Skip to main content

drasi_source_postgres/
types.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{DateTime, NaiveDateTime, Utc};
16use postgres_types::Oid;
17use rust_decimal::Decimal;
18use serde_json::Value as JsonValue;
19use std::collections::HashMap;
20use uuid::Uuid;
21
22#[derive(Debug, Clone)]
23pub enum PostgresValue {
24    Null,
25    Bool(bool),
26    Int2(i16),
27    Int4(i32),
28    Int8(i64),
29    Float4(f32),
30    Float8(f64),
31    Numeric(Decimal),
32    Text(String),
33    Varchar(String),
34    Char(String),
35    Uuid(Uuid),
36    Timestamp(NaiveDateTime),
37    TimestampTz(DateTime<Utc>),
38    Date(chrono::NaiveDate),
39    Time(chrono::NaiveTime),
40    Json(JsonValue),
41    Jsonb(JsonValue),
42    Array(Vec<PostgresValue>),
43    Composite(HashMap<String, PostgresValue>),
44    Bytea(Vec<u8>),
45}
46
47#[derive(Debug, Clone)]
48pub struct ColumnInfo {
49    pub name: String,
50    pub type_oid: Oid,
51    pub type_modifier: i32,
52    pub is_key: bool,
53}
54
55#[derive(Debug, Clone)]
56pub struct RelationInfo {
57    pub id: u32,
58    pub namespace: String,
59    pub name: String,
60    pub replica_identity: ReplicaIdentity,
61    pub columns: Vec<ColumnInfo>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq)]
65pub enum ReplicaIdentity {
66    Default,
67    Nothing,
68    Full,
69    Index,
70}
71
72#[derive(Debug, Clone)]
73pub struct TransactionInfo {
74    pub xid: u32,
75    pub commit_lsn: u64,
76    pub commit_timestamp: DateTime<Utc>,
77}
78
79#[derive(Debug, Clone)]
80pub enum WalMessage {
81    Begin(TransactionInfo),
82    Commit(TransactionInfo),
83    Relation(RelationInfo),
84    Insert {
85        relation_id: u32,
86        tuple: Vec<PostgresValue>,
87    },
88    Update {
89        relation_id: u32,
90        old_tuple: Option<Vec<PostgresValue>>,
91        new_tuple: Vec<PostgresValue>,
92    },
93    Delete {
94        relation_id: u32,
95        old_tuple: Vec<PostgresValue>,
96    },
97    Truncate {
98        relation_ids: Vec<u32>,
99    },
100}
101
102#[derive(Debug, Clone)]
103pub struct ReplicationSlotInfo {
104    pub slot_name: String,
105    pub consistent_point: String,
106    pub snapshot_name: Option<String>,
107    pub output_plugin: String,
108    /// The earliest WAL position retained by this slot (`restart_lsn` from pg_replication_slots).
109    /// Only populated when querying an existing slot via `get_replication_slot_info`.
110    pub restart_lsn: Option<String>,
111}
112
113#[derive(Debug, Clone)]
114pub struct StandbyStatusUpdate {
115    pub write_lsn: u64,
116    pub flush_lsn: u64,
117    pub apply_lsn: u64,
118    pub reply_requested: bool,
119}
120
121impl PostgresValue {
122    pub fn to_json(&self) -> JsonValue {
123        match self {
124            PostgresValue::Null => JsonValue::Null,
125            PostgresValue::Bool(b) => JsonValue::Bool(*b),
126            PostgresValue::Int2(i) => JsonValue::Number((*i).into()),
127            PostgresValue::Int4(i) => JsonValue::Number((*i).into()),
128            PostgresValue::Int8(i) => JsonValue::Number((*i).into()),
129            PostgresValue::Float4(f) => {
130                if let Some(n) = serde_json::Number::from_f64(*f as f64) {
131                    JsonValue::Number(n)
132                } else {
133                    JsonValue::Null
134                }
135            }
136            PostgresValue::Float8(f) => {
137                if let Some(n) = serde_json::Number::from_f64(*f) {
138                    JsonValue::Number(n)
139                } else {
140                    JsonValue::Null
141                }
142            }
143            PostgresValue::Numeric(d) => {
144                // Convert Decimal to Number via string parsing
145                // This ensures precision is maintained and the value is valid JSON number
146                d.to_string()
147                    .parse::<serde_json::Number>()
148                    .map(JsonValue::Number)
149                    .unwrap_or(JsonValue::Null)
150            }
151            PostgresValue::Text(s) | PostgresValue::Varchar(s) | PostgresValue::Char(s) => {
152                JsonValue::String(s.clone())
153            }
154            PostgresValue::Uuid(u) => JsonValue::String(u.to_string()),
155            PostgresValue::Timestamp(ts) => JsonValue::String(ts.to_string()),
156            PostgresValue::TimestampTz(ts) => JsonValue::String(ts.to_rfc3339()),
157            PostgresValue::Date(d) => JsonValue::String(d.to_string()),
158            PostgresValue::Time(t) => JsonValue::String(t.to_string()),
159            PostgresValue::Json(j) | PostgresValue::Jsonb(j) => j.clone(),
160            PostgresValue::Array(arr) => {
161                JsonValue::Array(arr.iter().map(|v| v.to_json()).collect())
162            }
163            PostgresValue::Composite(map) => {
164                let obj: serde_json::Map<String, JsonValue> =
165                    map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect();
166                JsonValue::Object(obj)
167            }
168            PostgresValue::Bytea(bytes) => JsonValue::String(base64::encode(bytes)),
169        }
170    }
171}
172
173// Add base64 encoding support
174mod base64 {
175    pub fn encode(input: &[u8]) -> String {
176        const TABLE: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
177        let mut result = String::new();
178        let mut i = 0;
179
180        while i < input.len() {
181            let b1 = input[i];
182            let b2 = if i + 1 < input.len() { input[i + 1] } else { 0 };
183            let b3 = if i + 2 < input.len() { input[i + 2] } else { 0 };
184
185            result.push(TABLE[(b1 >> 2) as usize] as char);
186            result.push(TABLE[(((b1 & 0x03) << 4) | (b2 >> 4)) as usize] as char);
187
188            if i + 1 < input.len() {
189                result.push(TABLE[(((b2 & 0x0f) << 2) | (b3 >> 6)) as usize] as char);
190            } else {
191                result.push('=');
192            }
193
194            if i + 2 < input.len() {
195                result.push(TABLE[(b3 & 0x3f) as usize] as char);
196            } else {
197                result.push('=');
198            }
199
200            i += 3;
201        }
202
203        result
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::str::FromStr;
211
212    #[test]
213    fn test_decimal_to_json_as_number() {
214        // Test that decimal values are serialized as numbers, not strings
215        let decimal = Decimal::from_str("123.45").unwrap();
216        let pg_value = PostgresValue::Numeric(decimal);
217        let json = pg_value.to_json();
218
219        // Should be a Number, not a String
220        assert!(json.is_number(), "Decimal should be serialized as a number");
221
222        // Verify the value is correct
223        let num = json.as_f64().unwrap();
224        assert_eq!(num, 123.45);
225    }
226
227    #[test]
228    fn test_decimal_integer_to_json() {
229        let decimal = Decimal::from_str("100").unwrap();
230        let pg_value = PostgresValue::Numeric(decimal);
231        let json = pg_value.to_json();
232
233        assert!(
234            json.is_number(),
235            "Integer decimal should be serialized as a number"
236        );
237
238        let num = json.as_f64().unwrap();
239        assert_eq!(num, 100.0);
240    }
241
242    #[test]
243    fn test_decimal_small_value_to_json() {
244        let decimal = Decimal::from_str("0.00001").unwrap();
245        let pg_value = PostgresValue::Numeric(decimal);
246        let json = pg_value.to_json();
247
248        assert!(
249            json.is_number(),
250            "Small decimal should be serialized as a number"
251        );
252
253        let num = json.as_f64().unwrap();
254        assert_eq!(num, 0.00001);
255    }
256
257    #[test]
258    fn test_decimal_negative_to_json() {
259        let decimal = Decimal::from_str("-999.99").unwrap();
260        let pg_value = PostgresValue::Numeric(decimal);
261        let json = pg_value.to_json();
262
263        assert!(
264            json.is_number(),
265            "Negative decimal should be serialized as a number"
266        );
267
268        let num = json.as_f64().unwrap();
269        assert_eq!(num, -999.99);
270    }
271
272    #[test]
273    fn test_decimal_zero_to_json() {
274        let decimal = Decimal::from_str("0").unwrap();
275        let pg_value = PostgresValue::Numeric(decimal);
276        let json = pg_value.to_json();
277
278        assert!(
279            json.is_number(),
280            "Zero decimal should be serialized as a number"
281        );
282
283        let num = json.as_f64().unwrap();
284        assert_eq!(num, 0.0);
285    }
286}