Skip to main content

pond/
sessions.rs

1//! The session datasets (spec.md#datasets): the three Lance tables, the
2//! `Store` facade, ingest validation, and `search_text` extraction.
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use async_stream::try_stream;
12use chrono::{DateTime, TimeZone, Utc};
13use lance::Dataset;
14use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
15use lance::deps::arrow_array::{
16    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
17    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
18    UInt64Array, new_null_array,
19};
20use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
21use lance_file::version::LanceFileVersion;
22use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
23use serde::{Deserialize, Serialize, de::DeserializeOwned};
24use serde_json::Value;
25use tokio_stream::{Stream, StreamExt};
26
27use crate::{
28    config, embed,
29    substrate::{
30        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
31        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
32        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
33    },
34    wire::{
35        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
36        SessionFrom,
37    },
38};
39use url::Url;
40
41#[derive(Debug)]
42pub struct Store {
43    handle: Handle,
44}
45
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LanceArchiveCounts {
48    pub sessions: usize,
49    pub messages: usize,
50    pub parts: usize,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveVersions {
55    pub sessions: u64,
56    pub messages: u64,
57    pub parts: u64,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveExport {
62    pub rows: LanceArchiveCounts,
63    pub source_versions: LanceArchiveVersions,
64}
65
66#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
67pub struct LanceArchiveImport {
68    pub rows: LanceArchiveCounts,
69    pub inserted: LanceArchiveCounts,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct IndexIntents {
74    pub sessions: Vec<IndexIntent>,
75    pub messages: Vec<IndexIntent>,
76    pub parts: Vec<IndexIntent>,
77}
78
79impl IndexIntents {
80    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
81        [
82            (Table::Sessions, &self.sessions),
83            (Table::Messages, &self.messages),
84            (Table::Parts, &self.parts),
85        ]
86    }
87}
88
89/// A message awaiting embedding: its primary key plus the `search_text` to
90/// embed. The vector lives on the same `messages` row, so no denormalized
91/// filter columns are needed (spec.md#session-embed-from-canonical).
92#[derive(Debug, Clone, PartialEq)]
93pub struct PendingMessage {
94    pub session_id: String,
95    pub id: String,
96    pub search_text: String,
97}
98
99/// One embedded message: a primary key and the vector to store. `pond embed`
100/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
101#[derive(Debug, Clone, PartialEq)]
102pub struct EmbeddedMessage {
103    pub session_id: String,
104    pub id: String,
105    pub vector: Vec<f32>,
106}
107
108/// Message metadata used to hydrate search hits after retriever ranking.
109#[derive(Debug, Clone, PartialEq)]
110pub struct MessageMeta {
111    pub message_id: String,
112    pub session_id: String,
113    pub role: String,
114    pub project: String,
115    pub source_agent: String,
116    pub timestamp: DateTime<Utc>,
117    pub search_text: String,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
121pub struct MessageKey {
122    pub session_id: String,
123    pub message_id: String,
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum UpsertStatus {
128    Inserted,
129    Matched,
130}
131
132/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
133/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
134/// results so the CLI can render compaction-skipped (under writer contention)
135/// distinctly from index-build failure (real problem).
136#[derive(Debug, Default)]
137pub struct OptimizeOutcome {
138    pub tables: Vec<TableOptimizeOutcome>,
139}
140
141impl OptimizeOutcome {
142    /// True if any table's indices phase reported a non-conflict failure.
143    /// `SkippedConflict` is expected under contention and does not count.
144    pub fn any_indices_failed(&self) -> bool {
145        self.tables.iter().any(|t| t.indices.is_failed())
146    }
147
148    /// Treat any `Failed` phase as an error. Tests that don't run under
149    /// contention use this to keep their existing `.await?` style: a real
150    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
151    pub fn into_result(self) -> Result<Self> {
152        for table in &self.tables {
153            if let PhaseOutcome::Failed(error) = &table.indices {
154                anyhow::bail!(
155                    "indices phase failed on {}: {error:#}",
156                    table.table.as_str()
157                );
158            }
159            if let PhaseOutcome::Failed(error) = &table.compaction {
160                anyhow::bail!(
161                    "compaction phase failed on {}: {error:#}",
162                    table.table.as_str()
163                );
164            }
165        }
166        Ok(self)
167    }
168}
169
170/// What `pond status` reports: where the data lives, total rows per table,
171/// and a per-(adapter, project) breakdown built from one `messages` scan.
172#[derive(Debug, Clone)]
173pub struct CorpusStats {
174    pub data_url: Url,
175    pub totals: RowTotals,
176    /// One entry per adapter present in the corpus. When `include_subagents`
177    /// is false (the CLI default), sub-agent rows (`source_agent` containing
178    /// `/`) are filtered out so only the main-agent sessions appear. When
179    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
180    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
181    /// this into registry order at render time so the tree matches the
182    /// discovery picker.
183    pub adapters: Vec<AdapterStats>,
184    /// Whether the rollup includes sub-agent sessions. The renderer prints a
185    /// hint about `--include-subagents` when this is false so users know the
186    /// `totals` row above counts sessions that aren't broken down below.
187    pub include_subagents: bool,
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct RowTotals {
192    pub sessions: u64,
193    pub messages: u64,
194    pub parts: u64,
195}
196
197/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
198/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
199/// without `search_text` produce no vector. `embedded` is the subset of those
200/// already carrying a vector under the current [`embed::model_id()`]. The pending
201/// backlog is `total - embedded`.
202#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203pub struct EmbeddingProgress {
204    pub embedded: usize,
205    pub total: usize,
206    pub model: &'static str,
207}
208
209#[derive(Debug, Clone)]
210pub struct AdapterStats {
211    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
212    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
213    /// `include_subagents` is on.
214    pub adapter: String,
215    pub sessions: u64,
216    pub messages: u64,
217    /// Projects under this adapter, sorted by message count desc, then by
218    /// project name asc.
219    pub projects: Vec<ProjectStats>,
220}
221
222#[derive(Debug, Clone)]
223pub struct ProjectStats {
224    pub project: String,
225    pub sessions: u64,
226    pub messages: u64,
227}
228
229#[derive(Default)]
230struct GroupAccumulator {
231    messages: u64,
232    session_ids: HashSet<String>,
233}
234
235#[derive(Debug, Clone, Copy)]
236pub struct MessageWrite<'a> {
237    pub message: &'a Message,
238    pub parts: &'a [Part],
239    pub search_text: Option<&'a str>,
240}
241
242impl Store {
243    /// Open against a local filesystem URL or a remote one for which the
244    /// caller has no extra options to pass (env vars suffice). CLI verbs
245    /// that load `[storage]` from config should call
246    /// [`Store::open_with_options`] instead so the same options flow into
247    /// every dataset open and write.
248    pub async fn open(location: &Url) -> Result<Self> {
249        Ok(Self {
250            handle: Handle::open(location).await?,
251        })
252    }
253
254    /// Open with object-store options (S3 creds, region, endpoint, ...)
255    /// threaded through Lance verbatim. Keys are the standard `object_store`
256    /// config names; pond does not parse them. Empty options + default caps
257    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
258    /// config block via [`crate::substrate::RuntimeCaps`].
259    pub async fn open_with_options(
260        location: &Url,
261        storage_options: std::collections::HashMap<String, String>,
262        caps: crate::substrate::RuntimeCaps,
263    ) -> Result<Self> {
264        Ok(Self {
265            handle: Handle::open_with_options(location, storage_options, caps).await?,
266        })
267    }
268
269    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
270    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
271    /// through [`Store::open_with_options`] so the production policy is
272    /// applied; tests get the backend-aware local-FS defaults.
273    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
274        let url = config::url_for_path(path)?;
275        Self::open_with_options(
276            &url,
277            std::collections::HashMap::new(),
278            crate::substrate::RuntimeCaps::default(),
279        )
280        .await
281    }
282
283    /// Export clean, index-free Lance datasets into `dest`.
284    ///
285    /// This rewrites the visible rows of each table instead of copying the
286    /// dataset roots. The resulting manifests therefore contain no references
287    /// to the source store's `_indices`, while `messages.vector` and
288    /// `messages.embedding_model` remain ordinary data columns and are
289    /// preserved.
290    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
291        std::fs::create_dir_all(dest)
292            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
293        let (sessions, sessions_version) = self
294            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
295            .await?;
296        let (messages, messages_version) = self
297            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
298            .await?;
299        let (parts, parts_version) = self
300            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
301            .await?;
302        Ok(LanceArchiveExport {
303            rows: LanceArchiveCounts {
304                sessions,
305                messages,
306                parts,
307            },
308            source_versions: LanceArchiveVersions {
309                sessions: sessions_version,
310                messages: messages_version,
311                parts: parts_version,
312            },
313        })
314    }
315
316    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
317        let sessions_dataset =
318            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
319        let messages_dataset =
320            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
321        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
322        let (sessions, sessions_inserted) = self
323            .import_clean_table(Table::Sessions, sessions_dataset)
324            .await?;
325        let (messages, messages_inserted) = self
326            .import_clean_table(Table::Messages, messages_dataset)
327            .await?;
328        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
329        Ok(LanceArchiveImport {
330            rows: LanceArchiveCounts {
331                sessions,
332                messages,
333                parts,
334            },
335            inserted: LanceArchiveCounts {
336                sessions: sessions_inserted,
337                messages: messages_inserted,
338                parts: parts_inserted,
339            },
340        })
341    }
342
343    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
344        let dataset = self.handle.dataset(table).await?;
345        let source_version = dataset.version_id();
346        let schema = export_schema(table);
347        let mut scan = dataset.scan();
348        // The default scan projects blob columns as descriptor structs
349        // (position/size into the source's blob storage) - meaningless in an
350        // archive and unwritable at V2_1. `AllBinary` materializes the bytes
351        // so the rewritten table is self-contained.
352        scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
353        let mut stream = scan
354            .try_into_stream()
355            .await
356            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
357        let dest_uri = dest
358            .to_str()
359            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
360
361        let mut rows = 0usize;
362        let mut wrote = false;
363        while let Some(batch) = stream.next().await {
364            let batch = batch
365                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
366            rows += batch.num_rows();
367            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
368            let mut params = write_params_for_create();
369            if wrote {
370                params.mode = WriteMode::Append;
371            }
372            Dataset::write(reader, dest_uri, Some(params))
373                .await
374                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
375            wrote = true;
376        }
377
378        if !wrote {
379            let batch = RecordBatch::new_empty(schema.clone());
380            let reader = RecordBatchIterator::new([Ok(batch)], schema);
381            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
382                .await
383                .with_context(|| {
384                    format!("failed to write empty {} archive table", table.as_str())
385                })?;
386        }
387        Ok((rows, source_version))
388    }
389
390    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
391        // Force the destination table into existence up front: an empty
392        // archive table yields zero batches, so merge_insert alone would
393        // leave a lazily-created table (parts) missing on the destination.
394        let _ = self.handle.dataset(table).await?;
395        let mut scan = dataset.scan();
396        // Mirror of the export side: materialize blob bytes, not descriptor
397        // structs - merge_insert writes them into the destination's schema.
398        scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
399        let mut stream = scan
400            .try_into_stream()
401            .await
402            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
403        let mut rows = 0usize;
404        let mut inserted = 0usize;
405        while let Some(batch) = stream.next().await {
406            let batch = batch
407                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
408            let row_count = batch.num_rows();
409            rows += row_count;
410            inserted += self
411                .handle
412                .merge_insert(table, batch, row_count)
413                .await
414                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
415                as usize;
416        }
417        Ok((rows, inserted))
418    }
419
420    /// Flat write path. Per-row insert/match truth is not synthesized here -
421    /// honest outcomes come from the pre-existence scan on
422    /// [`Self::upsert_session_batch`]; the CLI sync and wire ingest paths use
423    /// that, so these helpers only need to surface write failure.
424    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
425        if sessions.is_empty() {
426            return Ok(());
427        }
428        let batches = sessions_batches(sessions)?;
429        merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
430        Ok(())
431    }
432
433    /// Batched write path used by the adapter ingest loop and by the wire
434    /// handler's final flush. Receives N completed substreams from the
435    /// validator and:
436    ///
437    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
438    ///      per session, sequentially. Sessions that fail produce one Error
439    ///      outcome and are excluded from the write batch.
440    ///   2. Deduplicates in-batch at the substream level: when two substreams
441    ///      in the same batch share a `session_id` (Claude Code's subagent
442    ///      files reuse their parent's id), the first occurrence wins. The
443    ///      second is either *merged* (same `source_agent` + `project`:
444    ///      messages/parts append, no duplicate rows) or *rejected*
445    ///      (different `project` - the subagent-vs-parent case). Row-level
446    ///      duplicates that slip past here are caught downstream by Lance's
447    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
448    ///      (invariant 17): this layer's job is preserving substream merge
449    ///      semantics, not policing the PK uniqueness Lance handles itself.
450    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
451    ///      parts) across every valid substream.
452    ///   4. Fires the three `merge_insert` calls in parallel via
453    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
454    ///      independent, so these proceed concurrently.
455    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
456    async fn upsert_session_batch(
457        &self,
458        substreams: Vec<CompletedSubstream>,
459    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
460        if substreams.is_empty() {
461            return Ok((Vec::new(), BatchCounts::default()));
462        }
463
464        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
465        let mut counts = BatchCounts::default();
466
467        // In-batch dedup. First occurrence of each session_id wins; later
468        // occurrences either merge or get rejected. Iteration order preserves
469        // original substream order so outcomes index correctly.
470        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
471        let mut by_session_id: std::collections::HashMap<String, usize> =
472            std::collections::HashMap::with_capacity(substreams.len());
473        for substream in substreams {
474            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
475                let existing = &merged[existing_idx];
476                if existing.session.source_agent != substream.session.source_agent
477                    || existing.session.project != substream.session.project
478                {
479                    // Subagent-vs-parent class. The first occurrence's
480                    // metadata stays authoritative; this substream is
481                    // rejected on the same immutable-field axis as the
482                    // storage-side check.
483                    let reason = if existing.session.source_agent != substream.session.source_agent
484                    {
485                        IngestError::ImmutableField {
486                            field: "source_agent",
487                            session_id: substream.session.id.clone(),
488                            stored: existing.session.source_agent.clone(),
489                            attempted: substream.session.source_agent.clone(),
490                        }
491                    } else {
492                        IngestError::ImmutableField {
493                            field: "project",
494                            session_id: substream.session.id.clone(),
495                            stored: (*existing.session.project).clone(),
496                            attempted: (*substream.session.project).clone(),
497                        }
498                    };
499                    let field = match &reason {
500                        IngestError::ImmutableField { field, .. } => Some(*field),
501                    };
502                    let reason_key = match field {
503                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
504                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
505                        _ => DROP_REASON_UNCATEGORIZED,
506                    };
507                    outcomes.extend(error_outcomes_for_substream(
508                        substream.session_index,
509                        &substream.session,
510                        &substream.messages,
511                        reason.to_string(),
512                        field,
513                        reason_key,
514                    ));
515                    continue;
516                }
517                // Same session, same metadata: merge messages. Dedup message
518                // ids defensively (within one batch, the validator's seen
519                // sets are per-substream so cross-substream dups can happen
520                // legally if both files re-emit the same row).
521                let existing = &mut merged[existing_idx];
522                let mut seen: std::collections::HashSet<String> = existing
523                    .messages
524                    .iter()
525                    .map(|m| m.message.id().to_owned())
526                    .collect();
527                for msg in substream.messages {
528                    if seen.insert(msg.message.id().to_owned()) {
529                        existing.messages.push(msg);
530                    }
531                }
532                continue;
533            }
534            by_session_id.insert(substream.session.id.clone(), merged.len());
535            merged.push(substream);
536        }
537
538        // Pre-existence sweep: one scan per table keyed on the batch's
539        // session_ids, capped at the substream count. Replaces the prior
540        // N-sequential `find_session` calls and gives us honest per-row
541        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
542        let session_id_values: Vec<ScalarValue> = merged
543            .iter()
544            .map(|substream| ScalarValue::String(substream.session.id.clone()))
545            .collect();
546        let existing_sessions: std::collections::HashMap<String, Session> =
547            if session_id_values.is_empty() {
548                std::collections::HashMap::new()
549            } else {
550                let batch = self
551                    .handle
552                    .scan_batch(
553                        Table::Sessions,
554                        Some(&Predicate::In("id", session_id_values.clone())),
555                        &[],
556                    )
557                    .await?;
558                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
559                for row in 0..batch.num_rows() {
560                    let session = session_from_batch(&batch, row)?;
561                    map.insert(session.id.clone(), session);
562                }
563                map
564            };
565        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
566            HashSet::new()
567        } else {
568            let batch = self
569                .handle
570                .scan_batch(
571                    Table::Messages,
572                    Some(&Predicate::In("session_id", session_id_values.clone())),
573                    &["session_id", "id"],
574                )
575                .await?;
576            let mut set = HashSet::with_capacity(batch.num_rows());
577            for row in 0..batch.num_rows() {
578                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
579                let mid = string(&batch, "id", row)?.context("message id is null")?;
580                set.insert((sid, mid));
581            }
582            set
583        };
584        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
585            HashSet::new()
586        } else {
587            let batch = self
588                .handle
589                .scan_batch(
590                    Table::Parts,
591                    Some(&Predicate::In("session_id", session_id_values)),
592                    &["session_id", "message_id", "id"],
593                )
594                .await?;
595            let mut set = HashSet::with_capacity(batch.num_rows());
596            for row in 0..batch.num_rows() {
597                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
598                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
599                let pid = string(&batch, "id", row)?.context("part id is null")?;
600                set.insert((sid, mid, pid));
601            }
602            set
603        };
604
605        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
606        for substream in merged {
607            if let Some(existing) = existing_sessions.get(&substream.session.id)
608                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
609            {
610                let field = match &failure {
611                    IngestError::ImmutableField { field, .. } => Some(*field),
612                };
613                let reason_key = match field {
614                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
615                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
616                    _ => DROP_REASON_UNCATEGORIZED,
617                };
618                outcomes.extend(error_outcomes_for_substream(
619                    substream.session_index,
620                    &substream.session,
621                    &substream.messages,
622                    failure.to_string(),
623                    field,
624                    reason_key,
625                ));
626                continue;
627            }
628            writeable.push(substream);
629        }
630
631        if writeable.is_empty() {
632            outcomes.sort_by_key(|outcome| outcome.index);
633            return Ok((outcomes, counts));
634        }
635
636        let sessions_owned: Vec<Session> = writeable
637            .iter()
638            .map(|substream| substream.session.clone())
639            .collect();
640        let message_rows: Vec<MessageBatchRow<'_>> = writeable
641            .iter()
642            .flat_map(|substream| {
643                substream.messages.iter().map(|buffered| MessageBatchRow {
644                    message: &buffered.message,
645                    source_agent: &substream.session.source_agent,
646                    project: &substream.session.project,
647                    search_text: buffered.search_text.as_deref(),
648                })
649            })
650            .collect();
651        let part_rows: Vec<Part> = writeable
652            .iter()
653            .flat_map(|substream| {
654                substream.messages.iter().flat_map(|buffered| {
655                    buffered
656                        .parts
657                        .iter()
658                        .map(|buffered_part| buffered_part.part.clone())
659                })
660            })
661            .collect();
662
663        let session_batches = sessions_batches(&sessions_owned)?;
664        let message_batches = messages_batches(&message_rows)?;
665        let part_batches = parts_batches(&part_rows)?;
666
667        // Merge_insert returns a batch-level inserted count which we cross-
668        // check against our pre-existence sets, but for per-row truth we
669        // attribute through the sets themselves (next loop). Under
670        // single-writer the two agree exactly; under a hostile concurrent
671        // writer the sets are authoritative for THIS request's wire shape -
672        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
673        // distinction informational, not behavioral.
674        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
675            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
676            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
677            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
678        )?;
679
680        for substream in &writeable {
681            outcomes.extend(success_outcomes_for_substream(
682                substream.session_index,
683                &substream.session,
684                &substream.messages,
685                &existing_sessions,
686                &existing_message_pks,
687                &existing_part_pks,
688                &mut counts,
689            ));
690        }
691
692        outcomes.sort_by_key(|outcome| outcome.index);
693        Ok((outcomes, counts))
694    }
695
696    pub async fn upsert_messages(
697        &self,
698        session: &Session,
699        messages: &[MessageWrite<'_>],
700    ) -> Result<()> {
701        if messages.is_empty() {
702            return Ok(());
703        }
704
705        let rows = messages
706            .iter()
707            .map(|write| MessageBatchRow {
708                message: write.message,
709                source_agent: &session.source_agent,
710                project: &session.project,
711                search_text: write.search_text,
712            })
713            .collect::<Vec<_>>();
714        let batches = messages_batches(&rows)?;
715        merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
716        Ok(())
717    }
718
719    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
720        if parts.is_empty() {
721            return Ok(());
722        }
723        let batches = parts_batches(parts)?;
724        merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
725        Ok(())
726    }
727
728    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
729        let Some(session) = self.find_session(session_id).await? else {
730            return Ok(None);
731        };
732        let messages = self.messages_for_session(session_id).await?;
733        Ok(Some(SessionWithMessages { session, messages }))
734    }
735
736    /// Every session id currently in the store, unsorted.
737    pub async fn session_ids(&self) -> Result<Vec<String>> {
738        let batch = self
739            .handle
740            .scan_batch(Table::Sessions, None, &["id"])
741            .await?;
742        let mut ids = Vec::with_capacity(batch.num_rows());
743        for row in 0..batch.num_rows() {
744            if let Some(id) = string(&batch, "id", row)? {
745                ids.push(id);
746            }
747        }
748        Ok(ids)
749    }
750
751    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
752        let batch = self
753            .handle
754            .scan_batch(
755                Table::Sessions,
756                Some(&Predicate::Eq(
757                    "parent_session_id",
758                    parent_session_id.into(),
759                )),
760                &[
761                    "id",
762                    "parent_session_id",
763                    "parent_message_id",
764                    "source_agent",
765                    "created_at",
766                    "project",
767                    "options",
768                ],
769            )
770            .await?;
771        let mut sessions = Vec::with_capacity(batch.num_rows());
772        for row in 0..batch.num_rows() {
773            sessions.push(session_from_batch(&batch, row)?);
774        }
775        sessions.sort_by(|left, right| left.id.cmp(&right.id));
776        Ok(sessions)
777    }
778
779    /// `session_id -> wall-clock time of the Lance manifest version that
780    /// last wrote the row` for the per-session staleness skip
781    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
782    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
783    /// and joins it against `Dataset::versions()` for commit timestamps.
784    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
785        use lance::deps::arrow_array::UInt64Array;
786
787        let dataset = self.handle.dataset(Table::Sessions).await?;
788        let version_list = dataset.versions().await?;
789        let versions: HashMap<u64, DateTime<Utc>> = version_list
790            .iter()
791            .map(|v| (v.version, v.timestamp))
792            .collect();
793        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
794        // pruned versions from the manifest list, leaving rows whose
795        // `_row_last_updated_at_version` points at a version that no longer
796        // resolves. Those rows are still real and were ingested at some time
797        // <= the oldest still-visible version's commit timestamp - so falling
798        // back to that bound preserves a sound `mtime <= ingested` upper edge
799        // and keeps the staleness skip working after cleanup.
800        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
801
802        let scanner = self
803            .handle
804            .scan(
805                Table::Sessions,
806                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
807            )
808            .await?;
809        let mut stream = scanner.try_into_stream().await?;
810        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
811        while let Some(batch) = stream.next().await {
812            let batch = batch?;
813            let version_array = batch
814                .column_by_name("_row_last_updated_at_version")
815                .context("missing _row_last_updated_at_version column")?
816                .as_any()
817                .downcast_ref::<UInt64Array>()
818                .context("_row_last_updated_at_version is not UInt64")?;
819            for row in 0..batch.num_rows() {
820                let Some(id) = string(&batch, "id", row)? else {
821                    continue;
822                };
823                if version_array.is_null(row) {
824                    continue;
825                }
826                let version = version_array.value(row);
827                let ts = versions.get(&version).copied().or(oldest_visible_ts);
828                if let Some(ts) = ts {
829                    out.insert(id, ts);
830                }
831            }
832        }
833        Ok(out)
834    }
835
836    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
837    /// Conversational filters to `search_text IS NOT NULL`; Complete and
838    /// Verbatim scan every message. Every mode attaches compact part summaries;
839    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
840    /// lower bound (a message id); the page is bounded by `limit` and a byte
841    /// budget and never cuts mid-message.
842    pub async fn session_view(
843        &self,
844        session_id: &str,
845        params: SessionViewParams<'_>,
846    ) -> Result<GetLookup<SessionPage>> {
847        let Some(session) = self.find_session(session_id).await? else {
848            return Ok(GetLookup::NotFound);
849        };
850
851        let mut rows = match params.mode {
852            ResponseMode::Conversational => self
853                .scan_conversational_messages(session_id)
854                .await?
855                .into_iter()
856                .map(|row| ScanRow {
857                    id: row.message_id,
858                    role: row.role,
859                    timestamp: row.timestamp,
860                    text: Some(row.text.into_inner()),
861                    content: None,
862                })
863                .collect(),
864            ResponseMode::Complete | ResponseMode::Verbatim => {
865                self.scan_all_messages(session_id).await?
866            }
867        };
868        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
869
870        let start_at = match params.after_id {
871            // Append-only stream: a real anchor never vanishes, so an unknown
872            // `after_id` is a stale/mistyped client cursor, not "start over".
873            Some(after) => match rows.iter().position(|row| row.id == after) {
874                Some(idx) => idx + 1,
875                None => return Ok(GetLookup::UnknownAfterId),
876            },
877            None => 0,
878        };
879        let remaining = rows.get(start_at..).unwrap_or(&[]);
880        let (emitted, messages_remaining) = match params.session_from {
881            SessionFrom::Start => {
882                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
883                    row.text.as_deref().map_or(0, str::len)
884                });
885                (&remaining[..n], remaining.len() - n)
886            }
887            // Tail: the newest messages that fit `limit` and the byte budget,
888            // dropping oldest first; the newest is always kept and the page
889            // stays chronological so the agent reads the flow forward.
890            SessionFrom::End => {
891                let mut bytes = 0usize;
892                let mut start = remaining.len();
893                for row in remaining.iter().rev() {
894                    if remaining.len() - start >= params.limit {
895                        break;
896                    }
897                    let size = row.text.as_deref().map_or(0, str::len);
898                    if start < remaining.len() && bytes + size > params.budget_bytes {
899                        break;
900                    }
901                    bytes += size;
902                    start -= 1;
903                }
904                (&remaining[start..], start)
905            }
906        };
907        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
908
909        // Conversational/Complete only summarize parts; Verbatim inlines every
910        // part (blobs included).
911        let mut parts_by_message = match params.mode {
912            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
913            ResponseMode::Conversational | ResponseMode::Complete => {
914                self.summary_parts_for_messages(session_id, &ids).await?
915            }
916        };
917        let messages = emitted
918            .iter()
919            .map(|row| RetrievedMessage {
920                id: row.id.clone(),
921                role: row.role,
922                timestamp: row.timestamp,
923                text: row.text.clone(),
924                content: row.content.clone(),
925                parts: parts_by_message
926                    .remove(&(session_id.to_owned(), row.id.clone()))
927                    .unwrap_or_default(),
928            })
929            .collect();
930
931        Ok(GetLookup::Found(SessionPage {
932            session,
933            messages,
934            messages_remaining,
935        }))
936    }
937
938    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
939    /// the target with its full parts (paginated by `after_id` over part
940    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
941    /// `None` when no stored message carries `message_id`. Sibling parts are
942    /// carried for summarizing; the target's parts ride `target_parts`.
943    pub async fn message_view(
944        &self,
945        message_id: &str,
946        params: MessageViewParams<'_>,
947    ) -> Result<GetLookup<MessagePage>> {
948        let Some(session_id) = self.session_id_for_message(message_id).await? else {
949            return Ok(GetLookup::NotFound);
950        };
951        let Some(session) = self.find_session(&session_id).await? else {
952            return Ok(GetLookup::NotFound);
953        };
954        let mut rows = self.scan_all_messages(&session_id).await?;
955        // spec.md#protocol: context siblings follow the response mode, and the
956        // default is the conversational view - in carrier-heavy sessions the
957        // system/tool rows would otherwise fill the whole +-depth window and
958        // push the actual conversation out of it. The target stays regardless
959        // of its own role: the caller asked for that message.
960        if matches!(params.mode, ResponseMode::Conversational) {
961            rows.retain(|row| row.text.is_some() || row.id == message_id);
962        }
963        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
964        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
965            return Ok(GetLookup::NotFound);
966        };
967
968        let start = target_pos.saturating_sub(params.context_depth);
969        let end = (target_pos + params.context_depth + 1).min(rows.len());
970        let window = &rows[start..end];
971        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
972        // The target's full parts (blobs included) ride the response; siblings
973        // are only summarized, but they share this one window scan.
974        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
975
976        let all_parts = parts_by_message
977            .remove(&(session_id.clone(), message_id.to_owned()))
978            .unwrap_or_default();
979        let start_part = match params.after_id {
980            // Exclusive over ordinals: parts are ordinal-sorted, so the first
981            // part past the anchor's ordinal is the page start. An anchor absent
982            // from the target's parts is a stale/mistyped client cursor.
983            Some(after) => match all_parts.iter().find(|part| part.id == after) {
984                Some(anchor) => all_parts
985                    .iter()
986                    .position(|part| part.ordinal > anchor.ordinal)
987                    .unwrap_or(all_parts.len()),
988                None => return Ok(GetLookup::UnknownAfterId),
989            },
990            None => 0,
991        };
992        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
993        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
994            serde_json::to_string(part).map_or(0, |json| json.len())
995        });
996        let target_parts = remaining_parts[..part_count].to_vec();
997        let target_parts_remaining = remaining_parts.len() - part_count;
998
999        let target_row = &rows[target_pos];
1000        let target = RetrievedMessage {
1001            id: target_row.id.clone(),
1002            role: target_row.role,
1003            timestamp: target_row.timestamp,
1004            text: target_row.text.clone(),
1005            content: target_row.content.clone(),
1006            // Target structure is carried in full by `target_parts`.
1007            parts: Vec::new(),
1008        };
1009        let siblings = window
1010            .iter()
1011            .enumerate()
1012            .filter(|(idx, _)| start + idx != target_pos)
1013            .map(|(_, row)| RetrievedMessage {
1014                id: row.id.clone(),
1015                role: row.role,
1016                timestamp: row.timestamp,
1017                text: row.text.clone(),
1018                content: row.content.clone(),
1019                parts: parts_by_message
1020                    .get(&(session_id.clone(), row.id.clone()))
1021                    .cloned()
1022                    .unwrap_or_default(),
1023            })
1024            .collect();
1025
1026        Ok(GetLookup::Found(MessagePage {
1027            session,
1028            target,
1029            target_parts,
1030            target_parts_remaining,
1031            siblings,
1032        }))
1033    }
1034
1035    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1036        let batch = self
1037            .handle
1038            .scan_batch(
1039                Table::Messages,
1040                Some(&Predicate::Eq("session_id", session_id.into())),
1041                &["id", "timestamp", "role", "search_text", "content"],
1042            )
1043            .await?;
1044        let mut rows = Vec::with_capacity(batch.num_rows());
1045        for row in 0..batch.num_rows() {
1046            let id = string(&batch, "id", row)?.context("message id is null")?;
1047            let role =
1048                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1049            let timestamp = datetime(&batch, "timestamp", row)?;
1050            rows.push(ScanRow {
1051                id,
1052                role,
1053                timestamp,
1054                text: string(&batch, "search_text", row)?,
1055                content: string(&batch, "content", row)?,
1056            });
1057        }
1058        Ok(rows)
1059    }
1060
1061    /// Conversational scan over one session: rows ordered by
1062    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1063    /// read seam (spec.md#search-prefilter-pushdown).
1064    pub async fn scan_conversational_messages(
1065        &self,
1066        session_id: &str,
1067    ) -> Result<Vec<ConversationalRow>> {
1068        let filter = Predicate::And(vec![
1069            Predicate::Eq("session_id", session_id.into()),
1070            Predicate::IsNotNull("search_text"),
1071        ]);
1072        let batch = self
1073            .handle
1074            .scan_batch(
1075                Table::Messages,
1076                Some(&filter),
1077                &["id", "timestamp", "role", "search_text"],
1078            )
1079            .await?;
1080
1081        let mut rows = Vec::with_capacity(batch.num_rows());
1082        for row in 0..batch.num_rows() {
1083            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1084            let role =
1085                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1086            let timestamp = datetime(&batch, "timestamp", row)?;
1087            let text_str = string(&batch, "search_text", row)?.context(
1088                "search_text null after IsNotNull pushdown - storage invariant violated",
1089            )?;
1090            rows.push(ConversationalRow {
1091                session_id: session_id.to_owned(),
1092                message_id,
1093                role,
1094                timestamp,
1095                text: SearchText(text_str),
1096            });
1097        }
1098        rows.sort_by(|a, b| {
1099            a.timestamp
1100                .cmp(&b.timestamp)
1101                .then_with(|| a.message_id.cmp(&b.message_id))
1102        });
1103        Ok(rows)
1104    }
1105
1106    /// Locate the session id for a stored message. Cheap when only the routing
1107    /// hint is needed - callers that need the messages use `scan_all_messages`.
1108    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1109        let batch = self
1110            .handle
1111            .scan_batch(
1112                Table::Messages,
1113                Some(&Predicate::Eq("id", message_id.into())),
1114                &["session_id"],
1115            )
1116            .await?;
1117        if batch.num_rows() == 0 {
1118            return Ok(None);
1119        }
1120        string(&batch, "session_id", 0)
1121    }
1122
1123    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1124        self.handle.row_counts().await
1125    }
1126
1127    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1128    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1129    /// handle's freshness gate, so each query sees a current snapshot.
1130    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1131        Ok(Arc::new(self.handle.dataset(table).await?))
1132    }
1133
1134    /// Write a `pond_sql_query` export artifact.
1135    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1136        self.handle.export_write(name, bytes).await
1137    }
1138
1139    /// Read a `pond_sql_query` export artifact back.
1140    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1141        self.handle.export_read(name).await
1142    }
1143
1144    /// Local filesystem path of an export artifact on `file://` installs.
1145    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1146        self.handle.export_local_path(name)
1147    }
1148
1149    /// Compute the per-adapter / per-project rollup that drives
1150    /// `pond status`. One scan over `messages` projecting the three
1151    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1152    /// aggregated in-memory. Bounded by the cross product of adapters and
1153    /// projects, which stays small on real corpora.
1154    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1155        let scanner = self
1156            .handle
1157            .scan(
1158                Table::Messages,
1159                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1160            )
1161            .await?;
1162        let mut stream = scanner.try_into_stream().await?;
1163        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1164        while let Some(batch) = stream.next().await {
1165            let batch = batch?;
1166            for row in 0..batch.num_rows() {
1167                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1168                let project = string(&batch, "project", row)?.unwrap_or_default();
1169                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1170                let is_subagent = source_agent.contains('/');
1171                if is_subagent && !include_subagents {
1172                    continue;
1173                }
1174                let entry = groups.entry((source_agent, project)).or_default();
1175                entry.messages += 1;
1176                entry.session_ids.insert(session_id);
1177            }
1178        }
1179
1180        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1181        let totals = RowTotals {
1182            sessions: totals_sessions as u64,
1183            messages: totals_messages as u64,
1184            parts: totals_parts as u64,
1185        };
1186
1187        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1188        for ((adapter, project), acc) in groups {
1189            by_adapter.entry(adapter).or_default().push(ProjectStats {
1190                project,
1191                sessions: acc.session_ids.len() as u64,
1192                messages: acc.messages,
1193            });
1194        }
1195
1196        let mut adapters = Vec::with_capacity(by_adapter.len());
1197        for (adapter, mut projects) in by_adapter {
1198            projects.sort_by(|a, b| {
1199                b.messages
1200                    .cmp(&a.messages)
1201                    .then_with(|| a.project.cmp(&b.project))
1202            });
1203            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1204            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1205            adapters.push(AdapterStats {
1206                adapter,
1207                sessions,
1208                messages,
1209                projects,
1210            });
1211        }
1212
1213        Ok(CorpusStats {
1214            data_url: self.handle.location().clone(),
1215            totals,
1216            adapters,
1217            include_subagents,
1218        })
1219    }
1220
1221    /// Write a batch of embeddings into `messages`: set `vector` and
1222    /// `embedding_model` on each row by `(session_id, id)`
1223    /// (spec.md#session-embed-from-canonical). The column update goes through the
1224    /// write seam and lands as a new manifest version (`append-only`).
1225    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1226        if rows.is_empty() {
1227            return Ok(());
1228        }
1229        let batch = embedding_update_batch(rows)?;
1230        self.handle
1231            .merge_update(Table::Messages, batch, rows.len())
1232            .await?;
1233        Ok(())
1234    }
1235
1236    /// Stream the backlog of messages needing embedding: rows with `search_text`
1237    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1238    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1239        try_stream! {
1240            let filter = Predicate::And(vec![
1241                Predicate::IsNull("vector"),
1242                Predicate::IsNotNull("search_text"),
1243            ]);
1244            let projection: &[&str] = &["session_id", "id", "search_text"];
1245            let scanner = self
1246                .handle
1247                .scan(
1248                    Table::Messages,
1249                    ScanOpts::with_predicate_and_projection(&filter, projection),
1250                )
1251                .await?;
1252            let mut batches = scanner
1253                .try_into_stream()
1254                .await
1255                .context("failed to open messages stream")?;
1256            while let Some(batch) = batches.next().await {
1257                let batch = batch?;
1258                for row in 0..batch.num_rows() {
1259                    yield PendingMessage {
1260                        session_id: string(&batch, "session_id", row)?
1261                            .context("session_id is null")?,
1262                        id: string(&batch, "id", row)?.context("message id is null")?,
1263                        search_text: string(&batch, "search_text", row)?
1264                            .context("search_text is null")?,
1265                    };
1266                }
1267            }
1268        }
1269    }
1270
1271    /// Stream messages that are either never embedded or stale under the
1272    /// current model. `pond embed --force` feeds this to the same unconditional
1273    /// merge_update as the normal backlog; the filter makes that semantically
1274    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1275    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1276        try_stream! {
1277            let filter = Predicate::And(vec![
1278                Predicate::IsNotNull("search_text"),
1279                Predicate::Or(vec![
1280                    Predicate::IsNull("vector"),
1281                    Predicate::Ne("embedding_model", embed::model_id().into()),
1282                ]),
1283            ]);
1284            let projection: &[&str] = &["session_id", "id", "search_text"];
1285            let scanner = self
1286                .handle
1287                .scan(
1288                    Table::Messages,
1289                    ScanOpts::with_predicate_and_projection(&filter, projection),
1290                )
1291                .await?;
1292            let mut batches = scanner
1293                .try_into_stream()
1294                .await
1295                .context("failed to open pending-or-stale messages stream")?;
1296            while let Some(batch) = batches.next().await {
1297                let batch = batch?;
1298                for row in 0..batch.num_rows() {
1299                    yield PendingMessage {
1300                        session_id: string(&batch, "session_id", row)?
1301                            .context("session_id is null")?,
1302                        id: string(&batch, "id", row)?.context("message id is null")?,
1303                        search_text: string(&batch, "search_text", row)?
1304                            .context("search_text is null")?,
1305                    };
1306                }
1307            }
1308        }
1309    }
1310
1311    /// BM25 full-text retriever over `messages.search_text`.
1312    pub async fn fts_search(
1313        &self,
1314        query: &str,
1315        limit: usize,
1316        filter: &Predicate,
1317    ) -> Result<Vec<(MessageKey, f32)>> {
1318        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1319        scanner.full_text_search(
1320            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1321        )?;
1322        // Lance ships an autoprojection that silently appends `_score` to FTS
1323        // output when the projection omits it. That behavior is going away;
1324        // we opt into the future explicit-projection contract here so the
1325        // scanner stops emitting a per-call deprecation warning, and we list
1326        // `_score` ourselves since the loop below reads it.
1327        scanner.disable_scoring_autoprojection();
1328        scanner.project(&["session_id", "id", "_score"])?;
1329        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1330        let batch = scanner.try_into_batch().await?;
1331        let mut hits = Vec::with_capacity(batch.num_rows());
1332        for row in 0..batch.num_rows() {
1333            let key = MessageKey {
1334                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1335                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1336            };
1337            hits.push((key, float32(&batch, "_score", row)?));
1338        }
1339        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1340        // order, which varies between runs and across calls with different pool
1341        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1342        // different orderings at the same tied score). Without an explicit
1343        // tiebreak the downstream RRF dedup-rank for a tied target session can
1344        // flip session-to-session, making fusion outcomes nondeterministic.
1345        // Sort by `score desc`, then `(session_id, message_id)` asc.
1346        hits.sort_by(|left, right| {
1347            right
1348                .1
1349                .partial_cmp(&left.1)
1350                .unwrap_or(std::cmp::Ordering::Equal)
1351                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1352                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1353        });
1354        Ok(hits)
1355    }
1356
1357    /// Count of searchable messages (non-null `search_text`) inside the
1358    /// caller's filter scope - the universe a search actually ran over.
1359    /// Powers the response's absence honesty (spec.md#search): "no relevant
1360    /// hits" only means something relative to how many messages were
1361    /// searchable at all, and 0 tells the caller their filters excluded
1362    /// everything before retrieval even started.
1363    pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1364        let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1365        let dataset = self.handle.dataset(Table::Messages).await?;
1366        dataset
1367            .count_rows(Some(scope.to_lance()))
1368            .await
1369            .map_err(Into::into)
1370    }
1371
1372    /// Whether any `messages` row carries a vector (spec.md#search) - the
1373    /// signal that flips search from FTS-only to hybrid. The single-active-
1374    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1375    /// vector belongs to the current model.
1376    pub async fn has_embeddings(&self) -> Result<bool> {
1377        let scope = Predicate::IsNotNull("vector");
1378        let mut scanner = self
1379            .handle
1380            .scan(
1381                Table::Messages,
1382                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1383            )
1384            .await?;
1385        scanner.limit(Some(1), None)?;
1386        let batch = scanner.try_into_batch().await?;
1387        Ok(batch.num_rows() > 0)
1388    }
1389
1390    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1391    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1392    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1393    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1394    /// skip them anyway). The single-active-model invariant lets pond drop
1395    /// the per-row model filter: every non-null vector belongs to the current
1396    /// model.
1397    pub async fn vector_search(
1398        &self,
1399        query: &[f32],
1400        limit: usize,
1401        filter: &Predicate,
1402        search: Option<&config::SearchConfig>,
1403    ) -> Result<Vec<(MessageKey, f32)>> {
1404        let scope = embedded_scope(filter);
1405        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1406        let key = Float32Array::from(query.to_vec());
1407        scanner.nearest("vector", &key, limit)?;
1408        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1409            scanner.nprobes(nprobes);
1410        }
1411        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1412            scanner.refine(refine_factor);
1413        }
1414        // Mirror the explicit-projection contract from `fts_search`: opt out
1415        // of `_distance` autoprojection and list it ourselves since the loop
1416        // below reads it.
1417        scanner.disable_scoring_autoprojection();
1418        scanner.project(&["session_id", "id", "_distance"])?;
1419        let batch = scanner.try_into_batch().await?;
1420        let mut hits = Vec::with_capacity(batch.num_rows());
1421        for row in 0..batch.num_rows() {
1422            let key = MessageKey {
1423                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1424                message_id: string(&batch, "id", row)?.context("message id is null")?,
1425            };
1426            hits.push((key, float32(&batch, "_distance", row)?));
1427        }
1428        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1429        // emit hits with effectively identical `_distance` in fragment-dependent
1430        // order, which makes RRF dedup-ranks nondeterministic for tied
1431        // neighbors. Sort by distance asc (smaller = more similar), then by
1432        // `(session_id, message_id)` asc.
1433        hits.sort_by(|left, right| {
1434            left.1
1435                .partial_cmp(&right.1)
1436                .unwrap_or(std::cmp::Ordering::Equal)
1437                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1438                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1439        });
1440        Ok(hits)
1441    }
1442
1443    /// The DataFusion plan string for a filtered vector scan - the
1444    /// `search-prefilter-pushdown` regression guard reads it.
1445    pub async fn explain_vector_plan(
1446        &self,
1447        query: &[f32],
1448        limit: usize,
1449        filter: &Predicate,
1450        search: Option<&config::SearchConfig>,
1451    ) -> Result<String> {
1452        let scope = embedded_scope(filter);
1453        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1454        let key = Float32Array::from(query.to_vec());
1455        scanner.nearest("vector", &key, limit)?;
1456        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1457            scanner.nprobes(nprobes);
1458        }
1459        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1460            scanner.refine(refine_factor);
1461        }
1462        scanner
1463            .explain_plan(true)
1464            .await
1465            .context("explain_plan failed")
1466    }
1467
1468    pub async fn explain_fts_plan(
1469        &self,
1470        query: &str,
1471        limit: usize,
1472        filter: &Predicate,
1473    ) -> Result<String> {
1474        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1475        scanner.full_text_search(
1476            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1477        )?;
1478        scanner.project(&["session_id", "id"])?;
1479        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1480        scanner
1481            .explain_plan(true)
1482            .await
1483            .context("explain_plan failed")
1484    }
1485
1486    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1487    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1488        if keys.is_empty() {
1489            return Ok(Vec::new());
1490        }
1491        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1492        let session_ids = keys
1493            .iter()
1494            .map(|key| key.session_id.clone())
1495            .collect::<Vec<_>>();
1496        let message_ids = keys
1497            .iter()
1498            .map(|key| key.message_id.clone())
1499            .collect::<Vec<_>>();
1500        let predicate = Predicate::And(vec![
1501            in_predicate("session_id", &session_ids),
1502            in_predicate("id", &message_ids),
1503        ]);
1504        let batch = self
1505            .handle
1506            .scan_batch(
1507                Table::Messages,
1508                Some(&predicate),
1509                &[
1510                    "id",
1511                    "session_id",
1512                    "role",
1513                    "project",
1514                    "source_agent",
1515                    "timestamp",
1516                    "search_text",
1517                ],
1518            )
1519            .await?;
1520        let mut metas = Vec::with_capacity(batch.num_rows());
1521        for row in 0..batch.num_rows() {
1522            let message_id = string(&batch, "id", row)?.context("id is null")?;
1523            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1524            if !wanted.contains(&MessageKey {
1525                session_id: session_id.clone(),
1526                message_id: message_id.clone(),
1527            }) {
1528                continue;
1529            }
1530            metas.push(MessageMeta {
1531                message_id,
1532                session_id,
1533                role: string(&batch, "role", row)?.context("role is null")?,
1534                project: string(&batch, "project", row)?.context("project is null")?,
1535                source_agent: string(&batch, "source_agent", row)?
1536                    .context("source_agent is null")?,
1537                timestamp: datetime(&batch, "timestamp", row)?,
1538                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1539            });
1540        }
1541        Ok(metas)
1542    }
1543
1544    /// Total message count per session, for search session summaries.
1545    pub async fn session_message_counts(
1546        &self,
1547        session_ids: &[String],
1548    ) -> Result<BTreeMap<String, usize>> {
1549        if session_ids.is_empty() {
1550            return Ok(BTreeMap::new());
1551        }
1552        let dataset = self.handle.dataset(Table::Messages).await?;
1553        let mut tasks = tokio::task::JoinSet::new();
1554        for session_id in session_ids {
1555            let dataset = dataset.clone();
1556            let session_id = session_id.clone();
1557            tasks.spawn(async move {
1558                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1559                let count = dataset.count_rows(Some(filter)).await?;
1560                anyhow::Ok((session_id, count))
1561            });
1562        }
1563        let mut counts = BTreeMap::new();
1564        while let Some(joined) = tasks.join_next().await {
1565            let (session_id, count) = joined.context("session count task panicked")??;
1566            counts.insert(session_id, count);
1567        }
1568        Ok(counts)
1569    }
1570
1571    /// Rows appended to `messages` since the FTS index was last optimized.
1572    /// A missing index reports the whole table; the query is manifest-only.
1573    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1574        self.handle
1575            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1576            .await
1577    }
1578
1579    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1580    /// was last optimized. Below
1581    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1582    /// must read [`embedding_progress`](Self::embedding_progress) too and
1583    /// distinguish "index not built yet" from "index trails data".
1584    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1585        self.handle
1586            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1587            .await
1588    }
1589
1590    /// Embedding coverage: how many `messages` rows carry a vector and how
1591    /// many are still eligible. Drives the `pond status` embeddings line and
1592    /// the `pond embed` progress bar's known total. `embedded` reads the
1593    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1594    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1595    /// `embedding_model` column.
1596    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1597        let dataset = self.handle.dataset(Table::Messages).await?;
1598        let embedded = dataset
1599            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1600            .await?;
1601        let total = dataset
1602            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1603            .await?;
1604        Ok(EmbeddingProgress {
1605            embedded,
1606            total,
1607            model: embed::model_id(),
1608        })
1609    }
1610
1611    /// Count rows whose `embedding_model` is not the currently configured
1612    /// model AND whose `vector` is still populated - the signal `pond embed`
1613    /// uses to detect a model swap and require `--force`.
1614    pub async fn stale_embedding_count(&self) -> Result<usize> {
1615        let dataset = self.handle.dataset(Table::Messages).await?;
1616        dataset
1617            .count_rows(Some(
1618                Predicate::And(vec![
1619                    Predicate::IsNotNull("vector"),
1620                    Predicate::Ne("embedding_model", embed::model_id().into()),
1621                ])
1622                .to_lance(),
1623            ))
1624            .await
1625            .map_err(Into::into)
1626    }
1627
1628    /// Run the per-table maintenance cycle (compact + indices) across every
1629    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1630    /// and compaction commit independently, so a hot writer that starves
1631    /// compaction on one table does not abort the index work the operator
1632    /// asked for on other tables (or even on the same table).
1633    pub async fn optimize_indices(
1634        &self,
1635        progress: Option<OptimizeProgressFn>,
1636        maintenance: &MaintenancePolicy,
1637    ) -> Result<OptimizeOutcome> {
1638        let intents = pond_index_intents();
1639        let mut tables = Vec::with_capacity(3);
1640        for (table, intents) in intents.all() {
1641            let outcome = self
1642                .handle
1643                .optimize_table(table, intents, progress.as_ref(), maintenance)
1644                .await;
1645            tables.push(outcome);
1646        }
1647        Ok(OptimizeOutcome { tables })
1648    }
1649
1650    /// Fold trailing fragments into existing indices across every table,
1651    /// without running compaction. Used by `pond embed`'s tail so newly
1652    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1653    /// without paying the compaction retry budget while embed itself may
1654    /// still be writing in a sibling process.
1655    pub async fn build_indices_only(
1656        &self,
1657        progress: Option<OptimizeProgressFn>,
1658    ) -> Result<OptimizeOutcome> {
1659        let policy = pond_index_intents();
1660        let mut tables = Vec::with_capacity(3);
1661        for (table, intents) in policy.all() {
1662            let indices = self
1663                .handle
1664                .optimize_table_indices_only(table, intents, progress.as_ref())
1665                .await;
1666            tables.push(TableOptimizeOutcome {
1667                table,
1668                indices,
1669                compaction: PhaseOutcome::NotAttempted,
1670            });
1671        }
1672        Ok(OptimizeOutcome { tables })
1673    }
1674
1675    #[cfg(test)]
1676    async fn optimize_indices_with_vector_threshold(
1677        &self,
1678        vector_threshold: usize,
1679    ) -> Result<OptimizeOutcome> {
1680        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1681        let policy = MaintenancePolicy::always_compact();
1682        let mut tables = Vec::with_capacity(3);
1683        for (table, intents) in intents.all() {
1684            let outcome = self
1685                .handle
1686                .optimize_table(table, intents, None, &policy)
1687                .await;
1688            tables.push(outcome);
1689        }
1690        Ok(OptimizeOutcome { tables })
1691    }
1692
1693    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1694        let policy = pond_index_intents();
1695        let mut matched = false;
1696        for (table, intents) in policy.all() {
1697            for intent in intents {
1698                if intent_name.is_none_or(|name| name == intent.name) {
1699                    matched = true;
1700                    self.handle.rebuild_index(table, intent).await?;
1701                }
1702            }
1703        }
1704        if let Some(name) = intent_name
1705            && !matched
1706        {
1707            anyhow::bail!("unknown index intent {name:?}");
1708        }
1709        Ok(())
1710    }
1711
1712    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1713        let policy = pond_index_intents();
1714        let mut statuses = Vec::new();
1715        for (table, intents) in policy.all() {
1716            statuses.extend(self.handle.index_status(table, intents).await?);
1717        }
1718        Ok(statuses)
1719    }
1720
1721    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1722    /// --force` before re-bootstrapping under a different model. Silent
1723    /// when the index does not exist.
1724    pub async fn drop_vector_index(&self) -> Result<()> {
1725        match self
1726            .handle
1727            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1728            .await
1729        {
1730            Ok(()) => Ok(()),
1731            Err(error) => {
1732                let msg = error.to_string();
1733                if msg.contains("not found") || msg.contains("does not exist") {
1734                    Ok(())
1735                } else {
1736                    Err(error)
1737                }
1738            }
1739        }
1740    }
1741
1742    /// On-disk byte totals per dataset, sized through Lance's object store
1743    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1744    pub async fn table_sizes(&self) -> Result<TableSizes> {
1745        self.handle.table_sizes().await
1746    }
1747
1748    pub async fn initialized(&self) -> Result<bool> {
1749        self.handle.initialized().await
1750    }
1751
1752    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1753        let batch = self
1754            .handle
1755            .scan_batch(
1756                Table::Sessions,
1757                Some(&Predicate::Eq("id", session_id.into())),
1758                &[],
1759            )
1760            .await?;
1761        if batch.num_rows() == 0 {
1762            Ok(None)
1763        } else {
1764            Ok(Some(session_from_batch(&batch, 0)?))
1765        }
1766    }
1767
1768    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1769        let batch = self
1770            .handle
1771            .scan_batch(
1772                Table::Messages,
1773                Some(&Predicate::Eq("session_id", session_id.into())),
1774                &[
1775                    "session_id",
1776                    "id",
1777                    "timestamp",
1778                    "role",
1779                    "content",
1780                    "options",
1781                ],
1782            )
1783            .await?;
1784        let mut messages = Vec::with_capacity(batch.num_rows());
1785        for row in 0..batch.num_rows() {
1786            messages.push(message_from_batch(&batch, row)?);
1787        }
1788        messages.sort_by(|left, right| {
1789            left.timestamp()
1790                .cmp(&right.timestamp())
1791                .then_with(|| left.id().cmp(right.id()))
1792        });
1793
1794        let message_ids = messages
1795            .iter()
1796            .map(|message| message.id().to_owned())
1797            .collect::<Vec<_>>();
1798        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1799
1800        Ok(messages
1801            .into_iter()
1802            .map(|message| {
1803                let key = (message.session_id().to_owned(), message.id().to_owned());
1804                let parts = parts_by_message.remove(&key).unwrap_or_default();
1805                MessageWithParts { message, parts }
1806            })
1807            .collect())
1808    }
1809
1810    /// Every part of these messages, full fidelity (file blobs included). The
1811    /// canonical read primitive - restore/export, verbatim mode, and the
1812    /// message-mode target all need the complete set.
1813    pub async fn parts_for_messages(
1814        &self,
1815        session_id: &str,
1816        message_ids: &[String],
1817    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1818        self.scan_parts(session_id, message_ids, None).await
1819    }
1820
1821    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1822    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1823    /// nothing. For the summary-only reads (conversational/complete session
1824    /// views, search hits) - it never feeds restore/export.
1825    pub async fn summary_parts_for_messages(
1826        &self,
1827        session_id: &str,
1828        message_ids: &[String],
1829    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1830        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1831            .await
1832    }
1833
1834    async fn scan_parts(
1835        &self,
1836        session_id: &str,
1837        message_ids: &[String],
1838        part_types: Option<&[&str]>,
1839    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1840        if message_ids.is_empty() {
1841            return Ok(BTreeMap::new());
1842        }
1843        let mut clauses = vec![
1844            Predicate::Eq("session_id", session_id.into()),
1845            in_predicate("message_id", message_ids),
1846        ];
1847        if let Some(types) = part_types {
1848            clauses.push(Predicate::In(
1849                "type",
1850                types.iter().map(|&t| t.into()).collect(),
1851            ));
1852        }
1853        let predicate = Predicate::And(clauses);
1854        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1855        let mut scanner = self
1856            .handle
1857            .scan(
1858                Table::Parts,
1859                ScanOpts::with_predicate_and_projection(
1860                    &predicate,
1861                    &[
1862                        "session_id",
1863                        "message_id",
1864                        "id",
1865                        "ordinal",
1866                        "type",
1867                        "provenance",
1868                        "variant_data",
1869                        "options",
1870                    ],
1871                ),
1872            )
1873            .await?;
1874        scanner.with_row_address();
1875        let batch = scanner.try_into_batch().await.context("scan failed")?;
1876        let row_addresses = uint64(&batch, "_rowaddr")?;
1877        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1878        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1879        for row in 0..batch.num_rows() {
1880            if string(&batch, "type", row)?.as_deref() == Some("file") {
1881                let variant_data =
1882                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1883                file_rows.push((row, row_addresses.value(row), variant_data));
1884            }
1885        }
1886        if !file_rows.is_empty() {
1887            let addresses = file_rows
1888                .iter()
1889                .map(|(_, address, _)| *address)
1890                .collect::<Vec<_>>();
1891            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1892            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1893                // Legacy blob (lance-encoding:blob): payload is bytes; the
1894                // url variant stored its URL as UTF-8 bytes, recovered via
1895                // `file_data_from_blob`'s `data_kind = "url"` branch.
1896                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1897                file_payloads.insert(row, payload);
1898            }
1899        }
1900        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1901        for row in 0..batch.num_rows() {
1902            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1903            parts_by_message
1904                .entry((part.session_id.clone(), part.message_id.clone()))
1905                .or_default()
1906                .push(part);
1907        }
1908        for parts in parts_by_message.values_mut() {
1909            parts.sort_by_key(|part| part.ordinal);
1910        }
1911        Ok(parts_by_message)
1912    }
1913}
1914
1915#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1916#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1917pub enum IngestEvent {
1918    Session(Session),
1919    Message(Message),
1920    Part(Part),
1921}
1922
1923/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1924/// The wire layer (`pond_ingest`) instead returns per-row results; the
1925/// aggregate is derived from those at the wire boundary.
1926///
1927/// Fields are bucketed by population so the summary never conflates "100
1928/// validator-rejected rows in 1 bad session" with "100 separate failures."
1929/// The shape is set by spec.md#adapter-integrity-event-ordering.
1930#[derive(Debug, Clone, PartialEq, Eq, Default)]
1931pub struct IngestSummary {
1932    /// Rows actually written to Lance, summed across all three tables.
1933    /// Use the per-table fields below for user-facing counts; this stays
1934    /// for `accepted()` and existing wire callers.
1935    pub inserted: usize,
1936    /// Rows that already existed (merge_insert no-op match).
1937    pub matched: usize,
1938    /// Session rows inserted this pass.
1939    pub sessions_inserted: usize,
1940    /// Message rows inserted this pass (total - includes tool calls,
1941    /// tool results, and other non-searchable messages).
1942    pub messages_inserted_total: usize,
1943    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1944    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1945    /// count in `pond sync` / `pond status` reads this field.
1946    pub messages_inserted_searchable: usize,
1947    /// Part rows inserted this pass.
1948    pub parts_inserted: usize,
1949    /// Session rows already-present (merge_insert matched).
1950    pub sessions_matched: usize,
1951    /// Message rows already-present (merge_insert matched), total.
1952    pub messages_matched_total: usize,
1953    /// Subset of `messages_matched_total` with `search_text`.
1954    pub messages_matched_searchable: usize,
1955    /// Part rows already-present.
1956    pub parts_matched: usize,
1957    /// Events the validator dropped under per-event-drop policy (ordering
1958    /// violation, orphan part, mismatched parent, adapter parse failure,
1959    /// duplicate-id collision, ...). Counted by event, not by session: a
1960    /// session with one bad part stays in this bucket as 1, not as "the
1961    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1962    /// own emissions upstream when source replay is expected; the
1963    /// validator's in-batch HashSet is a safety net, not a feature
1964    /// adapters may rely on. If this bucket grows on a clean adapter,
1965    /// inspect `drop_reasons` for the top contributors.
1966    pub dropped_events: usize,
1967    /// Sessions whose Session-level invariants (immutable `source_agent` /
1968    /// `project` against the stored row) failed at flush time and
1969    /// whose substream got rejected wholesale. Always small relative to
1970    /// `inserted`; if not, there's a real problem to investigate.
1971    pub dropped_sessions: usize,
1972    /// Files the adapter couldn't decode at all (no Session header
1973    /// extractable: empty `.jsonl`, missing required field).
1974    pub skipped_files: usize,
1975    /// Files that produced no importable session and were benignly skipped:
1976    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1977    /// metadata file), or an unextractable header. Never an error or a drop;
1978    /// the underlying cause is logged at `-vv` (debug) verbosity.
1979    pub skipped_empty: usize,
1980    /// Sessions short-circuited via the per-session staleness skip
1981    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1982    /// pond last wrote that session's row, so re-decode was bypassed.
1983    pub skipped_fresh: usize,
1984    /// Storage-layer failures whose retries were exhausted (commit
1985    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1986    /// runs.
1987    pub storage_errors: usize,
1988    /// Oversized values truncated to a bounded sentinel at the seam
1989    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1990    pub truncated_values: usize,
1991    /// Histogram of stable reason keys for the combined `dropped_events +
1992    /// dropped_sessions` populations. Keys are `&'static str` (see the
1993    /// `DROP_REASON_*` constants) so consumers can match by identity.
1994    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1995    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1996    pub drop_reasons: BTreeMap<&'static str, usize>,
1997}
1998
1999/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
2000/// the per-row `RowError::reason_key`. `&'static str` so consumers can
2001/// match by identity rather than prose. Adding a new variant: pick a short
2002/// snake_case identifier, route it from the validator/adapter, and update
2003/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
2004pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2005pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2006pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2007pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2008pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2009pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2010pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2011pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2012pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2013pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2014pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2015
2016/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
2017/// returned counts together with the pre-existence sets captured by
2018/// `upsert_session_batch`. Folded into a per-sync summary via
2019/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
2020/// is a no-op write, so the inserted/matched split is informational - we still
2021/// surface it because both `pond sync` and `pond_ingest` clients reconcile
2022/// against "which rows landed this call."
2023#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2024pub struct BatchCounts {
2025    pub sessions_inserted: usize,
2026    pub sessions_matched: usize,
2027    pub messages_inserted_total: usize,
2028    pub messages_inserted_searchable: usize,
2029    pub messages_matched_total: usize,
2030    pub messages_matched_searchable: usize,
2031    pub parts_inserted: usize,
2032    pub parts_matched: usize,
2033}
2034
2035impl IngestSummary {
2036    pub fn accepted(&self) -> usize {
2037        self.inserted + self.matched
2038    }
2039
2040    /// Sole writer of the per-table counters on the CLI batched flush path.
2041    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2042    /// both for the same rows would double-count.
2043    pub fn add_batch(&mut self, counts: &BatchCounts) {
2044        self.sessions_inserted += counts.sessions_inserted;
2045        self.sessions_matched += counts.sessions_matched;
2046        self.messages_inserted_total += counts.messages_inserted_total;
2047        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2048        self.messages_matched_total += counts.messages_matched_total;
2049        self.messages_matched_searchable += counts.messages_matched_searchable;
2050        self.parts_inserted += counts.parts_inserted;
2051        self.parts_matched += counts.parts_matched;
2052        self.inserted +=
2053            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2054        self.matched +=
2055            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2056    }
2057
2058    /// Sum every counter from `other` into `self`. Used by the multi-source
2059    /// `pond sync` loop so adding a new field to this struct doesn't silently
2060    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2061    pub fn merge(&mut self, other: &Self) {
2062        self.inserted += other.inserted;
2063        self.matched += other.matched;
2064        self.sessions_inserted += other.sessions_inserted;
2065        self.messages_inserted_total += other.messages_inserted_total;
2066        self.messages_inserted_searchable += other.messages_inserted_searchable;
2067        self.parts_inserted += other.parts_inserted;
2068        self.sessions_matched += other.sessions_matched;
2069        self.messages_matched_total += other.messages_matched_total;
2070        self.messages_matched_searchable += other.messages_matched_searchable;
2071        self.parts_matched += other.parts_matched;
2072        self.dropped_events += other.dropped_events;
2073        self.dropped_sessions += other.dropped_sessions;
2074        self.skipped_files += other.skipped_files;
2075        self.skipped_empty += other.skipped_empty;
2076        self.skipped_fresh += other.skipped_fresh;
2077        self.storage_errors += other.storage_errors;
2078        self.truncated_values += other.truncated_values;
2079        for (key, value) in &other.drop_reasons {
2080            *self.drop_reasons.entry(key).or_insert(0) += value;
2081        }
2082    }
2083
2084    /// Same dispatch as [`Self::add_outcomes`] but ignores
2085    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2086    /// via [`Self::add_batch`] and uses this method to attribute per-row
2087    /// `Error` outcomes from the same flush.
2088    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2089        for outcome in outcomes {
2090            if !matches!(outcome.status, OutcomeStatus::Error) {
2091                continue;
2092            }
2093            if outcome.kind == "session" {
2094                self.dropped_sessions += 1;
2095            } else {
2096                self.dropped_events += 1;
2097            }
2098            let reason = outcome
2099                .error
2100                .as_ref()
2101                .and_then(|error| error.reason_key)
2102                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2103            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2104        }
2105    }
2106
2107    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2108        for outcome in outcomes {
2109            match outcome.status {
2110                OutcomeStatus::Inserted => {
2111                    self.inserted += 1;
2112                    match outcome.kind {
2113                        "session" => self.sessions_inserted += 1,
2114                        "message" => {
2115                            self.messages_inserted_total += 1;
2116                            if outcome.searchable {
2117                                self.messages_inserted_searchable += 1;
2118                            }
2119                        }
2120                        "part" => self.parts_inserted += 1,
2121                        _ => {}
2122                    }
2123                }
2124                OutcomeStatus::Matched => {
2125                    self.matched += 1;
2126                    match outcome.kind {
2127                        "session" => self.sessions_matched += 1,
2128                        "message" => {
2129                            self.messages_matched_total += 1;
2130                            if outcome.searchable {
2131                                self.messages_matched_searchable += 1;
2132                            }
2133                        }
2134                        "part" => self.parts_matched += 1,
2135                        _ => {}
2136                    }
2137                }
2138                OutcomeStatus::Error => {
2139                    // Session-level rejection: exactly one session-kind Error
2140                    // outcome (see `error_outcomes_for_substream`). Per-event
2141                    // drop: one Error per message/part. The two populations
2142                    // are counted separately so the operator can tell a
2143                    // structural reject from a row-level skip.
2144                    if outcome.kind == "session" {
2145                        self.dropped_sessions += 1;
2146                    } else {
2147                        self.dropped_events += 1;
2148                    }
2149                    let reason = outcome
2150                        .error
2151                        .as_ref()
2152                        .and_then(|e| e.reason_key)
2153                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2154                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2155                }
2156            }
2157        }
2158    }
2159}
2160
2161/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2162/// row per input event from the request's `events` array. The validator
2163/// returns these in array order so the wire layer can pack them directly
2164/// into [`crate::wire::IngestResult`] entries.
2165#[derive(Debug, Clone, PartialEq)]
2166pub struct RowOutcome {
2167    pub index: usize,
2168    pub kind: &'static str,
2169    pub pk: Value,
2170    pub status: OutcomeStatus,
2171    pub error: Option<RowError>,
2172    /// True iff `kind == "message"` AND the underlying row carries
2173    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2174    /// so the CLI can show "searchable" message deltas distinct from raw
2175    /// inserts. Always false for session/part rows.
2176    pub searchable: bool,
2177}
2178
2179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2180pub enum OutcomeStatus {
2181    Inserted,
2182    Matched,
2183    Error,
2184}
2185
2186/// Structured per-row error body. Mirrors the wire shape so the handler
2187/// can pass it straight through.
2188#[derive(Debug, Clone, PartialEq, Eq)]
2189pub struct RowError {
2190    pub message: String,
2191    pub field: Option<&'static str>,
2192    pub reason: Option<&'static str>,
2193    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2194    /// `reason` field above is human-prose; `reason_key` is the machine
2195    /// bucket. `None` means uncategorized; consumers attribute to
2196    /// `DROP_REASON_UNCATEGORIZED`.
2197    pub reason_key: Option<&'static str>,
2198}
2199
2200/// Buffered session events tagged with their input array index, so the
2201/// per-row outcomes can be re-attributed once `merge_insert` returns its
2202/// per-row Inserted/Matched stats.
2203#[derive(Debug)]
2204struct BufferedSession {
2205    index: usize,
2206    session: Session,
2207}
2208
2209#[derive(Debug)]
2210struct BufferedMessage {
2211    index: usize,
2212    message: Message,
2213    parts: Vec<BufferedPart>,
2214    search_text: Option<String>,
2215}
2216
2217#[derive(Debug)]
2218struct BufferedPart {
2219    index: usize,
2220    part: Part,
2221}
2222
2223/// State machine that turns the `events: Vec<IngestEvent>` array into a
2224/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2225/// session substream so `merge_insert` runs once per substream (three
2226/// batches: sessions, messages, parts). A validation error on a single event
2227/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2228/// continues; only Session-level invariants (immutable source_agent / project
2229/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2230///
2231/// Writes are batched at flush time. As complete substreams arrive (a new
2232/// `Session` event closes out the current one), they accumulate in
2233/// `completed` rather than each one calling `merge_insert` immediately.
2234/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2235/// at which point one batched 3-parallel-merge-insert covers all pending
2236/// substreams. This is the load-bearing perf change: per-substream commit
2237/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2238/// and amortizing it across N sessions cuts wall time materially.
2239#[derive(Debug, Default)]
2240pub struct IngestValidator {
2241    session: Option<BufferedSession>,
2242    current_message: Option<BufferedMessage>,
2243    current_parts: Vec<BufferedPart>,
2244    messages: Vec<BufferedMessage>,
2245    /// Message ids already buffered in the current substream. Duplicate ids
2246    /// drop the offending event in-line rather than failing the whole batch
2247    /// downstream.
2248    seen_message_ids: HashSet<String>,
2249    /// `(message_id, part_id)` keys already buffered in the current
2250    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2251    seen_part_keys: HashSet<(String, String)>,
2252    /// Substreams whose end-of-stream boundary has been observed but whose
2253    /// rows haven't been written yet. Flushed in batched mode by
2254    /// [`Self::flush`].
2255    completed: Vec<CompletedSubstream>,
2256}
2257
2258/// One closed substream ready for the batched flush path.
2259#[derive(Debug)]
2260struct CompletedSubstream {
2261    session_index: usize,
2262    session: Session,
2263    messages: Vec<BufferedMessage>,
2264}
2265
2266/// Ingest host provenance (`options.pond`, spec.md#model-pond-options),
2267/// computed once per process. An audit fact - "the process that inserted this
2268/// row" - not identity. Fallible lookups are omitted, never synthesized as
2269/// placeholders.
2270fn ingest_host_stamp() -> Option<&'static Value> {
2271    static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2272    STAMP
2273        .get_or_init(|| {
2274            let mut host = serde_json::Map::new();
2275            if let Ok(username) = whoami::username() {
2276                host.insert("username".to_owned(), username.into());
2277            }
2278            if let Ok(hostname) = whoami::hostname() {
2279                host.insert("hostname".to_owned(), hostname.into());
2280            }
2281            if let Ok(devicename) = whoami::devicename() {
2282                host.insert("device_name".to_owned(), devicename.into());
2283            }
2284            (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2285        })
2286        .as_ref()
2287}
2288
2289impl IngestValidator {
2290    /// Drive one input event through the validator. Returns the per-row
2291    /// outcomes the event triggered: empty when the event is just buffered,
2292    /// or N entries when a session substream just flushed (success or
2293    /// failure). `Err` is reserved for catastrophic storage failures that
2294    /// should fail the whole `pond_ingest` request.
2295    pub async fn push(
2296        &mut self,
2297        store: &Store,
2298        index: usize,
2299        event: IngestEvent,
2300    ) -> Result<Vec<RowOutcome>> {
2301        match event {
2302            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2303            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2304            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2305        }
2306    }
2307
2308    /// Final flush at end-of-batch. Closes the in-flight substream and
2309    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2310    /// the wire layer) alongside the honest per-table counts (for
2311    /// `IngestSummary::add_batch`).
2312    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2313        self.close_current_substream();
2314        self.flush(store).await
2315    }
2316
2317    /// Drain every completed substream into batched 3-parallel-merge_insert
2318    /// writes. Caller invokes this periodically (every N completed
2319    /// substreams) to keep memory bounded; in adapter-driven sync that
2320    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2321    /// in-flight substream stays buffered - close it explicitly via
2322    /// [`Self::finish`] or by feeding the next Session event.
2323    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2324        if self.completed.is_empty() {
2325            return Ok((Vec::new(), BatchCounts::default()));
2326        }
2327        let completed = std::mem::take(&mut self.completed);
2328        store.upsert_session_batch(completed).await
2329    }
2330
2331    /// Number of fully-buffered substreams awaiting batched write. Used by
2332    /// the adapter caller to decide when to call [`Self::flush`].
2333    pub fn pending_substreams(&self) -> usize {
2334        self.completed.len()
2335    }
2336
2337    async fn push_session(
2338        &mut self,
2339        _store: &Store,
2340        index: usize,
2341        mut session: Session,
2342    ) -> Result<Vec<RowOutcome>> {
2343        // Close out the current substream (if any) - move it to the pending
2344        // buffer instead of writing immediately. The actual write happens
2345        // when the caller invokes `flush` / `finish`.
2346        self.close_current_substream();
2347
2348        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2349        // if empty after trim. A Session event with empty source_agent is
2350        // dropped on the spot - the substream that would follow has nothing
2351        // to anchor on, so subsequent message/part events will also drop.
2352        let trimmed = session.source_agent.trim();
2353        if trimmed.is_empty() {
2354            return Ok(vec![RowOutcome {
2355                index,
2356                kind: "session",
2357                pk: Value::String(session.id.clone()),
2358                status: OutcomeStatus::Error,
2359                error: Some(RowError {
2360                    message: format!("session {} has empty source_agent after trim", session.id),
2361                    field: Some("source_agent"),
2362                    reason: None,
2363                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2364                }),
2365                searchable: false,
2366            }]);
2367        }
2368        if trimmed.len() != session.source_agent.len() {
2369            session.source_agent = trimmed.to_owned();
2370        }
2371
2372        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2373            return Ok(vec![RowOutcome {
2374                index,
2375                kind: "session",
2376                pk: Value::String(session.id.clone()),
2377                status: OutcomeStatus::Error,
2378                error: Some(RowError {
2379                    message: format!(
2380                        "session {} has parent_message_id without parent_session_id",
2381                        session.id,
2382                    ),
2383                    field: Some("parent_message_id"),
2384                    reason: None,
2385                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2386                }),
2387                searchable: false,
2388            }]);
2389        }
2390
2391        self.seen_message_ids.clear();
2392        self.seen_part_keys.clear();
2393        self.session = Some(BufferedSession { index, session });
2394        Ok(Vec::new())
2395    }
2396
2397    fn close_current_substream(&mut self) {
2398        self.flush_current_message();
2399        let Some(BufferedSession {
2400            index: session_index,
2401            session,
2402        }) = self.session.take()
2403        else {
2404            return;
2405        };
2406        let messages = std::mem::take(&mut self.messages);
2407        self.seen_message_ids.clear();
2408        self.seen_part_keys.clear();
2409        self.completed.push(CompletedSubstream {
2410            session_index,
2411            session,
2412            messages,
2413        });
2414    }
2415
2416    fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2417        let pk = Value::Array(vec![
2418            Value::String(message.session_id().to_owned()),
2419            Value::String(message.id().to_owned()),
2420        ]);
2421        let Some(session) = &self.session else {
2422            return vec![error_outcome(
2423                index,
2424                "message",
2425                pk,
2426                "first event in a session stream must be Session",
2427                None,
2428                DROP_REASON_MESSAGE_BEFORE_SESSION,
2429            )];
2430        };
2431        if message.session_id() != session.session.id {
2432            let msg = format!(
2433                "message {} references session {}, expected {}",
2434                message.id(),
2435                message.session_id(),
2436                session.session.id
2437            );
2438            return vec![error_outcome(
2439                index,
2440                "message",
2441                pk,
2442                &msg,
2443                Some("session_id"),
2444                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2445            )];
2446        }
2447        if !self.seen_message_ids.insert(message.id().to_owned()) {
2448            // Keep same-substream duplicate ids visible in `dropped_events`;
2449            // adapters are expected to dedupe upstream (see claude-code's
2450            // per-file `seen_uuids`), so a hit here is worth investigating.
2451            let msg = format!("duplicate message id {} in session substream", message.id());
2452            return vec![error_outcome(
2453                index,
2454                "message",
2455                pk,
2456                &msg,
2457                None,
2458                DROP_REASON_DUPLICATE_MESSAGE_ID,
2459            )];
2460        }
2461        // `options.pond` is core-owned (spec.md#model-pond-options): stripped
2462        // and restamped at ingest so neither adapters nor wire clients can
2463        // spoof provenance. Matched rows are merge_insert no-ops, so re-ingest
2464        // never restamps stored rows.
2465        match ingest_host_stamp() {
2466            Some(stamp) => {
2467                message
2468                    .options_mut()
2469                    .insert("pond".to_owned(), stamp.clone());
2470            }
2471            None => {
2472                message.options_mut().remove("pond");
2473            }
2474        }
2475        self.flush_current_message();
2476        self.current_message = Some(BufferedMessage {
2477            index,
2478            message,
2479            parts: Vec::new(),
2480            search_text: None,
2481        });
2482        Vec::new()
2483    }
2484
2485    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2486        let pk = Value::Array(vec![
2487            Value::String(part.session_id.clone()),
2488            Value::String(part.message_id.clone()),
2489            Value::String(part.id.clone()),
2490        ]);
2491        let Some(current) = &self.current_message else {
2492            return vec![error_outcome(
2493                index,
2494                "part",
2495                pk,
2496                "part event appeared before a message",
2497                None,
2498                DROP_REASON_PART_BEFORE_MESSAGE,
2499            )];
2500        };
2501        if part.session_id != current.message.session_id() {
2502            let msg = format!(
2503                "part {} references session {}, expected {}",
2504                part.id,
2505                part.session_id,
2506                current.message.session_id()
2507            );
2508            return vec![error_outcome(
2509                index,
2510                "part",
2511                pk,
2512                &msg,
2513                Some("session_id"),
2514                DROP_REASON_PART_MESSAGE_MISMATCH,
2515            )];
2516        }
2517        if part.message_id != current.message.id() {
2518            let msg = format!(
2519                "part {} references message {}, expected {}",
2520                part.id,
2521                part.message_id,
2522                current.message.id()
2523            );
2524            return vec![error_outcome(
2525                index,
2526                "part",
2527                pk,
2528                &msg,
2529                Some("message_id"),
2530                DROP_REASON_PART_MESSAGE_MISMATCH,
2531            )];
2532        }
2533        let part_key = (part.message_id.clone(), part.id.clone());
2534        if !self.seen_part_keys.insert(part_key) {
2535            let msg = format!(
2536                "duplicate part id {} for message {} in session substream",
2537                part.id, part.message_id
2538            );
2539            return vec![error_outcome(
2540                index,
2541                "part",
2542                pk,
2543                &msg,
2544                None,
2545                DROP_REASON_DUPLICATE_PART_KEY,
2546            )];
2547        }
2548        self.current_parts.push(BufferedPart { index, part });
2549        Vec::new()
2550    }
2551
2552    fn flush_current_message(&mut self) {
2553        let Some(mut buffered) = self.current_message.take() else {
2554            return;
2555        };
2556        let parts = std::mem::take(&mut self.current_parts);
2557        let mut canonical_parts = Vec::with_capacity(parts.len());
2558        for part in &parts {
2559            canonical_parts.push(part.part.clone());
2560        }
2561        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2562        buffered.parts = parts;
2563        self.messages.push(buffered);
2564    }
2565}
2566
2567fn error_outcome(
2568    index: usize,
2569    kind: &'static str,
2570    pk: Value,
2571    message: &str,
2572    field: Option<&'static str>,
2573    reason_key: &'static str,
2574) -> RowOutcome {
2575    RowOutcome {
2576        index,
2577        kind,
2578        pk,
2579        status: OutcomeStatus::Error,
2580        error: Some(RowError {
2581            message: message.to_owned(),
2582            field,
2583            reason: None,
2584            reason_key: Some(reason_key),
2585        }),
2586        searchable: false,
2587    }
2588}
2589
2590/// Session-level rejection (immutable `source_agent` / `project` violation):
2591/// emit exactly one Error outcome on the Session row. The buffered messages
2592/// and parts of this substream are *not* surfaced as per-row errors - their
2593/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2594fn error_outcomes_for_substream(
2595    session_index: usize,
2596    session: &Session,
2597    _messages: &[BufferedMessage],
2598    message: impl Into<String>,
2599    field: Option<&'static str>,
2600    reason_key: &'static str,
2601) -> Vec<RowOutcome> {
2602    let reason = field.map(|_| "immutable");
2603    vec![RowOutcome {
2604        index: session_index,
2605        kind: "session",
2606        pk: Value::String(session.id.clone()),
2607        status: OutcomeStatus::Error,
2608        error: Some(RowError {
2609            message: message.into(),
2610            field,
2611            reason,
2612            reason_key: Some(reason_key),
2613        }),
2614        searchable: false,
2615    }]
2616}
2617
2618/// Batched-path success helper. Each row's Inserted/Matched status is read
2619/// from the pre-existence sets captured by `upsert_session_batch` before its
2620/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2621/// Also accumulates the per-table totals into `counts` so the CLI summary
2622/// gets the same truth without re-walking the outcomes.
2623fn success_outcomes_for_substream(
2624    session_index: usize,
2625    session: &Session,
2626    messages: &[BufferedMessage],
2627    existing_sessions: &std::collections::HashMap<String, Session>,
2628    existing_message_pks: &HashSet<(String, String)>,
2629    existing_part_pks: &HashSet<(String, String, String)>,
2630    counts: &mut BatchCounts,
2631) -> Vec<RowOutcome> {
2632    let session_was_present = existing_sessions.contains_key(&session.id);
2633    let session_status = if session_was_present {
2634        counts.sessions_matched += 1;
2635        UpsertStatus::Matched
2636    } else {
2637        counts.sessions_inserted += 1;
2638        UpsertStatus::Inserted
2639    };
2640
2641    let mut outcomes = Vec::with_capacity(1 + messages.len());
2642    outcomes.push(success_outcome(
2643        session_index,
2644        "session",
2645        Value::String(session.id.clone()),
2646        session_status,
2647        false,
2648    ));
2649    for buffered in messages {
2650        let key = (
2651            buffered.message.session_id().to_owned(),
2652            buffered.message.id().to_owned(),
2653        );
2654        let searchable = buffered.search_text.is_some();
2655        let message_status = if existing_message_pks.contains(&key) {
2656            counts.messages_matched_total += 1;
2657            if searchable {
2658                counts.messages_matched_searchable += 1;
2659            }
2660            UpsertStatus::Matched
2661        } else {
2662            counts.messages_inserted_total += 1;
2663            if searchable {
2664                counts.messages_inserted_searchable += 1;
2665            }
2666            UpsertStatus::Inserted
2667        };
2668        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2669        outcomes.push(success_outcome(
2670            buffered.index,
2671            "message",
2672            pk,
2673            message_status,
2674            searchable,
2675        ));
2676        for part in &buffered.parts {
2677            let part_key = (
2678                part.part.session_id.clone(),
2679                part.part.message_id.clone(),
2680                part.part.id.clone(),
2681            );
2682            let part_status = if existing_part_pks.contains(&part_key) {
2683                counts.parts_matched += 1;
2684                UpsertStatus::Matched
2685            } else {
2686                counts.parts_inserted += 1;
2687                UpsertStatus::Inserted
2688            };
2689            let part_pk = Value::Array(vec![
2690                Value::String(part_key.0),
2691                Value::String(part_key.1),
2692                Value::String(part_key.2),
2693            ]);
2694            outcomes.push(success_outcome(
2695                part.index,
2696                "part",
2697                part_pk,
2698                part_status,
2699                false,
2700            ));
2701        }
2702    }
2703    outcomes
2704}
2705
2706fn success_outcome(
2707    index: usize,
2708    kind: &'static str,
2709    pk: Value,
2710    status: UpsertStatus,
2711    searchable: bool,
2712) -> RowOutcome {
2713    let status = match status {
2714        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2715        UpsertStatus::Matched => OutcomeStatus::Matched,
2716    };
2717    RowOutcome {
2718        index,
2719        kind,
2720        pk,
2721        status,
2722        error: None,
2723        searchable,
2724    }
2725}
2726
2727#[derive(Debug, Clone, PartialEq, Eq)]
2728enum IngestError {
2729    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2730    /// immutable post-first-write because the denormalized copies on
2731    /// `messages` were stamped from the prior Session at first ingest.
2732    /// A re-write that changes either would silently desync.
2733    ImmutableField {
2734        field: &'static str,
2735        session_id: String,
2736        stored: String,
2737        attempted: String,
2738    },
2739}
2740
2741impl std::fmt::Display for IngestError {
2742    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2743        match self {
2744            Self::ImmutableField {
2745                field,
2746                session_id,
2747                stored,
2748                attempted,
2749            } => write!(
2750                formatter,
2751                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2752            ),
2753        }
2754    }
2755}
2756
2757impl std::error::Error for IngestError {}
2758
2759/// Compare an incoming Session row against the stored row on the two
2760/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2761/// counts a NULL-vs-non-NULL change as a mismatch.
2762fn ensure_immutable_match(
2763    existing: &Session,
2764    incoming: &Session,
2765) -> std::result::Result<(), IngestError> {
2766    if existing.source_agent != incoming.source_agent {
2767        return Err(IngestError::ImmutableField {
2768            field: "source_agent",
2769            session_id: incoming.id.clone(),
2770            stored: existing.source_agent.clone(),
2771            attempted: incoming.source_agent.clone(),
2772        });
2773    }
2774    if existing.project != incoming.project {
2775        return Err(IngestError::ImmutableField {
2776            field: "project",
2777            session_id: incoming.id.clone(),
2778            stored: (*existing.project).clone(),
2779            attempted: (*incoming.project).clone(),
2780        });
2781    }
2782    Ok(())
2783}
2784
2785pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2786    use crate::wire::Provenance;
2787    let mut chunks: Vec<String> = Vec::new();
2788    for part in parts {
2789        // spec.md#search: only conversational parts contribute to the indexed
2790        // text; harness-injected scaffolding is excluded from search.
2791        if part.provenance != Provenance::Conversational {
2792            continue;
2793        }
2794        match (message.role(), &part.kind) {
2795            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2796                if let Some(text) = text {
2797                    chunks.push(text.to_string());
2798                }
2799            }
2800            (
2801                Role::User | Role::Assistant,
2802                PartKind::File {
2803                    media_type,
2804                    file_name,
2805                    data,
2806                },
2807            ) => {
2808                if let Some(file_name) = file_name {
2809                    chunks.push(file_name.clone());
2810                }
2811                if let Some(media_type) = media_type {
2812                    chunks.push(media_type.clone());
2813                }
2814                if let FileData::Url(uri) = data {
2815                    chunks.push(uri.clone());
2816                }
2817            }
2818            (
2819                Role::System | Role::Tool,
2820                PartKind::Text { .. }
2821                | PartKind::Reasoning { .. }
2822                | PartKind::File { .. }
2823                | PartKind::ToolCall { .. }
2824                | PartKind::ToolResult { .. }
2825                | PartKind::ToolApprovalRequest { .. }
2826                | PartKind::ToolApprovalResponse { .. },
2827            )
2828            | (
2829                Role::User | Role::Assistant,
2830                PartKind::Reasoning { .. }
2831                | PartKind::ToolCall { .. }
2832                | PartKind::ToolResult { .. }
2833                | PartKind::ToolApprovalRequest { .. }
2834                | PartKind::ToolApprovalResponse { .. },
2835            ) => {}
2836        }
2837    }
2838
2839    let text = chunks
2840        .into_iter()
2841        .filter(|chunk| !chunk.trim().is_empty())
2842        .collect::<Vec<_>>()
2843        .join("\n");
2844    if text.is_empty() { None } else { Some(text) }
2845}
2846
2847/// Non-empty conversational text (spec.md#search).
2848#[derive(Debug, Clone, PartialEq, Eq)]
2849pub struct SearchText(String);
2850
2851impl SearchText {
2852    pub fn as_str(&self) -> &str {
2853        &self.0
2854    }
2855
2856    pub fn into_inner(self) -> String {
2857        self.0
2858    }
2859}
2860
2861impl AsRef<str> for SearchText {
2862    fn as_ref(&self) -> &str {
2863        &self.0
2864    }
2865}
2866
2867#[derive(Debug, Clone, PartialEq)]
2868pub struct MessageWithParts {
2869    pub message: Message,
2870    pub parts: Vec<Part>,
2871}
2872
2873#[derive(Debug, Clone, PartialEq)]
2874pub struct SessionWithMessages {
2875    pub session: Session,
2876    pub messages: Vec<MessageWithParts>,
2877}
2878
2879#[derive(Debug, Clone)]
2880pub struct SessionViewParams<'a> {
2881    pub mode: ResponseMode,
2882    pub after_id: Option<&'a str>,
2883    pub limit: usize,
2884    pub budget_bytes: usize,
2885    pub session_from: SessionFrom,
2886}
2887
2888#[derive(Debug, Clone)]
2889pub struct MessageViewParams<'a> {
2890    pub context_depth: usize,
2891    /// Which siblings fill the context window: conversational (default)
2892    /// keeps the window on the human/model exchange; complete/verbatim
2893    /// include system/tool carriers.
2894    pub mode: ResponseMode,
2895    pub after_id: Option<&'a str>,
2896    pub limit: usize,
2897    pub budget_bytes: usize,
2898}
2899
2900/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2901/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2902/// `validation_failed`): the message/part stream is append-only, so an anchor
2903/// that was ever valid never disappears - an unknown one is always a client
2904/// error, never a reason to silently restart the page.
2905#[derive(Debug, Clone, PartialEq)]
2906pub enum GetLookup<T> {
2907    NotFound,
2908    UnknownAfterId,
2909    Found(T),
2910}
2911
2912/// Canonical retrieval result for `pond_get` session mode: the stored session
2913/// plus the page of messages (each with its `Part`s) and a remaining count.
2914/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2915#[derive(Debug, Clone, PartialEq)]
2916pub struct SessionPage {
2917    pub session: Session,
2918    pub messages: Vec<RetrievedMessage>,
2919    pub messages_remaining: usize,
2920}
2921
2922/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2923/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2924/// their parts so the handler can summarize them.
2925#[derive(Debug, Clone, PartialEq)]
2926pub struct MessagePage {
2927    pub session: Session,
2928    pub target: RetrievedMessage,
2929    pub target_parts: Vec<Part>,
2930    pub target_parts_remaining: usize,
2931    pub siblings: Vec<RetrievedMessage>,
2932}
2933
2934#[derive(Debug, Clone, PartialEq)]
2935pub struct RetrievedMessage {
2936    pub id: String,
2937    pub role: Role,
2938    pub timestamp: DateTime<Utc>,
2939    pub text: Option<String>,
2940    pub content: Option<String>,
2941    pub parts: Vec<Part>,
2942}
2943
2944#[derive(Debug, Clone)]
2945struct ScanRow {
2946    id: String,
2947    role: Role,
2948    timestamp: DateTime<Utc>,
2949    text: Option<String>,
2950    content: Option<String>,
2951}
2952
2953/// One row of the conversational scan. `text` is non-empty by
2954/// `IsNotNull("search_text")` pushdown (spec.md#search).
2955#[derive(Debug, Clone)]
2956pub struct ConversationalRow {
2957    pub session_id: String,
2958    pub message_id: String,
2959    pub role: Role,
2960    pub timestamp: DateTime<Utc>,
2961    pub text: SearchText,
2962}
2963
2964/// Number of leading `items` that fit within `limit` and the byte budget,
2965/// sizing each by `size`. Always emits at least one (a single oversize item
2966/// never blocks its own page); the budget then stops the page at the next item
2967/// boundary.
2968fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2969    let capped = items.len().min(limit.clamp(1, 1000));
2970    let mut acc = 0usize;
2971    let mut emitted = 0usize;
2972    for item in &items[..capped] {
2973        let next = acc.saturating_add(size(item));
2974        if emitted > 0 && next > budget_bytes {
2975            break;
2976        }
2977        acc = next;
2978        emitted += 1;
2979    }
2980    emitted
2981}
2982
2983fn role_from_str(value: &str) -> Result<Role> {
2984    match value {
2985        "system" => Ok(Role::System),
2986        "user" => Ok(Role::User),
2987        "assistant" => Ok(Role::Assistant),
2988        "tool" => Ok(Role::Tool),
2989        other => anyhow::bail!("unknown message role {other}"),
2990    }
2991}
2992
2993/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2994/// and range columns, BITMAP for low-cardinality columns. There is no index
2995/// on `embedding_model`: pond's invariant is one active model at a time
2996/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2997/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2998/// query-time predicate - the only embedding-state filter is `vector IS NOT
2999/// NULL`. `id` lookups are rare and full-scan.
3000const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3001    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
3002    (
3003        "session_id",
3004        BuiltinIndexType::BTree,
3005        "messages_session_id_btree",
3006    ),
3007    (
3008        "timestamp",
3009        BuiltinIndexType::BTree,
3010        "messages_timestamp_btree",
3011    ),
3012    (
3013        "source_agent",
3014        BuiltinIndexType::Bitmap,
3015        "messages_source_agent_bitmap",
3016    ),
3017    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
3018];
3019
3020/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
3021/// `parts_for_messages` (hydration on every `get` and grouped search).
3022const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3023    (
3024        "session_id",
3025        BuiltinIndexType::BTree,
3026        "parts_session_id_btree",
3027    ),
3028    (
3029        "message_id",
3030        BuiltinIndexType::BTree,
3031        "parts_message_id_btree",
3032    ),
3033];
3034
3035/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
3036/// `get` and every grouped search.
3037const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3038    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3039
3040fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3041    Predicate::In(
3042        column,
3043        values.iter().cloned().map(ScalarValue::String).collect(),
3044    )
3045}
3046
3047/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
3048/// never sees a null-vector row. Under the single-active-model invariant,
3049/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
3050/// the configured model" - no per-row `embedding_model` filter needed.
3051fn embedded_scope(filter: &Predicate) -> Predicate {
3052    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3053}
3054
3055// Bare logical table names: the lance-namespace Directory impl owns the
3056// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3057// a `.lance` path.
3058pub(crate) const SESSIONS: &str = "sessions";
3059pub(crate) const MESSAGES: &str = "messages";
3060pub(crate) const PARTS: &str = "parts";
3061
3062/// FTS index name on `messages.search_text`. Stable so status and index
3063/// creation name the same index.
3064pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3065
3066/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3067/// activation check and index creation name the same index.
3068pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3069
3070/// IVF_PQ tuning constants (spec.md#search):
3071/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3072/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3073/// - max_iters = 15 (kmeans cap)
3074/// - cosine metric (e5 vectors are L2-normalized)
3075const IVF_PQ_NUM_BITS: u8 = 8;
3076const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3077const IVF_PQ_MAX_ITERS: usize = 15;
3078
3079/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3080/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3081/// (`FTS`, `OCC`) searchable.
3082const FTS_NGRAM_MIN: u32 = 3;
3083const FTS_NGRAM_MAX: u32 = 5;
3084
3085/// Pond's production IndexIntents: the per-table intent set
3086/// `Store::open_with_options` registers with the substrate.
3087pub fn pond_index_intents() -> IndexIntents {
3088    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3089}
3090
3091/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3092/// threshold. Used by tests that need to exercise the activation boundary
3093/// without writing 100k vectors.
3094pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3095    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3096    messages.push(IndexIntent {
3097        name: MESSAGES_FTS_INDEX,
3098        column: "search_text",
3099        trigger: IndexTrigger::OnAnyRows,
3100        params: IndexParamsKind::InvertedFtsNgram {
3101            min: FTS_NGRAM_MIN,
3102            max: FTS_NGRAM_MAX,
3103        },
3104    });
3105    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3106        messages.push(IndexIntent {
3107            name,
3108            column,
3109            trigger: IndexTrigger::OnAnyRows,
3110            params: IndexParamsKind::Scalar(kind.clone()),
3111        });
3112    }
3113    messages.push(IndexIntent {
3114        name: MESSAGES_VECTOR_INDEX,
3115        column: "vector",
3116        trigger: IndexTrigger::OnNonNullCount {
3117            column: "vector",
3118            threshold: vector_threshold,
3119        },
3120        params: IndexParamsKind::IvfPqCosine {
3121            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3122            num_bits: IVF_PQ_NUM_BITS,
3123            max_iters: IVF_PQ_MAX_ITERS,
3124        },
3125    });
3126    let parts = PARTS_SCALAR_INDICES
3127        .iter()
3128        .map(|(column, kind, name)| IndexIntent {
3129            name,
3130            column,
3131            trigger: IndexTrigger::OnAnyRows,
3132            params: IndexParamsKind::Scalar(kind.clone()),
3133        })
3134        .collect();
3135    let sessions = SESSIONS_SCALAR_INDICES
3136        .iter()
3137        .map(|(column, kind, name)| IndexIntent {
3138            name,
3139            column,
3140            trigger: IndexTrigger::OnAnyRows,
3141            params: IndexParamsKind::Scalar(kind.clone()),
3142        })
3143        .collect();
3144    IndexIntents {
3145        sessions,
3146        messages,
3147        parts,
3148    }
3149}
3150
3151/// Default width of the `messages.vector` embedding column (spec.md#search):
3152/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3153/// 384). Used when `[embeddings].dim` is absent.
3154pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3155
3156/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3157/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3158/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3159/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3160/// which keeps unit tests config-free.
3161static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3162
3163/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3164/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3165pub fn embedding_dim() -> usize {
3166    EMBEDDING_DIM_RUNTIME
3167        .get()
3168        .copied()
3169        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3170}
3171
3172/// Seed [`embedding_dim`] from config. First call wins.
3173pub fn init_embedding_dim(dim: usize) {
3174    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3175}
3176
3177/// Initial-`CREATE` write params for the namespace-mediated path. The
3178/// substrate seam stamps in `session`, `mode`, and `store_params`.
3179/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3180/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3181/// suppresses the per-commit hook so cleanup stays operator-driven via
3182/// `pond index optimize` (one LIST per command instead of per write).
3183pub(crate) fn write_params_for_create() -> WriteParams {
3184    WriteParams {
3185        data_storage_version: Some(LanceFileVersion::V2_1),
3186        enable_v2_manifest_paths: true,
3187        enable_stable_row_ids: true,
3188        auto_cleanup: Some(AutoCleanupParams {
3189            interval: 20,
3190            older_than: chrono::TimeDelta::days(1),
3191        }),
3192        skip_auto_cleanup: true,
3193        ..WriteParams::default()
3194    }
3195}
3196
3197fn export_schema(table: Table) -> Arc<Schema> {
3198    match table {
3199        Table::Sessions => session_schema(),
3200        Table::Messages => message_schema(),
3201        Table::Parts => part_schema(),
3202    }
3203}
3204
3205fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3206    let expected = export_schema(table);
3207    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3208    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3209    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3210    if actual_names != expected_names {
3211        anyhow::bail!(
3212            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3213            table.as_str(),
3214        );
3215    }
3216    Ok(())
3217}
3218
3219async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3220    let source_uri = source
3221        .to_str()
3222        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3223    let dataset = Dataset::open(source_uri)
3224        .await
3225        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3226    ensure_schema_matches_archive(&dataset, table)?;
3227    Ok(dataset)
3228}
3229
3230pub(crate) fn session_schema() -> Arc<Schema> {
3231    Arc::new(Schema::new(vec![
3232        primary_field("id", DataType::Utf8, false),
3233        Field::new("parent_session_id", DataType::Utf8, true),
3234        Field::new("parent_message_id", DataType::Utf8, true),
3235        Field::new("source_agent", DataType::Utf8, false),
3236        Field::new(
3237            "created_at",
3238            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3239            false,
3240        ),
3241        Field::new("project", DataType::Utf8, false),
3242        json_field("options", false),
3243    ]))
3244}
3245
3246pub(crate) fn message_schema() -> Arc<Schema> {
3247    Arc::new(Schema::new(vec![
3248        primary_field("session_id", DataType::Utf8, false),
3249        primary_field("id", DataType::Utf8, false),
3250        Field::new(
3251            "timestamp",
3252            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3253            false,
3254        ),
3255        Field::new("role", DataType::Utf8, false),
3256        Field::new("source_agent", DataType::Utf8, false),
3257        Field::new("project", DataType::Utf8, false),
3258        Field::new("content", DataType::Utf8, true),
3259        Field::new("search_text", DataType::Utf8, true),
3260        // The message's derived embedding (spec.md#session-embed-from-canonical):
3261        // both null until `pond embed` fills them, set together thereafter.
3262        Field::new("vector", embedding_vector_type(), true),
3263        Field::new("embedding_model", DataType::Utf8, true),
3264        json_field("options", false),
3265    ]))
3266}
3267
3268pub(crate) fn part_schema() -> Arc<Schema> {
3269    Arc::new(Schema::new(vec![
3270        primary_field("session_id", DataType::Utf8, false),
3271        primary_field("message_id", DataType::Utf8, false),
3272        primary_field("id", DataType::Utf8, false),
3273        Field::new("ordinal", DataType::Int32, false),
3274        Field::new("type", DataType::Utf8, false),
3275        // spec.md#model-part-provenance: conversation vs harness-injected; search
3276        // reads this column to exclude injected scaffolding.
3277        Field::new("provenance", DataType::Utf8, false),
3278        json_field("variant_data", false),
3279        legacy_blob_field("data", true),
3280        json_field("options", false),
3281    ]))
3282}
3283
3284pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3285    let arrays = schema
3286        .fields()
3287        .iter()
3288        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3289        .collect();
3290    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3291}
3292
3293pub(crate) fn empty_reader(
3294    schema: Arc<Schema>,
3295) -> Result<
3296    RecordBatchIterator<
3297        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3298    >,
3299> {
3300    let batch = empty_batch(schema.clone())?;
3301    Ok(RecordBatchIterator::new(
3302        vec![Ok(batch)].into_iter(),
3303        schema,
3304    ))
3305}
3306
3307pub(crate) struct MessageBatchRow<'a> {
3308    pub message: &'a Message,
3309    pub source_agent: &'a str,
3310    pub project: &'a str,
3311    pub search_text: Option<&'a str>,
3312}
3313
3314// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3315// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3316// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3317// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3318// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3319fn embedding_vector_type() -> DataType {
3320    DataType::FixedSizeList(
3321        Arc::new(Field::new("item", DataType::Float16, true)),
3322        embedding_dim() as i32,
3323    )
3324}
3325
3326/// The partial-schema source for the embedding column update: the `messages`
3327/// primary key plus the two columns `pond embed` fills. The field definitions
3328/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3329fn embedding_update_schema() -> Arc<Schema> {
3330    Arc::new(Schema::new(vec![
3331        primary_field("session_id", DataType::Utf8, false),
3332        primary_field("id", DataType::Utf8, false),
3333        Field::new("vector", embedding_vector_type(), true),
3334        Field::new("embedding_model", DataType::Utf8, true),
3335    ]))
3336}
3337
3338/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3339/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3340pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3341    let dim = embedding_dim();
3342    let mut flat = Vec::with_capacity(rows.len() * dim);
3343    for row in rows {
3344        if row.vector.len() != dim {
3345            anyhow::bail!(
3346                "embedding for message {} has dim {}, expected {dim}",
3347                row.id,
3348                row.vector.len(),
3349            );
3350        }
3351        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3352    }
3353    let values = Float16Array::from(flat);
3354    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3355    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3356        .context("failed to build embedding vector column")?;
3357
3358    RecordBatch::try_new(
3359        embedding_update_schema(),
3360        vec![
3361            Arc::new(StringArray::from(
3362                rows.iter()
3363                    .map(|row| row.session_id.as_str())
3364                    .collect::<Vec<_>>(),
3365            )),
3366            Arc::new(StringArray::from(
3367                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3368            )),
3369            Arc::new(vectors),
3370            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3371        ],
3372    )
3373    .context("failed to build embedding update batch")
3374}
3375
3376/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3377/// is split before the running total of its text columns reaches this, and a
3378/// single cell at or above it is rejected rather than left to panic inside
3379/// `StringArray::from` (spec.md#adapter-bounded-values).
3380const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3381
3382/// Contiguous row ranges whose summed text-column byte cost each stays within
3383/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3384/// column too, since no single column's total can exceed it. `cells[i]` is row
3385/// `i`'s byte cost summed across every text column.
3386fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3387    let mut chunks = Vec::new();
3388    let mut start = 0usize;
3389    let mut running = 0usize;
3390    for (index, &row) in cells.iter().enumerate() {
3391        if running + row > COLUMN_BYTE_BUDGET && index > start {
3392            chunks.push(start..index);
3393            start = index;
3394            running = 0;
3395        }
3396        running += row;
3397    }
3398    if start < cells.len() {
3399        chunks.push(start..cells.len());
3400    }
3401    chunks
3402}
3403
3404fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3405    if bytes >= COLUMN_BYTE_BUDGET {
3406        anyhow::bail!(
3407            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3408             overflow Arrow's i32 offset buffer"
3409        );
3410    }
3411    Ok(())
3412}
3413
3414async fn merge_insert_chunks(
3415    handle: &Handle,
3416    table: Table,
3417    batches: Vec<RecordBatch>,
3418) -> Result<u64> {
3419    let mut inserted = 0u64;
3420    for batch in batches {
3421        let rows = batch.num_rows();
3422        inserted += handle.merge_insert(table, batch, rows).await?;
3423    }
3424    Ok(inserted)
3425}
3426
3427pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3428    let options = sessions
3429        .iter()
3430        .map(|session| json_bytes(&session.options))
3431        .collect::<Result<Vec<_>>>()?;
3432    let mut cells = Vec::with_capacity(sessions.len());
3433    for (session, encoded) in sessions.iter().zip(&options) {
3434        let columns = [
3435            session.id.len(),
3436            session.parent_session_id.as_deref().map_or(0, str::len),
3437            session.parent_message_id.as_deref().map_or(0, str::len),
3438            session.source_agent.len(),
3439            session.project.as_str().len(),
3440            encoded.len(),
3441        ];
3442        for bytes in columns {
3443            guard_cell("sessions", &session.id, bytes)?;
3444        }
3445        cells.push(columns.iter().sum());
3446    }
3447    chunk_ranges(&cells)
3448        .into_iter()
3449        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3450        .collect()
3451}
3452
3453fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3454    let schema = session_schema();
3455    RecordBatch::try_new(
3456        schema.clone(),
3457        vec![
3458            Arc::new(StringArray::from(
3459                sessions
3460                    .iter()
3461                    .map(|session| session.id.as_str())
3462                    .collect::<Vec<_>>(),
3463            )),
3464            Arc::new(StringArray::from(
3465                sessions
3466                    .iter()
3467                    .map(|session| session.parent_session_id.as_deref())
3468                    .collect::<Vec<_>>(),
3469            )),
3470            Arc::new(StringArray::from(
3471                sessions
3472                    .iter()
3473                    .map(|session| session.parent_message_id.as_deref())
3474                    .collect::<Vec<_>>(),
3475            )),
3476            Arc::new(StringArray::from(
3477                sessions
3478                    .iter()
3479                    .map(|session| session.source_agent.as_str())
3480                    .collect::<Vec<_>>(),
3481            )),
3482            Arc::new(
3483                TimestampMicrosecondArray::from(
3484                    sessions
3485                        .iter()
3486                        .map(|session| micros(session.created_at))
3487                        .collect::<Vec<_>>(),
3488                )
3489                .with_timezone("UTC"),
3490            ),
3491            Arc::new(StringArray::from(
3492                sessions
3493                    .iter()
3494                    .map(|session| session.project.as_str())
3495                    .collect::<Vec<_>>(),
3496            )),
3497            Arc::new(LargeBinaryArray::from_iter_values(
3498                options.iter().map(Vec::as_slice),
3499            )),
3500        ],
3501    )
3502    .context("failed to build session batch")
3503}
3504
3505pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3506    let options = rows
3507        .iter()
3508        .map(|row| json_bytes(row.message.options()))
3509        .collect::<Result<Vec<_>>>()?;
3510    let mut cells = Vec::with_capacity(rows.len());
3511    for (row, encoded) in rows.iter().zip(&options) {
3512        let columns = [
3513            row.message.session_id().len(),
3514            row.message.id().len(),
3515            row.message.role().as_str().len(),
3516            row.source_agent.len(),
3517            row.project.len(),
3518            row.message.system_content().map_or(0, str::len),
3519            row.search_text.map_or(0, str::len),
3520            encoded.len(),
3521        ];
3522        for bytes in columns {
3523            guard_cell("messages", row.message.id(), bytes)?;
3524        }
3525        cells.push(columns.iter().sum());
3526    }
3527    chunk_ranges(&cells)
3528        .into_iter()
3529        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3530        .collect()
3531}
3532
3533fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3534    let schema = message_schema();
3535    RecordBatch::try_new(
3536        schema.clone(),
3537        vec![
3538            Arc::new(StringArray::from(
3539                rows.iter()
3540                    .map(|row| row.message.session_id())
3541                    .collect::<Vec<_>>(),
3542            )),
3543            Arc::new(StringArray::from(
3544                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3545            )),
3546            Arc::new(
3547                TimestampMicrosecondArray::from(
3548                    rows.iter()
3549                        .map(|row| micros(row.message.timestamp()))
3550                        .collect::<Vec<_>>(),
3551                )
3552                .with_timezone("UTC"),
3553            ),
3554            Arc::new(StringArray::from(
3555                rows.iter()
3556                    .map(|row| row.message.role().as_str())
3557                    .collect::<Vec<_>>(),
3558            )),
3559            Arc::new(StringArray::from(
3560                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3561            )),
3562            Arc::new(StringArray::from(
3563                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3564            )),
3565            Arc::new(StringArray::from(
3566                rows.iter()
3567                    .map(|row| row.message.system_content())
3568                    .collect::<Vec<_>>(),
3569            )),
3570            Arc::new(StringArray::from(
3571                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3572            )),
3573            // `vector` / `embedding_model` are written null at ingest; every
3574            // message starts un-embedded and `pond embed` fills them later
3575            // (spec.md#session-embed-from-canonical).
3576            new_null_array(&embedding_vector_type(), rows.len()),
3577            new_null_array(&DataType::Utf8, rows.len()),
3578            Arc::new(LargeBinaryArray::from_iter_values(
3579                options.iter().map(Vec::as_slice),
3580            )),
3581        ],
3582    )
3583    .context("failed to build message batch")
3584}
3585
3586pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3587    let variant_data = parts
3588        .iter()
3589        .map(|part| part_variant_json(&part.kind))
3590        .collect::<Result<Vec<_>>>()?;
3591    let options = parts
3592        .iter()
3593        .map(|part| json_bytes(&part.options))
3594        .collect::<Result<Vec<_>>>()?;
3595    let mut cells = Vec::with_capacity(parts.len());
3596    // The blob column is a BinaryArray, exempt from the text-column bound
3597    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3598    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3599        let columns = [
3600            part.session_id.len(),
3601            part.message_id.len(),
3602            part.id.len(),
3603            part.kind.type_name().len(),
3604            part.provenance.as_str().len(),
3605            variant.len(),
3606            encoded.len(),
3607        ];
3608        for bytes in columns {
3609            guard_cell("parts", &part.id, bytes)?;
3610        }
3611        cells.push(columns.iter().sum());
3612    }
3613    chunk_ranges(&cells)
3614        .into_iter()
3615        .map(|range| {
3616            parts_chunk(
3617                &parts[range.clone()],
3618                &variant_data[range.clone()],
3619                &options[range],
3620            )
3621        })
3622        .collect()
3623}
3624
3625fn parts_chunk(
3626    parts: &[Part],
3627    variant_data: &[Vec<u8>],
3628    options: &[Vec<u8>],
3629) -> Result<RecordBatch> {
3630    let schema = part_schema();
3631    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3632    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3633    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3634    let blob_payloads: Vec<Option<&[u8]>> = parts
3635        .iter()
3636        .map(|part| match &part.kind {
3637            PartKind::File { data, .. } => Some(match data {
3638                FileData::String(value) => value.as_bytes(),
3639                FileData::Bytes(value) => value.as_slice(),
3640                FileData::Url(value) => value.as_bytes(),
3641            }),
3642            PartKind::Text { .. }
3643            | PartKind::Reasoning { .. }
3644            | PartKind::ToolCall { .. }
3645            | PartKind::ToolResult { .. }
3646            | PartKind::ToolApprovalRequest { .. }
3647            | PartKind::ToolApprovalResponse { .. } => None,
3648        })
3649        .collect();
3650    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3651
3652    RecordBatch::try_new(
3653        schema.clone(),
3654        vec![
3655            Arc::new(StringArray::from(
3656                parts
3657                    .iter()
3658                    .map(|part| part.session_id.as_str())
3659                    .collect::<Vec<_>>(),
3660            )),
3661            Arc::new(StringArray::from(
3662                parts
3663                    .iter()
3664                    .map(|part| part.message_id.as_str())
3665                    .collect::<Vec<_>>(),
3666            )),
3667            Arc::new(StringArray::from(
3668                parts
3669                    .iter()
3670                    .map(|part| part.id.as_str())
3671                    .collect::<Vec<_>>(),
3672            )),
3673            Arc::new(Int32Array::from(
3674                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3675            )),
3676            Arc::new(StringArray::from(
3677                parts
3678                    .iter()
3679                    .map(|part| part.kind.type_name())
3680                    .collect::<Vec<_>>(),
3681            )),
3682            Arc::new(StringArray::from(
3683                parts
3684                    .iter()
3685                    .map(|part| part.provenance.as_str())
3686                    .collect::<Vec<_>>(),
3687            )),
3688            Arc::new(LargeBinaryArray::from_iter_values(
3689                variant_data.iter().map(Vec::as_slice),
3690            )),
3691            Arc::new(blob_array),
3692            Arc::new(LargeBinaryArray::from_iter_values(
3693                options.iter().map(Vec::as_slice),
3694            )),
3695        ],
3696    )
3697    .context("failed to build parts batch")
3698}
3699
3700pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3701    Ok(Session {
3702        id: string(batch, "id", row)?.context("session id is null")?,
3703        parent_session_id: string(batch, "parent_session_id", row)?,
3704        parent_message_id: string(batch, "parent_message_id", row)?,
3705        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3706        created_at: datetime(batch, "created_at", row)?,
3707        project: crate::adapter::Extracted::from_stored(
3708            string(batch, "project", row)?.context("project is null")?,
3709        ),
3710        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3711    })
3712}
3713
3714pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3715    let id = string(batch, "id", row)?.context("message id is null")?;
3716    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3717    let timestamp = datetime(batch, "timestamp", row)?;
3718    let options =
3719        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3720
3721    match string(batch, "role", row)?
3722        .context("message role is null")?
3723        .as_str()
3724    {
3725        "system" => Ok(Message::System {
3726            id,
3727            session_id,
3728            timestamp,
3729            // `content` is nullable in the schema; preserve the distinction
3730            // between "no content row stored" (`None`) and "empty string
3731            // stored" (`Some(extracted_empty)`). The value originally
3732            // came from a `Source` extraction at ingest time; rewrap via
3733            // the storage-internal `from_stored` so the type-system seal
3734            // for adapters stays intact.
3735            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3736            options,
3737        }),
3738        "user" => Ok(Message::User {
3739            id,
3740            session_id,
3741            timestamp,
3742            options,
3743        }),
3744        "assistant" => Ok(Message::Assistant {
3745            id,
3746            session_id,
3747            timestamp,
3748            options,
3749        }),
3750        "tool" => Ok(Message::Tool {
3751            id,
3752            session_id,
3753            timestamp,
3754            options,
3755        }),
3756        other => anyhow::bail!("unknown message role {other}"),
3757    }
3758}
3759
3760pub(crate) fn part_from_batch(
3761    batch: &RecordBatch,
3762    row: usize,
3763    file_data: Option<FileData>,
3764) -> Result<Part> {
3765    let type_name = string(batch, "type", row)?.context("part type is null")?;
3766    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3767    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3768    Ok(Part {
3769        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3770        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3771        id: string(batch, "id", row)?.context("part id is null")?,
3772        ordinal: int32(batch, "ordinal", row)?,
3773        provenance: provenance_from_str(&provenance)?,
3774        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3775        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3776    })
3777}
3778
3779fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3780    match value {
3781        "conversational" => Ok(crate::wire::Provenance::Conversational),
3782        "injected" => Ok(crate::wire::Provenance::Injected),
3783        other => anyhow::bail!("unknown part provenance {other}"),
3784    }
3785}
3786
3787fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3788    let kind = file_data_kind(variant_data)?;
3789    match kind.as_str() {
3790        "string" => {
3791            let text = std::str::from_utf8(bytes)
3792                .context("file string payload is not UTF-8")?
3793                .to_owned();
3794            Ok(FileData::String(text))
3795        }
3796        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3797        "url" => Ok(FileData::Url(
3798            std::str::from_utf8(bytes)
3799                .context("file URL payload is not UTF-8")?
3800                .to_owned(),
3801        )),
3802        other => anyhow::bail!("unknown file data_kind {other}"),
3803    }
3804}
3805
3806fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3807    let value = json_parse::<Value>(variant_data)?;
3808    value
3809        .get("data_kind")
3810        .and_then(Value::as_str)
3811        .map(str::to_owned)
3812        .context("file part variant_data missing data_kind")
3813}
3814
3815fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3816    batch
3817        .column_by_name(name)
3818        .with_context(|| format!("missing column {name}"))?
3819        .as_any()
3820        .downcast_ref::<UInt64Array>()
3821        .with_context(|| format!("column {name} is not UInt64"))
3822}
3823
3824pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3825    let array = batch
3826        .column_by_name(name)
3827        .with_context(|| format!("missing column {name}"))?
3828        .as_any()
3829        .downcast_ref::<StringArray>()
3830        .with_context(|| format!("column {name} is not Utf8"))?;
3831    if array.is_null(row) {
3832        Ok(None)
3833    } else {
3834        Ok(Some(array.value(row).to_owned()))
3835    }
3836}
3837
3838fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3839    // Lance can return a `lance.json` column either as raw JSONB bytes
3840    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3841    // LargeUtf8), depending on the read path. Handle both.
3842    let column = batch
3843        .column_by_name(name)
3844        .with_context(|| format!("missing column {name}"))?;
3845    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3846        return if array.is_null(row) {
3847            Ok(None)
3848        } else {
3849            Ok(Some(
3850                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3851            ))
3852        };
3853    }
3854    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3855        return if array.is_null(row) {
3856            Ok(None)
3857        } else {
3858            Ok(Some(array.value(row).as_bytes().to_vec()))
3859        };
3860    }
3861    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3862        return if array.is_null(row) {
3863            Ok(None)
3864        } else {
3865            Ok(Some(array.value(row).as_bytes().to_vec()))
3866        };
3867    }
3868    anyhow::bail!("column {name} is not a JSON-compatible array")
3869}
3870
3871fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3872    let array = batch
3873        .column_by_name(name)
3874        .with_context(|| format!("missing column {name}"))?
3875        .as_any()
3876        .downcast_ref::<Int32Array>()
3877        .with_context(|| format!("column {name} is not Int32"))?;
3878    Ok(array.value(row))
3879}
3880
3881pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3882    let array = batch
3883        .column_by_name(name)
3884        .with_context(|| format!("missing column {name}"))?
3885        .as_any()
3886        .downcast_ref::<Float32Array>()
3887        .with_context(|| format!("column {name} is not Float32"))?;
3888    Ok(array.value(row))
3889}
3890
3891pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3892    let array = batch
3893        .column_by_name(name)
3894        .with_context(|| format!("missing column {name}"))?
3895        .as_any()
3896        .downcast_ref::<TimestampMicrosecondArray>()
3897        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3898    Utc.timestamp_micros(array.value(row))
3899        .single()
3900        .context("timestamp is out of range")
3901}
3902
3903fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3904    Field::new(name, data_type, nullable).with_metadata(
3905        [(
3906            "lance-schema:unenforced-primary-key".to_owned(),
3907            "true".to_owned(),
3908        )]
3909        .into(),
3910    )
3911}
3912
3913// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3914// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3915// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3916// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3917// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3918// allocated only one column_info, so the decoder's second `expect_next()`
3919// fires `"there were more fields in the schema than provided column
3920// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3921// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3922fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3923    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3924        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3925            .into_iter()
3926            .collect(),
3927    )
3928}
3929
3930fn json_field(name: &str, nullable: bool) -> Field {
3931    lance_arrow::json::json_field(name, nullable)
3932}
3933
3934fn micros(timestamp: DateTime<Utc>) -> i64 {
3935    timestamp.timestamp_micros()
3936}
3937
3938fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3939    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3940    // matches the `lance.json` extension contract. Lance's compact path
3941    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3942    // applies `decode_json -> encode_json` on this column; with proper JSONB
3943    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3944    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3945    // `try_into_dfstream`; compact still goes through the adapter).
3946    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3947    lance_arrow::json::encode_json(&text)
3948        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3949}
3950
3951fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3952    serde_json::from_slice(value).context("failed to parse JSON field")
3953}
3954
3955fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3956    if let PartKind::File {
3957        media_type,
3958        file_name,
3959        data,
3960    } = kind
3961    {
3962        let data_kind = match data {
3963            FileData::String(_) => "string",
3964            FileData::Bytes(_) => "bytes",
3965            FileData::Url(_) => "url",
3966        };
3967        return json_bytes(&serde_json::json!({
3968            "media_type": media_type,
3969            "file_name": file_name,
3970            "data_kind": data_kind,
3971        }));
3972    }
3973    let value = serde_json::to_value(kind)?;
3974    let mut object = value
3975        .as_object()
3976        .cloned()
3977        .context("part variant did not serialize to an object")?;
3978    object.remove("type");
3979    json_bytes(&object)
3980}
3981
3982fn part_kind_from_json(
3983    type_name: &str,
3984    variant_data: &[u8],
3985    file_data: Option<FileData>,
3986) -> Result<PartKind> {
3987    let mut value = json_parse::<Value>(variant_data)?;
3988    let object = value
3989        .as_object_mut()
3990        .context("part variant data is not an object")?;
3991    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3992    if let Some(data) = file_data {
3993        object.remove("data_kind");
3994        object.insert("data".to_owned(), serde_json::to_value(data)?);
3995    }
3996    serde_json::from_value(value).context("failed to parse part kind")
3997}
3998
3999#[cfg(test)]
4000mod tests {
4001    #![allow(clippy::expect_used, clippy::unwrap_used)]
4002
4003    use super::*;
4004    use crate::{
4005        adapter::Extracted,
4006        handlers::ingest_events,
4007        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
4008    };
4009    use chrono::Utc;
4010    use serde_json::json;
4011    use tempfile::TempDir;
4012
4013    fn synthetic_session(id: &str) -> Session {
4014        Session {
4015            id: id.to_owned(),
4016            parent_session_id: None,
4017            parent_message_id: None,
4018            source_agent: "claude-code".to_owned(),
4019            created_at: Utc::now(),
4020            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4021            options: ProviderOptions::new(),
4022        }
4023    }
4024
4025    #[test]
4026    fn search_text_excludes_injected_parts() {
4027        use crate::wire::Provenance;
4028        let message = Message::User {
4029            id: "m1".to_owned(),
4030            session_id: "s1".to_owned(),
4031            timestamp: Utc::now(),
4032            options: ProviderOptions::new(),
4033        };
4034        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4035            session_id: "s1".to_owned(),
4036            id: id.to_owned(),
4037            message_id: "m1".to_owned(),
4038            ordinal: 0,
4039            provenance,
4040            options: ProviderOptions::new(),
4041            kind: PartKind::Text {
4042                text: Some(Extracted::from_test_value(text.to_owned())),
4043            },
4044        };
4045
4046        // A conversational part contributes; an injected one is excluded
4047        // (spec.md#search).
4048        let conversational = search_text(
4049            &message,
4050            &[text_part(
4051                "p1",
4052                "real human prompt",
4053                Provenance::Conversational,
4054            )],
4055        );
4056        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4057
4058        let injected = search_text(
4059            &message,
4060            &[text_part(
4061                "p2",
4062                "<task-notification>...</task-notification>",
4063                Provenance::Injected,
4064            )],
4065        );
4066        assert!(
4067            injected.is_none(),
4068            "a message whose only part is injected has null search_text"
4069        );
4070    }
4071
4072    #[test]
4073    fn chunk_ranges_splits_on_byte_budget() {
4074        assert!(chunk_ranges(&[]).is_empty());
4075        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4076
4077        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4078        assert_eq!(
4079            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4080            vec![0..1, 1..2, 2..3],
4081        );
4082
4083        // An oversized single row gets its own chunk, never an infinite loop.
4084        assert_eq!(
4085            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4086            vec![0..1, 1..2, 2..3],
4087        );
4088    }
4089
4090    #[tokio::test]
4091    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4092        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4093        // Message is dropped on the spot, with one Error outcome surfaced. The
4094        // rest of the substream continues normally - subsequent valid messages
4095        // and parts get written.
4096        let temp = TempDir::new()?;
4097        let store = Store::open_local(temp.path()).await?;
4098        let session = synthetic_session("ordering");
4099        let orphan_part = Part {
4100            session_id: session.id.clone(),
4101            id: "orphan-part".to_owned(),
4102            message_id: "missing-message".to_owned(),
4103            ordinal: 0,
4104            provenance: crate::wire::Provenance::Conversational,
4105            options: ProviderOptions::new(),
4106            kind: PartKind::Text {
4107                text: Some(Extracted::from_test_value("orphan".to_owned())),
4108            },
4109        };
4110        let valid_message = Message::User {
4111            id: "valid-message".to_owned(),
4112            session_id: session.id.clone(),
4113            timestamp: Utc::now(),
4114            options: ProviderOptions::new(),
4115        };
4116        let valid_part = Part {
4117            session_id: session.id.clone(),
4118            id: "valid-part".to_owned(),
4119            message_id: valid_message.id().to_owned(),
4120            ordinal: 0,
4121            provenance: crate::wire::Provenance::Conversational,
4122            options: ProviderOptions::new(),
4123            kind: PartKind::Text {
4124                text: Some(Extracted::from_test_value("kept".to_owned())),
4125            },
4126        };
4127
4128        let mut validator = IngestValidator::default();
4129        validator
4130            .push(&store, 0, IngestEvent::Session(session.clone()))
4131            .await?;
4132        let part_outcomes = validator
4133            .push(&store, 1, IngestEvent::Part(orphan_part))
4134            .await?;
4135        assert_eq!(part_outcomes.len(), 1);
4136        assert_eq!(part_outcomes[0].kind, "part");
4137        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4138        assert!(
4139            part_outcomes[0]
4140                .error
4141                .as_ref()
4142                .map(|e| e.message.contains("part event appeared before a message"))
4143                .unwrap_or(false),
4144            "error message must explain the ordering violation: {part_outcomes:?}"
4145        );
4146        validator
4147            .push(&store, 2, IngestEvent::Message(valid_message))
4148            .await?;
4149        validator
4150            .push(&store, 3, IngestEvent::Part(valid_part))
4151            .await?;
4152        validator.finish(&store).await?;
4153
4154        let (sessions, messages, parts) = store.row_counts().await?;
4155        assert_eq!(sessions, 1, "session committed despite the orphan part");
4156        assert_eq!(messages, 1, "valid message committed");
4157        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4158
4159        Ok(())
4160    }
4161
4162    #[tokio::test]
4163    async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
4164        // `open` eagerly creates sessions/messages but `parts` is lazy, so a
4165        // configured-but-never-synced store reports uninitialized - the signal
4166        // `pond status`/`pond storage` use to render an empty state instead of
4167        // erroring on the first parts describe.
4168        let temp = TempDir::new()?;
4169        let store = Store::open_local(temp.path()).await?;
4170        assert!(
4171            !store.initialized().await?,
4172            "fresh store has no parts table"
4173        );
4174
4175        let session = synthetic_session("initialized-probe");
4176        let message = Message::User {
4177            id: "message-1".to_owned(),
4178            session_id: session.id.clone(),
4179            timestamp: Utc::now(),
4180            options: ProviderOptions::new(),
4181        };
4182        let part = Part {
4183            session_id: session.id.clone(),
4184            id: "part-1".to_owned(),
4185            message_id: message.id().to_owned(),
4186            ordinal: 0,
4187            provenance: crate::wire::Provenance::Conversational,
4188            options: ProviderOptions::new(),
4189            kind: PartKind::Text {
4190                text: Some(Extracted::from_test_value("hello".to_owned())),
4191            },
4192        };
4193        let mut validator = IngestValidator::default();
4194        validator
4195            .push(&store, 0, IngestEvent::Session(session))
4196            .await?;
4197        validator
4198            .push(&store, 1, IngestEvent::Message(message))
4199            .await?;
4200        validator.push(&store, 2, IngestEvent::Part(part)).await?;
4201        validator.finish(&store).await?;
4202
4203        assert!(store.initialized().await?, "ingest creates the parts table");
4204        Ok(())
4205    }
4206
4207    #[tokio::test]
4208    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4209        // Per-event drop: a duplicate message id within a substream drops the
4210        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4211        // session still commits.
4212        let temp = TempDir::new()?;
4213        let store = Store::open_local(temp.path()).await?;
4214        let session = synthetic_session("duplicate-message");
4215        let first = Message::User {
4216            id: "message-1".to_owned(),
4217            session_id: session.id.clone(),
4218            timestamp: Utc::now(),
4219            options: ProviderOptions::new(),
4220        };
4221        let second = Message::Assistant {
4222            id: "message-1".to_owned(),
4223            session_id: session.id.clone(),
4224            timestamp: Utc::now(),
4225            options: ProviderOptions::new(),
4226        };
4227
4228        let mut validator = IngestValidator::default();
4229        validator
4230            .push(&store, 0, IngestEvent::Session(session.clone()))
4231            .await?;
4232        validator
4233            .push(&store, 1, IngestEvent::Message(first))
4234            .await?;
4235        let dup_outcomes = validator
4236            .push(&store, 2, IngestEvent::Message(second))
4237            .await?;
4238        assert_eq!(dup_outcomes.len(), 1);
4239        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4240        assert!(
4241            dup_outcomes[0]
4242                .error
4243                .as_ref()
4244                .map(|e| e.message.contains("duplicate message id message-1"))
4245                .unwrap_or(false),
4246            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4247        );
4248
4249        validator.finish(&store).await?;
4250        let (sessions, messages, _) = store.row_counts().await?;
4251        assert_eq!(sessions, 1, "session committed");
4252        assert_eq!(messages, 1, "only the first message committed");
4253
4254        Ok(())
4255    }
4256
4257    #[tokio::test]
4258    async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4259    -> anyhow::Result<()> {
4260        // spec.md#model-pond-options: `options.pond` is core-owned. A stored
4261        // message carries the process's host stamp (when resolvable) and never
4262        // a client-supplied value; session and part options stay untouched.
4263        let temp = TempDir::new()?;
4264        let store = Store::open_local(temp.path()).await?;
4265        let session = synthetic_session("host-provenance");
4266        let mut spoofed = ProviderOptions::new();
4267        spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4268        let message = Message::User {
4269            id: "message-1".to_owned(),
4270            session_id: session.id.clone(),
4271            timestamp: Utc::now(),
4272            options: spoofed,
4273        };
4274        let part = Part {
4275            session_id: session.id.clone(),
4276            id: "part-1".to_owned(),
4277            message_id: "message-1".to_owned(),
4278            ordinal: 0,
4279            provenance: crate::wire::Provenance::Conversational,
4280            options: ProviderOptions::new(),
4281            kind: PartKind::Text {
4282                text: Some(Extracted::from_test_value("hello".to_owned())),
4283            },
4284        };
4285
4286        let mut validator = IngestValidator::default();
4287        validator
4288            .push(&store, 0, IngestEvent::Session(session.clone()))
4289            .await?;
4290        validator
4291            .push(&store, 1, IngestEvent::Message(message))
4292            .await?;
4293        validator.push(&store, 2, IngestEvent::Part(part)).await?;
4294        validator.finish(&store).await?;
4295
4296        let stored = store
4297            .get_session(&session.id)
4298            .await?
4299            .expect("ingested session is readable");
4300        assert!(
4301            !stored.session.options.contains_key("pond"),
4302            "session rows are not stamped (attribution derives from messages)"
4303        );
4304        let stored_message = &stored.messages[0].message;
4305        match ingest_host_stamp() {
4306            Some(stamp) => {
4307                assert_eq!(
4308                    stored_message.options().get("pond"),
4309                    Some(stamp),
4310                    "stored message carries the real stamp, never the spoof"
4311                );
4312                let host = stamp
4313                    .pointer("/ingest/host")
4314                    .and_then(Value::as_object)
4315                    .expect("stamp shape is {ingest: {host: {..}}}");
4316                assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4317                assert!(
4318                    host.values()
4319                        .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4320                    "stamp fields are omitted when unavailable, never empty: {host:?}"
4321                );
4322            }
4323            None => assert!(
4324                stored_message.options().get("pond").is_none(),
4325                "with no resolvable stamp the spoofed key is still stripped"
4326            ),
4327        }
4328        assert!(
4329            !stored.messages[0].parts[0].options.contains_key("pond"),
4330            "part rows are not stamped (covered by their message's stamp)"
4331        );
4332
4333        Ok(())
4334    }
4335
4336    /// Regression: compact_files on `parts` with the blob column tripped a
4337    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4338    /// batches give compact fragments to merge; every `FileData` variant
4339    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4340    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4341    /// write batch mixes null + valid rows in the blob column - benign in
4342    /// release, irrelevant to this regression's scope.
4343    #[tokio::test(flavor = "multi_thread")]
4344    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4345        use crate::wire::{FileData, PartKind, Provenance};
4346        let temp = TempDir::new()?;
4347        let store = Store::open_local(temp.path()).await?;
4348
4349        let session = synthetic_session("compact-blob");
4350        store
4351            .upsert_sessions(std::slice::from_ref(&session))
4352            .await?;
4353
4354        let make_part = |idx: usize, kind: PartKind| Part {
4355            session_id: session.id.clone(),
4356            message_id: format!("msg-{idx}"),
4357            id: format!("part-{idx}"),
4358            ordinal: 0,
4359            provenance: Provenance::Conversational,
4360            options: ProviderOptions::new(),
4361            kind,
4362        };
4363
4364        let batch_a = vec![
4365            make_part(
4366                0,
4367                PartKind::File {
4368                    media_type: Some("text/plain".to_owned()),
4369                    file_name: Some("a.txt".to_owned()),
4370                    data: FileData::Bytes(b"alpha".to_vec()),
4371                },
4372            ),
4373            make_part(
4374                1,
4375                PartKind::File {
4376                    media_type: Some("text/plain".to_owned()),
4377                    file_name: Some("b.txt".to_owned()),
4378                    data: FileData::String("beta".to_owned()),
4379                },
4380            ),
4381        ];
4382        store.upsert_parts(&batch_a).await?;
4383
4384        let batch_b = vec![
4385            make_part(
4386                2,
4387                PartKind::File {
4388                    media_type: Some("application/octet-stream".to_owned()),
4389                    file_name: None,
4390                    data: FileData::Url("https://example.com/file".to_owned()),
4391                },
4392            ),
4393            make_part(
4394                3,
4395                PartKind::File {
4396                    media_type: Some("image/png".to_owned()),
4397                    file_name: Some("c.png".to_owned()),
4398                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4399                },
4400            ),
4401        ];
4402        store.upsert_parts(&batch_b).await?;
4403
4404        store
4405            .optimize_indices(None, &MaintenancePolicy::always_compact())
4406            .await?
4407            .into_result()?;
4408
4409        Ok(())
4410    }
4411
4412    #[tokio::test]
4413    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4414        let temp = TempDir::new()?;
4415        let store = Store::open_local(temp.path()).await?;
4416        let session = synthetic_session("blob");
4417        let message = Message::User {
4418            id: "message-1".to_owned(),
4419            session_id: session.id.clone(),
4420            timestamp: Utc::now(),
4421            options: ProviderOptions::new(),
4422        };
4423        let part = Part {
4424            session_id: session.id.clone(),
4425            id: "part-1".to_owned(),
4426            message_id: message.id().to_owned(),
4427            ordinal: 0,
4428            provenance: crate::wire::Provenance::Conversational,
4429            options: ProviderOptions::new(),
4430            kind: PartKind::File {
4431                media_type: Some("text/plain".to_owned()),
4432                file_name: Some("payload.txt".to_owned()),
4433                data: FileData::Bytes(b"pond".to_vec()),
4434            },
4435        };
4436
4437        let mut validator = IngestValidator::default();
4438        validator
4439            .push(&store, 0, IngestEvent::Session(session.clone()))
4440            .await?;
4441        validator
4442            .push(&store, 1, IngestEvent::Message(message.clone()))
4443            .await?;
4444        validator
4445            .push(&store, 2, IngestEvent::Part(part.clone()))
4446            .await?;
4447        validator.finish(&store).await?;
4448
4449        let stored = store
4450            .get_session(&session.id)
4451            .await?
4452            .expect("session should exist");
4453        let stored_part = &stored.messages[0].parts[0];
4454        assert_eq!(stored_part, &part);
4455
4456        Ok(())
4457    }
4458
4459    //
4460    // `Session.source_agent` and `Session.project` are immutable
4461    // post-first-write because `messages` denormalizes them at first
4462    // ingest; a silent overwrite would desync the denormalized
4463    // copies. pond core's `IngestValidator` probes the existing session
4464    // before the merge_insert and emits a per-row `validation_failed`
4465    // outcome with the typed field name when either changes. Other Session
4466    // fields (options, parent_session_id, created_at, parent_message_id)
4467    // re-write idempotently via merge_insert.
4468
4469    fn base_session() -> Session {
4470        Session {
4471            id: "01HXY00000000001".to_owned(),
4472            parent_session_id: None,
4473            parent_message_id: None,
4474            source_agent: "claude-code".to_owned(),
4475            created_at: Utc::now(),
4476            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4477            options: ProviderOptions::new(),
4478        }
4479    }
4480
4481    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4482        outcomes
4483            .iter()
4484            .filter(|outcome| outcome.status == target)
4485            .count()
4486    }
4487
4488    #[tokio::test(flavor = "multi_thread")]
4489    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4490    -> anyhow::Result<()> {
4491        let temp = TempDir::new()?;
4492        let store = Store::open_local(temp.path()).await?;
4493
4494        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4495        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4496
4497        let mut again = base_session();
4498        again.options.insert("title".to_owned(), json!("renamed"));
4499        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4500        assert_eq!(
4501            count_status(&second, OutcomeStatus::Error),
4502            0,
4503            "options is mutable; the re-ingest must not surface an error: {second:?}",
4504        );
4505        assert_eq!(
4506            count_status(&second, OutcomeStatus::Matched),
4507            1,
4508            "unchanged immutable fields must match-insert via merge_insert",
4509        );
4510
4511        Ok(())
4512    }
4513
4514    #[tokio::test(flavor = "multi_thread")]
4515    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4516        let temp = TempDir::new()?;
4517        let store = Store::open_local(temp.path()).await?;
4518
4519        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4520        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4521
4522        let mut tampered = base_session();
4523        tampered.source_agent = "codex-cli".to_owned();
4524        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4525        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4526        let err_row = second
4527            .iter()
4528            .find(|outcome| outcome.status == OutcomeStatus::Error)
4529            .expect("error outcome present");
4530        let err = err_row.error.as_ref().expect("error body present");
4531        assert_eq!(err.field, Some("source_agent"));
4532        assert_eq!(err.reason, Some("immutable"));
4533
4534        // The stored row stayed on the original adapter - no silent rewrite.
4535        let stored = store
4536            .get_session(&base_session().id)
4537            .await?
4538            .expect("session row survives the rejected re-ingest");
4539        assert_eq!(stored.session.source_agent, "claude-code");
4540
4541        Ok(())
4542    }
4543
4544    #[tokio::test(flavor = "multi_thread")]
4545    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4546        let temp = TempDir::new()?;
4547        let store = Store::open_local(temp.path()).await?;
4548
4549        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4550        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4551
4552        let mut tampered = base_session();
4553        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4554        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4555        let err_row = second
4556            .iter()
4557            .find(|outcome| outcome.status == OutcomeStatus::Error)
4558            .expect("project change must surface an error outcome");
4559        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4560
4561        let stored = store
4562            .get_session(&base_session().id)
4563            .await?
4564            .expect("session row survives");
4565        assert_eq!(
4566            stored.session.project.as_str(),
4567            "/home/me/proj",
4568            "stored project must remain the original",
4569        );
4570
4571        Ok(())
4572    }
4573
4574    #[tokio::test(flavor = "multi_thread")]
4575    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4576        // Regression guard: re-ingesting an existing session with NEW
4577        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4578        // on `BatchCounts`, and per-row outcomes must mark the new message
4579        // rows `Inserted` while the session row is `Matched`. The prior
4580        // implementation derived all per-row statuses from the batch-level
4581        // session inserted count, which silently flipped the new messages
4582        // into `Matched` (visible as "up to date" in the CLI bar tail).
4583        use crate::wire::Provenance;
4584        let temp = TempDir::new()?;
4585        let store = Store::open_local(temp.path()).await?;
4586        let session = base_session();
4587
4588        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4589            session_id: session.id.clone(),
4590            id: part_id.to_owned(),
4591            message_id: message_id.to_owned(),
4592            ordinal: 0,
4593            provenance: Provenance::Conversational,
4594            options: ProviderOptions::new(),
4595            kind: PartKind::Text {
4596                text: Some(Extracted::from_test_value(body.to_owned())),
4597            },
4598        };
4599        let user_message = |id: &str| Message::User {
4600            id: id.to_owned(),
4601            session_id: session.id.clone(),
4602            timestamp: Utc::now(),
4603            options: ProviderOptions::new(),
4604        };
4605
4606        // First pass: 2 messages land fresh.
4607        let mut validator = IngestValidator::default();
4608        validator
4609            .push(&store, 0, IngestEvent::Session(session.clone()))
4610            .await?;
4611        validator
4612            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4613            .await?;
4614        validator
4615            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4616            .await?;
4617        validator
4618            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4619            .await?;
4620        validator
4621            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4622            .await?;
4623        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4624        assert_eq!(first_counts.sessions_inserted, 1);
4625        assert_eq!(first_counts.messages_inserted_total, 2);
4626        assert_eq!(first_counts.messages_inserted_searchable, 2);
4627
4628        // Second pass: same session id, 3 NEW messages.
4629        let mut validator = IngestValidator::default();
4630        validator
4631            .push(&store, 0, IngestEvent::Session(session.clone()))
4632            .await?;
4633        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4634            let pid = format!("p{}", idx + 3);
4635            validator
4636                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4637                .await?;
4638            validator
4639                .push(
4640                    &store,
4641                    idx * 2 + 2,
4642                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4643                )
4644                .await?;
4645        }
4646        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4647
4648        assert_eq!(
4649            second_counts.sessions_inserted, 0,
4650            "existing session row must report as Matched, not Inserted",
4651        );
4652        assert_eq!(second_counts.sessions_matched, 1);
4653        assert_eq!(
4654            second_counts.messages_inserted_total, 3,
4655            "the three NEW messages must register as Inserted in BatchCounts",
4656        );
4657        assert_eq!(
4658            second_counts.messages_inserted_searchable, 3,
4659            "all three new messages carry conversational text -> searchable",
4660        );
4661        assert_eq!(second_counts.messages_matched_total, 0);
4662        assert_eq!(second_counts.parts_inserted, 3);
4663        assert_eq!(second_counts.parts_matched, 0);
4664
4665        // Per-row outcomes mirror the BatchCounts shape: the session row is
4666        // Matched, every new message + part row is Inserted.
4667        let session_outcome = second_outcomes
4668            .iter()
4669            .find(|outcome| outcome.kind == "session")
4670            .expect("session-row outcome present");
4671        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4672        for outcome in &second_outcomes {
4673            if outcome.kind == "message" || outcome.kind == "part" {
4674                assert_eq!(
4675                    outcome.status,
4676                    OutcomeStatus::Inserted,
4677                    "new row must be Inserted, got: {outcome:?}",
4678                );
4679            }
4680        }
4681        Ok(())
4682    }
4683
4684    /// Ingest `count` synthetic messages spread across a handful of sessions
4685    /// and projects, each with conversational `search_text`. Returns the store
4686    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4687    async fn store_with_messages(
4688        temp: &TempDir,
4689        count: usize,
4690    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4691        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4692    }
4693
4694    /// Same as [`store_with_messages`] but tests optimize with a custom
4695    /// IVF_PQ activation threshold.
4696    async fn store_with_messages_at_threshold(
4697        temp: &TempDir,
4698        count: usize,
4699        _vector_threshold: usize,
4700    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4701        let store = Store::open_local(temp.path()).await?;
4702        let sessions = 8.min(count.max(1));
4703        let mut events = Vec::new();
4704        for s in 0..sessions {
4705            events.push(IngestEvent::Session(Session {
4706                id: format!("session-{s}"),
4707                parent_session_id: None,
4708                parent_message_id: None,
4709                source_agent: "claude-code".to_owned(),
4710                created_at: Utc::now(),
4711                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4712                options: ProviderOptions::new(),
4713            }));
4714            for i in (s..count).step_by(sessions) {
4715                let message_id = format!("msg-{i}");
4716                events.push(IngestEvent::Message(Message::User {
4717                    id: message_id.clone(),
4718                    session_id: format!("session-{s}"),
4719                    timestamp: Utc::now(),
4720                    options: ProviderOptions::new(),
4721                }));
4722                events.push(IngestEvent::Part(Part {
4723                    session_id: format!("session-{s}"),
4724                    id: format!("{message_id}-part"),
4725                    message_id,
4726                    ordinal: 0,
4727                    provenance: crate::wire::Provenance::Conversational,
4728                    options: ProviderOptions::new(),
4729                    kind: PartKind::Text {
4730                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4731                    },
4732                }));
4733            }
4734        }
4735        ingest_events(&store, events).await?;
4736        let keys = (0..count)
4737            .map(|i| MessageKey {
4738                session_id: format!("session-{}", i % sessions),
4739                message_id: format!("msg-{i}"),
4740            })
4741            .collect();
4742        Ok((store, keys))
4743    }
4744
4745    /// A deterministic pseudo-random vector of the production dimension.
4746    fn synthetic_vector(seed: usize) -> Vec<f32> {
4747        let mut state = (seed as u64)
4748            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4749            .wrapping_add(1);
4750        (0..embedding_dim())
4751            .map(|_| {
4752                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4753                #[allow(clippy::cast_precision_loss)]
4754                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4755                unit - 1.0
4756            })
4757            .collect()
4758    }
4759
4760    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4761    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4762        keys.iter()
4763            .enumerate()
4764            .map(|(seed, key)| EmbeddedMessage {
4765                session_id: key.session_id.clone(),
4766                id: key.message_id.clone(),
4767                vector: synthetic_vector(seed),
4768            })
4769            .collect()
4770    }
4771
4772    fn embedding_update_batch_with_model(
4773        rows: &[EmbeddedMessage],
4774        model: &str,
4775    ) -> Result<RecordBatch> {
4776        let mut batch = embedding_update_batch(rows)?;
4777        let columns = batch
4778            .columns()
4779            .iter()
4780            .take(3)
4781            .cloned()
4782            .chain(std::iter::once(
4783                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4784            ))
4785            .collect::<Vec<_>>();
4786        batch = RecordBatch::try_new(batch.schema(), columns)?;
4787        Ok(batch)
4788    }
4789
4790    #[tokio::test]
4791    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4792        let temp = TempDir::new()?;
4793        // 4 messages cycle session-0..session-3, so `session-3` is a real
4794        // partition. Scalar-index pushdown is volume-independent: the planner
4795        // emits `ScalarIndexQuery` whenever the index exists.
4796        let (store, keys) = store_with_messages(&temp, 4).await?;
4797        store.write_embeddings(&embedded(&keys)).await?;
4798        store
4799            .optimize_indices(None, &MaintenancePolicy::always_compact())
4800            .await?
4801            .into_result()?;
4802
4803        let query = vec![0.01_f32; embedding_dim()];
4804        let plan = store
4805            .explain_vector_plan(
4806                &query,
4807                10,
4808                &Predicate::Eq("session_id", "session-3".into()),
4809                None,
4810            )
4811            .await?;
4812
4813        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4814        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4815        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4816        // and unrelated.)
4817        assert!(
4818            plan.contains("ScalarIndexQuery"),
4819            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4820        );
4821        let predicate_postfiltered = plan
4822            .lines()
4823            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4824        assert!(
4825            !predicate_postfiltered,
4826            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4827        );
4828        Ok(())
4829    }
4830
4831    #[tokio::test]
4832    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4833        let temp = TempDir::new()?;
4834        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4835
4836        // First batch: 255 vectors, one below threshold. Optimize does not
4837        // create the IVF_PQ because the trigger is not met.
4838        store.write_embeddings(&embedded(&keys[..255])).await?;
4839        store
4840            .optimize_indices_with_vector_threshold(256)
4841            .await?
4842            .into_result()?;
4843        assert!(
4844            !store
4845                .handle
4846                .messages_index_names()
4847                .await?
4848                .iter()
4849                .any(|name| name == MESSAGES_VECTOR_INDEX),
4850            "IVF_PQ must not exist below the activation threshold",
4851        );
4852
4853        // Next batch: one more vector. Total reaches 256; optimize creates
4854        // the IVF_PQ.
4855        store.write_embeddings(&embedded(&keys[255..256])).await?;
4856        store
4857            .optimize_indices_with_vector_threshold(256)
4858            .await?
4859            .into_result()?;
4860        assert!(
4861            store
4862                .handle
4863                .messages_index_names()
4864                .await?
4865                .iter()
4866                .any(|name| name == MESSAGES_VECTOR_INDEX),
4867            "optimize must create the IVF_PQ once the threshold is crossed",
4868        );
4869
4870        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4871        // non-null subset and a planted vector is retrievable.
4872        let hits = store
4873            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4874            .await?;
4875        assert!(
4876            hits.iter().any(|(key, _)| key == &keys[0]),
4877            "an embedded row is retrievable via the index",
4878        );
4879        Ok(())
4880    }
4881
4882    #[tokio::test]
4883    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4884    {
4885        let temp = TempDir::new()?;
4886        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4887        let old_rows = embedded(&keys);
4888        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4889        store
4890            .handle
4891            .merge_update(Table::Messages, old_batch, old_rows.len())
4892            .await?;
4893        store
4894            .optimize_indices_with_vector_threshold(256)
4895            .await?
4896            .into_result()?;
4897        assert!(
4898            store
4899                .handle
4900                .messages_index_names()
4901                .await?
4902                .iter()
4903                .any(|name| name == MESSAGES_VECTOR_INDEX),
4904            "IVF_PQ must exist before a model swap",
4905        );
4906        assert_eq!(store.stale_embedding_count().await?, keys.len());
4907
4908        store.drop_vector_index().await?;
4909        let mut pending = Vec::new();
4910        let stream = store.pending_or_stale_messages();
4911        tokio::pin!(stream);
4912        while let Some(row) = stream.next().await {
4913            pending.push(row?);
4914        }
4915        assert_eq!(
4916            pending.len(),
4917            keys.len(),
4918            "force stream should see stale rows"
4919        );
4920        store.write_embeddings(&embedded(&keys)).await?;
4921        assert_eq!(store.stale_embedding_count().await?, 0);
4922        store
4923            .optimize_indices_with_vector_threshold(256)
4924            .await?
4925            .into_result()?;
4926        assert!(
4927            store
4928                .handle
4929                .messages_index_names()
4930                .await?
4931                .iter()
4932                .any(|name| name == MESSAGES_VECTOR_INDEX),
4933            "optimize must rebuild IVF_PQ after force re-embed",
4934        );
4935
4936        let stream = store.pending_or_stale_messages();
4937        tokio::pin!(stream);
4938        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4939        Ok(())
4940    }
4941
4942    #[tokio::test]
4943    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4944        // Regression: `_row_last_updated_at_version` can point at a Lance
4945        // manifest version that `cleanup_old_versions` or the auto_cleanup
4946        // hook has since dropped from `Dataset::versions()`. The old code
4947        // silently dropped any session whose row-version was not in the
4948        // visible list, collapsing the staleness-skip map down to recent
4949        // commits and forcing `pond sync` to re-touch every file. The fix
4950        // falls back to the oldest still-visible commit timestamp - a
4951        // sound upper bound on the row's true ingest time.
4952        let temp = TempDir::new()?;
4953        let (store, _keys) = store_with_messages(&temp, 4).await?;
4954
4955        // Produce several distinct manifest versions on `sessions` so the
4956        // older ones become eligible for cleanup.
4957        for tag in 0..3 {
4958            let extra = synthetic_session(&format!("extra-{tag}"));
4959            store.upsert_sessions(&[extra]).await?;
4960        }
4961
4962        // Prune everything older than ~now, leaving only the latest manifest.
4963        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4964        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4965        // Lance's `delete_unverified` floor still protects in-flight files.
4966        let dataset = store.handle.dataset(Table::Sessions).await?;
4967        dataset
4968            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4969            .await
4970            .context("cleanup_old_versions failed")?;
4971
4972        let map = store.session_last_ingested_at().await?;
4973        let session_count = store.row_counts().await?.0;
4974        assert!(
4975            map.len() >= session_count,
4976            "watermark map ({}) must still cover every session ({}) after \
4977             version cleanup; an empty fallback regresses pond sync to a \
4978             full re-scan",
4979            map.len(),
4980            session_count,
4981        );
4982        Ok(())
4983    }
4984
4985    #[tokio::test]
4986    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4987        let temp = TempDir::new()?;
4988        let (store, keys) = store_with_messages(&temp, 10).await?;
4989
4990        let before = store.embedding_progress().await?;
4991        assert_eq!(before.embedded, 0);
4992        assert_eq!(before.total, 10);
4993        assert_eq!(before.model, crate::embed::model_id());
4994
4995        store.write_embeddings(&embedded(&keys[..4])).await?;
4996        let partial = store.embedding_progress().await?;
4997        assert_eq!(partial.embedded, 4);
4998        assert_eq!(partial.total, 10);
4999
5000        store.write_embeddings(&embedded(&keys[4..])).await?;
5001        let full = store.embedding_progress().await?;
5002        assert_eq!(full.embedded, 10);
5003        assert_eq!(full.total, 10);
5004        Ok(())
5005    }
5006}