Skip to main content

drasi_source_postgres/
decoder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{anyhow, Result};
16use byteorder::{BigEndian, ReadBytesExt};
17use chrono::{DateTime, NaiveDateTime, Utc};
18use log::{debug, warn};
19use postgres_types::Oid;
20use rust_decimal::Decimal;
21use serde_json::Value as JsonValue;
22use std::collections::HashMap;
23use std::io::{Cursor, Read};
24use std::sync::Arc;
25use uuid::Uuid;
26
27use super::types::{
28    ColumnInfo, PostgresValue, RelationInfo, ReplicaIdentity, TransactionInfo, WalMessage,
29};
30
31#[allow(dead_code)]
32const PGOUTPUT_VERSION: u32 = 1;
33
34pub struct PgOutputDecoder {
35    relations: HashMap<u32, RelationInfo>,
36    current_transaction: Option<TransactionInfo>,
37}
38
39impl Default for PgOutputDecoder {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl PgOutputDecoder {
46    pub fn new() -> Self {
47        Self {
48            relations: HashMap::new(),
49            current_transaction: None,
50        }
51    }
52
53    pub fn decode_message(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
54        if data.is_empty() {
55            return Ok(None);
56        }
57
58        let msg_type = data[0];
59        let payload = &data[1..];
60
61        match msg_type {
62            b'B' => self.decode_begin(payload),
63            b'C' => self.decode_commit(payload),
64            b'O' => self.decode_origin(payload),
65            b'R' => self.decode_relation(payload),
66            b'Y' => self.decode_type(payload),
67            b'I' => self.decode_insert(payload),
68            b'U' => self.decode_update(payload),
69            b'D' => self.decode_delete(payload),
70            b'T' => self.decode_truncate(payload),
71            b'M' => self.decode_message_logical(payload),
72            _ => {
73                warn!("Unknown pgoutput message type: 0x{msg_type:02x}");
74                Ok(None)
75            }
76        }
77    }
78
79    fn decode_begin(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
80        let mut cursor = Cursor::new(data);
81
82        let final_lsn = cursor.read_u64::<BigEndian>()?;
83        let commit_timestamp = cursor.read_i64::<BigEndian>()?;
84        let xid = cursor.read_u32::<BigEndian>()?;
85
86        // Convert PostgreSQL timestamp to DateTime
87        let timestamp = postgres_epoch_to_datetime(commit_timestamp)?;
88
89        let transaction = TransactionInfo {
90            xid,
91            commit_lsn: final_lsn,
92            commit_timestamp: timestamp,
93        };
94
95        self.current_transaction = Some(transaction.clone());
96
97        debug!("Begin transaction: xid={xid}, lsn={final_lsn:x}");
98        Ok(Some(WalMessage::Begin(transaction)))
99    }
100
101    fn decode_commit(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
102        let mut cursor = Cursor::new(data);
103
104        let _flags = cursor.read_u8()?;
105        let commit_lsn = cursor.read_u64::<BigEndian>()?;
106        let _end_lsn = cursor.read_u64::<BigEndian>()?;
107        let commit_timestamp = cursor.read_i64::<BigEndian>()?;
108
109        let timestamp = postgres_epoch_to_datetime(commit_timestamp)?;
110
111        let transaction = TransactionInfo {
112            xid: self
113                .current_transaction
114                .as_ref()
115                .map(|t| t.xid)
116                .unwrap_or(0),
117            commit_lsn,
118            commit_timestamp: timestamp,
119        };
120
121        self.current_transaction = None;
122
123        debug!("Commit transaction: lsn={commit_lsn:x}");
124        Ok(Some(WalMessage::Commit(transaction)))
125    }
126
127    fn decode_origin(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
128        // Origin messages are informational, we can skip them for now
129        Ok(None)
130    }
131
132    fn decode_relation(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
133        let mut cursor = Cursor::new(data);
134
135        let relation_id = cursor.read_u32::<BigEndian>()?;
136        let namespace = read_cstring(&mut cursor)?;
137        let name = read_cstring(&mut cursor)?;
138        let replica_identity = match cursor.read_u8()? {
139            b'd' => ReplicaIdentity::Default,
140            b'n' => ReplicaIdentity::Nothing,
141            b'f' => ReplicaIdentity::Full,
142            b'i' => ReplicaIdentity::Index,
143            other => {
144                warn!("Unknown replica identity: {}", other as char);
145                ReplicaIdentity::Default
146            }
147        };
148
149        let column_count = cursor.read_u16::<BigEndian>()?;
150        let mut columns = Vec::with_capacity(column_count as usize);
151
152        for _ in 0..column_count {
153            let flags = cursor.read_u8()?;
154            let is_key = (flags & 1) != 0;
155            let column_name = read_cstring(&mut cursor)?;
156            let type_oid = cursor.read_u32::<BigEndian>()?;
157            let type_modifier = cursor.read_i32::<BigEndian>()?;
158
159            columns.push(ColumnInfo {
160                name: column_name,
161                type_oid: Oid::from(type_oid),
162                type_modifier,
163                is_key,
164            });
165        }
166
167        let relation = RelationInfo {
168            id: relation_id,
169            namespace,
170            name: name.clone(),
171            replica_identity,
172            columns,
173        };
174
175        debug!(
176            "Relation: id={}, namespace={}, name={}, columns={}",
177            relation_id,
178            relation.namespace,
179            name,
180            relation.columns.len()
181        );
182
183        self.relations.insert(relation_id, relation.clone());
184        Ok(Some(WalMessage::Relation(relation)))
185    }
186
187    fn decode_type(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
188        // Type messages describe custom types, we'll handle them later if needed
189        Ok(None)
190    }
191
192    fn decode_insert(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
193        debug!("Decoding insert message, data length: {}", data.len());
194        let mut cursor = Cursor::new(data);
195
196        let relation_id = cursor.read_u32::<BigEndian>()?;
197        let tuple_type = cursor.read_u8()?;
198
199        debug!(
200            "Insert: relation_id={}, tuple_type={}",
201            relation_id, tuple_type as char
202        );
203
204        if tuple_type != b'N' {
205            return Err(anyhow!(
206                "Expected 'N' tuple type for insert, got: {tuple_type}"
207            ));
208        }
209
210        let relation = self
211            .relations
212            .get(&relation_id)
213            .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
214
215        debug!(
216            "Insert for table: {}, expected columns: {}",
217            relation.name,
218            relation.columns.len()
219        );
220        let tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
221
222        debug!(
223            "Insert: relation={}, columns={}",
224            relation.name,
225            tuple.len()
226        );
227        Ok(Some(WalMessage::Insert { relation_id, tuple }))
228    }
229
230    fn decode_update(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
231        let mut cursor = Cursor::new(data);
232
233        let relation_id = cursor.read_u32::<BigEndian>()?;
234        let tuple_type = cursor.read_u8()?;
235
236        let relation = self
237            .relations
238            .get(&relation_id)
239            .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
240
241        let mut old_tuple = None;
242        let new_tuple;
243
244        match tuple_type {
245            b'K' | b'O' => {
246                // Has old tuple (key or old)
247                old_tuple = Some(self.decode_tuple_data(&mut cursor, &relation.columns)?);
248                let next_type = cursor.read_u8()?;
249                if next_type != b'N' {
250                    return Err(anyhow!("Expected 'N' after old tuple, got: {next_type}"));
251                }
252                new_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
253            }
254            b'N' => {
255                // Only new tuple
256                new_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
257            }
258            _ => {
259                return Err(anyhow!("Unknown tuple type for update: {tuple_type}"));
260            }
261        }
262
263        debug!(
264            "Update: relation={}, has_old={}",
265            relation.name,
266            old_tuple.is_some()
267        );
268        Ok(Some(WalMessage::Update {
269            relation_id,
270            old_tuple,
271            new_tuple,
272        }))
273    }
274
275    fn decode_delete(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
276        let mut cursor = Cursor::new(data);
277
278        let relation_id = cursor.read_u32::<BigEndian>()?;
279        let tuple_type = cursor.read_u8()?;
280
281        if tuple_type != b'K' && tuple_type != b'O' {
282            return Err(anyhow!(
283                "Expected 'K' or 'O' tuple type for delete, got: {tuple_type}"
284            ));
285        }
286
287        let relation = self
288            .relations
289            .get(&relation_id)
290            .ok_or_else(|| anyhow!("Unknown relation ID: {relation_id}"))?;
291
292        let old_tuple = self.decode_tuple_data(&mut cursor, &relation.columns)?;
293
294        debug!("Delete: relation={}", relation.name);
295        Ok(Some(WalMessage::Delete {
296            relation_id,
297            old_tuple,
298        }))
299    }
300
301    fn decode_truncate(&mut self, data: &[u8]) -> Result<Option<WalMessage>> {
302        let mut cursor = Cursor::new(data);
303
304        let relation_count = cursor.read_u32::<BigEndian>()?;
305        let _options = cursor.read_u8()?;
306
307        let mut relation_ids = Vec::with_capacity(relation_count as usize);
308        for _ in 0..relation_count {
309            relation_ids.push(cursor.read_u32::<BigEndian>()?);
310        }
311
312        debug!("Truncate: {relation_count} relations");
313        Ok(Some(WalMessage::Truncate { relation_ids }))
314    }
315
316    fn decode_message_logical(&mut self, _data: &[u8]) -> Result<Option<WalMessage>> {
317        // Logical messages are application-specific, skip for now
318        Ok(None)
319    }
320
321    fn decode_tuple_data(
322        &self,
323        cursor: &mut Cursor<&[u8]>,
324        columns: &[ColumnInfo],
325    ) -> Result<Vec<PostgresValue>> {
326        let start_pos = cursor.position();
327        let total_len = cursor.get_ref().len();
328        debug!("decode_tuple_data: start position={start_pos}, total buffer length={total_len}");
329
330        let column_count = cursor.read_u16::<BigEndian>()? as usize;
331        debug!("Decoding {column_count} columns");
332
333        if column_count != columns.len() {
334            warn!(
335                "Column count mismatch: expected {}, got {}",
336                columns.len(),
337                column_count
338            );
339        }
340
341        let mut values = Vec::with_capacity(column_count);
342
343        for i in 0..column_count {
344            let column = columns
345                .get(i)
346                .ok_or_else(|| anyhow!("Column index out of bounds: {i}"))?;
347
348            let tuple_type = cursor.read_u8()?;
349            debug!(
350                "Column {}: type={} ({}), oid={}",
351                i, tuple_type as char, tuple_type, column.type_oid
352            );
353
354            let value = match tuple_type {
355                b'n' => PostgresValue::Null,
356                b'u' => PostgresValue::Null, // Unchanged TOAST value
357                b't' => {
358                    let length = cursor.read_u32::<BigEndian>()? as usize;
359                    // Ensure we have enough data to read
360                    let pos = cursor.position() as usize;
361                    let available = cursor.get_ref().len() - pos;
362                    debug!(
363                        "Column {i} text value: length={length}, pos={pos}, available={available}"
364                    );
365                    if available < length {
366                        return Err(anyhow!("Not enough data for column {} ({}): need {} bytes, have {} bytes at position {}", 
367                            i, column.name, length, available, pos));
368                    }
369                    let mut data = vec![0u8; length];
370                    cursor.read_exact(&mut data).map_err(|e| {
371                        anyhow!(
372                            "Failed to read {} bytes for column {} ({}): {}",
373                            length,
374                            i,
375                            column.name,
376                            e
377                        )
378                    })?;
379                    debug!(
380                        "Successfully read {} bytes for column {}, decoding type OID {}",
381                        length, i, column.type_oid
382                    );
383                    self.decode_column_value(&data, column.type_oid)?
384                }
385                _ => {
386                    return Err(anyhow!(
387                        "Unknown tuple data type: {} ({})",
388                        tuple_type as char,
389                        tuple_type
390                    ));
391                }
392            };
393
394            values.push(value);
395        }
396
397        Ok(values)
398    }
399
400    fn decode_column_value(&self, data: &[u8], type_oid: Oid) -> Result<PostgresValue> {
401        // Map common PostgreSQL type OIDs to decoders
402        let oid_value = type_oid;
403        match oid_value {
404            16 => {
405                // bool
406                Ok(PostgresValue::Bool(data[0] != 0))
407            }
408            21 => {
409                // int2
410                if data.len() == 2 {
411                    // Binary format
412                    let mut cursor = Cursor::new(data);
413                    Ok(PostgresValue::Int2(cursor.read_i16::<BigEndian>()?))
414                } else {
415                    // Text format
416                    let text = String::from_utf8_lossy(data);
417                    let value = text
418                        .trim()
419                        .parse::<i16>()
420                        .map_err(|e| anyhow!("Failed to parse int2 from '{text}': {e}"))?;
421                    Ok(PostgresValue::Int2(value))
422                }
423            }
424            23 => {
425                // int4
426                if data.len() == 4 {
427                    // Binary format
428                    let mut cursor = Cursor::new(data);
429                    Ok(PostgresValue::Int4(cursor.read_i32::<BigEndian>()?))
430                } else {
431                    // Text format - parse as string
432                    let text = String::from_utf8_lossy(data);
433                    let value = text
434                        .trim()
435                        .parse::<i32>()
436                        .map_err(|e| anyhow!("Failed to parse int4 from '{text}': {e}"))?;
437                    Ok(PostgresValue::Int4(value))
438                }
439            }
440            20 => {
441                // int8
442                if data.len() == 8 {
443                    // Binary format
444                    let mut cursor = Cursor::new(data);
445                    Ok(PostgresValue::Int8(cursor.read_i64::<BigEndian>()?))
446                } else {
447                    // Text format
448                    let text = String::from_utf8_lossy(data);
449                    let value = text
450                        .trim()
451                        .parse::<i64>()
452                        .map_err(|e| anyhow!("Failed to parse int8 from '{text}': {e}"))?;
453                    Ok(PostgresValue::Int8(value))
454                }
455            }
456            700 => {
457                // float4
458                if data.len() == 4 {
459                    // Binary format
460                    let mut cursor = Cursor::new(data);
461                    Ok(PostgresValue::Float4(cursor.read_f32::<BigEndian>()?))
462                } else {
463                    // Text format
464                    let text = String::from_utf8_lossy(data);
465                    let value = text
466                        .trim()
467                        .parse::<f32>()
468                        .map_err(|e| anyhow!("Failed to parse float4 from '{text}': {e}"))?;
469                    Ok(PostgresValue::Float4(value))
470                }
471            }
472            701 => {
473                // float8
474                if data.len() == 8 {
475                    // Binary format
476                    let mut cursor = Cursor::new(data);
477                    Ok(PostgresValue::Float8(cursor.read_f64::<BigEndian>()?))
478                } else {
479                    // Text format
480                    let text = String::from_utf8_lossy(data);
481                    let value = text
482                        .trim()
483                        .parse::<f64>()
484                        .map_err(|e| anyhow!("Failed to parse float8 from '{text}': {e}"))?;
485                    Ok(PostgresValue::Float8(value))
486                }
487            }
488            1700 => {
489                // numeric
490                // Check if it's text format (pgoutput default) or binary format
491                if data.len() < 8
492                    || (!data.is_empty() && data[0] >= b'0' && data[0] <= b'9')
493                    || (!data.is_empty() && (data[0] == b'-' || data[0] == b'+' || data[0] == b'.'))
494                {
495                    // Text format - parse as string
496                    let text = String::from_utf8_lossy(data);
497                    let value = Decimal::from_str_exact(text.trim())
498                        .map_err(|e| anyhow!("Failed to parse numeric from '{text}': {e}"))?;
499                    Ok(PostgresValue::Numeric(value))
500                } else {
501                    // Binary format
502                    Ok(PostgresValue::Numeric(decode_numeric(data)?))
503                }
504            }
505            25 | 1043 | 19 => {
506                // text, varchar, name
507                Ok(PostgresValue::Text(
508                    String::from_utf8_lossy(data).to_string(),
509                ))
510            }
511            1042 => {
512                // char/bpchar
513                let s = String::from_utf8_lossy(data).trim_end().to_string();
514                Ok(PostgresValue::Char(s))
515            }
516            2950 => {
517                // uuid
518                if data.len() != 16 {
519                    return Err(anyhow!("Invalid UUID length: {}", data.len()));
520                }
521                let uuid = Uuid::from_slice(data)?;
522                Ok(PostgresValue::Uuid(uuid))
523            }
524            1114 => {
525                // timestamp
526                if data.len() == 8 {
527                    // Binary format
528                    let mut cursor = Cursor::new(data);
529                    let micros = cursor.read_i64::<BigEndian>()?;
530                    let timestamp = postgres_epoch_to_naive_datetime(micros)?;
531                    Ok(PostgresValue::Timestamp(timestamp))
532                } else {
533                    // Text format - parse PostgreSQL timestamp string
534                    let text = String::from_utf8_lossy(data);
535                    let timestamp =
536                        NaiveDateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S%.f")
537                            .or_else(|_| {
538                                NaiveDateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S")
539                            })
540                            .map_err(|e| anyhow!("Failed to parse timestamp from '{text}': {e}"))?;
541                    Ok(PostgresValue::Timestamp(timestamp))
542                }
543            }
544            1184 => {
545                // timestamptz
546                if data.len() == 8 {
547                    // Binary format
548                    let mut cursor = Cursor::new(data);
549                    let micros = cursor.read_i64::<BigEndian>()?;
550                    let timestamp = postgres_epoch_to_datetime(micros)?;
551                    Ok(PostgresValue::TimestampTz(timestamp))
552                } else {
553                    // Text format - parse PostgreSQL timestamptz string
554                    let text = String::from_utf8_lossy(data);
555                    // PostgreSQL sends timestamptz in ISO 8601 format
556                    let timestamp = DateTime::parse_from_rfc3339(text.trim())
557                        .or_else(|_| {
558                            DateTime::parse_from_str(text.trim(), "%Y-%m-%d %H:%M:%S%.f%z")
559                        })
560                        .map_err(|e| anyhow!("Failed to parse timestamptz from '{text}': {e}"))?
561                        .with_timezone(&Utc);
562                    Ok(PostgresValue::TimestampTz(timestamp))
563                }
564            }
565            1082 => {
566                // date
567                let mut cursor = Cursor::new(data);
568                let days = cursor.read_i32::<BigEndian>()?;
569                let date = postgres_epoch_to_date(days)?;
570                Ok(PostgresValue::Date(date))
571            }
572            1083 => {
573                // time
574                let mut cursor = Cursor::new(data);
575                let micros = cursor.read_i64::<BigEndian>()?;
576                let time = postgres_time_to_naive_time(micros)?;
577                Ok(PostgresValue::Time(time))
578            }
579            114 | 3802 => {
580                // json, jsonb
581                let json_str = if oid_value == 3802 {
582                    // jsonb has a version byte
583                    String::from_utf8_lossy(&data[1..]).to_string()
584                } else {
585                    String::from_utf8_lossy(data).to_string()
586                };
587                let value: JsonValue = serde_json::from_str(&json_str)?;
588                Ok(PostgresValue::Json(value))
589            }
590            17 => {
591                // bytea
592                Ok(PostgresValue::Bytea(data.to_vec()))
593            }
594            _ => {
595                // Default to text representation for unknown types
596                warn!("Unknown type OID {oid_value}, treating as text");
597                Ok(PostgresValue::Text(
598                    String::from_utf8_lossy(data).to_string(),
599                ))
600            }
601        }
602    }
603
604    pub fn get_relation(&self, relation_id: u32) -> Option<&RelationInfo> {
605        self.relations.get(&relation_id)
606    }
607}
608
609fn read_cstring(cursor: &mut Cursor<&[u8]>) -> Result<String> {
610    let mut buffer = Vec::new();
611    loop {
612        let byte = cursor.read_u8()?;
613        if byte == 0 {
614            break;
615        }
616        buffer.push(byte);
617    }
618    Ok(String::from_utf8_lossy(&buffer).to_string())
619}
620
621fn postgres_epoch_to_datetime(micros: i64) -> Result<DateTime<Utc>> {
622    // PostgreSQL epoch is 2000-01-01 00:00:00
623    const POSTGRES_EPOCH: i64 = 946684800000000; // microseconds since Unix epoch
624    let unix_micros = micros + POSTGRES_EPOCH;
625    let secs = unix_micros / 1_000_000;
626    let nanos = ((unix_micros % 1_000_000) * 1000) as u32;
627
628    DateTime::from_timestamp(secs, nanos).ok_or_else(|| anyhow!("Invalid timestamp"))
629}
630
631fn postgres_epoch_to_naive_datetime(micros: i64) -> Result<NaiveDateTime> {
632    let dt = postgres_epoch_to_datetime(micros)?;
633    Ok(dt.naive_utc())
634}
635
636fn postgres_epoch_to_date(days: i32) -> Result<chrono::NaiveDate> {
637    // PostgreSQL date epoch is 2000-01-01
638    const POSTGRES_DATE_EPOCH: i32 = 10957; // days since Unix epoch (1970-01-01)
639    let unix_days = days + POSTGRES_DATE_EPOCH;
640
641    let epoch =
642        chrono::NaiveDate::from_ymd_opt(1970, 1, 1).ok_or_else(|| anyhow!("Invalid epoch date"))?;
643
644    Ok(epoch + chrono::Duration::days(unix_days as i64))
645}
646
647fn postgres_time_to_naive_time(micros: i64) -> Result<chrono::NaiveTime> {
648    let total_secs = micros / 1_000_000;
649    let hours = (total_secs / 3600) as u32;
650    let minutes = ((total_secs % 3600) / 60) as u32;
651    let seconds = (total_secs % 60) as u32;
652    let nanos = ((micros % 1_000_000) * 1000) as u32;
653
654    chrono::NaiveTime::from_hms_nano_opt(hours, minutes, seconds, nanos)
655        .ok_or_else(|| anyhow!("Invalid time"))
656}
657
658/// Decode a column value from text format (used by bootstrap)
659pub fn decode_column_value_text(
660    text: &str,
661    type_oid: i32,
662) -> Result<drasi_core::models::ElementValue> {
663    use drasi_core::models::ElementValue;
664
665    match type_oid as u32 {
666        16 => {
667            // bool
668            let value = text.parse::<bool>().or_else(|_| match text {
669                "t" => Ok(true),
670                "f" => Ok(false),
671                _ => Err(anyhow!("Invalid boolean value")),
672            })?;
673            Ok(ElementValue::Bool(value))
674        }
675        21 => {
676            // int2
677            let value = text.parse::<i16>()?;
678            Ok(ElementValue::Integer(value as i64))
679        }
680        23 => {
681            // int4
682            let value = text.parse::<i32>()?;
683            Ok(ElementValue::Integer(value as i64))
684        }
685        20 => {
686            // int8
687            let value = text.parse::<i64>()?;
688            Ok(ElementValue::Integer(value))
689        }
690        700 => {
691            // float4
692            let value = text.parse::<f32>()?;
693            Ok(ElementValue::Float(ordered_float::OrderedFloat(
694                value as f64,
695            )))
696        }
697        701 => {
698            // float8
699            let value = text.parse::<f64>()?;
700            Ok(ElementValue::Float(ordered_float::OrderedFloat(value)))
701        }
702        1700 => {
703            // numeric/decimal
704            let value = text.parse::<f64>()?;
705            Ok(ElementValue::Float(ordered_float::OrderedFloat(value)))
706        }
707        25 | 1043 | 19 => {
708            // text, varchar, name
709            Ok(ElementValue::String(Arc::from(text)))
710        }
711        1114 | 1184 => {
712            // timestamp, timestamptz
713            Ok(ElementValue::String(Arc::from(text)))
714        }
715        1082 => {
716            // date
717            Ok(ElementValue::String(Arc::from(text)))
718        }
719        2950 => {
720            // uuid
721            Ok(ElementValue::String(Arc::from(text)))
722        }
723        _ => {
724            // Default to string for unknown types
725            Ok(ElementValue::String(Arc::from(text)))
726        }
727    }
728}
729
730fn decode_numeric(data: &[u8]) -> Result<Decimal> {
731    if data.len() < 8 {
732        return Err(anyhow!("Numeric data too short"));
733    }
734
735    let mut cursor = Cursor::new(data);
736    let ndigits = cursor.read_u16::<BigEndian>()?;
737    let weight = cursor.read_i16::<BigEndian>()?;
738    let sign = cursor.read_u16::<BigEndian>()?;
739    let dscale = cursor.read_u16::<BigEndian>()?;
740
741    if sign == 0xC000 {
742        // NaN
743        return Ok(Decimal::ZERO);
744    }
745
746    let mut digits = Vec::with_capacity(ndigits as usize);
747    for _ in 0..ndigits {
748        digits.push(cursor.read_u16::<BigEndian>()?);
749    }
750
751    // Convert PostgreSQL numeric to Decimal
752    // This is a simplified implementation
753    let mut result = Decimal::ZERO;
754    let base = Decimal::from(10000);
755
756    for (i, &digit) in digits.iter().enumerate() {
757        let power = weight as i32 - i as i32;
758        let digit_value = Decimal::from(digit as i64);
759        let multiplier = if power >= 0 {
760            let mut result = Decimal::ONE;
761            for _ in 0..power {
762                result *= base;
763            }
764            result
765        } else {
766            let mut result = Decimal::ONE;
767            for _ in 0..(-power) {
768                result /= base;
769            }
770            result
771        };
772        result += digit_value * multiplier;
773    }
774
775    if sign == 0x4000 {
776        result = -result;
777    }
778
779    // Apply scale
780    if dscale > 0 {
781        let mut scale_divisor = Decimal::ONE;
782        for _ in 0..dscale {
783            scale_divisor *= Decimal::from(10);
784        }
785        result /= scale_divisor;
786    }
787
788    Ok(result)
789}