Skip to main content

reddb_server/runtime/ai/
pg_wire_ask_row_encoder.rs

1//! `PgWireAskRowEncoder` — pure encoder that turns an
2//! [`AskResult`](super::ask_response_envelope::AskResult) into the
3//! single-row Postgres-wire result set that #408 exposes to psycopg /
4//! pgx / JDBC.
5//!
6//! Deep module: no I/O, no transport, no clock. Mirrors the slice-1
7//! pattern used by #395, #396, #398, #400, #401, #402, #403, #405,
8//! #406, #409, #411 — the contract is pinned in tests so the wiring
9//! slice (extended-query handler in `wire::postgres`) cannot drop or
10//! rename a column.
11//!
12//! ## Why a separate module
13//!
14//! The acceptance criteria for #408 demand that ASK over PG-wire
15//! returns a single-row result set with stable columns and stable
16//! OIDs across every PG client. Three different drivers (psycopg, pgx,
17//! JDBC) read columns by index after a `RowDescription`; renaming or
18//! reordering a column silently breaks every integration test in this
19//! issue's blocker chain. Building the row in a tested deep module
20//! keeps the wiring slice focused on "where do I hand these bytes to
21//! the PG codec" and the contract here on "are the bytes right".
22//!
23//! ## Shape pinned by tests
24//!
25//! Twelve columns, alphabetised — exact same order as the JSON keys
26//! [`AskResponseEnvelope::build`](super::ask_response_envelope::build)
27//! emits. Sharing the order means a bridge that takes the envelope
28//! JSON object and projects it column-wise stays index-aligned without
29//! re-shuffling.
30//!
31//! | # | Name              | OID                  | Format                                      |
32//! |---|-------------------|----------------------|---------------------------------------------|
33//! | 0 | `answer`          | `text` (25)          | UTF-8                                       |
34//! | 1 | `cache_hit`       | `bool` (16)          | `"t"` / `"f"`                               |
35//! | 2 | `citations`       | `jsonb` (3802)       | compact JSON, marker-ascending              |
36//! | 3 | `completion_tokens` | `int8` (20)        | decimal                                     |
37//! | 4 | `cost_usd`        | `numeric` (1700)     | decimal, `f64::to_string` form              |
38//! | 5 | `mode`            | `text` (25)          | `"strict"` / `"lenient"`                    |
39//! | 6 | `model`           | `text` (25)          | UTF-8                                       |
40//! | 7 | `prompt_tokens`   | `int8` (20)          | decimal                                     |
41//! | 8 | `provider`        | `text` (25)          | UTF-8                                       |
42//! | 9 | `retry_count`     | `int8` (20)          | decimal (0 or 1 per #395)                   |
43//! |10 | `sources_flat`    | `jsonb` (3802)       | compact JSON, RRF order preserved           |
44//! |11 | `validation`      | `jsonb` (3802)       | compact JSON `{errors, ok, warnings}`       |
45//!
46//! ## Why text format
47//!
48//! Binary format is opt-in via `Bind`'s result-column format codes.
49//! The existing PG-wire surface (`wire::postgres::types`) only emits
50//! text — see `value_to_pg_wire_bytes`. ASK rides the same codec, so
51//! every cell here is a UTF-8 byte buffer.
52//!
53//! ## Why numeric for cost_usd
54//!
55//! PG `numeric` is the only PG type with arbitrary precision and no
56//! representation loss across the wire. `float8` would round
57//! `0.0000001` to scientific form and trip JDBC `BigDecimal` parsing.
58//! `f64::to_string()` produces the canonical Rust form which PG's
59//! `numeric` parser accepts verbatim.
60//!
61//! ## Why jsonb (not json)
62//!
63//! psycopg ≥ 3 maps `jsonb` to native dicts; pgx exposes a
64//! `pgtype.JSONB` decoder; JDBC's PG driver routes `jsonb` to
65//! `PGobject` with `getType() == "jsonb"`. Returning `jsonb` lets
66//! every supported driver decode the column without an explicit cast.
67
68use crate::runtime::ai::ask_response_envelope::{
69    self, AskResult, Citation, SourceRow, Validation, ValidationError, ValidationWarning,
70};
71use crate::serde_json::{Map, Value};
72use crate::wire::postgres::types::PgOid;
73
74/// One column in the ASK PG-wire result set.
75///
76/// `name` is always a `&'static str` — the column set is fixed at
77/// compile time and no user-supplied string ever appears here.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct ColumnDesc {
80    pub name: &'static str,
81    pub oid: PgOid,
82}
83
84/// The single-row ASK PG-wire result set. The wiring slice hands
85/// `columns` to the `RowDescription` codec and `cells` to the
86/// `DataRow` codec (one row, then `CommandComplete`).
87#[derive(Debug, Clone)]
88pub struct AskRow {
89    pub columns: Vec<ColumnDesc>,
90    pub cells: Vec<Option<Vec<u8>>>,
91}
92
93/// Canonical column list, in wire order. Kept private so callers can't
94/// pull out a stale copy and drift from `encode()`.
95const COLUMNS: &[ColumnDesc] = &[
96    ColumnDesc { name: "answer", oid: PgOid::Text },
97    ColumnDesc { name: "cache_hit", oid: PgOid::Bool },
98    ColumnDesc { name: "citations", oid: PgOid::Jsonb },
99    ColumnDesc { name: "completion_tokens", oid: PgOid::Int8 },
100    ColumnDesc { name: "cost_usd", oid: PgOid::Numeric },
101    ColumnDesc { name: "mode", oid: PgOid::Text },
102    ColumnDesc { name: "model", oid: PgOid::Text },
103    ColumnDesc { name: "prompt_tokens", oid: PgOid::Int8 },
104    ColumnDesc { name: "provider", oid: PgOid::Text },
105    ColumnDesc { name: "retry_count", oid: PgOid::Int8 },
106    ColumnDesc { name: "sources_flat", oid: PgOid::Jsonb },
107    ColumnDesc { name: "validation", oid: PgOid::Jsonb },
108];
109
110/// Encode an [`AskResult`] as the single-row PG-wire result set.
111///
112/// Deterministic: re-running on byte-equal input is byte-equal output
113/// (pinned by `encode_is_deterministic_across_calls`). Required by the
114/// ASK determinism contract (#400) and by the cache-hit path (#403)
115/// where the cached PG row must equal the freshly-encoded one.
116pub fn encode(result: &AskResult) -> AskRow {
117    let cells = vec![
118        Some(result.answer.as_bytes().to_vec()),
119        Some(bool_cell(result.cache_hit)),
120        Some(citations_jsonb(&result.citations)),
121        Some(result.completion_tokens.to_string().into_bytes()),
122        Some(numeric_cell(result.cost_usd)),
123        Some(mode_cell(result.effective_mode)),
124        Some(result.model.as_bytes().to_vec()),
125        Some(result.prompt_tokens.to_string().into_bytes()),
126        Some(result.provider.as_bytes().to_vec()),
127        Some(result.retry_count.to_string().into_bytes()),
128        Some(sources_jsonb(&result.sources_flat)),
129        Some(validation_jsonb(&result.validation)),
130    ];
131    AskRow {
132        columns: COLUMNS.to_vec(),
133        cells,
134    }
135}
136
137/// Column descriptors only — the wiring slice needs these before it
138/// knows the result body (PG protocol: `RowDescription` is sent before
139/// `Execute` finishes). Exposed so the `Parse`/`Describe` codepath can
140/// answer without running the query.
141pub fn columns() -> Vec<ColumnDesc> {
142    COLUMNS.to_vec()
143}
144
145fn bool_cell(b: bool) -> Vec<u8> {
146    if b { b"t".to_vec() } else { b"f".to_vec() }
147}
148
149fn mode_cell(m: ask_response_envelope::Mode) -> Vec<u8> {
150    match m {
151        ask_response_envelope::Mode::Strict => b"strict".to_vec(),
152        ask_response_envelope::Mode::Lenient => b"lenient".to_vec(),
153    }
154}
155
156fn numeric_cell(v: f64) -> Vec<u8> {
157    // PG `numeric` text input accepts the same forms Rust's
158    // `f64::to_string` emits, including `0`, `0.0000321`, and
159    // scientific notation for very large values. Non-finite values
160    // (`NaN`, `Inf`) cannot appear here — cost is always a
161    // non-negative finite number per the cost-counter contract
162    // (#401). If one does sneak through, PG will reject the row with
163    // a parse error at the codec layer, which is the correct loud
164    // failure mode.
165    v.to_string().into_bytes()
166}
167
168fn citations_jsonb(cites: &[Citation]) -> Vec<u8> {
169    // Marker-ascending order is the contract — same as
170    // ask_response_envelope::citations_value. Re-implemented here
171    // rather than reused so the JSON-shape tests in the envelope
172    // module and the column-shape tests here can fail
173    // independently: a wire shape regression points at one module,
174    // not the seam between two.
175    let mut sorted: Vec<Citation> = cites.to_vec();
176    sorted.sort_by_key(|c| c.marker);
177    let arr: Vec<Value> = sorted
178        .iter()
179        .map(|c| {
180            let mut o = Map::new();
181            o.insert("marker".into(), Value::Number(c.marker as f64));
182            o.insert("urn".into(), Value::String(c.urn.clone()));
183            Value::Object(o)
184        })
185        .collect();
186    Value::Array(arr).to_string_compact().into_bytes()
187}
188
189fn sources_jsonb(rows: &[SourceRow]) -> Vec<u8> {
190    let arr: Vec<Value> = rows
191        .iter()
192        .map(|r| {
193            let mut o = Map::new();
194            o.insert("payload".into(), Value::String(r.payload.clone()));
195            o.insert("urn".into(), Value::String(r.urn.clone()));
196            Value::Object(o)
197        })
198        .collect();
199    Value::Array(arr).to_string_compact().into_bytes()
200}
201
202fn validation_jsonb(v: &Validation) -> Vec<u8> {
203    let mut o = Map::new();
204    o.insert(
205        "errors".into(),
206        Value::Array(v.errors.iter().map(error_value).collect()),
207    );
208    o.insert("ok".into(), Value::Bool(v.ok));
209    o.insert(
210        "warnings".into(),
211        Value::Array(v.warnings.iter().map(warning_value).collect()),
212    );
213    Value::Object(o).to_string_compact().into_bytes()
214}
215
216fn warning_value(w: &ValidationWarning) -> Value {
217    let mut o = Map::new();
218    o.insert("detail".into(), Value::String(w.detail.clone()));
219    o.insert("kind".into(), Value::String(w.kind.clone()));
220    Value::Object(o)
221}
222
223fn error_value(e: &ValidationError) -> Value {
224    let mut o = Map::new();
225    o.insert("detail".into(), Value::String(e.detail.clone()));
226    o.insert("kind".into(), Value::String(e.kind.clone()));
227    Value::Object(o)
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::runtime::ai::ask_response_envelope::{
234        AskResult, Citation, Mode, SourceRow, Validation, ValidationError, ValidationWarning,
235    };
236
237    fn fixture() -> AskResult {
238        AskResult {
239            answer: "X is 42 [^1].".into(),
240            sources_flat: vec![SourceRow {
241                urn: "urn:reddb:row:1".into(),
242                payload: "{\"k\":\"v\"}".into(),
243            }],
244            citations: vec![Citation {
245                marker: 1,
246                urn: "urn:reddb:row:1".into(),
247            }],
248            validation: Validation {
249                ok: true,
250                warnings: vec![],
251                errors: vec![],
252            },
253            cache_hit: false,
254            provider: "openai".into(),
255            model: "gpt-4o-mini".into(),
256            prompt_tokens: 123,
257            completion_tokens: 45,
258            cost_usd: 0.000_321,
259            effective_mode: Mode::Strict,
260            retry_count: 0,
261        }
262    }
263
264    fn cell_str(row: &AskRow, idx: usize) -> &str {
265        std::str::from_utf8(row.cells[idx].as_ref().unwrap()).unwrap()
266    }
267
268    #[test]
269    fn emits_exactly_twelve_columns() {
270        let row = encode(&fixture());
271        assert_eq!(row.columns.len(), 12);
272        assert_eq!(row.cells.len(), 12);
273    }
274
275    #[test]
276    fn column_order_matches_envelope_alphabetical_order() {
277        let row = encode(&fixture());
278        let names: Vec<&str> = row.columns.iter().map(|c| c.name).collect();
279        assert_eq!(
280            names,
281            vec![
282                "answer",
283                "cache_hit",
284                "citations",
285                "completion_tokens",
286                "cost_usd",
287                "mode",
288                "model",
289                "prompt_tokens",
290                "provider",
291                "retry_count",
292                "sources_flat",
293                "validation",
294            ]
295        );
296    }
297
298    #[test]
299    fn columns_helper_returns_same_descriptors_as_encode() {
300        let row = encode(&fixture());
301        assert_eq!(columns(), row.columns);
302    }
303
304    #[test]
305    fn oids_match_pg_type_d_h_canonical_values() {
306        let row = encode(&fixture());
307        let by_name: std::collections::BTreeMap<&str, PgOid> = row
308            .columns
309            .iter()
310            .map(|c| (c.name, c.oid))
311            .collect();
312        assert_eq!(by_name["answer"], PgOid::Text);
313        assert_eq!(by_name["cache_hit"], PgOid::Bool);
314        assert_eq!(by_name["citations"], PgOid::Jsonb);
315        assert_eq!(by_name["completion_tokens"], PgOid::Int8);
316        assert_eq!(by_name["cost_usd"], PgOid::Numeric);
317        assert_eq!(by_name["mode"], PgOid::Text);
318        assert_eq!(by_name["model"], PgOid::Text);
319        assert_eq!(by_name["prompt_tokens"], PgOid::Int8);
320        assert_eq!(by_name["provider"], PgOid::Text);
321        assert_eq!(by_name["retry_count"], PgOid::Int8);
322        assert_eq!(by_name["sources_flat"], PgOid::Jsonb);
323        assert_eq!(by_name["validation"], PgOid::Jsonb);
324    }
325
326    #[test]
327    fn answer_text_preserved_with_inline_markers() {
328        let row = encode(&fixture());
329        assert_eq!(cell_str(&row, 0), "X is 42 [^1].");
330    }
331
332    #[test]
333    fn cache_hit_serializes_as_pg_bool_text() {
334        let mut r = fixture();
335        r.cache_hit = false;
336        assert_eq!(cell_str(&encode(&r), 1), "f");
337        r.cache_hit = true;
338        assert_eq!(cell_str(&encode(&r), 1), "t");
339    }
340
341    #[test]
342    fn citations_jsonb_is_marker_ascending() {
343        let mut r = fixture();
344        r.citations = vec![
345            Citation { marker: 3, urn: "urn:c".into() },
346            Citation { marker: 1, urn: "urn:a".into() },
347            Citation { marker: 2, urn: "urn:b".into() },
348        ];
349        let row = encode(&r);
350        let s = cell_str(&row, 2);
351        // Markers appear in ascending order within the JSON array.
352        let p1 = s.find("\"marker\":1").unwrap();
353        let p2 = s.find("\"marker\":2").unwrap();
354        let p3 = s.find("\"marker\":3").unwrap();
355        assert!(p1 < p2 && p2 < p3, "order: {s}");
356    }
357
358    #[test]
359    fn empty_citations_serialize_as_empty_array_not_null() {
360        let mut r = fixture();
361        r.citations = vec![];
362        assert_eq!(cell_str(&encode(&r), 2), "[]");
363    }
364
365    #[test]
366    fn completion_tokens_is_decimal_text() {
367        let mut r = fixture();
368        r.completion_tokens = 0;
369        assert_eq!(cell_str(&encode(&r), 3), "0");
370        r.completion_tokens = 1_000_000;
371        assert_eq!(cell_str(&encode(&r), 3), "1000000");
372    }
373
374    #[test]
375    fn cost_usd_uses_canonical_rust_f64_form() {
376        let mut r = fixture();
377        r.cost_usd = 0.0;
378        assert_eq!(cell_str(&encode(&r), 4), "0");
379        r.cost_usd = 0.000_321;
380        // f64::to_string preserves a representation PG `numeric`
381        // accepts directly — no precision loss across the wire.
382        assert_eq!(cell_str(&encode(&r), 4), "0.000321");
383        r.cost_usd = 1.5;
384        assert_eq!(cell_str(&encode(&r), 4), "1.5");
385    }
386
387    #[test]
388    fn mode_serialises_as_strict_or_lenient_text() {
389        let mut r = fixture();
390        r.effective_mode = Mode::Strict;
391        assert_eq!(cell_str(&encode(&r), 5), "strict");
392        r.effective_mode = Mode::Lenient;
393        assert_eq!(cell_str(&encode(&r), 5), "lenient");
394    }
395
396    #[test]
397    fn model_and_provider_are_utf8_text() {
398        let mut r = fixture();
399        r.model = "claude-sonnet-4-6".into();
400        r.provider = "anthropic".into();
401        let row = encode(&r);
402        assert_eq!(cell_str(&row, 6), "claude-sonnet-4-6");
403        assert_eq!(cell_str(&row, 8), "anthropic");
404    }
405
406    #[test]
407    fn prompt_tokens_is_decimal_text() {
408        let mut r = fixture();
409        r.prompt_tokens = 4096;
410        assert_eq!(cell_str(&encode(&r), 7), "4096");
411    }
412
413    #[test]
414    fn retry_count_is_zero_or_one() {
415        // #395 caps retry budget at one. We don't enforce here (the
416        // validator does) but the column must round-trip both values.
417        let mut r = fixture();
418        r.retry_count = 0;
419        assert_eq!(cell_str(&encode(&r), 9), "0");
420        r.retry_count = 1;
421        assert_eq!(cell_str(&encode(&r), 9), "1");
422    }
423
424    #[test]
425    fn sources_flat_preserves_input_order() {
426        // Post-RRF order is the contract — `[^N]` indexes into this
427        // array, so reordering would silently break grounding.
428        let mut r = fixture();
429        r.sources_flat = vec![
430            SourceRow { urn: "urn:z".into(), payload: "{}".into() },
431            SourceRow { urn: "urn:a".into(), payload: "{}".into() },
432        ];
433        let row = encode(&r);
434        let s = cell_str(&row, 10);
435        let pz = s.find("urn:z").unwrap();
436        let pa = s.find("urn:a").unwrap();
437        assert!(pz < pa, "order: {s}");
438    }
439
440    #[test]
441    fn empty_sources_flat_serializes_as_empty_array() {
442        let mut r = fixture();
443        r.sources_flat = vec![];
444        assert_eq!(cell_str(&encode(&r), 10), "[]");
445    }
446
447    #[test]
448    fn validation_jsonb_carries_ok_false_with_errors() {
449        let mut r = fixture();
450        r.validation = Validation {
451            ok: false,
452            warnings: vec![],
453            errors: vec![ValidationError {
454                kind: "out_of_range".into(),
455                detail: "marker 5 > sources_count 2".into(),
456            }],
457        };
458        let encoded = encode(&r);
459        let s = cell_str(&encoded, 11);
460        assert!(s.contains("\"ok\":false"), "got {s}");
461        assert!(s.contains("\"kind\":\"out_of_range\""), "got {s}");
462        assert!(s.contains("marker 5 > sources_count 2"), "got {s}");
463    }
464
465    #[test]
466    fn validation_jsonb_with_warnings_only_keeps_ok_true() {
467        let mut r = fixture();
468        r.validation = Validation {
469            ok: true,
470            warnings: vec![ValidationWarning {
471                kind: "mode_fallback".into(),
472                detail: "provider does not support citations".into(),
473            }],
474            errors: vec![],
475        };
476        let encoded = encode(&r);
477        let s = cell_str(&encoded, 11);
478        assert!(s.contains("\"ok\":true"), "got {s}");
479        assert!(s.contains("\"kind\":\"mode_fallback\""), "got {s}");
480    }
481
482    #[test]
483    fn validation_empty_arrays_are_present_not_null() {
484        let row = encode(&fixture());
485        let s = cell_str(&row, 11);
486        assert!(s.contains("\"errors\":[]"), "got {s}");
487        assert!(s.contains("\"warnings\":[]"), "got {s}");
488    }
489
490    #[test]
491    fn every_cell_is_some_no_nulls() {
492        // PG-wire `DataRow` distinguishes NULL (length -1) from empty
493        // string (length 0). The ASK row never emits NULL — empty
494        // arrays serialize as `[]`, empty strings as `""`. The wiring
495        // slice can rely on this invariant when streaming cells.
496        let row = encode(&fixture());
497        assert!(row.cells.iter().all(|c| c.is_some()));
498    }
499
500    #[test]
501    fn encode_is_deterministic_across_calls() {
502        let r = fixture();
503        let a = encode(&r);
504        let b = encode(&r);
505        assert_eq!(a.columns, b.columns);
506        assert_eq!(a.cells, b.cells);
507    }
508
509    #[test]
510    fn cells_index_aligns_with_columns_index() {
511        // The wiring slice will iterate `columns` and `cells` in
512        // lock-step. Pin the invariant so reordering one without the
513        // other trips here first.
514        let row = encode(&fixture());
515        for (i, col) in row.columns.iter().enumerate() {
516            let cell = row.cells[i].as_ref().expect("no nulls");
517            match col.name {
518                "answer" => assert_eq!(cell.as_slice(), b"X is 42 [^1]."),
519                "cache_hit" => assert_eq!(cell.as_slice(), b"f"),
520                "citations" => assert!(cell.starts_with(b"[")),
521                "completion_tokens" => assert_eq!(cell.as_slice(), b"45"),
522                "cost_usd" => assert_eq!(cell.as_slice(), b"0.000321"),
523                "mode" => assert_eq!(cell.as_slice(), b"strict"),
524                "model" => assert_eq!(cell.as_slice(), b"gpt-4o-mini"),
525                "prompt_tokens" => assert_eq!(cell.as_slice(), b"123"),
526                "provider" => assert_eq!(cell.as_slice(), b"openai"),
527                "retry_count" => assert_eq!(cell.as_slice(), b"0"),
528                "sources_flat" => assert!(cell.starts_with(b"[")),
529                "validation" => assert!(cell.starts_with(b"{")),
530                other => panic!("unexpected column {other}"),
531            }
532        }
533    }
534
535    #[test]
536    fn answer_with_multibyte_utf8_round_trips_byte_for_byte() {
537        let mut r = fixture();
538        r.answer = "Café — résumé 中文 [^1]".into();
539        let row = encode(&r);
540        assert_eq!(
541            row.cells[0].as_ref().unwrap().as_slice(),
542            "Café — résumé 中文 [^1]".as_bytes()
543        );
544    }
545
546    #[test]
547    fn jsonb_outputs_are_compact_not_pretty() {
548        // PG drivers route `jsonb` through their own parser; whitespace
549        // is irrelevant for correctness but matters for wire-size
550        // budgets and audit-row equality (#402). Pin compact form.
551        let row = encode(&fixture());
552        for idx in [2usize, 10, 11] {
553            let s = cell_str(&row, idx);
554            assert!(!s.contains("\n"), "col {idx} not compact: {s}");
555            assert!(!s.contains(": "), "col {idx} pretty-spaced: {s}");
556        }
557    }
558
559    #[test]
560    fn columns_helper_is_callable_before_query_runs() {
561        // The PG `Describe` codepath needs column descriptors before
562        // the query executes — pin that `columns()` does not require
563        // an `AskResult` instance.
564        let cols = columns();
565        assert_eq!(cols.len(), 12);
566        assert_eq!(cols[0].name, "answer");
567    }
568}