Skip to main content

pond/adapter/
extract.rs

1//! Adapter seam primitives: `Source`, `Extracted<T>`, and the `extract_*`
2//! family.
3//!
4//! These types are the load-bearing contract for pond's adapter ecosystem
5//! (CLAUDE.md "Adapter seam"). They make ONE thing impossible to express:
6//! synthesizing a value the source data did not carry. The schema fields
7//! that hold "did the source say this?" data are typed as
8//! `Option<Extracted<T>>`, and the only way to construct an
9//! `Extracted<T>` is to extract it from a `Source` via one of the
10//! `extract_*` functions in this module. Adapters cannot produce a
11//! sentinel ("unknown", "function", "") through any combination of trait
12//! methods, conversions, or struct literals - the seal is module-private.
13//!
14//! Transport-agnostic by design: `Source` is a tiny trait that adapter
15//! authors implement for their own row type, regardless of how the row
16//! arrived (JSONL file, HTTP response body, WebSocket frame, queue
17//! payload, database row). pond ships `impl Source for serde_json::Value`
18//! for JSON-flavored adapters; the trait carries no transport assumptions.
19//!
20//! See spec.md#model-no-synthesis, spec.md#model-schema-honesty, and spec.md#model-lossless-projection for the underlying principles.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23
24use serde_json::Value;
25
26/// A value that was pulled from real source data. The wrapper is opaque to
27/// adapter code: there is no public constructor, no `From<T>`, no
28/// `Default`, no consuming `into_inner`. Read-only access is via `Deref`
29/// so consumers and storage code can use the inner value freely without
30/// being able to forge new instances.
31///
32/// The only path that produces an `Extracted<T>` is the
33/// [`wrap`] function below, which is module-private and called solely by
34/// the `extract_*` helpers in this file. An adapter that wants to put a
35/// value into a schema field has to obtain it from a `Source` via one of
36/// those helpers; no other path exists.
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct Extracted<T>(T);
39
40impl<T> Extracted<T> {
41    /// Borrow the inner value.
42    #[allow(clippy::should_implement_trait)]
43    pub fn as_ref(&self) -> &T {
44        &self.0
45    }
46
47    /// Test-only constructor. Lets unit-test code build Part / Message
48    /// literals directly without routing through a `Source` (which would
49    /// force every test to stand up a synthetic JSON row). Compiled out of
50    /// release builds via `#[cfg(test)]`, so production adapter code
51    /// genuinely cannot reach this even from sibling modules.
52    #[cfg(test)]
53    pub fn from_test_value(value: T) -> Self {
54        wrap(value)
55    }
56
57    /// Crate-internal constructor for the storage decode path. Lance
58    /// stores values that originally came from a `Source` extraction; on
59    /// read-back we need to rewrap them. NOT for adapter use - adapter
60    /// code MUST go through `extract_*`. The `pub(crate)` visibility
61    /// limits this to pond's own infrastructure (notably the
62    /// `message_from_batch` / `part_from_batch` decoders in
63    /// `src/sessions.rs`). Code review checks that no adapter module
64    /// reaches for this.
65    pub(crate) fn from_stored(value: T) -> Self {
66        wrap(value)
67    }
68}
69
70impl<T> std::ops::Deref for Extracted<T> {
71    type Target = T;
72
73    fn deref(&self) -> &T {
74        &self.0
75    }
76}
77
78/// Module-private constructor. The single place in pond's codebase that
79/// can wrap a raw `T` into `Extracted<T>`. Visibility is bounded to this
80/// module so adapter code (which lives in sibling modules under
81/// `src/adapter/`) cannot reach it - that is the seal that makes the
82/// "no synthesized values" invariant compile-enforced.
83fn wrap<T>(value: T) -> Extracted<T> {
84    Extracted(value)
85}
86
87// Arrow `Utf8` columns address with an `i32` offset buffer: no stored text
88// value may approach `i32::MAX`. The seam caps every extracted value at
89// `LEAF_CAP`, truncating an oversized leaf to a head-preserving marker
90// (spec.md#adapter-bounded-values).
91pub(crate) const LEAF_CAP: usize = 10 * 1024 * 1024;
92
93static TRUNCATED_VALUES: AtomicU64 = AtomicU64::new(0);
94
95pub(crate) fn truncated_values_count() -> u64 {
96    TRUNCATED_VALUES.load(Ordering::Relaxed)
97}
98
99fn record_truncation(original_bytes: usize) {
100    TRUNCATED_VALUES.fetch_add(1, Ordering::Relaxed);
101    // Per-occurrence detail at debug; the operator-facing signal is the
102    // `truncated_values` count in the sync summary, always visible.
103    tracing::debug!(
104        original_bytes,
105        cap_bytes = LEAF_CAP,
106        "value exceeded the seam leaf cap; truncated to a marked sentinel"
107    );
108}
109
110fn truncation_marker(original_bytes: usize) -> String {
111    format!("<pond:truncated {original_bytes} bytes>")
112}
113
114/// Truncate an over-cap value to its head-preserving sentinel: a UTF-8 prefix
115/// of `head` plus the `<pond:truncated N bytes>` marker, the whole staying
116/// within `LEAF_CAP`. `original` is the value's true byte length, which may
117/// exceed `head.len()` when the caller streamed and discarded the tail. The
118/// sole definition of the truncation shape (spec.md#adapter-bounded-values). Caller
119/// guarantees `head.len() > LEAF_CAP`.
120pub(crate) fn truncate_to_marker(head: &[u8], original: usize) -> String {
121    let marker = truncation_marker(original);
122    let mut end = LEAF_CAP.saturating_sub(marker.len());
123    while end > 0 && head[end] & 0xC0 == 0x80 {
124        end -= 1;
125    }
126    let mut capped = String::from_utf8_lossy(&head[..end]).into_owned();
127    capped.push_str(&marker);
128    record_truncation(original);
129    capped
130}
131
132pub(crate) fn bound_str(s: &mut String) -> bool {
133    if s.len() <= LEAF_CAP {
134        return false;
135    }
136    *s = truncate_to_marker(s.as_bytes(), s.len());
137    true
138}
139
140pub(crate) fn bound_value(value: &mut Value) {
141    match value {
142        Value::String(s) => {
143            bound_str(s);
144        }
145        Value::Array(items) => items.iter_mut().for_each(bound_value),
146        Value::Object(map) => map.values_mut().for_each(bound_value),
147        Value::Null | Value::Bool(_) | Value::Number(_) => {}
148    }
149}
150
151pub fn extract_raw_record(row: &Value) -> Value {
152    let mut bounded = row.clone();
153    bound_value(&mut bounded);
154    bounded
155}
156
157//
158// Round-tripping `Extracted<T>` through serde is required because
159// `PartKind::Text { text: Option<Extracted<String>> }` (and friends) are
160// serialized to/from JSON for the wire protocol and for Lance's
161// `variant_data` column. Encoding is just the inner value; decoding
162// rewraps the inner value via `wrap`. From an adapter author's
163// perspective serde Deserialize is part of pond, not part of the seam
164// they get to call from their adapter code - so this does not loosen the
165// non-synthesis guarantee for adapters. (Storage decoding happens in
166// `src/sessions.rs::part_from_batch`, not in any adapter.)
167
168impl<T: serde::Serialize> serde::Serialize for Extracted<T> {
169    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
170        self.0.serialize(serializer)
171    }
172}
173
174impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Extracted<T> {
175    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
176        T::deserialize(deserializer).map(wrap)
177    }
178}
179
180/// One row of source data, abstracted across transports. Adapter authors
181/// implement this for whatever type carries one event from their source:
182///
183/// - JSON-flavored adapters (claude-code, codex-cli, ...): pond ships
184///   `impl Source for serde_json::Value` below, so the adapter just
185///   reuses it.
186/// - Struct-flavored adapters (nanoclaw API responses, managed-agent
187///   stream frames, database rows): the adapter defines a row struct and
188///   `impl Source for MyRow` with whatever lookup makes sense for that
189///   format.
190///
191/// The trait is intentionally minimal - just the four primitives
192/// `extract_*` needs - so it ports cleanly to non-JSON sources.
193pub trait Source {
194    /// String field by key. `None` when the field is absent OR present
195    /// but not a string. Adapters that need to distinguish "missing" from
196    /// "wrong type" should branch on `value_field` first.
197    fn str_field(&self, key: &str) -> Option<&str>;
198
199    /// Boolean field by key. `None` for missing-or-wrong-type, same as
200    /// `str_field`.
201    fn bool_field(&self, key: &str) -> Option<bool>;
202
203    /// Raw structured value by key (when the field is a nested object,
204    /// array, or any non-primitive shape pond stores as `Value`). `None`
205    /// for missing.
206    fn value_field(&self, key: &str) -> Option<&Value>;
207
208    /// Nested source view by key. Lets `extract_*` walk into sub-objects
209    /// without the adapter manually unwrapping intermediate layers.
210    /// `None` when the key is absent or not an object.
211    fn nested(&self, key: &str) -> Option<&dyn Source>;
212
213    /// The source itself viewed as a primitive string, when it IS one
214    /// (e.g. `Value::String("hi")`). `None` for object / array / number
215    /// shapes. Used by adapters when the "row" they hold is itself the
216    /// string they want to extract (claude-code's
217    /// `("user", Value::String(text))` shape).
218    fn as_str(&self) -> Option<&str> {
219        None
220    }
221
222    /// Lossless string encoding of the whole source. Returned by
223    /// `extract_compact_repr` when an adapter wants to preserve a row's
224    /// bytes via a faithful encoding fallback (e.g. an unknown
225    /// `assistant_part` subtype that we still want to keep as text).
226    /// This is NOT synthesis (the encoding preserves what was there),
227    /// but going through the seam means storage-layer code can rely on
228    /// every stored value having flowed through a `Source`. For JSON
229    /// sources the default impl is compact `serde_json::to_string`.
230    fn compact_repr(&self) -> String;
231}
232
233//
234// These are the ONLY public producers of `Option<Extracted<T>>`. Anything
235// else that needs to put a value into a schema field has to go through
236// one of these (directly or via composition), which means it has to come
237// from a `Source` - i.e. from real source data.
238
239/// Extract a `String` field. `None` when the source did not carry it.
240pub fn extract_str(source: &dyn Source, key: &str) -> Option<Extracted<String>> {
241    source.str_field(key).map(|s| {
242        let mut owned = s.to_owned();
243        bound_str(&mut owned);
244        wrap(owned)
245    })
246}
247
248/// Extract a `bool` field. `None` when the source did not carry it.
249pub fn extract_bool(source: &dyn Source, key: &str) -> Option<Extracted<bool>> {
250    source.bool_field(key).map(wrap)
251}
252
253/// Extract a structured `Value` field. `None` when the source did not
254/// carry it. Used for fields like `params` / `result` where pond stores
255/// the raw JSON value alongside the canonical schema slots.
256pub fn extract_value(source: &dyn Source, key: &str) -> Option<Extracted<Value>> {
257    source.value_field(key).cloned().map(|mut value| {
258        bound_value(&mut value);
259        wrap(value)
260    })
261}
262
263/// The source itself, viewed as a primitive string. `None` when the
264/// source is not a primitive string shape. Composes with patterns where
265/// the "row" the adapter holds is the string (e.g. claude-code's
266/// `("user", Value::String(text))` content shape).
267pub fn extract_self_str(source: &dyn Source) -> Option<Extracted<String>> {
268    source.as_str().map(|s| {
269        let mut owned = s.to_owned();
270        bound_str(&mut owned);
271        wrap(owned)
272    })
273}
274
275/// Lossless compact-string encoding of the whole source. Always succeeds
276/// (returns an `Extracted<String>`, not an `Option`) because every
277/// `Source` knows how to serialize itself. Used for "preserve the row
278/// bytes when we don't have a richer canonical shape" cases - this is
279/// NOT synthesis: the encoded string is a faithful representation of
280/// data the source actually carried.
281pub fn extract_compact_repr(source: &dyn Source) -> Extracted<String> {
282    let mut repr = source.compact_repr();
283    bound_str(&mut repr);
284    wrap(repr)
285}
286
287//
288// The default implementation for JSON-flavored adapters. Sits at the
289// seam, not inside a specific adapter, so claude-code, codex-cli, and
290// any future JSON-based adapter share one implementation.
291
292impl Source for Value {
293    fn str_field(&self, key: &str) -> Option<&str> {
294        self.get(key).and_then(Value::as_str)
295    }
296
297    fn bool_field(&self, key: &str) -> Option<bool> {
298        self.get(key).and_then(Value::as_bool)
299    }
300
301    fn value_field(&self, key: &str) -> Option<&Value> {
302        self.get(key)
303    }
304
305    fn nested(&self, key: &str) -> Option<&dyn Source> {
306        self.get(key).map(|v| v as &dyn Source)
307    }
308
309    fn as_str(&self) -> Option<&str> {
310        Value::as_str(self)
311    }
312
313    fn compact_repr(&self) -> String {
314        // `to_string` on `Value` produces compact JSON. Falls back to an
315        // empty object encoding only if serialization itself fails, which
316        // is impossible for `Value` (it's a closed enum that always
317        // serializes), but the unwrap_or_default keeps the trait method
318        // total.
319        serde_json::to_string(self).unwrap_or_else(|_| "{}".to_owned())
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    #![allow(clippy::expect_used, clippy::unwrap_used)]
326
327    use super::*;
328    use serde_json::json;
329
330    #[test]
331    fn extract_str_pulls_present_field_and_wraps() {
332        let row = json!({"name": "Edit", "input": {"file": "/tmp/foo"}});
333        let extracted = extract_str(&row, "name").expect("name is present");
334        assert_eq!(&*extracted, "Edit", "Deref exposes the inner string");
335        assert_eq!(extracted.as_ref(), "Edit", "as_ref does too");
336    }
337
338    #[test]
339    fn extract_str_returns_none_when_field_absent() {
340        let row = json!({"name": "Edit"});
341        assert!(extract_str(&row, "missing").is_none());
342    }
343
344    #[test]
345    fn extract_str_returns_none_when_field_is_not_a_string() {
346        let row = json!({"name": 42});
347        assert!(
348            extract_str(&row, "name").is_none(),
349            "wrong-type fields surface as absence; adapters that care should branch on value_field first",
350        );
351    }
352
353    #[test]
354    fn extract_bool_and_extract_value_round_trip() {
355        let row = json!({
356            "is_error": true,
357            "input": {"k": "v"},
358        });
359        let is_error = extract_bool(&row, "is_error").expect("present");
360        assert!(*is_error);
361        let params = extract_value(&row, "input").expect("present");
362        assert_eq!(&*params, &json!({"k": "v"}));
363    }
364
365    #[test]
366    fn extracted_serde_round_trip_preserves_value() {
367        let extracted = extract_str(&json!({"k": "hello"}), "k").unwrap();
368        let encoded = serde_json::to_string(&extracted).unwrap();
369        assert_eq!(encoded, "\"hello\"");
370        let decoded: Extracted<String> = serde_json::from_str(&encoded).unwrap();
371        assert_eq!(&*decoded, "hello");
372    }
373
374    #[test]
375    fn source_impl_for_value_walks_nested_objects() {
376        let row = json!({"message": {"role": "user", "content": "hi"}});
377        let nested = row.nested("message").expect("message is an object");
378        assert_eq!(nested.str_field("role"), Some("user"));
379        assert_eq!(nested.str_field("content"), Some("hi"));
380        assert!(row.nested("missing").is_none());
381    }
382
383    #[test]
384    fn bound_value_caps_every_position_and_spares_good_leaves() {
385        let oversize = "x".repeat(LEAF_CAP + 100);
386        let mut value = json!({
387            "first": oversize,
388            "good_a": "ok",
389            "middle": oversize,
390            "good_b": "ok",
391            "nested": {"deep": oversize, "kept": "ok"},
392            "list": ["ok", oversize, "ok"],
393            "last": oversize,
394        });
395        bound_value(&mut value);
396
397        let marker = format!("{} bytes>", LEAF_CAP + 100);
398        let capped = |v: &Value| {
399            let text = v.as_str().expect("string leaf");
400            text.len() <= LEAF_CAP && text.ends_with(&marker)
401        };
402        let intact = |v: &Value| v.as_str() == Some("ok");
403        for path in [&value["first"], &value["middle"], &value["last"]] {
404            assert!(capped(path));
405        }
406        assert!(capped(&value["nested"]["deep"]));
407        assert!(capped(&value["list"][1]));
408        assert!(intact(&value["good_a"]));
409        assert!(intact(&value["good_b"]));
410        assert!(intact(&value["nested"]["kept"]));
411        assert!(intact(&value["list"][0]));
412        assert!(intact(&value["list"][2]));
413    }
414}