Skip to main content

reddb_server/wire/postgres/
types.rs

1//! PostgreSQL type OID mapping (Phase 3.1 PG parity).
2//!
3//! PG clients identify column types by OID (see `pg_catalog.pg_type`).
4//! RedDB's `Value` enum is richer than PG's canonical set, so we collapse
5//! domain-specific variants (`Email`, `Phone`, `Money`, ...) onto their
6//! closest PG equivalent — TEXT, NUMERIC, etc. This keeps generic clients
7//! working; clients that need the fine-grained types call the native
8//! gRPC path.
9//!
10//! Reference: PostgreSQL source `src/include/catalog/pg_type_d.h`.
11
12use crate::storage::schema::Value;
13
14/// A subset of PG type OIDs that cover every case we need to encode.
15#[allow(dead_code)]
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum PgOid {
18    Bool = 16,
19    Bytea = 17,
20    Int8 = 20,
21    Int2 = 21,
22    Int4 = 23,
23    Text = 25,
24    Oid = 26,
25    Json = 114,
26    Float4 = 700,
27    Float8 = 701,
28    Unknown = 705,
29    Varchar = 1043,
30    Date = 1082,
31    Time = 1083,
32    Timestamp = 1114,
33    TimestampTz = 1184,
34    Numeric = 1700,
35    Uuid = 2950,
36    Jsonb = 3802,
37    /// RedDB-reserved synthetic vector OID. PostgreSQL extension OIDs are
38    /// cluster-local; RedDB uses a stable high value for wire clients that
39    /// want to bind vector parameters explicitly.
40    Vector = 38000,
41}
42
43impl PgOid {
44    pub fn as_u32(self) -> u32 {
45        self as u32
46    }
47
48    pub fn from_u32(oid: u32) -> Self {
49        match oid {
50            16 => PgOid::Bool,
51            17 => PgOid::Bytea,
52            20 => PgOid::Int8,
53            21 => PgOid::Int2,
54            23 => PgOid::Int4,
55            25 => PgOid::Text,
56            26 => PgOid::Oid,
57            114 => PgOid::Json,
58            700 => PgOid::Float4,
59            701 => PgOid::Float8,
60            705 => PgOid::Unknown,
61            1043 => PgOid::Varchar,
62            1082 => PgOid::Date,
63            1083 => PgOid::Time,
64            1114 => PgOid::Timestamp,
65            1184 => PgOid::TimestampTz,
66            1700 => PgOid::Numeric,
67            2950 => PgOid::Uuid,
68            3802 => PgOid::Jsonb,
69            38000 => PgOid::Vector,
70            _ => PgOid::Unknown,
71        }
72    }
73
74    /// Preferred type OID for a runtime `Value`. Used by
75    /// `RowDescription` to tell the client what each column is.
76    pub fn from_value(value: &Value) -> Self {
77        match value {
78            Value::Null => PgOid::Text,
79            Value::Boolean(_) => PgOid::Bool,
80            Value::Integer(_) => PgOid::Int8,
81            Value::UnsignedInteger(_) => PgOid::Int8,
82            Value::BigInt(_) => PgOid::Int8,
83            Value::Float(_) => PgOid::Float8,
84            Value::Text(_) => PgOid::Text,
85            Value::Blob(_) => PgOid::Bytea,
86            Value::Json(_) => PgOid::Jsonb,
87            Value::Uuid(_) => PgOid::Uuid,
88            Value::Date(_) => PgOid::Date,
89            Value::Timestamp(_) => PgOid::TimestampTz,
90            Value::TimestampMs(_) => PgOid::TimestampTz,
91            Value::Vector(_) => PgOid::Vector,
92            // Domain / richer types collapse to TEXT so psql can render them.
93            _ => PgOid::Text,
94        }
95    }
96}
97
98pub fn pg_param_to_value(
99    oid: u32,
100    format_code: i16,
101    bytes: Option<&[u8]>,
102) -> Result<Value, String> {
103    let Some(bytes) = bytes else {
104        return Ok(Value::Null);
105    };
106    match format_code {
107        0 => pg_text_param_to_value(PgOid::from_u32(oid), bytes),
108        1 => pg_binary_param_to_value(PgOid::from_u32(oid), bytes),
109        other => Err(format!("unsupported PG parameter format code {other}")),
110    }
111}
112
113fn pg_text_param_to_value(oid: PgOid, bytes: &[u8]) -> Result<Value, String> {
114    let text = std::str::from_utf8(bytes).map_err(|e| format!("invalid UTF-8 parameter: {e}"))?;
115    match oid {
116        PgOid::Bool => match text.to_ascii_lowercase().as_str() {
117            "t" | "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
118            "f" | "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
119            _ => Err(format!("invalid bool parameter {text:?}")),
120        },
121        PgOid::Int2 | PgOid::Int4 | PgOid::Int8 | PgOid::Oid => text
122            .parse::<i64>()
123            .map(Value::Integer)
124            .map_err(|e| format!("invalid integer parameter {text:?}: {e}")),
125        PgOid::Float4 | PgOid::Float8 | PgOid::Numeric => text
126            .parse::<f64>()
127            .map(Value::Float)
128            .map_err(|e| format!("invalid float parameter {text:?}: {e}")),
129        PgOid::Bytea => parse_bytea_text(text).map(Value::Blob),
130        PgOid::Json | PgOid::Jsonb => Ok(Value::Json(bytes.to_vec())),
131        PgOid::Timestamp | PgOid::TimestampTz => text
132            .parse::<i64>()
133            .map(Value::Timestamp)
134            .or_else(|_| Ok(Value::Text(std::sync::Arc::from(text)))),
135        PgOid::Uuid => parse_uuid_text(text).map(Value::Uuid),
136        PgOid::Vector => parse_vector_text(text).map(Value::Vector),
137        PgOid::Text | PgOid::Varchar | PgOid::Unknown | PgOid::Date | PgOid::Time => {
138            Ok(Value::Text(std::sync::Arc::from(text)))
139        }
140    }
141}
142
143fn pg_binary_param_to_value(oid: PgOid, bytes: &[u8]) -> Result<Value, String> {
144    match oid {
145        PgOid::Bool if bytes.len() == 1 => Ok(Value::Boolean(bytes[0] != 0)),
146        PgOid::Int2 if bytes.len() == 2 => {
147            Ok(Value::Integer(
148                i16::from_be_bytes([bytes[0], bytes[1]]) as i64
149            ))
150        }
151        PgOid::Int4 | PgOid::Oid if bytes.len() == 4 => {
152            Ok(Value::Integer(
153                i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64,
154            ))
155        }
156        PgOid::Int8 if bytes.len() == 8 => Ok(Value::Integer(i64::from_be_bytes([
157            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
158        ]))),
159        PgOid::Float4 if bytes.len() == 4 => {
160            Ok(Value::Float(
161                f32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64,
162            ))
163        }
164        PgOid::Float8 if bytes.len() == 8 => Ok(Value::Float(f64::from_be_bytes([
165            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
166        ]))),
167        PgOid::Bytea => Ok(Value::Blob(bytes.to_vec())),
168        PgOid::Json | PgOid::Jsonb => Ok(Value::Json(bytes.to_vec())),
169        PgOid::Uuid if bytes.len() == 16 => {
170            let mut out = [0u8; 16];
171            out.copy_from_slice(bytes);
172            Ok(Value::Uuid(out))
173        }
174        PgOid::Timestamp | PgOid::TimestampTz if bytes.len() == 8 => Ok(Value::Timestamp(
175            pg_microseconds_to_unix_seconds(i64::from_be_bytes([
176                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
177            ])),
178        )),
179        PgOid::Vector => parse_vector_binary(bytes).map(Value::Vector),
180        _ => Err(format!(
181            "unsupported binary parameter for OID {} with {} bytes",
182            oid.as_u32(),
183            bytes.len()
184        )),
185    }
186}
187
188fn parse_bytea_text(text: &str) -> Result<Vec<u8>, String> {
189    let Some(hex) = text.strip_prefix("\\x") else {
190        return Ok(text.as_bytes().to_vec());
191    };
192    if hex.len() % 2 != 0 {
193        return Err("invalid bytea hex length".to_string());
194    }
195    (0..hex.len())
196        .step_by(2)
197        .map(|idx| {
198            u8::from_str_radix(&hex[idx..idx + 2], 16)
199                .map_err(|e| format!("invalid bytea hex: {e}"))
200        })
201        .collect()
202}
203
204fn parse_vector_text(text: &str) -> Result<Vec<f32>, String> {
205    let parsed: crate::json::Value =
206        crate::json::from_str(text).map_err(|e| format!("invalid vector parameter: {e}"))?;
207    let crate::json::Value::Array(items) = parsed else {
208        return Err("invalid vector parameter: expected JSON number array".to_string());
209    };
210    items
211        .iter()
212        .map(|item| {
213            item.as_f64().map(|value| value as f32).ok_or_else(|| {
214                "invalid vector parameter: array must contain only numbers".to_string()
215            })
216        })
217        .collect()
218}
219
220fn parse_vector_binary(bytes: &[u8]) -> Result<Vec<f32>, String> {
221    if bytes.len() < 4 {
222        return Err("invalid binary vector parameter: payload too short".to_string());
223    }
224    let dims = i16::from_be_bytes([bytes[0], bytes[1]]);
225    if dims < 0 {
226        return Err("invalid binary vector parameter: negative dimension".to_string());
227    }
228    let dims = dims as usize;
229    let expected = 4 + dims * 4;
230    if bytes.len() != expected {
231        return Err(format!(
232            "invalid binary vector parameter: expected {expected} bytes, got {}",
233            bytes.len()
234        ));
235    }
236    (0..dims)
237        .map(|idx| {
238            let off = 4 + idx * 4;
239            Ok(f32::from_be_bytes([
240                bytes[off],
241                bytes[off + 1],
242                bytes[off + 2],
243                bytes[off + 3],
244            ]))
245        })
246        .collect()
247}
248
249fn parse_uuid_text(text: &str) -> Result<[u8; 16], String> {
250    let compact = text.replace('-', "");
251    if compact.len() != 32 {
252        return Err(format!("invalid uuid parameter {text:?}"));
253    }
254    let bytes = (0..compact.len())
255        .step_by(2)
256        .map(|idx| {
257            u8::from_str_radix(&compact[idx..idx + 2], 16)
258                .map_err(|e| format!("invalid uuid parameter {text:?}: {e}"))
259        })
260        .collect::<Result<Vec<_>, _>>()?;
261    let mut out = [0u8; 16];
262    out.copy_from_slice(&bytes);
263    Ok(out)
264}
265
266fn pg_microseconds_to_unix_seconds(pg_micros: i64) -> i64 {
267    // PostgreSQL binary timestamps are microseconds since 2000-01-01.
268    const PG_UNIX_EPOCH_OFFSET_SECONDS: i64 = 946_684_800;
269    PG_UNIX_EPOCH_OFFSET_SECONDS + pg_micros / 1_000_000
270}
271
272/// Encode a `Value` as the UTF-8 text representation PG's text-mode
273/// protocol expects. Binary format is opt-in via a flag in the client's
274/// `Bind` message — we don't advertise binary support yet, so simple
275/// text encoding is sufficient for every supported client.
276///
277/// Returns `None` for `Value::Null` (the caller emits a `-1` length).
278pub fn value_to_pg_wire_bytes(value: &Value) -> Option<Vec<u8>> {
279    match value {
280        Value::Null => None,
281        Value::Boolean(b) => Some((if *b { "t" } else { "f" }).as_bytes().to_vec()),
282        Value::Integer(n) => Some(n.to_string().into_bytes()),
283        Value::UnsignedInteger(n) => Some(n.to_string().into_bytes()),
284        Value::BigInt(n) => Some(n.to_string().into_bytes()),
285        Value::Float(f) => Some(f.to_string().into_bytes()),
286        Value::Text(s) => Some(s.as_bytes().to_vec()),
287        Value::Blob(b) => {
288            // PG bytea text format: `\xHEX...`. Two chars per byte.
289            let mut out = Vec::with_capacity(2 + b.len() * 2);
290            out.extend_from_slice(b"\\x");
291            for byte in b {
292                out.extend_from_slice(format!("{byte:02x}").as_bytes());
293            }
294            Some(out)
295        }
296        Value::Json(bytes) => Some(bytes.clone()),
297        // Everything else renders via Display — catches Uuid, Date,
298        // Timestamp, Email, Phone, Money, GeoPoint, etc. PG clients see
299        // these as TEXT columns (OID 25) and can render them verbatim.
300        other => Some(other.to_string().into_bytes()),
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307
308    #[test]
309    fn pg_text_params_decode_common_oids() {
310        assert_eq!(
311            pg_param_to_value(PgOid::Bool.as_u32(), 0, Some(b"t")).unwrap(),
312            Value::Boolean(true)
313        );
314        assert_eq!(
315            pg_param_to_value(PgOid::Int4.as_u32(), 0, Some(b"42")).unwrap(),
316            Value::Integer(42)
317        );
318        assert_eq!(
319            pg_param_to_value(PgOid::Float8.as_u32(), 0, Some(b"1.5")).unwrap(),
320            Value::Float(1.5)
321        );
322        assert_eq!(
323            pg_param_to_value(PgOid::Bytea.as_u32(), 0, Some(br"\xdeadbeef")).unwrap(),
324            Value::Blob(vec![0xde, 0xad, 0xbe, 0xef])
325        );
326        assert_eq!(
327            pg_param_to_value(PgOid::Jsonb.as_u32(), 0, Some(br#"{"a":1}"#)).unwrap(),
328            Value::Json(br#"{"a":1}"#.to_vec())
329        );
330        assert_eq!(
331            pg_param_to_value(
332                PgOid::Uuid.as_u32(),
333                0,
334                Some(b"00112233-4455-6677-8899-aabbccddeeff")
335            )
336            .unwrap(),
337            Value::Uuid([
338                0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
339                0xee, 0xff,
340            ])
341        );
342    }
343
344    #[test]
345    fn pg_binary_params_decode_numeric_and_uuid_oids() {
346        assert_eq!(
347            pg_param_to_value(PgOid::Int2.as_u32(), 1, Some(&7i16.to_be_bytes())).unwrap(),
348            Value::Integer(7)
349        );
350        assert_eq!(
351            pg_param_to_value(PgOid::Int8.as_u32(), 1, Some(&42i64.to_be_bytes())).unwrap(),
352            Value::Integer(42)
353        );
354        assert_eq!(
355            pg_param_to_value(PgOid::Float4.as_u32(), 1, Some(&1.5f32.to_be_bytes())).unwrap(),
356            Value::Float(1.5)
357        );
358        assert_eq!(
359            pg_param_to_value(PgOid::Uuid.as_u32(), 1, Some(&[0x11; 16])).unwrap(),
360            Value::Uuid([0x11; 16])
361        );
362        let mut vector = Vec::new();
363        vector.extend_from_slice(&2i16.to_be_bytes());
364        vector.extend_from_slice(&0i16.to_be_bytes());
365        vector.extend_from_slice(&1.0f32.to_be_bytes());
366        vector.extend_from_slice(&(-0.5f32).to_be_bytes());
367        assert_eq!(
368            pg_param_to_value(PgOid::Vector.as_u32(), 1, Some(&vector)).unwrap(),
369            Value::Vector(vec![1.0, -0.5])
370        );
371    }
372
373    #[test]
374    fn pg_null_param_decodes_to_value_null() {
375        assert_eq!(
376            pg_param_to_value(PgOid::Text.as_u32(), 0, None).unwrap(),
377            Value::Null
378        );
379    }
380
381    #[test]
382    fn pg_vector_text_param_decodes_json_array() {
383        assert_eq!(
384            pg_param_to_value(PgOid::Vector.as_u32(), 0, Some(b"[1.0, -0.5]")).unwrap(),
385            Value::Vector(vec![1.0, -0.5])
386        );
387    }
388}