Skip to main content

reddb_server/runtime/ai/
pg_wire_ask_row_encoder.rs

1//! `PgWireAskRowEncoder` — pure encoder that turns an
2//! [`AskResult`](super::ask_response_envelope::AskResult) into the
3//! single-row Postgres-wire result set that #408 exposes to psycopg /
4//! pgx / JDBC.
5//!
6//! Deep module: no I/O, no transport, no clock. Mirrors the slice-1
7//! pattern used by #395, #396, #398, #400, #401, #402, #403, #405,
8//! #406, #409, #411 — the contract is pinned in tests so the wiring
9//! slice (extended-query handler in `wire::postgres`) cannot drop or
10//! rename a column.
11//!
12//! ## Why a separate module
13//!
14//! The acceptance criteria for #408 demand that ASK over PG-wire
15//! returns a single-row result set with stable columns and stable
16//! OIDs across every PG client. Three different drivers (psycopg, pgx,
17//! JDBC) read columns by index after a `RowDescription`; renaming or
18//! reordering a column silently breaks every integration test in this
19//! issue's blocker chain. Building the row in a tested deep module
20//! keeps the wiring slice focused on "where do I hand these bytes to
21//! the PG codec" and the contract here on "are the bytes right".
22//!
23//! ## Shape pinned by tests
24//!
25//! Twelve columns, alphabetised — exact same order as the JSON keys
26//! [`AskResponseEnvelope::build`](super::ask_response_envelope::build)
27//! emits. Sharing the order means a bridge that takes the envelope
28//! JSON object and projects it column-wise stays index-aligned without
29//! re-shuffling.
30//!
31//! | # | Name              | OID                  | Format                                      |
32//! |---|-------------------|----------------------|---------------------------------------------|
33//! | 0 | `answer`          | `text` (25)          | UTF-8                                       |
34//! | 1 | `cache_hit`       | `bool` (16)          | `"t"` / `"f"`                               |
35//! | 2 | `citations`       | `jsonb` (3802)       | compact JSON, marker-ascending              |
36//! | 3 | `completion_tokens` | `int8` (20)        | decimal                                     |
37//! | 4 | `cost_usd`        | `numeric` (1700)     | decimal, `f64::to_string` form              |
38//! | 5 | `mode`            | `text` (25)          | `"strict"` / `"lenient"`                    |
39//! | 6 | `model`           | `text` (25)          | UTF-8                                       |
40//! | 7 | `prompt_tokens`   | `int8` (20)          | decimal                                     |
41//! | 8 | `provider`        | `text` (25)          | UTF-8                                       |
42//! | 9 | `retry_count`     | `int8` (20)          | decimal (0 or 1 per #395)                   |
43//! |10 | `sources_flat`    | `jsonb` (3802)       | compact JSON, RRF order preserved           |
44//! |11 | `validation`      | `jsonb` (3802)       | compact JSON `{errors, ok, warnings}`       |
45//!
46//! ## Why text format
47//!
48//! Binary format is opt-in via `Bind`'s result-column format codes.
49//! The existing PG-wire surface (`wire::postgres::types`) only emits
50//! text — see `value_to_pg_wire_bytes`. ASK rides the same codec, so
51//! every cell here is a UTF-8 byte buffer.
52//!
53//! ## Why numeric for cost_usd
54//!
55//! PG `numeric` is the only PG type with arbitrary precision and no
56//! representation loss across the wire. `float8` would round
57//! `0.0000001` to scientific form and trip JDBC `BigDecimal` parsing.
58//! `f64::to_string()` produces the canonical Rust form which PG's
59//! `numeric` parser accepts verbatim.
60//!
61//! ## Why jsonb (not json)
62//!
63//! psycopg ≥ 3 maps `jsonb` to native dicts; pgx exposes a
64//! `pgtype.JSONB` decoder; JDBC's PG driver routes `jsonb` to
65//! `PGobject` with `getType() == "jsonb"`. Returning `jsonb` lets
66//! every supported driver decode the column without an explicit cast.
67
68use crate::runtime::ai::ask_response_envelope::{
69    self, AskResult, Citation, SourceRow, Validation, ValidationError, ValidationWarning,
70};
71use crate::serde_json::{Map, Value};
72use crate::wire::postgres::types::PgOid;
73
74/// One column in the ASK PG-wire result set.
75///
76/// `name` is always a `&'static str` — the column set is fixed at
77/// compile time and no user-supplied string ever appears here.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct ColumnDesc {
80    pub name: &'static str,
81    pub oid: PgOid,
82}
83
84/// The single-row ASK PG-wire result set. The wiring slice hands
85/// `columns` to the `RowDescription` codec and `cells` to the
86/// `DataRow` codec (one row, then `CommandComplete`).
87#[derive(Debug, Clone)]
88pub struct AskRow {
89    pub columns: Vec<ColumnDesc>,
90    pub cells: Vec<Option<Vec<u8>>>,
91}
92
93/// Canonical column list, in wire order. Kept private so callers can't
94/// pull out a stale copy and drift from `encode()`.
95const COLUMNS: &[ColumnDesc] = &[
96    ColumnDesc {
97        name: "answer",
98        oid: PgOid::Text,
99    },
100    ColumnDesc {
101        name: "cache_hit",
102        oid: PgOid::Bool,
103    },
104    ColumnDesc {
105        name: "citations",
106        oid: PgOid::Jsonb,
107    },
108    ColumnDesc {
109        name: "completion_tokens",
110        oid: PgOid::Int8,
111    },
112    ColumnDesc {
113        name: "cost_usd",
114        oid: PgOid::Numeric,
115    },
116    ColumnDesc {
117        name: "mode",
118        oid: PgOid::Text,
119    },
120    ColumnDesc {
121        name: "model",
122        oid: PgOid::Text,
123    },
124    ColumnDesc {
125        name: "prompt_tokens",
126        oid: PgOid::Int8,
127    },
128    ColumnDesc {
129        name: "provider",
130        oid: PgOid::Text,
131    },
132    ColumnDesc {
133        name: "retry_count",
134        oid: PgOid::Int8,
135    },
136    ColumnDesc {
137        name: "sources_flat",
138        oid: PgOid::Jsonb,
139    },
140    ColumnDesc {
141        name: "validation",
142        oid: PgOid::Jsonb,
143    },
144];
145
146/// Encode an [`AskResult`] as the single-row PG-wire result set.
147///
148/// Deterministic: re-running on byte-equal input is byte-equal output
149/// (pinned by `encode_is_deterministic_across_calls`). Required by the
150/// ASK determinism contract (#400) and by the cache-hit path (#403)
151/// where the cached PG row must equal the freshly-encoded one.
152pub fn encode(result: &AskResult) -> AskRow {
153    let cells = vec![
154        Some(result.answer.as_bytes().to_vec()),
155        Some(bool_cell(result.cache_hit)),
156        Some(citations_jsonb(&result.citations)),
157        Some(result.completion_tokens.to_string().into_bytes()),
158        Some(numeric_cell(result.cost_usd)),
159        Some(mode_cell(result.effective_mode)),
160        Some(result.model.as_bytes().to_vec()),
161        Some(result.prompt_tokens.to_string().into_bytes()),
162        Some(result.provider.as_bytes().to_vec()),
163        Some(result.retry_count.to_string().into_bytes()),
164        Some(sources_jsonb(&result.sources_flat)),
165        Some(validation_jsonb(&result.validation)),
166    ];
167    AskRow {
168        columns: COLUMNS.to_vec(),
169        cells,
170    }
171}
172
173/// Column descriptors only — the wiring slice needs these before it
174/// knows the result body (PG protocol: `RowDescription` is sent before
175/// `Execute` finishes). Exposed so the `Parse`/`Describe` codepath can
176/// answer without running the query.
177pub fn columns() -> Vec<ColumnDesc> {
178    COLUMNS.to_vec()
179}
180
181fn bool_cell(b: bool) -> Vec<u8> {
182    if b {
183        b"t".to_vec()
184    } else {
185        b"f".to_vec()
186    }
187}
188
189fn mode_cell(m: ask_response_envelope::Mode) -> Vec<u8> {
190    match m {
191        ask_response_envelope::Mode::Strict => b"strict".to_vec(),
192        ask_response_envelope::Mode::Lenient => b"lenient".to_vec(),
193    }
194}
195
196fn numeric_cell(v: f64) -> Vec<u8> {
197    // PG `numeric` text input accepts the same forms Rust's
198    // `f64::to_string` emits, including `0`, `0.0000321`, and
199    // scientific notation for very large values. Non-finite values
200    // (`NaN`, `Inf`) cannot appear here — cost is always a
201    // non-negative finite number per the cost-counter contract
202    // (#401). If one does sneak through, PG will reject the row with
203    // a parse error at the codec layer, which is the correct loud
204    // failure mode.
205    v.to_string().into_bytes()
206}
207
208fn citations_jsonb(cites: &[Citation]) -> Vec<u8> {
209    // Marker-ascending order is the contract — same as
210    // ask_response_envelope::citations_value. Re-implemented here
211    // rather than reused so the JSON-shape tests in the envelope
212    // module and the column-shape tests here can fail
213    // independently: a wire shape regression points at one module,
214    // not the seam between two.
215    let mut sorted: Vec<Citation> = cites.to_vec();
216    sorted.sort_by_key(|c| c.marker);
217    let arr: Vec<Value> = sorted
218        .iter()
219        .map(|c| {
220            let mut o = Map::new();
221            o.insert("marker".into(), Value::Number(c.marker as f64));
222            o.insert("urn".into(), Value::String(c.urn.clone()));
223            Value::Object(o)
224        })
225        .collect();
226    Value::Array(arr).to_string_compact().into_bytes()
227}
228
229fn sources_jsonb(rows: &[SourceRow]) -> Vec<u8> {
230    let arr: Vec<Value> = rows
231        .iter()
232        .map(|r| {
233            let mut o = Map::new();
234            o.insert("payload".into(), Value::String(r.payload.clone()));
235            o.insert("urn".into(), Value::String(r.urn.clone()));
236            Value::Object(o)
237        })
238        .collect();
239    Value::Array(arr).to_string_compact().into_bytes()
240}
241
242fn validation_jsonb(v: &Validation) -> Vec<u8> {
243    let mut o = Map::new();
244    o.insert(
245        "errors".into(),
246        Value::Array(v.errors.iter().map(error_value).collect()),
247    );
248    o.insert("ok".into(), Value::Bool(v.ok));
249    o.insert(
250        "warnings".into(),
251        Value::Array(v.warnings.iter().map(warning_value).collect()),
252    );
253    Value::Object(o).to_string_compact().into_bytes()
254}
255
256fn warning_value(w: &ValidationWarning) -> Value {
257    let mut o = Map::new();
258    o.insert("detail".into(), Value::String(w.detail.clone()));
259    o.insert("kind".into(), Value::String(w.kind.clone()));
260    Value::Object(o)
261}
262
263fn error_value(e: &ValidationError) -> Value {
264    let mut o = Map::new();
265    o.insert("detail".into(), Value::String(e.detail.clone()));
266    o.insert("kind".into(), Value::String(e.kind.clone()));
267    Value::Object(o)
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::runtime::ai::ask_response_envelope::{
274        AskResult, Citation, Mode, SourceRow, Validation, ValidationError, ValidationWarning,
275    };
276
277    fn fixture() -> AskResult {
278        AskResult {
279            answer: "X is 42 [^1].".into(),
280            sources_flat: vec![SourceRow {
281                urn: "urn:reddb:row:1".into(),
282                payload: "{\"k\":\"v\"}".into(),
283            }],
284            citations: vec![Citation {
285                marker: 1,
286                urn: "urn:reddb:row:1".into(),
287            }],
288            validation: Validation {
289                ok: true,
290                warnings: vec![],
291                errors: vec![],
292            },
293            cache_hit: false,
294            provider: "openai".into(),
295            model: "gpt-4o-mini".into(),
296            prompt_tokens: 123,
297            completion_tokens: 45,
298            cost_usd: 0.000_321,
299            effective_mode: Mode::Strict,
300            retry_count: 0,
301        }
302    }
303
304    fn cell_str(row: &AskRow, idx: usize) -> &str {
305        std::str::from_utf8(row.cells[idx].as_ref().unwrap()).unwrap()
306    }
307
308    #[test]
309    fn emits_exactly_twelve_columns() {
310        let row = encode(&fixture());
311        assert_eq!(row.columns.len(), 12);
312        assert_eq!(row.cells.len(), 12);
313    }
314
315    #[test]
316    fn column_order_matches_envelope_alphabetical_order() {
317        let row = encode(&fixture());
318        let names: Vec<&str> = row.columns.iter().map(|c| c.name).collect();
319        assert_eq!(
320            names,
321            vec![
322                "answer",
323                "cache_hit",
324                "citations",
325                "completion_tokens",
326                "cost_usd",
327                "mode",
328                "model",
329                "prompt_tokens",
330                "provider",
331                "retry_count",
332                "sources_flat",
333                "validation",
334            ]
335        );
336    }
337
338    #[test]
339    fn columns_helper_returns_same_descriptors_as_encode() {
340        let row = encode(&fixture());
341        assert_eq!(columns(), row.columns);
342    }
343
344    #[test]
345    fn oids_match_pg_type_d_h_canonical_values() {
346        let row = encode(&fixture());
347        let by_name: std::collections::BTreeMap<&str, PgOid> =
348            row.columns.iter().map(|c| (c.name, c.oid)).collect();
349        assert_eq!(by_name["answer"], PgOid::Text);
350        assert_eq!(by_name["cache_hit"], PgOid::Bool);
351        assert_eq!(by_name["citations"], PgOid::Jsonb);
352        assert_eq!(by_name["completion_tokens"], PgOid::Int8);
353        assert_eq!(by_name["cost_usd"], PgOid::Numeric);
354        assert_eq!(by_name["mode"], PgOid::Text);
355        assert_eq!(by_name["model"], PgOid::Text);
356        assert_eq!(by_name["prompt_tokens"], PgOid::Int8);
357        assert_eq!(by_name["provider"], PgOid::Text);
358        assert_eq!(by_name["retry_count"], PgOid::Int8);
359        assert_eq!(by_name["sources_flat"], PgOid::Jsonb);
360        assert_eq!(by_name["validation"], PgOid::Jsonb);
361    }
362
363    #[test]
364    fn answer_text_preserved_with_inline_markers() {
365        let row = encode(&fixture());
366        assert_eq!(cell_str(&row, 0), "X is 42 [^1].");
367    }
368
369    #[test]
370    fn cache_hit_serializes_as_pg_bool_text() {
371        let mut r = fixture();
372        r.cache_hit = false;
373        assert_eq!(cell_str(&encode(&r), 1), "f");
374        r.cache_hit = true;
375        assert_eq!(cell_str(&encode(&r), 1), "t");
376    }
377
378    #[test]
379    fn citations_jsonb_is_marker_ascending() {
380        let mut r = fixture();
381        r.citations = vec![
382            Citation {
383                marker: 3,
384                urn: "urn:c".into(),
385            },
386            Citation {
387                marker: 1,
388                urn: "urn:a".into(),
389            },
390            Citation {
391                marker: 2,
392                urn: "urn:b".into(),
393            },
394        ];
395        let row = encode(&r);
396        let s = cell_str(&row, 2);
397        // Markers appear in ascending order within the JSON array.
398        let p1 = s.find("\"marker\":1").unwrap();
399        let p2 = s.find("\"marker\":2").unwrap();
400        let p3 = s.find("\"marker\":3").unwrap();
401        assert!(p1 < p2 && p2 < p3, "order: {s}");
402    }
403
404    #[test]
405    fn empty_citations_serialize_as_empty_array_not_null() {
406        let mut r = fixture();
407        r.citations = vec![];
408        assert_eq!(cell_str(&encode(&r), 2), "[]");
409    }
410
411    #[test]
412    fn completion_tokens_is_decimal_text() {
413        let mut r = fixture();
414        r.completion_tokens = 0;
415        assert_eq!(cell_str(&encode(&r), 3), "0");
416        r.completion_tokens = 1_000_000;
417        assert_eq!(cell_str(&encode(&r), 3), "1000000");
418    }
419
420    #[test]
421    fn cost_usd_uses_canonical_rust_f64_form() {
422        let mut r = fixture();
423        r.cost_usd = 0.0;
424        assert_eq!(cell_str(&encode(&r), 4), "0");
425        r.cost_usd = 0.000_321;
426        // f64::to_string preserves a representation PG `numeric`
427        // accepts directly — no precision loss across the wire.
428        assert_eq!(cell_str(&encode(&r), 4), "0.000321");
429        r.cost_usd = 1.5;
430        assert_eq!(cell_str(&encode(&r), 4), "1.5");
431    }
432
433    #[test]
434    fn mode_serialises_as_strict_or_lenient_text() {
435        let mut r = fixture();
436        r.effective_mode = Mode::Strict;
437        assert_eq!(cell_str(&encode(&r), 5), "strict");
438        r.effective_mode = Mode::Lenient;
439        assert_eq!(cell_str(&encode(&r), 5), "lenient");
440    }
441
442    #[test]
443    fn model_and_provider_are_utf8_text() {
444        let mut r = fixture();
445        r.model = "claude-sonnet-4-6".into();
446        r.provider = "anthropic".into();
447        let row = encode(&r);
448        assert_eq!(cell_str(&row, 6), "claude-sonnet-4-6");
449        assert_eq!(cell_str(&row, 8), "anthropic");
450    }
451
452    #[test]
453    fn prompt_tokens_is_decimal_text() {
454        let mut r = fixture();
455        r.prompt_tokens = 4096;
456        assert_eq!(cell_str(&encode(&r), 7), "4096");
457    }
458
459    #[test]
460    fn retry_count_is_zero_or_one() {
461        // #395 caps retry budget at one. We don't enforce here (the
462        // validator does) but the column must round-trip both values.
463        let mut r = fixture();
464        r.retry_count = 0;
465        assert_eq!(cell_str(&encode(&r), 9), "0");
466        r.retry_count = 1;
467        assert_eq!(cell_str(&encode(&r), 9), "1");
468    }
469
470    #[test]
471    fn sources_flat_preserves_input_order() {
472        // Post-RRF order is the contract — `[^N]` indexes into this
473        // array, so reordering would silently break grounding.
474        let mut r = fixture();
475        r.sources_flat = vec![
476            SourceRow {
477                urn: "urn:z".into(),
478                payload: "{}".into(),
479            },
480            SourceRow {
481                urn: "urn:a".into(),
482                payload: "{}".into(),
483            },
484        ];
485        let row = encode(&r);
486        let s = cell_str(&row, 10);
487        let pz = s.find("urn:z").unwrap();
488        let pa = s.find("urn:a").unwrap();
489        assert!(pz < pa, "order: {s}");
490    }
491
492    #[test]
493    fn empty_sources_flat_serializes_as_empty_array() {
494        let mut r = fixture();
495        r.sources_flat = vec![];
496        assert_eq!(cell_str(&encode(&r), 10), "[]");
497    }
498
499    #[test]
500    fn validation_jsonb_carries_ok_false_with_errors() {
501        let mut r = fixture();
502        r.validation = Validation {
503            ok: false,
504            warnings: vec![],
505            errors: vec![ValidationError {
506                kind: "out_of_range".into(),
507                detail: "marker 5 > sources_count 2".into(),
508            }],
509        };
510        let encoded = encode(&r);
511        let s = cell_str(&encoded, 11);
512        assert!(s.contains("\"ok\":false"), "got {s}");
513        assert!(s.contains("\"kind\":\"out_of_range\""), "got {s}");
514        assert!(s.contains("marker 5 > sources_count 2"), "got {s}");
515    }
516
517    #[test]
518    fn validation_jsonb_with_warnings_only_keeps_ok_true() {
519        let mut r = fixture();
520        r.validation = Validation {
521            ok: true,
522            warnings: vec![ValidationWarning {
523                kind: "mode_fallback".into(),
524                detail: "provider does not support citations".into(),
525            }],
526            errors: vec![],
527        };
528        let encoded = encode(&r);
529        let s = cell_str(&encoded, 11);
530        assert!(s.contains("\"ok\":true"), "got {s}");
531        assert!(s.contains("\"kind\":\"mode_fallback\""), "got {s}");
532    }
533
534    #[test]
535    fn validation_empty_arrays_are_present_not_null() {
536        let row = encode(&fixture());
537        let s = cell_str(&row, 11);
538        assert!(s.contains("\"errors\":[]"), "got {s}");
539        assert!(s.contains("\"warnings\":[]"), "got {s}");
540    }
541
542    #[test]
543    fn every_cell_is_some_no_nulls() {
544        // PG-wire `DataRow` distinguishes NULL (length -1) from empty
545        // string (length 0). The ASK row never emits NULL — empty
546        // arrays serialize as `[]`, empty strings as `""`. The wiring
547        // slice can rely on this invariant when streaming cells.
548        let row = encode(&fixture());
549        assert!(row.cells.iter().all(|c| c.is_some()));
550    }
551
552    #[test]
553    fn encode_is_deterministic_across_calls() {
554        let r = fixture();
555        let a = encode(&r);
556        let b = encode(&r);
557        assert_eq!(a.columns, b.columns);
558        assert_eq!(a.cells, b.cells);
559    }
560
561    #[test]
562    fn cells_index_aligns_with_columns_index() {
563        // The wiring slice will iterate `columns` and `cells` in
564        // lock-step. Pin the invariant so reordering one without the
565        // other trips here first.
566        let row = encode(&fixture());
567        for (i, col) in row.columns.iter().enumerate() {
568            let cell = row.cells[i].as_ref().expect("no nulls");
569            match col.name {
570                "answer" => assert_eq!(cell.as_slice(), b"X is 42 [^1]."),
571                "cache_hit" => assert_eq!(cell.as_slice(), b"f"),
572                "citations" => assert!(cell.starts_with(b"[")),
573                "completion_tokens" => assert_eq!(cell.as_slice(), b"45"),
574                "cost_usd" => assert_eq!(cell.as_slice(), b"0.000321"),
575                "mode" => assert_eq!(cell.as_slice(), b"strict"),
576                "model" => assert_eq!(cell.as_slice(), b"gpt-4o-mini"),
577                "prompt_tokens" => assert_eq!(cell.as_slice(), b"123"),
578                "provider" => assert_eq!(cell.as_slice(), b"openai"),
579                "retry_count" => assert_eq!(cell.as_slice(), b"0"),
580                "sources_flat" => assert!(cell.starts_with(b"[")),
581                "validation" => assert!(cell.starts_with(b"{")),
582                other => panic!("unexpected column {other}"),
583            }
584        }
585    }
586
587    #[test]
588    fn answer_with_multibyte_utf8_round_trips_byte_for_byte() {
589        let mut r = fixture();
590        r.answer = "Café — résumé 中文 [^1]".into();
591        let row = encode(&r);
592        assert_eq!(
593            row.cells[0].as_ref().unwrap().as_slice(),
594            "Café — résumé 中文 [^1]".as_bytes()
595        );
596    }
597
598    #[test]
599    fn jsonb_outputs_are_compact_not_pretty() {
600        // PG drivers route `jsonb` through their own parser; whitespace
601        // is irrelevant for correctness but matters for wire-size
602        // budgets and audit-row equality (#402). Pin compact form.
603        let row = encode(&fixture());
604        for idx in [2usize, 10, 11] {
605            let s = cell_str(&row, idx);
606            assert!(!s.contains("\n"), "col {idx} not compact: {s}");
607            assert!(!s.contains(": "), "col {idx} pretty-spaced: {s}");
608        }
609    }
610
611    #[test]
612    fn columns_helper_is_callable_before_query_runs() {
613        // The PG `Describe` codepath needs column descriptors before
614        // the query executes — pin that `columns()` does not require
615        // an `AskResult` instance.
616        let cols = columns();
617        assert_eq!(cols.len(), 12);
618        assert_eq!(cols[0].name, "answer");
619    }
620}