1use chrono::{DateTime, NaiveDateTime, Utc};
16use postgres_types::Oid;
17use rust_decimal::Decimal;
18use serde_json::Value as JsonValue;
19use std::collections::HashMap;
20use uuid::Uuid;
21
22#[derive(Debug, Clone)]
23pub enum PostgresValue {
24 Null,
25 Bool(bool),
26 Int2(i16),
27 Int4(i32),
28 Int8(i64),
29 Float4(f32),
30 Float8(f64),
31 Numeric(Decimal),
32 Text(String),
33 Varchar(String),
34 Char(String),
35 Uuid(Uuid),
36 Timestamp(NaiveDateTime),
37 TimestampTz(DateTime<Utc>),
38 Date(chrono::NaiveDate),
39 Time(chrono::NaiveTime),
40 Json(JsonValue),
41 Jsonb(JsonValue),
42 Array(Vec<PostgresValue>),
43 Composite(HashMap<String, PostgresValue>),
44 Bytea(Vec<u8>),
45}
46
47#[derive(Debug, Clone)]
48pub struct ColumnInfo {
49 pub name: String,
50 pub type_oid: Oid,
51 pub type_modifier: i32,
52 pub is_key: bool,
53}
54
55#[derive(Debug, Clone)]
56pub struct RelationInfo {
57 pub id: u32,
58 pub namespace: String,
59 pub name: String,
60 pub replica_identity: ReplicaIdentity,
61 pub columns: Vec<ColumnInfo>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq)]
65pub enum ReplicaIdentity {
66 Default,
67 Nothing,
68 Full,
69 Index,
70}
71
72#[derive(Debug, Clone)]
73pub struct TransactionInfo {
74 pub xid: u32,
75 pub commit_lsn: u64,
76 pub commit_timestamp: DateTime<Utc>,
77}
78
79#[derive(Debug, Clone)]
80pub enum WalMessage {
81 Begin(TransactionInfo),
82 Commit(TransactionInfo),
83 Relation(RelationInfo),
84 Insert {
85 relation_id: u32,
86 tuple: Vec<PostgresValue>,
87 },
88 Update {
89 relation_id: u32,
90 old_tuple: Option<Vec<PostgresValue>>,
91 new_tuple: Vec<PostgresValue>,
92 },
93 Delete {
94 relation_id: u32,
95 old_tuple: Vec<PostgresValue>,
96 },
97 Truncate {
98 relation_ids: Vec<u32>,
99 },
100}
101
102#[derive(Debug, Clone)]
103pub struct ReplicationSlotInfo {
104 pub slot_name: String,
105 pub consistent_point: String,
106 pub snapshot_name: Option<String>,
107 pub output_plugin: String,
108 pub restart_lsn: Option<String>,
111}
112
113#[derive(Debug, Clone)]
114pub struct StandbyStatusUpdate {
115 pub write_lsn: u64,
116 pub flush_lsn: u64,
117 pub apply_lsn: u64,
118 pub reply_requested: bool,
119}
120
121impl PostgresValue {
122 pub fn to_json(&self) -> JsonValue {
123 match self {
124 PostgresValue::Null => JsonValue::Null,
125 PostgresValue::Bool(b) => JsonValue::Bool(*b),
126 PostgresValue::Int2(i) => JsonValue::Number((*i).into()),
127 PostgresValue::Int4(i) => JsonValue::Number((*i).into()),
128 PostgresValue::Int8(i) => JsonValue::Number((*i).into()),
129 PostgresValue::Float4(f) => {
130 if let Some(n) = serde_json::Number::from_f64(*f as f64) {
131 JsonValue::Number(n)
132 } else {
133 JsonValue::Null
134 }
135 }
136 PostgresValue::Float8(f) => {
137 if let Some(n) = serde_json::Number::from_f64(*f) {
138 JsonValue::Number(n)
139 } else {
140 JsonValue::Null
141 }
142 }
143 PostgresValue::Numeric(d) => {
144 d.to_string()
147 .parse::<serde_json::Number>()
148 .map(JsonValue::Number)
149 .unwrap_or(JsonValue::Null)
150 }
151 PostgresValue::Text(s) | PostgresValue::Varchar(s) | PostgresValue::Char(s) => {
152 JsonValue::String(s.clone())
153 }
154 PostgresValue::Uuid(u) => JsonValue::String(u.to_string()),
155 PostgresValue::Timestamp(ts) => JsonValue::String(ts.to_string()),
156 PostgresValue::TimestampTz(ts) => JsonValue::String(ts.to_rfc3339()),
157 PostgresValue::Date(d) => JsonValue::String(d.to_string()),
158 PostgresValue::Time(t) => JsonValue::String(t.to_string()),
159 PostgresValue::Json(j) | PostgresValue::Jsonb(j) => j.clone(),
160 PostgresValue::Array(arr) => {
161 JsonValue::Array(arr.iter().map(|v| v.to_json()).collect())
162 }
163 PostgresValue::Composite(map) => {
164 let obj: serde_json::Map<String, JsonValue> =
165 map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect();
166 JsonValue::Object(obj)
167 }
168 PostgresValue::Bytea(bytes) => JsonValue::String(base64::encode(bytes)),
169 }
170 }
171}
172
173mod base64 {
175 pub fn encode(input: &[u8]) -> String {
176 const TABLE: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
177 let mut result = String::new();
178 let mut i = 0;
179
180 while i < input.len() {
181 let b1 = input[i];
182 let b2 = if i + 1 < input.len() { input[i + 1] } else { 0 };
183 let b3 = if i + 2 < input.len() { input[i + 2] } else { 0 };
184
185 result.push(TABLE[(b1 >> 2) as usize] as char);
186 result.push(TABLE[(((b1 & 0x03) << 4) | (b2 >> 4)) as usize] as char);
187
188 if i + 1 < input.len() {
189 result.push(TABLE[(((b2 & 0x0f) << 2) | (b3 >> 6)) as usize] as char);
190 } else {
191 result.push('=');
192 }
193
194 if i + 2 < input.len() {
195 result.push(TABLE[(b3 & 0x3f) as usize] as char);
196 } else {
197 result.push('=');
198 }
199
200 i += 3;
201 }
202
203 result
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::str::FromStr;
211
212 #[test]
213 fn test_decimal_to_json_as_number() {
214 let decimal = Decimal::from_str("123.45").unwrap();
216 let pg_value = PostgresValue::Numeric(decimal);
217 let json = pg_value.to_json();
218
219 assert!(json.is_number(), "Decimal should be serialized as a number");
221
222 let num = json.as_f64().unwrap();
224 assert_eq!(num, 123.45);
225 }
226
227 #[test]
228 fn test_decimal_integer_to_json() {
229 let decimal = Decimal::from_str("100").unwrap();
230 let pg_value = PostgresValue::Numeric(decimal);
231 let json = pg_value.to_json();
232
233 assert!(
234 json.is_number(),
235 "Integer decimal should be serialized as a number"
236 );
237
238 let num = json.as_f64().unwrap();
239 assert_eq!(num, 100.0);
240 }
241
242 #[test]
243 fn test_decimal_small_value_to_json() {
244 let decimal = Decimal::from_str("0.00001").unwrap();
245 let pg_value = PostgresValue::Numeric(decimal);
246 let json = pg_value.to_json();
247
248 assert!(
249 json.is_number(),
250 "Small decimal should be serialized as a number"
251 );
252
253 let num = json.as_f64().unwrap();
254 assert_eq!(num, 0.00001);
255 }
256
257 #[test]
258 fn test_decimal_negative_to_json() {
259 let decimal = Decimal::from_str("-999.99").unwrap();
260 let pg_value = PostgresValue::Numeric(decimal);
261 let json = pg_value.to_json();
262
263 assert!(
264 json.is_number(),
265 "Negative decimal should be serialized as a number"
266 );
267
268 let num = json.as_f64().unwrap();
269 assert_eq!(num, -999.99);
270 }
271
272 #[test]
273 fn test_decimal_zero_to_json() {
274 let decimal = Decimal::from_str("0").unwrap();
275 let pg_value = PostgresValue::Numeric(decimal);
276 let json = pg_value.to_json();
277
278 assert!(
279 json.is_number(),
280 "Zero decimal should be serialized as a number"
281 );
282
283 let num = json.as_f64().unwrap();
284 assert_eq!(num, 0.0);
285 }
286}