1use anyhow::{anyhow, Result};
16use byteorder::{BigEndian, ReadBytesExt};
17use chrono::{DateTime, NaiveDateTime, Utc};
18use log::{debug, warn};
19use postgres_types::Oid;
20use rust_decimal::Decimal;
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::io::{Cursor, Read};
24use std::sync::Arc;
25use uuid::Uuid;
26
27use super::types::{
28 ColumnInfo, PostgresValue, RelationInfo, ReplicaIdentity, TransactionInfo, WalMessage,
29};
30
31#[allow(dead_code)]
32const PGOUTPUT_VERSION: u32 = 1;
33
34pub struct PgOutputDecoder {
35 relations: HashMap<u32, RelationInfo>,
36 current_transaction: Option<TransactionInfo>,
37}
38
39impl Default for PgOutputDecoder {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl PgOutputDecoder {
46 pub fn new() -> Self {
47 Self {
48 relations: HashMap::new(),
49 current_transaction: None,
50 }
51 }
52
53 pub fn decode_message(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
54 if data.is_empty() {
55 return Ok(None);
56 }
57
58 let msg_type = data[0];
59 let payload = &data[1..];
60
61 match msg_type {
62 b'B' => self.decode_begin(payload),
63 b'C' => self.decode_commit(payload),
64 b'O' => self.decode_origin(payload),
65 b'R' => self.decode_relation(payload),
66 b'Y' => self.decode_type(payload),
67 b'I' => self.decode_insert(payload),
68 b'U' => self.decode_update(payload),
69 b'D' => self.decode_delete(payload),
70 b'T' => self.decode_truncate(payload),
71 b'M' => self.decode_message_logical(payload),
72 _ => {
73 warn!("Unknown pgoutput message type: 0x{msg_type:02x}");
74 Ok(None)
75 }
76 }
77 }
78
79 fn decode_begin(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
80 let mut cursor = Cursor::new(data);
81
82 let final_lsn = cursor.read_u64::<BigEndian>()?;
83 let commit_timestamp = cursor.read_i64::<BigEndian>()?;
84 let xid = cursor.read_u32::<BigEndian>()?;
85
86 let timestamp = postgres_epoch_to_datetime(commit_timestamp)?;
88
89 let transaction = TransactionInfo {
90 xid,
91 commit_lsn: final_lsn,
92 commit_timestamp: timestamp,
93 };
94
95 self.current_transaction = Some(transaction.clone());
96
97 debug!("Begin transaction: xid={xid}, lsn={final_lsn:x}");
98 Ok(Some(WalMessage::Begin(transaction)))
99 }
100
101 fn decode_commit(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
102 let mut cursor = Cursor::new(data);
103
104 let _flags = cursor.read_u8()?;
105 let commit_lsn = cursor.read_u64::<BigEndian>()?;
106 let _end_lsn = cursor.read_u64::<BigEndian>()?;
107 let commit_timestamp = cursor.read_i64::<BigEndian>()?;
108
109 let timestamp = postgres_epoch_to_datetime(commit_timestamp)?;
110
111 let transaction = TransactionInfo {
112 xid: self
113 .current_transaction
114 .as_ref()
115 .map(|t| t.xid)
116 .unwrap_or(0),
117 commit_lsn,
118 commit_timestamp: timestamp,
119 };
120
121 self.current_transaction = None;
122
123 debug!("Commit transaction: lsn={commit_lsn:x}");
124 Ok(Some(WalMessage::Commit(transaction)))
125 }
126
127 fn decode_origin(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
128 Ok(None)
130 }
131
132 fn decode_relation(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
133 let mut cursor = Cursor::new(data);
134
135 let relation_id = cursor.read_u32::<BigEndian>()?;
136 let namespace = read_cstring(&mut cursor)?;
137 let name = read_cstring(&mut cursor)?;
138 let replica_identity = match cursor.read_u8()? {
139 b'd' => ReplicaIdentity::Default,
140 b'n' => ReplicaIdentity::Nothing,
141 b'f' => ReplicaIdentity::Full,
142 b'i' => ReplicaIdentity::Index,
143 other => {
144 warn!("Unknown replica identity: {}", other as char);
145 ReplicaIdentity::Default
146 }
147 };
148
149 let column_count = cursor.read_u16::<BigEndian>()?;
150 let mut columns = Vec::with_capacity(column_count as usize);
151
152 for _ in 0..column_count {
153 let flags = cursor.read_u8()?;
154 let is_key = (flags & 1) != 0;
155 let column_name = read_cstring(&mut cursor)?;
156 let type_oid = cursor.read_u32::<BigEndian>()?;
157 let type_modifier = cursor.read_i32::<BigEndian>()?;
158
159 columns.push(ColumnInfo {
160 name: column_name,
161 type_oid: Oid::from(type_oid),
162 type_modifier,
163 is_key,
164 });
165 }
166
167 let relation = RelationInfo {
168 id: relation_id,
169 namespace,
170 name: name.clone(),
171 replica_identity,
172 columns,
173 };
174
175 debug!(
176 "Relation: id={}, namespace={}, name={}, columns={}",
177 relation_id,
178 relation.namespace,
179 name,
180 relation.columns.len()
181 );
182
183 self.relations.insert(relation_id, relation.clone());
184 Ok(Some(WalMessage::Relation(relation)))
185 }
186
187 fn decode_type(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
188 Ok(None)
190 }
191
192 fn decode_insert(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
193 debug!("Decoding insert message, data length: {}", data.len());
194 let mut cursor = Cursor::new(data);
195
196 let relation_id = cursor.read_u32::<BigEndian>()?;
197 let tuple_type = cursor.read_u8()?;
198
199 debug!(
200 "Insert: relation_id={}, tuple_type={}",
201 relation_id, tuple_type as char
202 );
203
204 if tuple_type != b'N' {
205 return Err(anyhow!(
206 "Expected 'N' tuple type for insert, got: {tuple_type}"
207 ));
208 }
209
210 let relation = self
211 .relations
212 .get(&relation_id)
213 .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
214
215 debug!(
216 "Insert for table: {}, expected columns: {}",
217 relation.name,
218 relation.columns.len()
219 );
220 let tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
221
222 debug!(
223 "Insert: relation={}, columns={}",
224 relation.name,
225 tuple.len()
226 );
227 Ok(Some(WalMessage::Insert { relation_id, tuple }))
228 }
229
230 fn decode_update(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
231 let mut cursor = Cursor::new(data);
232
233 let relation_id = cursor.read_u32::<BigEndian>()?;
234 let tuple_type = cursor.read_u8()?;
235
236 let relation = self
237 .relations
238 .get(&relation_id)
239 .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
240
241 let mut old_tuple = None;
242 let new_tuple;
243
244 match tuple_type {
245 b'K' | b'O' => {
246 old_tuple = Some(self.decode_tuple_data(&mut cursor, &relation.columns)?);
248 let next_type = cursor.read_u8()?;
249 if next_type != b'N' {
250 return Err(anyhow!("Expected 'N' after old tuple, got: {next_type}"));
251 }
252 new_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
253 }
254 b'N' => {
255 new_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
257 }
258 _ => {
259 return Err(anyhow!("Unknown tuple type for update: {tuple_type}"));
260 }
261 }
262
263 debug!(
264 "Update: relation={}, has_old={}",
265 relation.name,
266 old_tuple.is_some()
267 );
268 Ok(Some(WalMessage::Update {
269 relation_id,
270 old_tuple,
271 new_tuple,
272 }))
273 }
274
275 fn decode_delete(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
276 let mut cursor = Cursor::new(data);
277
278 let relation_id = cursor.read_u32::<BigEndian>()?;
279 let tuple_type = cursor.read_u8()?;
280
281 if tuple_type != b'K' && tuple_type != b'O' {
282 return Err(anyhow!(
283 "Expected 'K' or 'O' tuple type for delete, got: {tuple_type}"
284 ));
285 }
286
287 let relation = self
288 .relations
289 .get(&relation_id)
290 .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
291
292 let old_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
293
294 debug!("Delete: relation={}", relation.name);
295 Ok(Some(WalMessage::Delete {
296 relation_id,
297 old_tuple,
298 }))
299 }
300
301 fn decode_truncate(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
302 let mut cursor = Cursor::new(data);
303
304 let relation_count = cursor.read_u32::<BigEndian>()?;
305 let _options = cursor.read_u8()?;
306
307 let mut relation_ids = Vec::with_capacity(relation_count as usize);
308 for _ in 0..relation_count {
309 relation_ids.push(cursor.read_u32::<BigEndian>()?);
310 }
311
312 debug!("Truncate: {relation_count} relations");
313 Ok(Some(WalMessage::Truncate { relation_ids }))
314 }
315
316 fn decode_message_logical(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
317 Ok(None)
319 }
320
321 fn decode_tuple_data(
322 &self,
323 cursor: &mut Cursor<&[u8]>,
324 columns: &[ColumnInfo],
325 ) -> Result<Vec<PostgresValue>> {
326 let start_pos = cursor.position();
327 let total_len = cursor.get_ref().len();
328 debug!("decode_tuple_data: start position={start_pos}, total buffer length={total_len}");
329
330 let column_count = cursor.read_u16::<BigEndian>()? as usize;
331 debug!("Decoding {column_count} columns");
332
333 if column_count != columns.len() {
334 warn!(
335 "Column count mismatch: expected {}, got {}",
336 columns.len(),
337 column_count
338 );
339 }
340
341 let mut values = Vec::with_capacity(column_count);
342
343 for i in 0..column_count {
344 let column = columns
345 .get(i)
346 .ok_or_else(|| anyhow!("Column index out of bounds: {i}"))?;
347
348 let tuple_type = cursor.read_u8()?;
349 debug!(
350 "Column {}: type={} ({}), oid={}",
351 i, tuple_type as char, tuple_type, column.type_oid
352 );
353
354 let value = match tuple_type {
355 b'n' => PostgresValue::Null,
356 b'u' => PostgresValue::Null, b't' => {
358 let length = cursor.read_u32::<BigEndian>()? as usize;
359 let pos = cursor.position() as usize;
361 let available = cursor.get_ref().len() - pos;
362 debug!(
363 "Column {i} text value: length={length}, pos={pos}, available={available}"
364 );
365 if available < length {
366 return Err(anyhow!("Not enough data for column {} ({}): need {} bytes, have {} bytes at position {}",
367 i, column.name, length, available, pos));
368 }
369 let mut data = vec![0u8; length];
370 cursor.read_exact(&mut data).map_err(|e| {
371 anyhow!(
372 "Failed to read {} bytes for column {} ({}): {}",
373 length,
374 i,
375 column.name,
376 e
377 )
378 })?;
379 debug!(
380 "Successfully read {} bytes for column {}, decoding type OID {}",
381 length, i, column.type_oid
382 );
383 self.decode_column_value(&data, column.type_oid)?
384 }
385 _ => {
386 return Err(anyhow!(
387 "Unknown tuple data type: {} ({})",
388 tuple_type as char,
389 tuple_type
390 ));
391 }
392 };
393
394 values.push(value);
395 }
396
397 Ok(values)
398 }
399
400 fn decode_column_value(&self, data: &[u8], type_oid: Oid) -> Result<PostgresValue> {
401 let oid_value = type_oid;
403 match oid_value {
404 16 => {
405 Ok(PostgresValue::Bool(data[0] != 0))
407 }
408 21 => {
409 if data.len() == 2 {
411 let mut cursor = Cursor::new(data);
413 Ok(PostgresValue::Int2(cursor.read_i16::<BigEndian>()?))
414 } else {
415 let text = String::from_utf8_lossy(data);
417 let value = text
418 .trim()
419 .parse::<i16>()
420 .map_err(|e| anyhow!("Failed to parse int2 from '{text}': {e}"))?;
421 Ok(PostgresValue::Int2(value))
422 }
423 }
424 23 => {
425 if data.len() == 4 {
427 let mut cursor = Cursor::new(data);
429 Ok(PostgresValue::Int4(cursor.read_i32::<BigEndian>()?))
430 } else {
431 let text = String::from_utf8_lossy(data);
433 let value = text
434 .trim()
435 .parse::<i32>()
436 .map_err(|e| anyhow!("Failed to parse int4 from '{text}': {e}"))?;
437 Ok(PostgresValue::Int4(value))
438 }
439 }
440 20 => {
441 if data.len() == 8 {
443 let mut cursor = Cursor::new(data);
445 Ok(PostgresValue::Int8(cursor.read_i64::<BigEndian>()?))
446 } else {
447 let text = String::from_utf8_lossy(data);
449 let value = text
450 .trim()
451 .parse::<i64>()
452 .map_err(|e| anyhow!("Failed to parse int8 from '{text}': {e}"))?;
453 Ok(PostgresValue::Int8(value))
454 }
455 }
456 700 => {
457 if data.len() == 4 {
459 let mut cursor = Cursor::new(data);
461 Ok(PostgresValue::Float4(cursor.read_f32::<BigEndian>()?))
462 } else {
463 let text = String::from_utf8_lossy(data);
465 let value = text
466 .trim()
467 .parse::<f32>()
468 .map_err(|e| anyhow!("Failed to parse float4 from '{text}': {e}"))?;
469 Ok(PostgresValue::Float4(value))
470 }
471 }
472 701 => {
473 if data.len() == 8 {
475 let mut cursor = Cursor::new(data);
477 Ok(PostgresValue::Float8(cursor.read_f64::<BigEndian>()?))
478 } else {
479 let text = String::from_utf8_lossy(data);
481 let value = text
482 .trim()
483 .parse::<f64>()
484 .map_err(|e| anyhow!("Failed to parse float8 from '{text}': {e}"))?;
485 Ok(PostgresValue::Float8(value))
486 }
487 }
488 1700 => {
489 if data.len() < 8
492 || (!data.is_empty() && data[0] >= b'0' && data[0] <= b'9')
493 || (!data.is_empty() && (data[0] == b'-' || data[0] == b'+' || data[0] == b'.'))
494 {
495 let text = String::from_utf8_lossy(data);
497 let value = Decimal::from_str_exact(text.trim())
498 .map_err(|e| anyhow!("Failed to parse numeric from '{text}': {e}"))?;
499 Ok(PostgresValue::Numeric(value))
500 } else {
501 Ok(PostgresValue::Numeric(decode_numeric(data)?))
503 }
504 }
505 25 | 1043 | 19 => {
506 Ok(PostgresValue::Text(
508 String::from_utf8_lossy(data).to_string(),
509 ))
510 }
511 1042 => {
512 let s = String::from_utf8_lossy(data).trim_end().to_string();
514 Ok(PostgresValue::Char(s))
515 }
516 2950 => {
517 if data.len() != 16 {
519 return Err(anyhow!("Invalid UUID length: {}", data.len()));
520 }
521 let uuid = Uuid::from_slice(data)?;
522 Ok(PostgresValue::Uuid(uuid))
523 }
524 1114 => {
525 if data.len() == 8 {
527 let mut cursor = Cursor::new(data);
529 let micros = cursor.read_i64::<BigEndian>()?;
530 let timestamp = postgres_epoch_to_naive_datetime(micros)?;
531 Ok(PostgresValue::Timestamp(timestamp))
532 } else {
533 let text = String::from_utf8_lossy(data);
535 let timestamp =
536 NaiveDateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S%.f")
537 .or_else(|_| {
538 NaiveDateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S")
539 })
540 .map_err(|e| anyhow!("Failed to parse timestamp from '{text}': {e}"))?;
541 Ok(PostgresValue::Timestamp(timestamp))
542 }
543 }
544 1184 => {
545 if data.len() == 8 {
547 let mut cursor = Cursor::new(data);
549 let micros = cursor.read_i64::<BigEndian>()?;
550 let timestamp = postgres_epoch_to_datetime(micros)?;
551 Ok(PostgresValue::TimestampTz(timestamp))
552 } else {
553 let text = String::from_utf8_lossy(data);
555 let timestamp = DateTime::parse_from_rfc3339(text.trim())
557 .or_else(|_| {
558 DateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S%.f%z")
559 })
560 .map_err(|e| anyhow!("Failed to parse timestamptz from '{text}': {e}"))?
561 .with_timezone(&Utc);
562 Ok(PostgresValue::TimestampTz(timestamp))
563 }
564 }
565 1082 => {
566 let mut cursor = Cursor::new(data);
568 let days = cursor.read_i32::<BigEndian>()?;
569 let date = postgres_epoch_to_date(days)?;
570 Ok(PostgresValue::Date(date))
571 }
572 1083 => {
573 let mut cursor = Cursor::new(data);
575 let micros = cursor.read_i64::<BigEndian>()?;
576 let time = postgres_time_to_naive_time(micros)?;
577 Ok(PostgresValue::Time(time))
578 }
579 114 | 3802 => {
580 let json_str = if oid_value == 3802 {
582 String::from_utf8_lossy(&data[1..]).to_string()
584 } else {
585 String::from_utf8_lossy(data).to_string()
586 };
587 let value: JsonValue = serde_json::from_str(&json_str)?;
588 Ok(PostgresValue::Json(value))
589 }
590 17 => {
591 Ok(PostgresValue::Bytea(data.to_vec()))
593 }
594 _ => {
595 warn!("Unknown type OID {oid_value}, treating as text");
597 Ok(PostgresValue::Text(
598 String::from_utf8_lossy(data).to_string(),
599 ))
600 }
601 }
602 }
603
604 pub fn get_relation(&self, relation_id: u32) -> Option<&RelationInfo> {
605 self.relations.get(&relation_id)
606 }
607}
608
609fn read_cstring(cursor: &mut Cursor<&[u8]>) -> Result<String> {
610 let mut buffer = Vec::new();
611 loop {
612 let byte = cursor.read_u8()?;
613 if byte == 0 {
614 break;
615 }
616 buffer.push(byte);
617 }
618 Ok(String::from_utf8_lossy(&buffer).to_string())
619}
620
621fn postgres_epoch_to_datetime(micros: i64) -> Result<DateTime<Utc>> {
622 const POSTGRES_EPOCH: i64 = 946684800000000; let unix_micros = micros + POSTGRES_EPOCH;
625 let secs = unix_micros / 1_000_000;
626 let nanos = ((unix_micros % 1_000_000) * 1000) as u32;
627
628 DateTime::from_timestamp(secs, nanos).ok_or_else(|| anyhow!("Invalid timestamp"))
629}
630
631fn postgres_epoch_to_naive_datetime(micros: i64) -> Result<NaiveDateTime> {
632 let dt = postgres_epoch_to_datetime(micros)?;
633 Ok(dt.naive_utc())
634}
635
636fn postgres_epoch_to_date(days: i32) -> Result<chrono::NaiveDate> {
637 const POSTGRES_DATE_EPOCH: i32 = 10957; let unix_days = days + POSTGRES_DATE_EPOCH;
640
641 let epoch =
642 chrono::NaiveDate::from_ymd_opt(1970, 1, 1).ok_or_else(|| anyhow!("Invalid epoch date"))?;
643
644 Ok(epoch + chrono::Duration::days(unix_days as i64))
645}
646
647fn postgres_time_to_naive_time(micros: i64) -> Result<chrono::NaiveTime> {
648 let total_secs = micros / 1_000_000;
649 let hours = (total_secs / 3600) as u32;
650 let minutes = ((total_secs % 3600) / 60) as u32;
651 let seconds = (total_secs % 60) as u32;
652 let nanos = ((micros % 1_000_000) * 1000) as u32;
653
654 chrono::NaiveTime::from_hms_nano_opt(hours, minutes, seconds, nanos)
655 .ok_or_else(|| anyhow!("Invalid time"))
656}
657
658pub fn decode_column_value_text(
660 text: &str,
661 type_oid: i32,
662) -> Result<drasi_core::models::ElementValue> {
663 use drasi_core::models::ElementValue;
664
665 match type_oid as u32 {
666 16 => {
667 let value = text.parse::<bool>().or_else(|_| match text {
669 "t" => Ok(true),
670 "f" => Ok(false),
671 _ => Err(anyhow!("Invalid boolean value")),
672 })?;
673 Ok(ElementValue::Bool(value))
674 }
675 21 => {
676 let value = text.parse::<i16>()?;
678 Ok(ElementValue::Integer(value as i64))
679 }
680 23 => {
681 let value = text.parse::<i32>()?;
683 Ok(ElementValue::Integer(value as i64))
684 }
685 20 => {
686 let value = text.parse::<i64>()?;
688 Ok(ElementValue::Integer(value))
689 }
690 700 => {
691 let value = text.parse::<f32>()?;
693 Ok(ElementValue::Float(ordered_float::OrderedFloat(
694 value as f64,
695 )))
696 }
697 701 => {
698 let value = text.parse::<f64>()?;
700 Ok(ElementValue::Float(ordered_float::OrderedFloat(value)))
701 }
702 1700 => {
703 let value = text.parse::<f64>()?;
705 Ok(ElementValue::Float(ordered_float::OrderedFloat(value)))
706 }
707 25 | 1043 | 19 => {
708 Ok(ElementValue::String(Arc::from(text)))
710 }
711 1114 | 1184 => {
712 Ok(ElementValue::String(Arc::from(text)))
714 }
715 1082 => {
716 Ok(ElementValue::String(Arc::from(text)))
718 }
719 2950 => {
720 Ok(ElementValue::String(Arc::from(text)))
722 }
723 _ => {
724 Ok(ElementValue::String(Arc::from(text)))
726 }
727 }
728}
729
730fn decode_numeric(data: &[u8]) -> Result<Decimal> {
731 if data.len() < 8 {
732 return Err(anyhow!("Numeric data too short"));
733 }
734
735 let mut cursor = Cursor::new(data);
736 let ndigits = cursor.read_u16::<BigEndian>()?;
737 let weight = cursor.read_i16::<BigEndian>()?;
738 let sign = cursor.read_u16::<BigEndian>()?;
739 let dscale = cursor.read_u16::<BigEndian>()?;
740
741 if sign == 0xC000 {
742 return Ok(Decimal::ZERO);
744 }
745
746 let mut digits = Vec::with_capacity(ndigits as usize);
747 for _ in 0..ndigits {
748 digits.push(cursor.read_u16::<BigEndian>()?);
749 }
750
751 let mut result = Decimal::ZERO;
754 let base = Decimal::from(10000);
755
756 for (i, &digit) in digits.iter().enumerate() {
757 let power = weight as i32 - i as i32;
758 let digit_value = Decimal::from(digit as i64);
759 let multiplier = if power >= 0 {
760 let mut result = Decimal::ONE;
761 for _ in 0..power {
762 result *= base;
763 }
764 result
765 } else {
766 let mut result = Decimal::ONE;
767 for _ in 0..(-power) {
768 result /= base;
769 }
770 result
771 };
772 result += digit_value * multiplier;
773 }
774
775 if sign == 0x4000 {
776 result = -result;
777 }
778
779 if dscale > 0 {
781 let mut scale_divisor = Decimal::ONE;
782 for _ in 0..dscale {
783 scale_divisor *= Decimal::from(10);
784 }
785 result /= scale_divisor;
786 }
787
788 Ok(result)
789}