Skip to main content

pond/
sessions.rs

1//! The session datasets (spec.md#datasets): the three Lance tables, the
2//! `Store` facade, ingest validation, and `search_text` extraction.
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use arc_swap::ArcSwapOption;
12use async_stream::try_stream;
13use chrono::{DateTime, TimeZone, Utc};
14use lance::Dataset;
15use lance::dataset::{AutoCleanupParams, ProjectionRequest, WriteMode, WriteParams};
16use lance::deps::arrow_array::{
17    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
18    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
19    UInt64Array, new_null_array,
20};
21use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
22use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
23use lance::index::DatasetIndexExt;
24use lance_file::version::LanceFileVersion;
25use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
26use serde::{Deserialize, Serialize, de::DeserializeOwned};
27use serde_json::Value;
28use tokio_stream::{Stream, StreamExt};
29
30use crate::{
31    config, embed,
32    rowmap::{RowMetaEntry, RowMetaMap, RowMetaSet, discover_chain},
33    substrate::{
34        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
35        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
36        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
37    },
38    wire::{FileData, Message, Part, PartKind, Role, SUMMARY_PART_TYPES, Session, SessionFrom},
39};
40use url::Url;
41
42#[derive(Debug)]
43pub struct Store {
44    handle: Handle,
45    /// Resident per-message meta map for index-only hit resolution and in-memory
46    /// hydration (see [`crate::rowmap`]). `None` until [`Store::ensure_rowmap`]
47    /// builds it (local tests, pre-prewarm), where the arms fall back to a
48    /// data-projection scan and hydration to `take_rows`. `ArcSwap` so a
49    /// version-bump rebuild swaps it under concurrent searches.
50    rowmap: ArcSwapOption<RowMetaSet>,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveCounts {
55    pub sessions: usize,
56    pub messages: usize,
57    pub parts: usize,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveVersions {
62    pub sessions: u64,
63    pub messages: u64,
64    pub parts: u64,
65}
66
67#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
68pub struct LanceArchiveExport {
69    pub rows: LanceArchiveCounts,
70    pub source_versions: LanceArchiveVersions,
71}
72
73#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
74pub struct LanceArchiveImport {
75    pub rows: LanceArchiveCounts,
76    pub inserted: LanceArchiveCounts,
77}
78
79/// One table's slice of a store-to-store copy plan: which sessions' rows for
80/// that table can be **appended** versus **merged** (spec.md#session-durable-copy).
81/// The choice is made per table by row presence on the destination, because the
82/// three tables are written by separate commits and an interrupted copy can
83/// leave them in different states (e.g. the small `sessions` table committed but
84/// `messages` not):
85/// - `append`: the destination has **zero** rows for the session in this table,
86///   so they cannot collide -> append (no merge join, no target probe; the
87///   bandwidth-bound fast path).
88/// - `merge`: the destination already has *some* rows but the source has more ->
89///   merge to dedup the rows already there (`WhenMatched::DoNothing`).
90#[derive(Debug, Clone, Default)]
91pub struct TablePlan {
92    pub append: Vec<String>,
93    pub merge: Vec<String>,
94}
95
96impl TablePlan {
97    pub fn is_empty(&self) -> bool {
98        self.append.is_empty() && self.merge.is_empty()
99    }
100}
101
102/// A store-to-store `pond copy` plan, decided per table (see [`TablePlan`]).
103/// `source_sessions` is the full source session count, kept so the caller can
104/// tell "destination already up to date" (empty plan, non-empty source) from
105/// "empty source", and so each table can recognize a from-empty/resumed run
106/// (`append.len() == source_sessions`) and skip the per-session `IN` filter.
107#[derive(Debug, Clone, Default)]
108pub struct DeltaPlan {
109    pub sessions: TablePlan,
110    pub messages: TablePlan,
111    pub parts: TablePlan,
112    pub source_sessions: usize,
113}
114
115impl DeltaPlan {
116    pub fn is_empty(&self) -> bool {
117        self.sessions.is_empty() && self.messages.is_empty() && self.parts.is_empty()
118    }
119
120    /// Sessions whose own row is absent on the destination - the "new" count for
121    /// the plan receipt. Sessions never grow in row count (one immutable row
122    /// each), so the `sessions` table only ever appends.
123    pub fn new_sessions(&self) -> usize {
124        self.sessions.append.len()
125    }
126
127    /// Distinct sessions touched by the copy across all three tables - the
128    /// figure the progress bar totals against.
129    pub fn total(&self) -> usize {
130        let mut seen = std::collections::HashSet::new();
131        for plan in [&self.sessions, &self.messages, &self.parts] {
132            seen.extend(plan.append.iter());
133            seen.extend(plan.merge.iter());
134        }
135        seen.len()
136    }
137}
138
139#[derive(Debug, Clone, Default)]
140pub struct IndexIntents {
141    pub sessions: Vec<IndexIntent>,
142    pub messages: Vec<IndexIntent>,
143    pub parts: Vec<IndexIntent>,
144}
145
146impl IndexIntents {
147    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
148        [
149            (Table::Sessions, &self.sessions),
150            (Table::Messages, &self.messages),
151            (Table::Parts, &self.parts),
152        ]
153    }
154}
155
156/// A message awaiting embedding: its primary key plus the `search_text` to
157/// embed. The vector lives on the same `messages` row, so no denormalized
158/// filter columns are needed (spec.md#session-embed-from-canonical).
159#[derive(Debug, Clone, PartialEq)]
160pub struct PendingMessage {
161    pub session_id: String,
162    pub id: String,
163    pub search_text: String,
164}
165
166/// One embedded message: a primary key and the vector to store. `pond optimize`
167/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
168#[derive(Debug, Clone, PartialEq)]
169pub struct EmbeddedMessage {
170    pub session_id: String,
171    pub id: String,
172    pub vector: Vec<f32>,
173}
174
175/// Message metadata used to hydrate search hits after retriever ranking.
176#[derive(Debug, Clone, PartialEq)]
177pub struct MessageMeta {
178    pub message_id: String,
179    pub session_id: String,
180    pub role: String,
181    pub project: String,
182    pub source_agent: String,
183    pub timestamp: DateTime<Utc>,
184    pub search_text: String,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
188pub struct MessageKey {
189    pub session_id: String,
190    pub message_id: String,
191}
192
193/// One retrieval-arm hit. `rowid` is `Some` when the row meta map (or its
194/// take_rows miss-fallback) resolved a stable row id, which lets hydration
195/// `take_rows` the exact row instead of re-finding it with an `IN`-predicate
196/// scan; `None` on the no-map fallback path (local tests, pre-prewarm).
197#[derive(Debug, Clone, PartialEq)]
198pub struct SearchHit {
199    pub rowid: Option<u64>,
200    pub key: MessageKey,
201    pub score: f32,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub enum UpsertStatus {
206    Inserted,
207    Matched,
208}
209
210/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
211/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
212/// results so the CLI can render compaction-skipped (under writer contention)
213/// distinctly from index-build failure (real problem).
214#[derive(Debug, Default)]
215pub struct OptimizeOutcome {
216    pub tables: Vec<TableOptimizeOutcome>,
217}
218
219impl OptimizeOutcome {
220    /// True if any table's indices phase reported a non-conflict failure.
221    /// `SkippedConflict` is expected under contention and does not count.
222    pub fn any_indices_failed(&self) -> bool {
223        self.tables.iter().any(|t| t.indices.is_failed())
224    }
225
226    /// Treat any `Failed` phase as an error. Tests that don't run under
227    /// contention use this to keep their existing `.await?` style: a real
228    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
229    pub fn into_result(self) -> Result<Self> {
230        for table in &self.tables {
231            if let PhaseOutcome::Failed(error) = &table.indices {
232                anyhow::bail!(
233                    "indices phase failed on {}: {error:#}",
234                    table.table.as_str()
235                );
236            }
237            if let PhaseOutcome::Failed(error) = &table.compaction {
238                anyhow::bail!(
239                    "compaction phase failed on {}: {error:#}",
240                    table.table.as_str()
241                );
242            }
243        }
244        Ok(self)
245    }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249pub struct RowTotals {
250    pub sessions: u64,
251    pub messages: u64,
252    pub parts: u64,
253}
254
255/// Embedding coverage for `pond status` / `pond optimize`. `total` is the count of
256/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
257/// without `search_text` produce no vector. `embedded` is the subset of those
258/// already carrying a vector under the current [`embed::model_id()`]. `backlog`
259/// is the authoritative count still owed an embedding (`total - embedded` by
260/// construction), read live from the dataset rather than derived by subtracting
261/// the FTS `num_docs`, which over-counts deleted-but-unpurged docs and would
262/// otherwise report a phantom backlog that never clears.
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct EmbeddingProgress {
265    pub embedded: usize,
266    pub total: usize,
267    pub backlog: usize,
268    pub model: &'static str,
269}
270
271#[derive(Debug, Clone, Copy)]
272pub struct MessageWrite<'a> {
273    pub message: &'a Message,
274    pub parts: &'a [Part],
275    pub search_text: Option<&'a str>,
276}
277
278impl Store {
279    /// Open against a local filesystem URL or a remote one for which the
280    /// caller has no extra options to pass (env vars suffice). CLI verbs
281    /// that load `[storage]` from config should call
282    /// [`Store::open_with_options`] instead so the same options flow into
283    /// every dataset open and write.
284    pub async fn open(location: &Url) -> Result<Self> {
285        Ok(Self {
286            handle: Handle::open(location).await?,
287            rowmap: ArcSwapOption::empty(),
288        })
289    }
290
291    /// Live byte size of the shared Lance session caches (index + metadata).
292    /// Diagnostic only - walks the caches.
293    pub fn lance_cache_bytes(&self) -> u64 {
294        self.handle.lance_cache_bytes()
295    }
296
297    /// Open with object-store options (S3 creds, region, endpoint, ...)
298    /// threaded through Lance verbatim. Keys are the standard `object_store`
299    /// config names; pond does not parse them. Empty options + default caps
300    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
301    /// config block via [`crate::substrate::RuntimeCaps`].
302    pub async fn open_with_options(
303        location: &Url,
304        storage_options: std::collections::HashMap<String, String>,
305        caps: crate::substrate::RuntimeCaps,
306    ) -> Result<Self> {
307        Ok(Self {
308            handle: Handle::open_with_options(location, storage_options, caps).await?,
309            rowmap: ArcSwapOption::empty(),
310        })
311    }
312
313    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
314    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
315    /// through [`Store::open_with_options`] so the production policy is
316    /// applied; tests get the backend-aware local-FS defaults.
317    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
318        let url = config::url_for_path(path)?;
319        Self::open_with_options(
320            &url,
321            std::collections::HashMap::new(),
322            crate::substrate::RuntimeCaps::default(),
323        )
324        .await
325    }
326
327    /// Export clean, index-free Lance datasets into `dest`.
328    ///
329    /// This rewrites the visible rows of each table instead of copying the
330    /// dataset roots. The resulting manifests therefore contain no references
331    /// to the source store's `_indices`, while `messages.vector` and
332    /// `messages.embedding_model` remain ordinary data columns and are
333    /// preserved.
334    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
335        std::fs::create_dir_all(dest)
336            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
337        let (sessions, sessions_version) = self
338            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
339            .await?;
340        let (messages, messages_version) = self
341            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
342            .await?;
343        let (parts, parts_version) = self
344            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
345            .await?;
346        Ok(LanceArchiveExport {
347            rows: LanceArchiveCounts {
348                sessions,
349                messages,
350                parts,
351            },
352            source_versions: LanceArchiveVersions {
353                sessions: sessions_version,
354                messages: messages_version,
355                parts: parts_version,
356            },
357        })
358    }
359
360    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
361        let sessions_dataset =
362            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
363        let messages_dataset =
364            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
365        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
366        let (sessions, sessions_inserted) = self
367            .import_clean_table(Table::Sessions, sessions_dataset)
368            .await?;
369        let (messages, messages_inserted) = self
370            .import_clean_table(Table::Messages, messages_dataset)
371            .await?;
372        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
373        Ok(LanceArchiveImport {
374            rows: LanceArchiveCounts {
375                sessions,
376                messages,
377                parts,
378            },
379            inserted: LanceArchiveCounts {
380                sessions: sessions_inserted,
381                messages: messages_inserted,
382                parts: parts_inserted,
383            },
384        })
385    }
386
387    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
388        let dataset = self.handle.dataset(table).await?;
389        let source_version = dataset.version_id();
390        let schema = export_schema(table);
391        let mut scan = dataset.scan();
392        // The default scan projects blob columns as descriptor structs
393        // (position/size into the source's blob storage) - meaningless in an
394        // archive and unwritable at V2_1. `AllBinary` materializes the bytes
395        // so the rewritten table is self-contained.
396        scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
397        let mut stream = scan
398            .try_into_stream()
399            .await
400            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
401        let dest_uri = dest
402            .to_str()
403            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
404
405        let mut rows = 0usize;
406        let mut wrote = false;
407        while let Some(batch) = stream.next().await {
408            let batch = batch
409                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
410            rows += batch.num_rows();
411            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
412            let mut params = write_params_for_create();
413            if wrote {
414                params.mode = WriteMode::Append;
415            }
416            Dataset::write(reader, dest_uri, Some(params))
417                .await
418                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
419            wrote = true;
420        }
421
422        if !wrote {
423            let batch = RecordBatch::new_empty(schema.clone());
424            let reader = RecordBatchIterator::new([Ok(batch)], schema);
425            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
426                .await
427                .with_context(|| {
428                    format!("failed to write empty {} archive table", table.as_str())
429                })?;
430        }
431        Ok((rows, source_version))
432    }
433
434    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
435        // Force the destination table into existence up front: an empty
436        // archive table yields zero batches, so merge_insert alone would
437        // leave a lazily-created table (parts) missing on the destination.
438        let _ = self.handle.dataset(table).await?;
439        self.merge_scanner(table, dataset.scan(), "archive import")
440            .await
441    }
442
443    /// Stream a prepared source `scanner` into this store's `table` via
444    /// `merge_insert_stats`, materializing blob bytes (not descriptor structs)
445    /// so the merge writes them into the destination's own schema. Shared by
446    /// the archive-restore and store-to-store copy paths; `context` names the
447    /// caller in error messages. Returns (rows scanned, rows inserted).
448    async fn merge_scanner(
449        &self,
450        table: Table,
451        mut scanner: lance::dataset::scanner::Scanner,
452        context: &'static str,
453    ) -> Result<(usize, usize)> {
454        scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
455        let mut stream = scanner
456            .try_into_stream()
457            .await
458            .with_context(|| format!("failed to scan {} for {context}", table.as_str()))?;
459        let mut rows = 0usize;
460        let mut inserted = 0usize;
461        while let Some(batch) = stream.next().await {
462            let batch = batch
463                .with_context(|| format!("failed to read {} {context} batch", table.as_str()))?;
464            let row_count = batch.num_rows();
465            rows += row_count;
466            let stats = self
467                .handle
468                .merge_insert_stats(table, batch, row_count)
469                .await
470                .with_context(|| format!("failed to merge {} during {context}", table.as_str()))?;
471            inserted += (stats.num_inserted_rows + stats.num_updated_rows) as usize;
472        }
473        Ok((rows, inserted))
474    }
475
476    /// Per-session message count - the data-intrinsic freshness key for
477    /// incremental `pond copy`. pond is append-only (merge is
478    /// `WhenMatched::DoNothing`; no edits or deletes), so this count rises iff a
479    /// session gained messages, catching growth a `MAX(timestamp)` key would
480    /// miss when a new message shares the session's latest timestamp. The count
481    /// is source-authored and survives the copy unchanged, so it compares
482    /// soundly across two stores with independent clocks
483    /// (spec.md#session-durable-copy). Projects only
484    /// the one column it counts; resolves the `session_id` array once per batch
485    /// and allocates a key only on a session's first row. Distinct from
486    /// `session_message_counts`, which counts a supplied id list one query each;
487    /// this counts every session in a single scan.
488    pub async fn all_session_message_counts(&self) -> Result<HashMap<String, usize>> {
489        self.all_session_row_counts(Table::Messages).await
490    }
491
492    pub async fn all_session_part_counts(&self) -> Result<HashMap<String, usize>> {
493        self.all_session_row_counts(Table::Parts).await
494    }
495
496    /// Count rows per `session_id` across one table in a single scan, projecting
497    /// only the `session_id` column and allocating a key on a session's first
498    /// row. Both `messages` and `parts` lead their primary key with `session_id`
499    /// (`lance-table-creation-session-scoped-pk`).
500    async fn all_session_row_counts(&self, table: Table) -> Result<HashMap<String, usize>> {
501        let scanner = self
502            .handle
503            .scan(table, ScanOpts::project_only(&["session_id"]))
504            .await?;
505        let mut stream = scanner.try_into_stream().await?;
506        let mut out: HashMap<String, usize> = HashMap::new();
507        while let Some(batch) = stream.next().await {
508            let batch = batch?;
509            let session_ids = batch
510                .column_by_name("session_id")
511                .context("scan projection dropped the session_id column")?
512                .as_any()
513                .downcast_ref::<StringArray>()
514                .context("session_id column is not Utf8")?;
515            for row in 0..batch.num_rows() {
516                if session_ids.is_null(row) {
517                    continue;
518                }
519                let session_id = session_ids.value(row);
520                if let Some(count) = out.get_mut(session_id) {
521                    *count += 1;
522                } else {
523                    out.insert(session_id.to_owned(), 1);
524                }
525            }
526        }
527        Ok(out)
528    }
529
530    /// Plan an incremental store-to-store copy into `self` from `source`,
531    /// deciding **per table** whether each source session's rows can be appended
532    /// (the destination has none, so they cannot collide) or must be merged (the
533    /// destination has some, source has more). Reads both id-sets plus
534    /// per-session message and part counts. Parts have their own data-derived
535    /// signal so a part added under an existing message routes through merge
536    /// instead of relying on the closing verify to catch it
537    /// (spec.md#session-movement-complete).
538    pub async fn plan_incremental_from(&self, source: &Store) -> Result<DeltaPlan> {
539        let (
540            source_ids,
541            dest_ids,
542            source_msg_counts,
543            dest_msg_counts,
544            source_part_counts,
545            dest_part_counts,
546        ) = tokio::try_join!(
547            source.collect_ids(Table::Sessions),
548            self.collect_ids(Table::Sessions),
549            source.all_session_message_counts(),
550            self.all_session_message_counts(),
551            source.all_session_part_counts(),
552            self.all_session_part_counts(),
553        )?;
554        let source_sessions = source_ids.len();
555        let mut plan = DeltaPlan {
556            source_sessions,
557            ..DeltaPlan::default()
558        };
559        for id in &source_ids {
560            // The `sessions` table holds one immutable row per session, so it
561            // only ever appends an absent id - a present row is identical.
562            if !dest_ids.contains(id) {
563                plan.sessions.append.push(id.clone());
564            }
565            let source_msgs = source_msg_counts.get(id).copied().unwrap_or(0);
566            let dest_msgs = dest_msg_counts.get(id).copied().unwrap_or(0);
567            if dest_msgs == 0 {
568                if source_msgs > 0 {
569                    plan.messages.append.push(id.clone());
570                }
571            } else if source_msgs > dest_msgs {
572                plan.messages.merge.push(id.clone());
573            }
574            let source_parts = source_part_counts.get(id).copied().unwrap_or(0);
575            let dest_parts = dest_part_counts.get(id).copied().unwrap_or(0);
576            if dest_parts == 0 {
577                if source_parts > 0 {
578                    plan.parts.append.push(id.clone());
579                }
580            } else if source_parts > dest_parts {
581                plan.parts.merge.push(id.clone());
582            }
583        }
584        Ok(plan)
585    }
586
587    /// Copy the planned delta from `source` into `self`, streaming the source
588    /// scan straight into the destination - no local staging copy. Each table
589    /// picks its primitive per session from its [`TablePlan`]
590    /// (spec.md#session-durable-copy): **append** the sessions whose rows are
591    /// absent here (cannot collide; one commit per scan, bandwidth-bound) then
592    /// **merge** the partially-present ones (`WhenMatched::DoNothing` dedups the
593    /// rows already there). Append-only storage is what makes the append safe: a
594    /// re-run re-plans from current destination state, so an
595    /// interrupted-then-resumed copy never double-appends (landed rows are no
596    /// longer absent).
597    pub async fn copy_delta_from(
598        &self,
599        source: &Store,
600        plan: &DeltaPlan,
601    ) -> Result<LanceArchiveImport> {
602        // The three tables are independent Lance datasets with separate write
603        // locks, so copy them concurrently - mirrors the ingest path's
604        // three-table `try_join!` (see `upsert_session_batch`).
605        let ((sessions, sessions_inserted), (messages, messages_inserted), (parts, parts_inserted)) =
606            tokio::try_join!(
607                self.copy_table(
608                    source,
609                    Table::Sessions,
610                    "id",
611                    &plan.sessions,
612                    plan.source_sessions,
613                ),
614                self.copy_table(
615                    source,
616                    Table::Messages,
617                    "session_id",
618                    &plan.messages,
619                    plan.source_sessions,
620                ),
621                self.copy_table(
622                    source,
623                    Table::Parts,
624                    "session_id",
625                    &plan.parts,
626                    plan.source_sessions,
627                ),
628            )?;
629        Ok(LanceArchiveImport {
630            rows: LanceArchiveCounts {
631                sessions,
632                messages,
633                parts,
634            },
635            inserted: LanceArchiveCounts {
636                sessions: sessions_inserted,
637                messages: messages_inserted,
638                parts: parts_inserted,
639            },
640        })
641    }
642
643    /// Copy one table's slice of the plan: append the sessions whose rows are
644    /// absent here, then merge the ones whose rows are partially present.
645    /// Sequential within a table (one write lock); `copy_delta_from` runs the
646    /// three tables in parallel. Returns (rows transferred, rows inserted) -
647    /// equal for the append portion, which never dedups.
648    async fn copy_table(
649        &self,
650        source: &Store,
651        table: Table,
652        key_column: &'static str,
653        table_plan: &TablePlan,
654        source_sessions: usize,
655    ) -> Result<(usize, usize)> {
656        // Force the destination table into existence up front so a lazily
657        // created table (parts) is never left missing when its slice is empty -
658        // same reason as the archive import path.
659        let _ = self.handle.dataset(table).await?;
660
661        let appended = self
662            .append_sessions(
663                source,
664                table,
665                key_column,
666                &table_plan.append,
667                source_sessions,
668            )
669            .await?;
670
671        // Sessions with rows already on the destination take the merge path to
672        // skip the rows already there. Chunk the `IN` list so each chunk pushes
673        // down to the key column's btree index.
674        let mut merged_rows = 0usize;
675        let mut merged_inserted = 0usize;
676        for chunk in table_plan.merge.chunks(COPY_SESSION_IN_CHUNK) {
677            let predicate = in_predicate(key_column, chunk);
678            let scanner = source
679                .handle
680                .scan(
681                    table,
682                    ScanOpts {
683                        predicate: Some(&predicate),
684                        projection: None,
685                    },
686                )
687                .await?;
688            let (r, i) = self.merge_scanner(table, scanner, "copy").await?;
689            merged_rows += r;
690            merged_inserted += i;
691        }
692
693        Ok((appended + merged_rows, appended + merged_inserted))
694    }
695
696    /// Append one table's slice for the listed sessions. A from-empty or resumed
697    /// copy (`session_ids.len() == source_sessions`: every session's rows for
698    /// this table are absent on the destination) scans the source wholesale
699    /// under one commit; a partial copy chunks the `IN` predicate (btree-pushed)
700    /// but still commits once per chunk, not per scan batch. Returns rows
701    /// appended.
702    async fn append_sessions(
703        &self,
704        source: &Store,
705        table: Table,
706        key_column: &'static str,
707        session_ids: &[String],
708        source_sessions: usize,
709    ) -> Result<usize> {
710        if session_ids.is_empty() {
711            return Ok(0);
712        }
713        if session_ids.len() == source_sessions {
714            return self.append_scanner(source, table, None).await;
715        }
716        let mut rows = 0usize;
717        for chunk in session_ids.chunks(COPY_SESSION_IN_CHUNK) {
718            let predicate = in_predicate(key_column, chunk);
719            rows += self.append_scanner(source, table, Some(&predicate)).await?;
720        }
721        Ok(rows)
722    }
723
724    /// Append a prepared source scan into this store's `table` via
725    /// `Handle::append_stream`, materializing blob bytes (`AllBinary`) so the
726    /// write is self-contained. The closure is a *factory*: `append_stream`
727    /// rebuilds the one-shot scan stream on each OCC attempt. Returns rows
728    /// appended.
729    async fn append_scanner(
730        &self,
731        source: &Store,
732        table: Table,
733        predicate: Option<&Predicate>,
734    ) -> Result<usize> {
735        let make_source = || async {
736            let mut scanner = source
737                .handle
738                .scan(
739                    table,
740                    ScanOpts {
741                        predicate,
742                        projection: None,
743                    },
744                )
745                .await?;
746            scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
747            let stream: SendableRecordBatchStream = scanner
748                .try_into_stream()
749                .await
750                .with_context(|| format!("failed to scan {} for copy", table.as_str()))?
751                .into();
752            Ok(stream)
753        };
754        let stats = self.handle.append_stream(table, make_source).await?;
755        Ok(stats.rows as usize)
756    }
757
758    /// Append source rows whose `filter_column` is in `values`. Absent rows
759    /// can't collide, so append is safe where the count-based plan would merge
760    /// (spec.md#session-durable-copy). Drives the `copy_bench` append-vs-merge
761    /// regression guard.
762    pub async fn append_absent_rows(
763        &self,
764        source: &Store,
765        table: Table,
766        filter_column: &'static str,
767        values: &[String],
768    ) -> Result<usize> {
769        if values.is_empty() {
770            return Ok(0);
771        }
772        let _ = self.handle.dataset(table).await?;
773        let mut rows = 0usize;
774        for chunk in values.chunks(COPY_SESSION_IN_CHUNK) {
775            let predicate = in_predicate(filter_column, chunk);
776            rows += self.append_scanner(source, table, Some(&predicate)).await?;
777        }
778        Ok(rows)
779    }
780
781    /// Flat write path. Per-row insert/match truth is not synthesized here -
782    /// honest outcomes come from the pre-existence scan on
783    /// [`Self::upsert_session_batch`]; the CLI sync and wire ingest paths use
784    /// that, so these helpers only need to surface write failure.
785    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
786        if sessions.is_empty() {
787            return Ok(());
788        }
789        let batches = sessions_batches(sessions)?;
790        merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
791        Ok(())
792    }
793
794    /// Batched write path used by the adapter ingest loop and by the wire
795    /// handler's final flush. Receives N completed substreams from the
796    /// validator and:
797    ///
798    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
799    ///      per session, sequentially. Sessions that fail produce one Error
800    ///      outcome and are excluded from the write batch.
801    ///   2. Deduplicates in-batch at the substream level: when two substreams
802    ///      in the same batch share a `session_id` (Claude Code's subagent
803    ///      files reuse their parent's id), the first occurrence wins. The
804    ///      second is either *merged* (same `source_agent` + `project`:
805    ///      messages/parts append, no duplicate rows) or *rejected*
806    ///      (different `project` - the subagent-vs-parent case). Row-level
807    ///      duplicates that slip past here are caught downstream by Lance's
808    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
809    ///      (invariant 17): this layer's job is preserving substream merge
810    ///      semantics, not policing the PK uniqueness Lance handles itself.
811    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
812    ///      parts) across every valid substream.
813    ///   4. Commits messages + parts first, then sessions. The session row is
814    ///      the freshness-bearing row; writing it last makes a partial
815    ///      non-atomic flush re-ingest and heal (spec.md#session-movement-complete).
816    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
817    async fn upsert_session_batch(
818        &self,
819        substreams: Vec<CompletedSubstream>,
820    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
821        if substreams.is_empty() {
822            return Ok((Vec::new(), BatchCounts::default()));
823        }
824
825        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
826        let mut counts = BatchCounts::default();
827
828        // In-batch dedup. First occurrence of each session_id wins; later
829        // occurrences either merge or get rejected. Iteration order preserves
830        // original substream order so outcomes index correctly.
831        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
832        let mut by_session_id: std::collections::HashMap<String, usize> =
833            std::collections::HashMap::with_capacity(substreams.len());
834        for substream in substreams {
835            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
836                let existing = &merged[existing_idx];
837                if existing.session.source_agent != substream.session.source_agent
838                    || existing.session.project != substream.session.project
839                {
840                    // Subagent-vs-parent class. The first occurrence's
841                    // metadata stays authoritative; this substream is
842                    // rejected on the same immutable-field axis as the
843                    // storage-side check.
844                    let reason = if existing.session.source_agent != substream.session.source_agent
845                    {
846                        IngestError::ImmutableField {
847                            field: "source_agent",
848                            session_id: substream.session.id.clone(),
849                            stored: existing.session.source_agent.clone(),
850                            attempted: substream.session.source_agent.clone(),
851                        }
852                    } else {
853                        IngestError::ImmutableField {
854                            field: "project",
855                            session_id: substream.session.id.clone(),
856                            stored: (*existing.session.project).clone(),
857                            attempted: (*substream.session.project).clone(),
858                        }
859                    };
860                    let field = match &reason {
861                        IngestError::ImmutableField { field, .. } => Some(*field),
862                    };
863                    let reason_key = match field {
864                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
865                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
866                        _ => DROP_REASON_UNCATEGORIZED,
867                    };
868                    outcomes.extend(error_outcomes_for_substream(
869                        substream.session_index,
870                        &substream.session,
871                        &substream.messages,
872                        reason.to_string(),
873                        field,
874                        reason_key,
875                    ));
876                    continue;
877                }
878                // Same session, same metadata: merge messages. Dedup message
879                // ids defensively (within one batch, the validator's seen
880                // sets are per-substream so cross-substream dups can happen
881                // legally if both files re-emit the same row).
882                let existing = &mut merged[existing_idx];
883                let mut seen: std::collections::HashSet<String> = existing
884                    .messages
885                    .iter()
886                    .map(|m| m.message.id().to_owned())
887                    .collect();
888                for msg in substream.messages {
889                    if seen.insert(msg.message.id().to_owned()) {
890                        existing.messages.push(msg);
891                    }
892                }
893                continue;
894            }
895            by_session_id.insert(substream.session.id.clone(), merged.len());
896            merged.push(substream);
897        }
898
899        // Pre-existence sweep: one scan per table keyed on the batch's
900        // session_ids, capped at the substream count. Replaces the prior
901        // N-sequential `find_session` calls and gives us honest per-row
902        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
903        let session_id_values: Vec<ScalarValue> = merged
904            .iter()
905            .map(|substream| ScalarValue::String(substream.session.id.clone()))
906            .collect();
907        let existing_sessions: std::collections::HashMap<String, Session> =
908            if session_id_values.is_empty() {
909                std::collections::HashMap::new()
910            } else {
911                let batch = self
912                    .handle
913                    .scan_batch(
914                        Table::Sessions,
915                        Some(&Predicate::In("id", session_id_values.clone())),
916                        &[],
917                    )
918                    .await?;
919                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
920                for row in 0..batch.num_rows() {
921                    let session = session_from_batch(&batch, row)?;
922                    map.insert(session.id.clone(), session);
923                }
924                map
925            };
926        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
927            HashSet::new()
928        } else {
929            let batch = self
930                .handle
931                .scan_batch(
932                    Table::Messages,
933                    Some(&Predicate::In("session_id", session_id_values.clone())),
934                    &["session_id", "id"],
935                )
936                .await?;
937            let mut set = HashSet::with_capacity(batch.num_rows());
938            for row in 0..batch.num_rows() {
939                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
940                let mid = string(&batch, "id", row)?.context("message id is null")?;
941                set.insert((sid, mid));
942            }
943            set
944        };
945        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
946            HashSet::new()
947        } else {
948            let batch = self
949                .handle
950                .scan_batch(
951                    Table::Parts,
952                    Some(&Predicate::In("session_id", session_id_values)),
953                    &["session_id", "message_id", "id"],
954                )
955                .await?;
956            let mut set = HashSet::with_capacity(batch.num_rows());
957            for row in 0..batch.num_rows() {
958                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
959                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
960                let pid = string(&batch, "id", row)?.context("part id is null")?;
961                set.insert((sid, mid, pid));
962            }
963            set
964        };
965
966        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
967        for substream in merged {
968            if let Some(existing) = existing_sessions.get(&substream.session.id)
969                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
970            {
971                let field = match &failure {
972                    IngestError::ImmutableField { field, .. } => Some(*field),
973                };
974                let reason_key = match field {
975                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
976                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
977                    _ => DROP_REASON_UNCATEGORIZED,
978                };
979                outcomes.extend(error_outcomes_for_substream(
980                    substream.session_index,
981                    &substream.session,
982                    &substream.messages,
983                    failure.to_string(),
984                    field,
985                    reason_key,
986                ));
987                continue;
988            }
989            writeable.push(substream);
990        }
991
992        if writeable.is_empty() {
993            outcomes.sort_by_key(|outcome| outcome.index);
994            return Ok((outcomes, counts));
995        }
996
997        let sessions_owned: Vec<Session> = writeable
998            .iter()
999            .map(|substream| substream.session.clone())
1000            .collect();
1001        // Append only rows absent from the pre-existence sweep; present rows are
1002        // merge no-ops (spec.md#adapter-integrity-additive-sync). `seen_*` carries
1003        // the in-batch dedup floor (spec.md#adapter-integrity-dedup).
1004        let mut seen_messages: HashSet<(String, String)> = HashSet::new();
1005        let message_rows: Vec<MessageBatchRow<'_>> = writeable
1006            .iter()
1007            .flat_map(|substream| {
1008                substream.messages.iter().map(|buffered| MessageBatchRow {
1009                    message: &buffered.message,
1010                    source_agent: &substream.session.source_agent,
1011                    project: &substream.session.project,
1012                    search_text: buffered.search_text.as_deref(),
1013                })
1014            })
1015            .filter(|row| {
1016                let key = (
1017                    row.message.session_id().to_owned(),
1018                    row.message.id().to_owned(),
1019                );
1020                !existing_message_pks.contains(&key) && seen_messages.insert(key)
1021            })
1022            .collect();
1023        let mut seen_parts: HashSet<(String, String, String)> = HashSet::new();
1024        let part_rows: Vec<Part> = writeable
1025            .iter()
1026            .flat_map(|substream| {
1027                substream.messages.iter().flat_map(|buffered| {
1028                    buffered
1029                        .parts
1030                        .iter()
1031                        .map(|buffered_part| buffered_part.part.clone())
1032                })
1033            })
1034            .filter(|part| {
1035                let key = (
1036                    part.session_id.clone(),
1037                    part.message_id.clone(),
1038                    part.id.clone(),
1039                );
1040                !existing_part_pks.contains(&key) && seen_parts.insert(key)
1041            })
1042            .collect();
1043
1044        let session_batches = sessions_batches(&sessions_owned)?;
1045        let message_batches = messages_batches(&message_rows)?;
1046        let part_batches = parts_batches(&part_rows)?;
1047
1048        let (_messages_appended, _parts_appended) = tokio::try_join!(
1049            self.handle.append_batches(Table::Messages, message_batches),
1050            self.handle.append_batches(Table::Parts, part_batches),
1051        )?;
1052        let _sessions_inserted =
1053            merge_insert_chunks(&self.handle, Table::Sessions, session_batches).await?;
1054
1055        for substream in &writeable {
1056            outcomes.extend(success_outcomes_for_substream(
1057                substream.session_index,
1058                &substream.session,
1059                &substream.messages,
1060                &existing_sessions,
1061                &existing_message_pks,
1062                &existing_part_pks,
1063                &mut counts,
1064            ));
1065        }
1066
1067        outcomes.sort_by_key(|outcome| outcome.index);
1068        Ok((outcomes, counts))
1069    }
1070
1071    pub async fn upsert_messages(
1072        &self,
1073        session: &Session,
1074        messages: &[MessageWrite<'_>],
1075    ) -> Result<()> {
1076        if messages.is_empty() {
1077            return Ok(());
1078        }
1079
1080        let rows = messages
1081            .iter()
1082            .map(|write| MessageBatchRow {
1083                message: write.message,
1084                source_agent: &session.source_agent,
1085                project: &session.project,
1086                search_text: write.search_text,
1087            })
1088            .collect::<Vec<_>>();
1089        let batches = messages_batches(&rows)?;
1090        merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
1091        Ok(())
1092    }
1093
1094    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
1095        if parts.is_empty() {
1096            return Ok(());
1097        }
1098        let batches = parts_batches(parts)?;
1099        merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
1100        Ok(())
1101    }
1102
1103    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
1104        let Some(session) = self.find_session(session_id).await? else {
1105            return Ok(None);
1106        };
1107        let messages = self.messages_for_session(session_id).await?;
1108        Ok(Some(SessionWithMessages { session, messages }))
1109    }
1110
1111    /// Every session id currently in the store, unsorted.
1112    pub async fn session_ids(&self) -> Result<Vec<String>> {
1113        let batch = self
1114            .handle
1115            .scan_batch(Table::Sessions, None, &["id"])
1116            .await?;
1117        let mut ids = Vec::with_capacity(batch.num_rows());
1118        for row in 0..batch.num_rows() {
1119            if let Some(id) = string(&batch, "id", row)? {
1120                ids.push(id);
1121            }
1122        }
1123        Ok(ids)
1124    }
1125
1126    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
1127        let batch = self
1128            .handle
1129            .scan_batch(
1130                Table::Sessions,
1131                Some(&Predicate::Eq(
1132                    "parent_session_id",
1133                    parent_session_id.into(),
1134                )),
1135                &[
1136                    "id",
1137                    "parent_session_id",
1138                    "parent_message_id",
1139                    "source_agent",
1140                    "created_at",
1141                    "project",
1142                    "options",
1143                ],
1144            )
1145            .await?;
1146        let mut sessions = Vec::with_capacity(batch.num_rows());
1147        for row in 0..batch.num_rows() {
1148            sessions.push(session_from_batch(&batch, row)?);
1149        }
1150        sessions.sort_by(|left, right| left.id.cmp(&right.id));
1151        Ok(sessions)
1152    }
1153
1154    /// `session_id -> last durable message id` for the sync freshness gate.
1155    /// Scans stored message data only, never Lance version history:
1156    /// `Dataset::versions()` is remote-manifest-bound on object stores, and a
1157    /// write timestamp can exist even when a non-atomic ingest did not commit
1158    /// the messages (spec.md#session-movement-complete).
1159    ///
1160    /// Only emits a key when the session row is ALSO durable. `upsert_session_batch`
1161    /// commits messages+parts before the session row, so a partial flush can leave
1162    /// a session whose messages are stored but whose session row is not; keying on
1163    /// messages alone would report it fresh and orphan the missing row. Intersecting
1164    /// with the sessions id-set forces a re-ingest that heals it
1165    /// (spec.md#session-movement-complete).
1166    pub async fn session_last_message_ids(&self) -> Result<HashMap<String, String>> {
1167        let (session_ids, latest) = tokio::try_join!(self.collect_ids(Table::Sessions), async {
1168            let scanner = self
1169                .handle
1170                .scan(
1171                    Table::Messages,
1172                    ScanOpts::project_only(&["session_id", "id", "timestamp"]),
1173                )
1174                .await?;
1175            let mut stream = scanner.try_into_stream().await?;
1176            let mut latest: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
1177            while let Some(batch) = stream.next().await {
1178                let batch = batch?;
1179                let session_ids = batch
1180                    .column_by_name("session_id")
1181                    .context("scan projection dropped the session_id column")?
1182                    .as_any()
1183                    .downcast_ref::<StringArray>()
1184                    .context("session_id column is not Utf8")?;
1185                for row in 0..batch.num_rows() {
1186                    if session_ids.is_null(row) {
1187                        continue;
1188                    }
1189                    let session_id = session_ids.value(row);
1190                    let Some(id) = string(&batch, "id", row)? else {
1191                        continue;
1192                    };
1193                    let timestamp = datetime(&batch, "timestamp", row)?;
1194                    match latest.get_mut(session_id) {
1195                        Some((stored_ts, stored_id))
1196                            if timestamp > *stored_ts
1197                                || (timestamp == *stored_ts
1198                                    && id.as_str() > stored_id.as_str()) =>
1199                        {
1200                            *stored_ts = timestamp;
1201                            *stored_id = id;
1202                        }
1203                        None => {
1204                            latest.insert(session_id.to_owned(), (timestamp, id));
1205                        }
1206                        _ => {}
1207                    }
1208                }
1209            }
1210            Ok::<_, anyhow::Error>(latest)
1211        })?;
1212        Ok(latest
1213            .into_iter()
1214            .filter(|(session_id, _)| session_ids.contains(session_id))
1215            .map(|(session_id, (_, message_id))| (session_id, message_id))
1216            .collect())
1217    }
1218
1219    /// Whole-session view for `pond_get` session scope (spec.md#protocol).
1220    /// Always the conversational view (`search_text IS NOT NULL`) with one-line
1221    /// part summaries - full part bodies are reached by `message_id` scope, not
1222    /// here. The page is the window selected by the anchors (`after_message_id`
1223    /// pages forward, `before_message_id` pages backward) or, with neither,
1224    /// `session_from` (start/end); it is bounded by `limit` and a byte budget,
1225    /// never cutting mid-message. `before_remaining`/`after_remaining` drive the
1226    /// bidirectional page markers.
1227    pub async fn session_view(
1228        &self,
1229        session_id: &str,
1230        params: SessionViewParams<'_>,
1231    ) -> Result<GetLookup<SessionPage>> {
1232        let Some(session) = self.find_session(session_id).await? else {
1233            return Ok(GetLookup::NotFound);
1234        };
1235        let mut rows: Vec<ScanRow> = self
1236            .scan_conversational_messages(session_id)
1237            .await?
1238            .into_iter()
1239            .map(|row| ScanRow {
1240                id: row.message_id,
1241                role: row.role,
1242                timestamp: row.timestamp,
1243                text: Some(row.text.into_inner()),
1244                content: None,
1245            })
1246            .collect();
1247        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1248
1249        let size = |row: &ScanRow| row.text.as_deref().map_or(0, str::len);
1250        let total = rows.len();
1251        // Append-only stream: a real anchor never vanishes, so an unknown
1252        // anchor is a stale/mistyped client cursor, not "start over".
1253        let (win_start, win_end) = match (params.after_message_id, params.before_message_id) {
1254            (Some(after), _) => {
1255                let pos = match rows.iter().position(|row| row.id == after) {
1256                    Some(idx) => idx + 1,
1257                    None => return Ok(GetLookup::UnknownAnchor),
1258                };
1259                let n = page_by(&rows[pos..], params.limit, params.budget_bytes, size);
1260                (pos, pos + n)
1261            }
1262            (None, Some(before)) => {
1263                let pos = match rows.iter().position(|row| row.id == before) {
1264                    Some(idx) => idx,
1265                    None => return Ok(GetLookup::UnknownAnchor),
1266                };
1267                let n = page_tail(&rows[..pos], params.limit, params.budget_bytes, size);
1268                (pos - n, pos)
1269            }
1270            (None, None) => match params.session_from {
1271                SessionFrom::Start => (0, page_by(&rows, params.limit, params.budget_bytes, size)),
1272                SessionFrom::End => {
1273                    let n = page_tail(&rows, params.limit, params.budget_bytes, size);
1274                    (total - n, total)
1275                }
1276            },
1277        };
1278        let emitted = &rows[win_start..win_end];
1279        let before_remaining = win_start;
1280        let after_remaining = total - win_end;
1281        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
1282
1283        let mut parts_by_message = self.summary_parts_for_messages(session_id, &ids).await?;
1284        let messages = emitted
1285            .iter()
1286            .map(|row| RetrievedMessage {
1287                id: row.id.clone(),
1288                role: row.role,
1289                timestamp: row.timestamp,
1290                text: row.text.clone(),
1291                content: row.content.clone(),
1292                parts: parts_by_message
1293                    .remove(&(session_id.to_owned(), row.id.clone()))
1294                    .unwrap_or_default(),
1295            })
1296            .collect();
1297
1298        Ok(GetLookup::Found(SessionPage {
1299            session,
1300            messages,
1301            before_remaining,
1302            after_remaining,
1303        }))
1304    }
1305
1306    /// Message-scope retrieval for `pond_get` message scope (spec.md#protocol):
1307    /// the target with its full parts (budget-bounded) plus `context_before`
1308    /// conversational siblings before and `context_after` after it. `NotFound`
1309    /// when no stored message carries `message_id`. Sibling parts are carried
1310    /// for summarizing; the target's parts ride `target_parts`.
1311    pub async fn message_view(
1312        &self,
1313        message_id: &str,
1314        params: MessageViewParams,
1315    ) -> Result<GetLookup<MessagePage>> {
1316        let Some(session_id) = self.session_id_for_message(message_id).await? else {
1317            return Ok(GetLookup::NotFound);
1318        };
1319        let Some(session) = self.find_session(&session_id).await? else {
1320            return Ok(GetLookup::NotFound);
1321        };
1322        let mut rows = self.scan_all_messages(&session_id).await?;
1323        // Siblings are always the conversational view: in carrier-heavy sessions
1324        // the system/tool rows would otherwise fill the whole window and push
1325        // the actual conversation out of it. The target stays regardless of its
1326        // own role - the caller asked for that message.
1327        rows.retain(|row| row.text.is_some() || row.id == message_id);
1328        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1329        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
1330            return Ok(GetLookup::NotFound);
1331        };
1332
1333        let start = target_pos.saturating_sub(params.context_before);
1334        let end = (target_pos + params.context_after + 1).min(rows.len());
1335        let window = &rows[start..end];
1336        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
1337        // The target's full parts (blobs included) ride the response; siblings
1338        // are only summarized, but they share this one window scan.
1339        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
1340
1341        let all_parts = parts_by_message
1342            .remove(&(session_id.clone(), message_id.to_owned()))
1343            .unwrap_or_default();
1344        // Target parts are budget-bounded (no per-part pagination cursor). The
1345        // 1000 cap is the page_by hard ceiling; the budget is the real bound.
1346        let part_count = page_by(&all_parts, 1000, params.budget_bytes, |part| {
1347            serde_json::to_string(part).map_or(0, |json| json.len())
1348        });
1349        let target_parts = all_parts[..part_count].to_vec();
1350        let target_parts_remaining = all_parts.len() - part_count;
1351
1352        let target_row = &rows[target_pos];
1353        let target = RetrievedMessage {
1354            id: target_row.id.clone(),
1355            role: target_row.role,
1356            timestamp: target_row.timestamp,
1357            text: target_row.text.clone(),
1358            content: target_row.content.clone(),
1359            // Target structure is carried in full by `target_parts`.
1360            parts: Vec::new(),
1361        };
1362        let siblings = window
1363            .iter()
1364            .enumerate()
1365            .filter(|(idx, _)| start + idx != target_pos)
1366            .map(|(_, row)| RetrievedMessage {
1367                id: row.id.clone(),
1368                role: row.role,
1369                timestamp: row.timestamp,
1370                text: row.text.clone(),
1371                content: row.content.clone(),
1372                parts: parts_by_message
1373                    .get(&(session_id.clone(), row.id.clone()))
1374                    .cloned()
1375                    .unwrap_or_default(),
1376            })
1377            .collect();
1378
1379        Ok(GetLookup::Found(MessagePage {
1380            session,
1381            target,
1382            target_parts,
1383            target_parts_remaining,
1384            siblings,
1385        }))
1386    }
1387
1388    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1389        let batch = self
1390            .handle
1391            .scan_batch(
1392                Table::Messages,
1393                Some(&Predicate::Eq("session_id", session_id.into())),
1394                &["id", "timestamp", "role", "search_text", "content"],
1395            )
1396            .await?;
1397        let mut rows = Vec::with_capacity(batch.num_rows());
1398        for row in 0..batch.num_rows() {
1399            let id = string(&batch, "id", row)?.context("message id is null")?;
1400            let role =
1401                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1402            let timestamp = datetime(&batch, "timestamp", row)?;
1403            rows.push(ScanRow {
1404                id,
1405                role,
1406                timestamp,
1407                text: string(&batch, "search_text", row)?,
1408                content: string(&batch, "content", row)?,
1409            });
1410        }
1411        Ok(rows)
1412    }
1413
1414    /// Conversational scan over one session: rows ordered by
1415    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1416    /// read seam (spec.md#search-prefilter-pushdown).
1417    pub async fn scan_conversational_messages(
1418        &self,
1419        session_id: &str,
1420    ) -> Result<Vec<ConversationalRow>> {
1421        let filter = Predicate::And(vec![
1422            Predicate::Eq("session_id", session_id.into()),
1423            Predicate::IsNotNull("search_text"),
1424        ]);
1425        let batch = self
1426            .handle
1427            .scan_batch(
1428                Table::Messages,
1429                Some(&filter),
1430                &["id", "timestamp", "role", "search_text"],
1431            )
1432            .await?;
1433
1434        let mut rows = Vec::with_capacity(batch.num_rows());
1435        for row in 0..batch.num_rows() {
1436            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1437            let role =
1438                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1439            let timestamp = datetime(&batch, "timestamp", row)?;
1440            let text_str = string(&batch, "search_text", row)?.context(
1441                "search_text null after IsNotNull pushdown - storage invariant violated",
1442            )?;
1443            rows.push(ConversationalRow {
1444                session_id: session_id.to_owned(),
1445                message_id,
1446                role,
1447                timestamp,
1448                text: SearchText(text_str),
1449            });
1450        }
1451        rows.sort_by(|a, b| {
1452            a.timestamp
1453                .cmp(&b.timestamp)
1454                .then_with(|| a.message_id.cmp(&b.message_id))
1455        });
1456        Ok(rows)
1457    }
1458
1459    /// Locate the session id for a stored message. Cheap when only the routing
1460    /// hint is needed - callers that need the messages use `scan_all_messages`.
1461    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1462        let batch = self
1463            .handle
1464            .scan_batch(
1465                Table::Messages,
1466                Some(&Predicate::Eq("id", message_id.into())),
1467                &["session_id"],
1468            )
1469            .await?;
1470        if batch.num_rows() == 0 {
1471            return Ok(None);
1472        }
1473        string(&batch, "session_id", 0)
1474    }
1475
1476    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1477        self.handle.row_counts().await
1478    }
1479
1480    /// The primary-key (`id`) set for `table`. Powers storage verification
1481    /// (`pond copy --verify-only` and copy's closing check).
1482    pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
1483        self.handle.collect_ids(table).await
1484    }
1485
1486    /// Stream `table`'s `id` column and return `(rows_scanned, rows whose id is
1487    /// absent from `present`)`. Streaming means the scanned side is never
1488    /// materialized into a set, so a verify holds only the `present` (other)
1489    /// side per table instead of both.
1490    pub async fn id_diff_against(
1491        &self,
1492        table: Table,
1493        present: &std::collections::HashSet<String>,
1494    ) -> Result<(usize, usize)> {
1495        let scanner = self
1496            .handle
1497            .scan(table, ScanOpts::project_only(&["id"]))
1498            .await?;
1499        let mut stream = scanner.try_into_stream().await?;
1500        let (mut rows, mut absent) = (0usize, 0usize);
1501        while let Some(batch) = stream.next().await {
1502            let batch = batch?;
1503            let ids = batch
1504                .column_by_name("id")
1505                .context("scan projection dropped the id column")?
1506                .as_any()
1507                .downcast_ref::<StringArray>()
1508                .context("id column is not Utf8")?;
1509            for id in ids.iter().flatten() {
1510                rows += 1;
1511                if !present.contains(id) {
1512                    absent += 1;
1513                }
1514            }
1515        }
1516        Ok((rows, absent))
1517    }
1518
1519    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1520    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1521    /// handle's freshness gate, so each query sees a current snapshot.
1522    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1523        Ok(Arc::new(self.handle.dataset(table).await?))
1524    }
1525
1526    /// Page the heavy search indices in from storage so the first user query
1527    /// after process start never eats the 175-442 s cold S3 load
1528    /// (spec.md#search). Vector via `prewarm_index` (loads the IVF_SQ partition
1529    /// storage). FTS is warmed with one synthetic query: Lance's full FTS
1530    /// `prewarm_index` loads *every* token's posting list, which resident-sets
1531    /// the whole inverted index (~1.7 GiB on the 2M-row corpus) and blows the
1532    /// server RAM budget - so we settle the term dictionary + a hot token
1533    /// instead and let real queries page their own postings. Best effort: a
1534    /// missing index (IVF_SQ below activation, or no FTS yet on an empty store)
1535    /// is logged, not fatal.
1536    pub async fn prewarm(&self, cache_dir: &Path) -> Result<()> {
1537        let messages = self.dataset(Table::Messages).await?;
1538        if let Err(error) = messages.prewarm_index(MESSAGES_VECTOR_INDEX).await {
1539            tracing::debug!(%error, "vector index prewarm skipped");
1540        }
1541        // Best-effort: on failure `rowmap` stays empty and the arms fall back to
1542        // the data-take path, so search still works (slower on a remote store).
1543        if let Err(error) = self.ensure_rowmap(cache_dir).await {
1544            tracing::warn!(%error, "rowmap build skipped; arms fall back to data-take resolution");
1545        }
1546        // Warm the FTS posting lists; the rowmap build above touched only the
1547        // data columns.
1548        if let Err(error) = self
1549            .fts_search("pond", 1, &Predicate::And(Vec::new()))
1550            .await
1551        {
1552            tracing::debug!(%error, "fts index prewarm skipped");
1553        }
1554        Ok(())
1555    }
1556
1557    /// Stable filesystem-safe cache key: same store URL -> same key, so sibling
1558    /// pond processes share one map file and distinct stores never collide.
1559    fn store_key(&self) -> String {
1560        let digest = blake3::hash(self.handle.location().as_str().as_bytes());
1561        digest.to_hex()[..16].to_owned()
1562    }
1563
1564    /// Max delta segments before the chain is compacted into a fresh base.
1565    const MAX_ROWMAP_DELTAS: usize = 16;
1566
1567    /// Columns the resident meta map is built from. The full scan and the delta
1568    /// scan MUST project the same set in the same order - both feed
1569    /// [`row_meta_entry`], so a column added to one only would silently corrupt
1570    /// delta hydration.
1571    const ROW_META_COLUMNS: [&str; 7] = [
1572        "session_id",
1573        "id",
1574        "role",
1575        "project",
1576        "source_agent",
1577        "timestamp",
1578        "search_text",
1579    ];
1580
1581    /// Install the resident meta map covering the current `messages` version.
1582    /// Idempotent - a chain already at that version is kept. On a version bump
1583    /// it layers a delta segment (scanning only the new fragments), compacts the
1584    /// deltas locally once they pile up, and full-rebuilds the base only on a
1585    /// store compaction - all under a build `flock` so N local processes don't
1586    /// rescan the store at once.
1587    pub async fn ensure_rowmap(&self, cache_dir: &Path) -> Result<()> {
1588        let version = self.messages_version().await?;
1589        if let Some(current) = self.rowmap.load_full()
1590            && current.version() == version
1591        {
1592            return Ok(());
1593        }
1594        std::fs::create_dir_all(cache_dir)
1595            .with_context(|| format!("create cache dir {}", cache_dir.display()))?;
1596        let store_key = self.store_key();
1597
1598        // A sibling may already have published a chain at this version; install
1599        // it without rebuilding.
1600        if let Some(chain) = discover_chain(cache_dir, &store_key)
1601            && chain.version() == version
1602            && let Ok(set) = RowMetaSet::open(&chain)
1603        {
1604            self.rowmap.store(Some(Arc::new(set)));
1605            Self::sweep_stale_rowmaps(cache_dir, &store_key, chain.base_version);
1606            return Ok(());
1607        }
1608        if let Some(set) = self
1609            .extend_rowmap_coordinated(cache_dir, &store_key, version)
1610            .await?
1611        {
1612            self.rowmap.store(Some(Arc::new(set)));
1613        }
1614        Ok(())
1615    }
1616
1617    /// Extend the chain to `version` under the build `flock` (spec: lock the
1618    /// build only; atomic rename already prevents corruption). `None` when
1619    /// another local process holds the lock - this caller keeps its current map
1620    /// (or the take_rows fallback) until a later refresh opens what the winner
1621    /// published.
1622    async fn extend_rowmap_coordinated(
1623        &self,
1624        cache_dir: &Path,
1625        store_key: &str,
1626        version: u64,
1627    ) -> Result<Option<RowMetaSet>> {
1628        let lock_path = cache_dir.join(format!("rowmetamap-{store_key}.lock"));
1629        let lock = std::fs::File::create(&lock_path)
1630            .with_context(|| format!("create rowmap build lock {}", lock_path.display()))?;
1631        match lock.try_lock() {
1632            Ok(()) => {}
1633            Err(std::fs::TryLockError::WouldBlock) => return Ok(None),
1634            Err(std::fs::TryLockError::Error(error)) => {
1635                return Err(error).context("lock rowmap build");
1636            }
1637        }
1638
1639        // Re-check after acquiring: a sibling may have published `version`. An
1640        // open failure here (older MAGIC after an upgrade, or corruption) falls
1641        // through to the purge+rebuild below rather than erroring.
1642        if let Some(chain) = discover_chain(cache_dir, store_key)
1643            && chain.version() == version
1644            && let Ok(set) = RowMetaSet::open(&chain)
1645        {
1646            return Ok(Some(set));
1647        }
1648
1649        // Holding the lock makes us the only builder, so every build temp is a
1650        // dead orphan from a crashed build - clear them before writing ours.
1651        Self::sweep_orphan_temps(cache_dir, store_key);
1652
1653        // Validate any existing chain opens; an unreadable segment (an older
1654        // MAGIC after a pond upgrade, or a corrupt file) is purged so the build
1655        // below is a clean full rebuild instead of erroring forever or appending
1656        // a fresh delta onto an unreadable base.
1657        let mut chain = discover_chain(cache_dir, store_key);
1658        if let Some(existing) = &chain
1659            && let Err(error) = RowMetaSet::open(existing)
1660        {
1661            tracing::warn!(%error, store = store_key, "rowmap unreadable; purging and rebuilding");
1662            Self::purge_rowmaps(cache_dir, store_key);
1663            chain = None;
1664        }
1665        // A pure-append delta scan (None on store compaction) decides the path.
1666        let delta = match &chain {
1667            Some(existing) => self.collect_row_metas_delta(existing.version()).await?,
1668            None => None,
1669        };
1670
1671        let base_version = match (&chain, delta) {
1672            // Pure append with room: layer a new delta segment.
1673            (Some(existing), Some(entries)) if existing.deltas.len() < Self::MAX_ROWMAP_DELTAS => {
1674                let path = RowMetaMap::delta_path(cache_dir, store_key, version);
1675                RowMetaMap::build(&path, version, entries)?;
1676                existing.base_version
1677            }
1678            // Pure append but the deltas are full: compact the existing segments
1679            // (read locally from their mmaps) plus this delta into a fresh base -
1680            // no full store re-read.
1681            (Some(existing), Some(entries)) => {
1682                let mut merged = RowMetaSet::open(existing)?.merged_entries();
1683                merged.extend(entries);
1684                let path = RowMetaMap::path_for(cache_dir, store_key, version);
1685                RowMetaMap::build(&path, version, merged)?;
1686                version
1687            }
1688            // No chain, or store compaction since the base: full scan -> base.
1689            _ => {
1690                let entries = self.collect_row_metas().await?;
1691                let path = RowMetaMap::path_for(cache_dir, store_key, version);
1692                RowMetaMap::build(&path, version, entries)?;
1693                version
1694            }
1695        };
1696
1697        let chain =
1698            discover_chain(cache_dir, store_key).context("rowmap chain missing after build")?;
1699        let set = RowMetaSet::open(&chain)?;
1700        Self::sweep_stale_rowmaps(cache_dir, store_key, base_version);
1701        Ok(Some(set))
1702    }
1703
1704    /// Remove this store's segment files (`-v{V}` bases, `-d{V}` deltas) for
1705    /// versions strictly older than `keep` (best-effort). A newer file belongs
1706    /// to a sibling that advanced past us; unlinking a superseded file is safe
1707    /// even if a sibling has it mapped - Unix keeps the inode alive until unmap.
1708    fn sweep_stale_rowmaps(cache_dir: &Path, store_key: &str, keep: u64) {
1709        let prefix = format!("rowmetamap-{store_key}-");
1710        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1711            return;
1712        };
1713        for entry in entries.flatten() {
1714            let name = entry.file_name();
1715            let Some(rest) = name
1716                .to_str()
1717                .and_then(|name| name.strip_prefix(&prefix))
1718                .and_then(|rest| rest.strip_suffix(".rmm"))
1719            else {
1720                continue;
1721            };
1722            let version = rest
1723                .strip_prefix('v')
1724                .or_else(|| rest.strip_prefix('d'))
1725                .and_then(|digits| digits.parse::<u64>().ok());
1726            if let Some(version) = version
1727                && version < keep
1728            {
1729                let _ = std::fs::remove_file(entry.path());
1730            }
1731        }
1732    }
1733
1734    /// Remove every segment file (`-v{V}` / `-d{V}`) for this store regardless of
1735    /// version - used when a discovered chain is unreadable (older MAGIC after an
1736    /// upgrade, or corruption) so the next build starts clean. Sound under the
1737    /// build lock; POSIX keeps any inode a sibling still has mapped alive.
1738    fn purge_rowmaps(cache_dir: &Path, store_key: &str) {
1739        let prefix = format!("rowmetamap-{store_key}-");
1740        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1741            return;
1742        };
1743        for entry in entries.flatten() {
1744            if let Some(name) = entry.file_name().to_str()
1745                && name.starts_with(&prefix)
1746                && name.ends_with(".rmm")
1747            {
1748                let _ = std::fs::remove_file(entry.path());
1749            }
1750        }
1751    }
1752
1753    /// Remove abandoned build temp files (`*.tmp-*`) for this store. Best-effort,
1754    /// and only sound under the build lock - the holder is the sole builder, so
1755    /// any temp present is a crashed-build orphan, not a live write.
1756    fn sweep_orphan_temps(cache_dir: &Path, store_key: &str) {
1757        let prefix = format!("rowmetamap-{store_key}-");
1758        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1759            return;
1760        };
1761        for entry in entries.flatten() {
1762            let name = entry.file_name();
1763            let Some(name) = name.to_str() else { continue };
1764            if name.starts_with(&prefix) && name.contains(".tmp-") {
1765                let _ = std::fs::remove_file(entry.path());
1766            }
1767        }
1768    }
1769
1770    #[cfg(test)]
1771    pub(crate) fn rowmap_delta_count(&self) -> Option<usize> {
1772        self.rowmap.load_full().map(|set| set.delta_count())
1773    }
1774
1775    /// The currently-installed resident meta map, if any. `pond sync` reads it
1776    /// (via [`RowmapOracle`]) as the freshness oracle (max timestamp per
1777    /// session); `None` falls back to re-reading every source.
1778    pub fn rowmap_snapshot(&self) -> Option<Arc<RowMetaSet>> {
1779        self.rowmap.load_full()
1780    }
1781
1782    /// Resolve index-only `(row_id, score)` hits to keys via the map; row ids the
1783    /// map lacks (appended since build) fall back to one `take_rows` batch. The
1784    /// caller re-sorts, so the misses appended at the end carry no order meaning.
1785    async fn resolve_rowid_hits(
1786        &self,
1787        map: &RowMetaSet,
1788        hits: Vec<(u64, f32)>,
1789    ) -> Result<Vec<SearchHit>> {
1790        let mut resolved = Vec::with_capacity(hits.len());
1791        let mut misses: Vec<(u64, f32)> = Vec::new();
1792        for (rowid, score) in hits {
1793            match map.lookup(rowid) {
1794                Some((session_id, message_id)) => resolved.push(SearchHit {
1795                    rowid: Some(rowid),
1796                    key: MessageKey {
1797                        session_id: session_id.to_owned(),
1798                        message_id: message_id.to_owned(),
1799                    },
1800                    score,
1801                }),
1802                None => misses.push((rowid, score)),
1803            }
1804        }
1805        // A miss still knows its rowid; carry it so hydration can take_rows it
1806        // alongside the hits the map resolved.
1807        if !misses.is_empty() {
1808            let rowids: Vec<u64> = misses.iter().map(|(rowid, _)| *rowid).collect();
1809            let keys = self.message_keys_by_rowids(&rowids).await?;
1810            for ((rowid, score), key) in misses.into_iter().zip(keys) {
1811                resolved.push(SearchHit {
1812                    rowid: Some(rowid),
1813                    key,
1814                    score,
1815                });
1816            }
1817        }
1818        Ok(resolved)
1819    }
1820
1821    /// Resolve stable row ids to `(session_id, id)` via `take_rows`, which
1822    /// returns rows in `rowids` order - the caller's `zip` relies on that.
1823    async fn message_keys_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageKey>> {
1824        let dataset = self.handle.dataset(Table::Messages).await?;
1825        let projection = ProjectionRequest::from_columns(["session_id", "id"], dataset.schema());
1826        let batch = dataset.take_rows(rowids, projection).await?;
1827        let mut keys = Vec::with_capacity(batch.num_rows());
1828        for row in 0..batch.num_rows() {
1829            keys.push(MessageKey {
1830                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1831                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1832            });
1833        }
1834        Ok(keys)
1835    }
1836
1837    /// Write a `pond_sql_query` export artifact.
1838    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1839        self.handle.export_write(name, bytes).await
1840    }
1841
1842    /// Read a `pond_sql_query` export artifact back.
1843    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1844        self.handle.export_read(name).await
1845    }
1846
1847    /// Local filesystem path of an export artifact on `file://` installs.
1848    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1849        self.handle.export_local_path(name)
1850    }
1851
1852    /// Distinct adapter names present in the corpus, sorted. Scans only the
1853    /// `source_agent` column of the small `sessions` table, so `pond status`
1854    /// gets its adapter count without touching the 2M-row `messages` table.
1855    /// `include_subagents=false` drops `source_agent` values containing `/`
1856    /// (e.g. `claude-code/general-purpose`).
1857    pub async fn adapter_names(&self, include_subagents: bool) -> Result<Vec<String>> {
1858        let scanner = self
1859            .handle
1860            .scan(Table::Sessions, ScanOpts::project_only(&["source_agent"]))
1861            .await?;
1862        let mut stream = scanner.try_into_stream().await?;
1863        let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
1864        while let Some(batch) = stream.next().await {
1865            let batch = batch?;
1866            for row in 0..batch.num_rows() {
1867                let agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1868                if !include_subagents && agent.contains('/') {
1869                    continue;
1870                }
1871                names.insert(agent);
1872            }
1873        }
1874        Ok(names.into_iter().collect())
1875    }
1876
1877    /// Write a batch of embeddings into `messages`: set `vector` and
1878    /// `embedding_model` on each row by `(session_id, id)`
1879    /// (spec.md#session-embed-from-canonical). The column update goes through the
1880    /// write seam and lands as a new manifest version (`append-only`).
1881    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1882        if rows.is_empty() {
1883            return Ok(());
1884        }
1885        let batch = embedding_update_batch(rows)?;
1886        self.handle
1887            .merge_update(Table::Messages, batch, rows.len())
1888            .await?;
1889        Ok(())
1890    }
1891
1892    /// Stream the backlog of messages needing embedding: rows with `search_text`
1893    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1894    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1895        try_stream! {
1896            let filter = Predicate::And(vec![
1897                Predicate::IsNull("vector"),
1898                Predicate::IsNotNull("search_text"),
1899            ]);
1900            let projection: &[&str] = &["session_id", "id", "search_text"];
1901            let scanner = self
1902                .handle
1903                .scan(
1904                    Table::Messages,
1905                    ScanOpts::with_predicate_and_projection(&filter, projection),
1906                )
1907                .await?;
1908            let mut batches = scanner
1909                .try_into_stream()
1910                .await
1911                .context("failed to open messages stream")?;
1912            while let Some(batch) = batches.next().await {
1913                let batch = batch?;
1914                for row in 0..batch.num_rows() {
1915                    yield PendingMessage {
1916                        session_id: string(&batch, "session_id", row)?
1917                            .context("session_id is null")?,
1918                        id: string(&batch, "id", row)?.context("message id is null")?,
1919                        search_text: string(&batch, "search_text", row)?
1920                            .context("search_text is null")?,
1921                    };
1922                }
1923            }
1924        }
1925    }
1926
1927    /// Stream messages that are either never embedded or stale under the
1928    /// current model. `pond optimize --force-embed` feeds this to the same unconditional
1929    /// merge_update as the normal backlog; the filter makes that semantically
1930    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1931    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1932        try_stream! {
1933            let filter = Predicate::And(vec![
1934                Predicate::IsNotNull("search_text"),
1935                Predicate::Or(vec![
1936                    Predicate::IsNull("vector"),
1937                    Predicate::Ne("embedding_model", embed::model_id().into()),
1938                ]),
1939            ]);
1940            let projection: &[&str] = &["session_id", "id", "search_text"];
1941            let scanner = self
1942                .handle
1943                .scan(
1944                    Table::Messages,
1945                    ScanOpts::with_predicate_and_projection(&filter, projection),
1946                )
1947                .await?;
1948            let mut batches = scanner
1949                .try_into_stream()
1950                .await
1951                .context("failed to open pending-or-stale messages stream")?;
1952            while let Some(batch) = batches.next().await {
1953                let batch = batch?;
1954                for row in 0..batch.num_rows() {
1955                    yield PendingMessage {
1956                        session_id: string(&batch, "session_id", row)?
1957                            .context("session_id is null")?,
1958                        id: string(&batch, "id", row)?.context("message id is null")?,
1959                        search_text: string(&batch, "search_text", row)?
1960                            .context("search_text is null")?,
1961                    };
1962                }
1963            }
1964        }
1965    }
1966
1967    /// BM25 full-text retriever over `messages.search_text`. With the row meta map
1968    /// loaded the scan is index-only (no data columns -> no `TakeExec`, no
1969    /// scattered GETs) and hits resolve through the map; otherwise it falls back
1970    /// to `fts_search_keys` so search works before prewarm.
1971    pub async fn fts_search(
1972        &self,
1973        query: &str,
1974        limit: usize,
1975        filter: &Predicate,
1976    ) -> Result<Vec<SearchHit>> {
1977        let mut hits = if let Some(map) = self.rowmap.load_full() {
1978            let rowid_hits = self.fts_search_rowids(query, limit, filter).await?;
1979            self.resolve_rowid_hits(&map, rowid_hits).await?
1980        } else {
1981            self.fts_search_keys(query, limit, filter).await?
1982        };
1983        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1984        // order, which varies between runs and across calls with different pool
1985        // sizes. Without an explicit tiebreak the downstream session grouping and
1986        // rank for a tied target can flip session-to-session, making results
1987        // nondeterministic. Sort by `score desc`, then `(session_id, message_id)` asc.
1988        hits.sort_by(|left, right| {
1989            right
1990                .score
1991                .partial_cmp(&left.score)
1992                .unwrap_or(std::cmp::Ordering::Equal)
1993                .then_with(|| left.key.session_id.cmp(&right.key.session_id))
1994                .then_with(|| left.key.message_id.cmp(&right.key.message_id))
1995        });
1996        Ok(hits)
1997    }
1998
1999    /// Shared FTS-scan setup: scope filter, the `search_text` full-text query,
2000    /// `fast_search` when indexed, and `limit`. Callers set only their projection.
2001    async fn fts_scanner(
2002        &self,
2003        query: &str,
2004        limit: usize,
2005        filter: &Predicate,
2006    ) -> Result<lance::dataset::scanner::Scanner> {
2007        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2008        scanner.full_text_search(
2009            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2010        )?;
2011        if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2012            scanner.fast_search();
2013        }
2014        // Lance ships an autoprojection that silently appends `_score` to FTS
2015        // output when the projection omits it. That behavior is going away;
2016        // we opt into the future explicit-projection contract here so the
2017        // scanner stops emitting a per-call deprecation warning, and each caller
2018        // lists `_score` in its own projection.
2019        scanner.disable_scoring_autoprojection();
2020        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2021        Ok(scanner)
2022    }
2023
2024    /// No-map FTS fallback: project the key columns plus `_score` directly,
2025    /// taking the `TakeExec` cost. Unsorted; `fts_search` applies the sort.
2026    async fn fts_search_keys(
2027        &self,
2028        query: &str,
2029        limit: usize,
2030        filter: &Predicate,
2031    ) -> Result<Vec<SearchHit>> {
2032        let mut scanner = self.fts_scanner(query, limit, filter).await?;
2033        scanner.project(&["session_id", "id", "_score"])?;
2034        let batch = scanner.try_into_batch().await?;
2035        let mut hits = Vec::with_capacity(batch.num_rows());
2036        for row in 0..batch.num_rows() {
2037            let key = MessageKey {
2038                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2039                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
2040            };
2041            hits.push(SearchHit {
2042                rowid: None,
2043                key,
2044                score: float32(&batch, "_score", row)?,
2045            });
2046        }
2047        Ok(hits)
2048    }
2049
2050    /// Current `messages` dataset version - the key a `RowMetaMap` is built
2051    /// against (pond's stable row ids keep a built map valid until this advances).
2052    pub async fn messages_version(&self) -> Result<u64> {
2053        Ok(self
2054            .handle
2055            .dataset(Table::Messages)
2056            .await?
2057            .version()
2058            .version)
2059    }
2060
2061    /// Scan the hydration columns with row ids into a `Vec`, the input to
2062    /// `RowMetaMap::build`. One large sequential scan (few big reads), unlike the
2063    /// scattered per-hit take it replaces; `search_text` dominates the bytes.
2064    pub async fn collect_row_metas(&self) -> Result<Vec<RowMetaEntry>> {
2065        let mut scanner = self.handle.scanner(Table::Messages, None).await?;
2066        scanner.with_row_id();
2067        scanner.project(&Self::ROW_META_COLUMNS)?;
2068        let mut stream = scanner.try_into_stream().await?;
2069        let mut out = Vec::new();
2070        while let Some(batch) = stream.next().await {
2071            let batch = batch?;
2072            let rowids = uint64(&batch, "_rowid")?;
2073            for row in 0..batch.num_rows() {
2074                out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2075            }
2076        }
2077        Ok(out)
2078    }
2079
2080    /// Row metas for the fragments appended since `base_version` - the input to
2081    /// a delta segment. `None` when a fragment present at `base_version` is gone
2082    /// now (store compaction / deletion): the chain can't be extended by append,
2083    /// so the caller rebuilds the base from a full scan instead.
2084    async fn collect_row_metas_delta(
2085        &self,
2086        base_version: u64,
2087    ) -> Result<Option<Vec<RowMetaEntry>>> {
2088        let dataset = self.handle.dataset(Table::Messages).await?;
2089        let old = dataset.checkout_version(base_version).await?;
2090        let old_ids: HashSet<u64> = old.get_fragments().iter().map(|f| f.id() as u64).collect();
2091        let current = dataset.get_fragments();
2092        let current_ids: HashSet<u64> = current.iter().map(|f| f.id() as u64).collect();
2093        if !old_ids.is_subset(&current_ids) {
2094            return Ok(None);
2095        }
2096        let new_fragments: Vec<_> = current
2097            .iter()
2098            .filter(|fragment| !old_ids.contains(&(fragment.id() as u64)))
2099            .map(|fragment| fragment.metadata().clone())
2100            .collect();
2101        if new_fragments.is_empty() {
2102            return Ok(Some(Vec::new()));
2103        }
2104        let mut scanner = dataset.scan();
2105        scanner.with_fragments(new_fragments);
2106        scanner.with_row_id();
2107        scanner.project(&Self::ROW_META_COLUMNS)?;
2108        let mut stream = scanner.try_into_stream().await?;
2109        let mut out = Vec::new();
2110        while let Some(batch) = stream.next().await {
2111            let batch = batch?;
2112            let rowids = uint64(&batch, "_rowid")?;
2113            for row in 0..batch.num_rows() {
2114                out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2115            }
2116        }
2117        Ok(Some(out))
2118    }
2119
2120    /// Index-only FTS retriever: `_rowid` + `_score` only, so Lance inserts no
2121    /// `TakeExec` and issues no scattered GETs. `fts_search` resolves the row ids.
2122    async fn fts_search_rowids(
2123        &self,
2124        query: &str,
2125        limit: usize,
2126        filter: &Predicate,
2127    ) -> Result<Vec<(u64, f32)>> {
2128        let mut scanner = self.fts_scanner(query, limit, filter).await?;
2129        scanner.with_row_id();
2130        scanner.project(&["_score"])?;
2131        let batch = scanner.try_into_batch().await?;
2132        let rowids = uint64(&batch, "_rowid")?;
2133        let mut hits = Vec::with_capacity(batch.num_rows());
2134        for row in 0..batch.num_rows() {
2135            hits.push((rowids.value(row), float32(&batch, "_score", row)?));
2136        }
2137        Ok(hits)
2138    }
2139
2140    /// Count of searchable messages (non-null `search_text`) inside the
2141    /// caller's filter scope - the universe a search actually ran over.
2142    /// Powers the response's absence honesty (spec.md#search): "no relevant
2143    /// hits" only means something relative to how many messages were
2144    /// searchable at all, and 0 tells the caller their filters excluded
2145    /// everything before retrieval even started.
2146    pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
2147        // Unfiltered: the FTS index already counts non-null search_text rows
2148        // (`num_docs`), and fast_search only searches those indexed docs - so
2149        // num_docs is exactly the universe a search ran over. Reading it avoids
2150        // the ~133 MB `IsNotNull(search_text)` column scan Lance pays per query
2151        // (no per-column null metadata). Filtered scopes fall back to the scan.
2152        if matches!(filter, Predicate::And(clauses) if clauses.is_empty())
2153            && let Some(count) = self.fts_num_docs().await?
2154        {
2155            return Ok(count);
2156        }
2157        let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
2158        let dataset = self.handle.dataset(Table::Messages).await?;
2159        let count = dataset.count_rows(Some(scope.to_lance())).await?;
2160        Ok(count)
2161    }
2162
2163    /// Non-null `search_text` count read from the FTS index's `num_docs`
2164    /// statistic (summed across delta segments). `None` when the FTS index is
2165    /// absent (empty store) so the caller falls back to the count scan.
2166    async fn fts_num_docs(&self) -> Result<Option<usize>> {
2167        if !self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2168            return Ok(None);
2169        }
2170        let dataset = self.handle.dataset(Table::Messages).await?;
2171        let json = dataset.index_statistics(MESSAGES_FTS_INDEX).await?;
2172        let parsed: Value =
2173            serde_json::from_str(&json).context("failed to parse FTS index_statistics")?;
2174        let total: u64 = parsed["indices"]
2175            .as_array()
2176            .map(|segments| {
2177                segments
2178                    .iter()
2179                    .filter_map(|segment| segment["num_docs"].as_u64())
2180                    .sum()
2181            })
2182            .unwrap_or(0);
2183        Ok(Some(usize::try_from(total).unwrap_or(usize::MAX)))
2184    }
2185
2186    /// Whether any `messages` row carries a vector (spec.md#search) - the
2187    /// signal that lets the `vector` arm run instead of degrading to `fts`. The single-active-
2188    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
2189    /// vector belongs to the current model.
2190    pub async fn has_embeddings(&self) -> Result<bool> {
2191        let scope = Predicate::IsNotNull("vector");
2192        let mut scanner = self
2193            .handle
2194            .scan(
2195                Table::Messages,
2196                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
2197            )
2198            .await?;
2199        scanner.limit(Some(1), None)?;
2200        let batch = scanner.try_into_batch().await?;
2201        Ok(batch.num_rows() > 0)
2202    }
2203
2204    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
2205    /// scalar predicate alone (spec.md#search-prefilter-pushdown) - see
2206    /// `embedded_scope` for why pond does NOT add `vector IS NOT NULL`. nprobes
2207    /// falls back to [`DEFAULT_NPROBES`] when `[search]` leaves it unset, so a
2208    /// default install never inherits Lance's unbounded "probe every partition"
2209    /// behavior on a remote store. No refine (see `apply_vector_search_knobs`).
2210    /// Index-only + map resolve when loaded, else key projection - see `fts_search`.
2211    pub async fn vector_search(
2212        &self,
2213        query: &[f32],
2214        limit: usize,
2215        filter: &Predicate,
2216        search: Option<&config::SearchConfig>,
2217    ) -> Result<Vec<SearchHit>> {
2218        let mut hits = if let Some(map) = self.rowmap.load_full() {
2219            let rowid_hits = self
2220                .vector_search_rowids(query, limit, filter, search)
2221                .await?;
2222            self.resolve_rowid_hits(&map, rowid_hits).await?
2223        } else {
2224            self.vector_search_keys(query, limit, filter, search)
2225                .await?
2226        };
2227        // Stable secondary sort: same reasoning as `fts_search` - IVF_SQ can
2228        // emit hits with effectively identical `_distance` in fragment-dependent
2229        // order, which makes RRF dedup-ranks nondeterministic for tied
2230        // neighbors. Sort by distance asc (smaller = more similar), then by
2231        // `(session_id, message_id)` asc.
2232        hits.sort_by(|left, right| {
2233            left.score
2234                .partial_cmp(&right.score)
2235                .unwrap_or(std::cmp::Ordering::Equal)
2236                .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2237                .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2238        });
2239        Ok(hits)
2240    }
2241
2242    /// Shared vector-scan setup: scope, `nearest`, knobs, `fast_search`.
2243    async fn vector_scanner(
2244        &self,
2245        query: &[f32],
2246        limit: usize,
2247        filter: &Predicate,
2248        search: Option<&config::SearchConfig>,
2249    ) -> Result<lance::dataset::scanner::Scanner> {
2250        let scope = embedded_scope(filter);
2251        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2252        let key = Float32Array::from(query.to_vec());
2253        scanner.nearest("vector", &key, limit)?;
2254        apply_vector_search_knobs(&mut scanner, search);
2255        if self
2256            .handle
2257            .messages_has_index(MESSAGES_VECTOR_INDEX)
2258            .await?
2259        {
2260            scanner.fast_search();
2261        }
2262        scanner.disable_scoring_autoprojection();
2263        Ok(scanner)
2264    }
2265
2266    /// Index-only vector retriever: `_rowid` + `_distance` only, so no `TakeExec`.
2267    /// `vector_search` resolves the row ids. Mirrors `fts_search_rowids`.
2268    async fn vector_search_rowids(
2269        &self,
2270        query: &[f32],
2271        limit: usize,
2272        filter: &Predicate,
2273        search: Option<&config::SearchConfig>,
2274    ) -> Result<Vec<(u64, f32)>> {
2275        let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2276        scanner.with_row_id();
2277        scanner.project(&["_distance"])?;
2278        let batch = scanner.try_into_batch().await?;
2279        let rowids = uint64(&batch, "_rowid")?;
2280        let mut hits = Vec::with_capacity(batch.num_rows());
2281        for row in 0..batch.num_rows() {
2282            hits.push((rowids.value(row), float32(&batch, "_distance", row)?));
2283        }
2284        Ok(hits)
2285    }
2286
2287    /// No-map vector fallback: project the key columns plus `_distance` directly.
2288    /// Unsorted; `vector_search` sorts. Mirrors `fts_search_keys`.
2289    async fn vector_search_keys(
2290        &self,
2291        query: &[f32],
2292        limit: usize,
2293        filter: &Predicate,
2294        search: Option<&config::SearchConfig>,
2295    ) -> Result<Vec<SearchHit>> {
2296        let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2297        scanner.project(&["session_id", "id", "_distance"])?;
2298        let batch = scanner.try_into_batch().await?;
2299        let mut hits = Vec::with_capacity(batch.num_rows());
2300        for row in 0..batch.num_rows() {
2301            let key = MessageKey {
2302                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2303                message_id: string(&batch, "id", row)?.context("message id is null")?,
2304            };
2305            hits.push(SearchHit {
2306                rowid: None,
2307                key,
2308                score: float32(&batch, "_distance", row)?,
2309            });
2310        }
2311        Ok(hits)
2312    }
2313
2314    /// The DataFusion plan string for a filtered vector scan - the
2315    /// `search-prefilter-pushdown` regression guard reads it.
2316    pub async fn explain_vector_plan(
2317        &self,
2318        query: &[f32],
2319        limit: usize,
2320        filter: &Predicate,
2321        search: Option<&config::SearchConfig>,
2322    ) -> Result<String> {
2323        let scope = embedded_scope(filter);
2324        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2325        let key = Float32Array::from(query.to_vec());
2326        scanner.nearest("vector", &key, limit)?;
2327        apply_vector_search_knobs(&mut scanner, search);
2328        if self
2329            .handle
2330            .messages_has_index(MESSAGES_VECTOR_INDEX)
2331            .await?
2332        {
2333            scanner.fast_search();
2334        }
2335        scanner
2336            .explain_plan(true)
2337            .await
2338            .context("explain_plan failed")
2339    }
2340
2341    pub async fn explain_fts_plan(
2342        &self,
2343        query: &str,
2344        limit: usize,
2345        filter: &Predicate,
2346    ) -> Result<String> {
2347        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2348        scanner.full_text_search(
2349            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2350        )?;
2351        if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2352            scanner.fast_search();
2353        }
2354        scanner.project(&["session_id", "id"])?;
2355        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2356        scanner
2357            .explain_plan(true)
2358            .await
2359            .context("explain_plan failed")
2360    }
2361
2362    /// Hydrate search hits by stable row id (spec.md#search). Resolves each
2363    /// rowid from the resident meta map in memory (no object-store round-trip -
2364    /// Lance caches index/metadata but never data column values, so a `take_rows`
2365    /// re-reads `search_text` from storage every query). Rowids the map lacks
2366    /// (appended since it was built, or no map loaded) fall back to a single
2367    /// `take_rows` batch. The caller indexes the result by key, so order is
2368    /// irrelevant.
2369    pub async fn message_metas_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2370        if rowids.is_empty() {
2371            return Ok(Vec::new());
2372        }
2373        let mut metas = Vec::with_capacity(rowids.len());
2374        let misses: Vec<u64> = if let Some(map) = self.rowmap.load_full() {
2375            let (hits, misses) = map.hydrate(rowids);
2376            metas.extend(hits.into_iter().map(|entry| MessageMeta {
2377                message_id: entry.message_id,
2378                session_id: entry.session_id,
2379                role: entry.role,
2380                project: entry.project,
2381                source_agent: entry.source_agent,
2382                timestamp:
2383                    DateTime::from_timestamp_micros(entry.timestamp_micros).unwrap_or_default(),
2384                search_text: entry.search_text,
2385            }));
2386            misses
2387        } else {
2388            rowids.to_vec()
2389        };
2390        if !misses.is_empty() {
2391            metas.extend(self.message_metas_by_rowids_take(&misses).await?);
2392        }
2393        Ok(metas)
2394    }
2395
2396    /// `take_rows` hydration of exactly `rowids` - the cache-miss fallback for
2397    /// rows the resident meta map lacks. `take_rows` coalesces the reads per
2398    /// fragment (Lance's own batching), so a scattered take is few requests, not
2399    /// one per row.
2400    async fn message_metas_by_rowids_take(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2401        let dataset = self.handle.dataset(Table::Messages).await?;
2402        let projection = ProjectionRequest::from_columns(
2403            [
2404                "id",
2405                "session_id",
2406                "role",
2407                "project",
2408                "source_agent",
2409                "timestamp",
2410                "search_text",
2411            ],
2412            dataset.schema(),
2413        );
2414        let batch = dataset.take_rows(rowids, projection).await?;
2415        let mut metas = Vec::with_capacity(batch.num_rows());
2416        for row in 0..batch.num_rows() {
2417            metas.push(message_meta_from_batch(&batch, row)?);
2418        }
2419        Ok(metas)
2420    }
2421
2422    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
2423    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
2424        if keys.is_empty() {
2425            return Ok(Vec::new());
2426        }
2427        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
2428        let session_ids = keys
2429            .iter()
2430            .map(|key| key.session_id.clone())
2431            .collect::<Vec<_>>();
2432        let message_ids = keys
2433            .iter()
2434            .map(|key| key.message_id.clone())
2435            .collect::<Vec<_>>();
2436        let predicate = Predicate::And(vec![
2437            in_predicate("session_id", &session_ids),
2438            in_predicate("id", &message_ids),
2439        ]);
2440        let batch = self
2441            .handle
2442            .scan_batch(
2443                Table::Messages,
2444                Some(&predicate),
2445                &[
2446                    "id",
2447                    "session_id",
2448                    "role",
2449                    "project",
2450                    "source_agent",
2451                    "timestamp",
2452                    "search_text",
2453                ],
2454            )
2455            .await?;
2456        let mut metas = Vec::with_capacity(batch.num_rows());
2457        for row in 0..batch.num_rows() {
2458            // The IN x IN predicate is a cross-product, so the scan can return
2459            // pairs that were never asked for; keep only the wanted keys.
2460            let meta = message_meta_from_batch(&batch, row)?;
2461            if wanted.contains(&MessageKey {
2462                session_id: meta.session_id.clone(),
2463                message_id: meta.message_id.clone(),
2464            }) {
2465                metas.push(meta);
2466            }
2467        }
2468        Ok(metas)
2469    }
2470
2471    /// Total message count per session, for search session summaries. One
2472    /// `session_id IN (...)` scan projecting only `session_id`, aggregated in
2473    /// pond, instead of `N` concurrent `count_rows(session_id = X)` round-trips
2474    /// against `messages_session_id_btree`. Same wire shape for any backend,
2475    /// but one S3 operation instead of `N` on remote stores. Sessions with
2476    /// zero matching messages are present in the map with count `0` so the
2477    /// caller can distinguish "filter excluded everything" from "session
2478    /// missing from the response."
2479    pub async fn session_message_counts(
2480        &self,
2481        session_ids: &[String],
2482    ) -> Result<BTreeMap<String, usize>> {
2483        if session_ids.is_empty() {
2484            return Ok(BTreeMap::new());
2485        }
2486        // A version-matched resident map covers every current row, so its
2487        // per-session counts are authoritative (a session absent from it has 0
2488        // messages) - serve them with no scan. The version gate is load-bearing:
2489        // unlike meta hydration, a count cannot detect staleness by a row-id
2490        // miss, so a map that predates appended rows would undercount. A stale
2491        // or absent map falls through to the IN-scan.
2492        if let Some(map) = self.rowmap.load_full()
2493            && map.version() == self.messages_version().await?
2494        {
2495            return Ok(session_ids
2496                .iter()
2497                .map(|id| (id.clone(), map.lookup_count(id).unwrap_or(0)))
2498                .collect());
2499        }
2500        let predicate = in_predicate("session_id", session_ids);
2501        let scanner = self
2502            .handle
2503            .scan(
2504                Table::Messages,
2505                ScanOpts::with_predicate_and_projection(&predicate, &["session_id"]),
2506            )
2507            .await?;
2508        let mut stream = scanner
2509            .try_into_stream()
2510            .await
2511            .context("failed to open session_message_counts stream")?;
2512        let mut counts: BTreeMap<String, usize> =
2513            session_ids.iter().map(|id| (id.clone(), 0)).collect();
2514        while let Some(batch) = stream.next().await {
2515            let batch = batch.context("failed to read session_message_counts batch")?;
2516            let column = batch
2517                .column_by_name("session_id")
2518                .context("session_message_counts: session_id column missing")?
2519                .as_any()
2520                .downcast_ref::<StringArray>()
2521                .context("session_message_counts: session_id column is not Utf8")?;
2522            for value in column.iter().flatten() {
2523                if let Some(entry) = counts.get_mut(value) {
2524                    *entry += 1;
2525                }
2526            }
2527        }
2528        Ok(counts)
2529    }
2530
2531    /// Rows appended to `messages` since the FTS index was last optimized.
2532    /// A missing index reports the whole table; the query is manifest-only.
2533    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
2534        self.handle
2535            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
2536            .await
2537    }
2538
2539    /// Rows added or rewritten in `messages` since the IVF_SQ vector index
2540    /// was last optimized. Below
2541    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
2542    /// must read [`embedding_progress`](Self::embedding_progress) too and
2543    /// distinguish "index not built yet" from "index trails data".
2544    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
2545        self.handle
2546            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
2547            .await
2548    }
2549
2550    /// Embedding coverage: how many `messages` rows carry a vector and how
2551    /// many are still eligible. Drives the `pond status` embeddings line and
2552    /// the `pond optimize` progress bar's known total.
2553    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
2554        let dataset = self.handle.dataset(Table::Messages).await?;
2555        // `embedded` counts `embedding_model IS NOT NULL`, not `vector`: the two
2556        // are co-set (spec.md#session-embed-from-canonical) so the count is
2557        // identical, but the model-id string column is ~50x narrower than the
2558        // Float16 vector (Lance 7.0.0 has no per-column null_count, so this is a
2559        // data-page read).
2560        let embedded = dataset
2561            .count_rows(Some(Predicate::IsNotNull("embedding_model").to_lance()))
2562            .await?;
2563        // `backlog` and `total` come from live, deletion-aware counts, not the
2564        // FTS `num_docs`: num_docs counts indexed docs incl. deleted-but-unpurged
2565        // ones, so `num_docs - embedded` reports a phantom backlog that survives
2566        // every embed. `embedded` (model present) + `backlog` (model absent,
2567        // search_text present) is exactly the live eligible set, since embedding
2568        // a row requires its search_text.
2569        let backlog = self.embed_backlog_count().await?;
2570        Ok(EmbeddingProgress {
2571            embedded,
2572            total: embedded + backlog,
2573            backlog,
2574            model: embed::model_id(),
2575        })
2576    }
2577
2578    /// Messages eligible but not yet embedded (`search_text` present,
2579    /// `embedding_model` null) - the exact set [`crate::embed::EmbedWorker`]
2580    /// processes. Read straight from the dataset so it is correct right after
2581    /// ingest, unlike the FTS `num_docs` `embedding_progress` shows (which lags
2582    /// until the index is rebuilt - the embed stage runs before that).
2583    pub async fn embed_backlog_count(&self) -> Result<usize> {
2584        let dataset = self.handle.dataset(Table::Messages).await?;
2585        let filter = Predicate::And(vec![
2586            Predicate::IsNull("embedding_model"),
2587            Predicate::IsNotNull("search_text"),
2588        ]);
2589        Ok(dataset.count_rows(Some(filter.to_lance())).await?)
2590    }
2591
2592    /// Count rows whose `embedding_model` is not the currently configured
2593    /// model AND whose `vector` is still populated - the signal `pond optimize`
2594    /// uses to detect a model swap and require `--force-embed`.
2595    pub async fn stale_embedding_count(&self) -> Result<usize> {
2596        let dataset = self.handle.dataset(Table::Messages).await?;
2597        // Same shape as the original (IsNotNull AND Ne), but the null check is on
2598        // the narrow model-id column, not the ~50x-wider Float16 vector: the two
2599        // are co-set (spec.md#session-embed-from-canonical), so `embedding_model
2600        // IS NOT NULL` equals `vector IS NOT NULL`, and the model-id page read is
2601        // far cheaper than the vector's.
2602        dataset
2603            .count_rows(Some(
2604                Predicate::And(vec![
2605                    Predicate::IsNotNull("embedding_model"),
2606                    Predicate::Ne("embedding_model", embed::model_id().into()),
2607                ])
2608                .to_lance(),
2609            ))
2610            .await
2611            .map_err(Into::into)
2612    }
2613
2614    /// Run the per-table maintenance cycle (compact + indices) across every
2615    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
2616    /// and compaction commit independently, so a hot writer that starves
2617    /// compaction on one table does not abort the index work the operator
2618    /// asked for on other tables (or even on the same table).
2619    pub async fn optimize_indices(
2620        &self,
2621        progress: Option<OptimizeProgressFn>,
2622        maintenance: &MaintenancePolicy,
2623    ) -> Result<OptimizeOutcome> {
2624        let intents = pond_index_intents();
2625        let mut tables = Vec::with_capacity(3);
2626        for (table, intents) in intents.all() {
2627            let outcome = self
2628                .handle
2629                .optimize_table(table, intents, progress.as_ref(), maintenance)
2630                .await;
2631            tables.push(outcome);
2632        }
2633        Ok(OptimizeOutcome { tables })
2634    }
2635
2636    /// Fold trailing fragments into existing indices across every table,
2637    /// without running compaction. Used by `pond optimize`'s tail so newly
2638    /// written vectors land in the FTS / IVF_SQ / btree / bitmap indices
2639    /// without paying the compaction retry budget while embed itself may
2640    /// still be writing in a sibling process.
2641    pub async fn build_indices_only(
2642        &self,
2643        progress: Option<OptimizeProgressFn>,
2644    ) -> Result<OptimizeOutcome> {
2645        let policy = pond_index_intents();
2646        let mut tables = Vec::with_capacity(3);
2647        for (table, intents) in policy.all() {
2648            let indices = self
2649                .handle
2650                .optimize_table_indices_only(table, intents, progress.as_ref())
2651                .await;
2652            tables.push(TableOptimizeOutcome {
2653                table,
2654                indices,
2655                compaction: PhaseOutcome::NotAttempted,
2656            });
2657        }
2658        Ok(OptimizeOutcome { tables })
2659    }
2660
2661    #[cfg(test)]
2662    async fn optimize_indices_with_vector_threshold(
2663        &self,
2664        vector_threshold: usize,
2665    ) -> Result<OptimizeOutcome> {
2666        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
2667        let policy = MaintenancePolicy::always_compact();
2668        let mut tables = Vec::with_capacity(3);
2669        for (table, intents) in intents.all() {
2670            let outcome = self
2671                .handle
2672                .optimize_table(table, intents, None, &policy)
2673                .await;
2674            tables.push(outcome);
2675        }
2676        Ok(OptimizeOutcome { tables })
2677    }
2678
2679    /// Reclaim superseded data/index files across every indexed table (Lance
2680    /// `cleanup_old_versions`), without compaction. `pond optimize --rebuild`
2681    /// runs this after the rebuild so the index segments it just replaced are
2682    /// dropped immediately. The retention floor still protects versions a live
2683    /// reader may have pinned (spec.md#concurrency).
2684    pub async fn cleanup_old_versions(&self, older_than: chrono::Duration) -> Result<()> {
2685        for (table, _) in pond_index_intents().all() {
2686            self.handle
2687                .cleanup_table_versions(table, older_than)
2688                .await?;
2689        }
2690        Ok(())
2691    }
2692
2693    pub async fn rebuild_indices(
2694        &self,
2695        intent_name: Option<&str>,
2696        progress: Option<OptimizeProgressFn>,
2697    ) -> Result<()> {
2698        let policy = pond_index_intents();
2699        let mut matched = false;
2700        for (table, intents) in policy.all() {
2701            for intent in intents {
2702                if intent_name.is_none_or(|name| name == intent.name) {
2703                    matched = true;
2704                    self.handle
2705                        .rebuild_index(table, intent, progress.as_ref())
2706                        .await?;
2707                }
2708            }
2709        }
2710        if let Some(name) = intent_name
2711            && !matched
2712        {
2713            anyhow::bail!("unknown index intent {name:?}");
2714        }
2715        Ok(())
2716    }
2717
2718    /// Drop a named index from whichever table owns it. Used by `pond optimize
2719    /// --drop-index <name>` to clean up orphaned indices (e.g. after renaming
2720    /// an intent whose on-disk name no longer matches the policy). Finds the
2721    /// owning table via parallel `load_indices` lookups, then drops on just
2722    /// that table - so real I/O errors surface with the right context instead
2723    /// of being hidden behind "no such index" from the wrong table.
2724    pub async fn drop_index_by_name(&self, name: &str) -> Result<()> {
2725        let Some(owner) = self.handle.find_index_owner(name).await? else {
2726            anyhow::bail!("no index named {name:?} found on any table");
2727        };
2728        self.handle.drop_index(owner, name).await
2729    }
2730
2731    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
2732        let policy = pond_index_intents();
2733        let mut statuses = Vec::new();
2734        for (table, intents) in policy.all() {
2735            statuses.extend(self.handle.index_status(table, intents).await?);
2736        }
2737        Ok(statuses)
2738    }
2739
2740    /// Drop the IVF_SQ index on `messages.vector`. Used by `pond optimize
2741    /// --force-embed` before re-bootstrapping under a different model. Silent
2742    /// when the index does not exist.
2743    pub async fn drop_vector_index(&self) -> Result<()> {
2744        match self
2745            .handle
2746            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
2747            .await
2748        {
2749            Ok(()) => Ok(()),
2750            Err(error) => {
2751                let msg = error.to_string();
2752                if msg.contains("not found") || msg.contains("does not exist") {
2753                    Ok(())
2754                } else {
2755                    Err(error)
2756                }
2757            }
2758        }
2759    }
2760
2761    /// On-disk byte totals per dataset, sized through Lance's object store
2762    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
2763    pub async fn table_sizes(&self) -> Result<TableSizes> {
2764        self.handle.table_sizes().await
2765    }
2766
2767    pub async fn initialized(&self) -> Result<bool> {
2768        self.handle.initialized().await
2769    }
2770
2771    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
2772        let batch = self
2773            .handle
2774            .scan_batch(
2775                Table::Sessions,
2776                Some(&Predicate::Eq("id", session_id.into())),
2777                &[],
2778            )
2779            .await?;
2780        if batch.num_rows() == 0 {
2781            Ok(None)
2782        } else {
2783            Ok(Some(session_from_batch(&batch, 0)?))
2784        }
2785    }
2786
2787    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
2788        let batch = self
2789            .handle
2790            .scan_batch(
2791                Table::Messages,
2792                Some(&Predicate::Eq("session_id", session_id.into())),
2793                &[
2794                    "session_id",
2795                    "id",
2796                    "timestamp",
2797                    "role",
2798                    "content",
2799                    "options",
2800                ],
2801            )
2802            .await?;
2803        let mut messages = Vec::with_capacity(batch.num_rows());
2804        for row in 0..batch.num_rows() {
2805            messages.push(message_from_batch(&batch, row)?);
2806        }
2807        messages.sort_by(|left, right| {
2808            left.timestamp()
2809                .cmp(&right.timestamp())
2810                .then_with(|| left.id().cmp(right.id()))
2811        });
2812
2813        let message_ids = messages
2814            .iter()
2815            .map(|message| message.id().to_owned())
2816            .collect::<Vec<_>>();
2817        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
2818
2819        Ok(messages
2820            .into_iter()
2821            .map(|message| {
2822                let key = (message.session_id().to_owned(), message.id().to_owned());
2823                let parts = parts_by_message.remove(&key).unwrap_or_default();
2824                MessageWithParts { message, parts }
2825            })
2826            .collect())
2827    }
2828
2829    /// Every part of these messages, full fidelity (file blobs included). The
2830    /// canonical read primitive - restore/export, verbatim mode, and the
2831    /// message-mode target all need the complete set.
2832    pub async fn parts_for_messages(
2833        &self,
2834        session_id: &str,
2835        message_ids: &[String],
2836    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2837        self.scan_parts(session_id, message_ids, None).await
2838    }
2839
2840    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
2841    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
2842    /// nothing. For the summary-only reads (conversational/complete session
2843    /// views, search hits) - it never feeds restore/export.
2844    pub async fn summary_parts_for_messages(
2845        &self,
2846        session_id: &str,
2847        message_ids: &[String],
2848    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2849        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
2850            .await
2851    }
2852
2853    async fn scan_parts(
2854        &self,
2855        session_id: &str,
2856        message_ids: &[String],
2857        part_types: Option<&[&str]>,
2858    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2859        if message_ids.is_empty() {
2860            return Ok(BTreeMap::new());
2861        }
2862        let mut clauses = vec![
2863            Predicate::Eq("session_id", session_id.into()),
2864            in_predicate("message_id", message_ids),
2865        ];
2866        if let Some(types) = part_types {
2867            clauses.push(Predicate::In(
2868                "type",
2869                types.iter().map(|&t| t.into()).collect(),
2870            ));
2871        }
2872        let predicate = Predicate::And(clauses);
2873        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
2874        let mut scanner = self
2875            .handle
2876            .scan(
2877                Table::Parts,
2878                ScanOpts::with_predicate_and_projection(
2879                    &predicate,
2880                    &[
2881                        "session_id",
2882                        "message_id",
2883                        "id",
2884                        "ordinal",
2885                        "type",
2886                        "provenance",
2887                        "variant_data",
2888                        "options",
2889                    ],
2890                ),
2891            )
2892            .await?;
2893        scanner.with_row_address();
2894        let batch = scanner.try_into_batch().await.context("scan failed")?;
2895        let row_addresses = uint64(&batch, "_rowaddr")?;
2896        let mut file_payloads = BTreeMap::<usize, FileData>::new();
2897        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
2898        for row in 0..batch.num_rows() {
2899            if string(&batch, "type", row)?.as_deref() == Some("file") {
2900                let variant_data =
2901                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
2902                file_rows.push((row, row_addresses.value(row), variant_data));
2903            }
2904        }
2905        if !file_rows.is_empty() {
2906            let addresses = file_rows
2907                .iter()
2908                .map(|(_, address, _)| *address)
2909                .collect::<Vec<_>>();
2910            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
2911            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
2912                // Legacy blob (lance-encoding:blob): payload is bytes; the
2913                // url variant stored its URL as UTF-8 bytes, recovered via
2914                // `file_data_from_blob`'s `data_kind = "url"` branch.
2915                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
2916                file_payloads.insert(row, payload);
2917            }
2918        }
2919        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
2920        for row in 0..batch.num_rows() {
2921            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
2922            parts_by_message
2923                .entry((part.session_id.clone(), part.message_id.clone()))
2924                .or_default()
2925                .push(part);
2926        }
2927        for parts in parts_by_message.values_mut() {
2928            parts.sort_by_key(|part| part.ordinal);
2929        }
2930        Ok(parts_by_message)
2931    }
2932}
2933
2934#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2935#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
2936pub enum IngestEvent {
2937    Session(Session),
2938    Message(Message),
2939    Part(Part),
2940}
2941
2942/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
2943/// The wire layer (`pond_ingest`) instead returns per-row results; the
2944/// aggregate is derived from those at the wire boundary.
2945///
2946/// Fields are bucketed by population so the summary never conflates "100
2947/// validator-rejected rows in 1 bad session" with "100 separate failures."
2948/// The shape is set by spec.md#adapter-integrity-event-ordering.
2949#[derive(Debug, Clone, PartialEq, Eq, Default)]
2950pub struct IngestSummary {
2951    /// Rows actually written to Lance, summed across all three tables.
2952    /// Use the per-table fields below for user-facing counts; this stays
2953    /// for `accepted()` and existing wire callers.
2954    pub inserted: usize,
2955    /// Rows that already existed (merge_insert no-op match).
2956    pub matched: usize,
2957    /// Session rows inserted this pass.
2958    pub sessions_inserted: usize,
2959    /// Message rows inserted this pass (total - includes tool calls,
2960    /// tool results, and other non-searchable messages).
2961    pub messages_inserted_total: usize,
2962    /// Subset of `messages_inserted_total` whose `search_text` is non-null
2963    /// (eligible for FTS + semantic indexing). The user-facing "messages"
2964    /// count in `pond sync` / `pond status` reads this field.
2965    pub messages_inserted_searchable: usize,
2966    /// Part rows inserted this pass.
2967    pub parts_inserted: usize,
2968    /// Session rows already-present (merge_insert matched).
2969    pub sessions_matched: usize,
2970    /// Message rows already-present (merge_insert matched), total.
2971    pub messages_matched_total: usize,
2972    /// Subset of `messages_matched_total` with `search_text`.
2973    pub messages_matched_searchable: usize,
2974    /// Part rows already-present.
2975    pub parts_matched: usize,
2976    /// Events the validator dropped under per-event-drop policy (ordering
2977    /// violation, orphan part, mismatched parent, adapter parse failure,
2978    /// duplicate-id collision, ...). Counted by event, not by session: a
2979    /// session with one bad part stays in this bucket as 1, not as "the
2980    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
2981    /// own emissions upstream when source replay is expected; the
2982    /// validator's in-batch HashSet is a safety net, not a feature
2983    /// adapters may rely on. If this bucket grows on a clean adapter,
2984    /// inspect `drop_reasons` for the top contributors.
2985    pub dropped_events: usize,
2986    /// Sessions whose Session-level invariants (immutable `source_agent` /
2987    /// `project` against the stored row) failed at flush time and
2988    /// whose substream got rejected wholesale. Always small relative to
2989    /// `inserted`; if not, there's a real problem to investigate.
2990    pub dropped_sessions: usize,
2991    /// Files the adapter couldn't decode at all (no Session header
2992    /// extractable: empty `.jsonl`, missing required field).
2993    pub skipped_files: usize,
2994    /// Files that produced no importable session and were benignly skipped:
2995    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
2996    /// metadata file), or an unextractable header. Never an error or a drop;
2997    /// the underlying cause is logged at `-vv` (debug) verbosity.
2998    pub skipped_empty: usize,
2999    /// Sessions short-circuited via the per-session staleness skip
3000    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
3001    /// pond last wrote that session's row, so re-decode was bypassed.
3002    pub skipped_fresh: usize,
3003    /// Storage-layer failures whose retries were exhausted (commit
3004    /// conflicts, transient IO that didn't recover). Hard zero on healthy
3005    /// runs.
3006    pub storage_errors: usize,
3007    /// Oversized values truncated to a bounded sentinel at the seam
3008    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
3009    pub truncated_values: usize,
3010    /// Histogram of stable reason keys for the combined `dropped_events +
3011    /// dropped_sessions` populations. Keys are `&'static str` (see the
3012    /// `DROP_REASON_*` constants) so consumers can match by identity.
3013    /// Empty on a clean run. Used by `pond sync` to print the top reasons
3014    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
3015    pub drop_reasons: BTreeMap<&'static str, usize>,
3016}
3017
3018/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
3019/// the per-row `RowError::reason_key`. `&'static str` so consumers can
3020/// match by identity rather than prose. Adding a new variant: pick a short
3021/// snake_case identifier, route it from the validator/adapter, and update
3022/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
3023pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
3024pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
3025pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
3026pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
3027pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
3028pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
3029pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
3030pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
3031pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
3032pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
3033pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
3034
3035/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
3036/// returned counts together with the pre-existence sets captured by
3037/// `upsert_session_batch`. Folded into a per-sync summary via
3038/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
3039/// is a no-op write, so the inserted/matched split is informational - we still
3040/// surface it because both `pond sync` and `pond_ingest` clients reconcile
3041/// against "which rows landed this call."
3042#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3043pub struct BatchCounts {
3044    pub sessions_inserted: usize,
3045    pub sessions_matched: usize,
3046    pub messages_inserted_total: usize,
3047    pub messages_inserted_searchable: usize,
3048    pub messages_matched_total: usize,
3049    pub messages_matched_searchable: usize,
3050    pub parts_inserted: usize,
3051    pub parts_matched: usize,
3052}
3053
3054impl IngestSummary {
3055    pub fn accepted(&self) -> usize {
3056        self.inserted + self.matched
3057    }
3058
3059    /// Sole writer of the per-table counters on the CLI batched flush path.
3060    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
3061    /// both for the same rows would double-count.
3062    pub fn add_batch(&mut self, counts: &BatchCounts) {
3063        self.sessions_inserted += counts.sessions_inserted;
3064        self.sessions_matched += counts.sessions_matched;
3065        self.messages_inserted_total += counts.messages_inserted_total;
3066        self.messages_inserted_searchable += counts.messages_inserted_searchable;
3067        self.messages_matched_total += counts.messages_matched_total;
3068        self.messages_matched_searchable += counts.messages_matched_searchable;
3069        self.parts_inserted += counts.parts_inserted;
3070        self.parts_matched += counts.parts_matched;
3071        self.inserted +=
3072            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
3073        self.matched +=
3074            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
3075    }
3076
3077    /// Sum every counter from `other` into `self`. Used by the multi-source
3078    /// `pond sync` loop so adding a new field to this struct doesn't silently
3079    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
3080    pub fn merge(&mut self, other: &Self) {
3081        self.inserted += other.inserted;
3082        self.matched += other.matched;
3083        self.sessions_inserted += other.sessions_inserted;
3084        self.messages_inserted_total += other.messages_inserted_total;
3085        self.messages_inserted_searchable += other.messages_inserted_searchable;
3086        self.parts_inserted += other.parts_inserted;
3087        self.sessions_matched += other.sessions_matched;
3088        self.messages_matched_total += other.messages_matched_total;
3089        self.messages_matched_searchable += other.messages_matched_searchable;
3090        self.parts_matched += other.parts_matched;
3091        self.dropped_events += other.dropped_events;
3092        self.dropped_sessions += other.dropped_sessions;
3093        self.skipped_files += other.skipped_files;
3094        self.skipped_empty += other.skipped_empty;
3095        self.skipped_fresh += other.skipped_fresh;
3096        self.storage_errors += other.storage_errors;
3097        self.truncated_values += other.truncated_values;
3098        for (key, value) in &other.drop_reasons {
3099            *self.drop_reasons.entry(key).or_insert(0) += value;
3100        }
3101    }
3102
3103    /// Same dispatch as [`Self::add_outcomes`] but ignores
3104    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
3105    /// via [`Self::add_batch`] and uses this method to attribute per-row
3106    /// `Error` outcomes from the same flush.
3107    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
3108        for outcome in outcomes {
3109            if !matches!(outcome.status, OutcomeStatus::Error) {
3110                continue;
3111            }
3112            if outcome.kind == "session" {
3113                self.dropped_sessions += 1;
3114            } else {
3115                self.dropped_events += 1;
3116            }
3117            let reason = outcome
3118                .error
3119                .as_ref()
3120                .and_then(|error| error.reason_key)
3121                .unwrap_or(DROP_REASON_UNCATEGORIZED);
3122            *self.drop_reasons.entry(reason).or_insert(0) += 1;
3123        }
3124    }
3125
3126    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
3127        for outcome in outcomes {
3128            match outcome.status {
3129                OutcomeStatus::Inserted => {
3130                    self.inserted += 1;
3131                    match outcome.kind {
3132                        "session" => self.sessions_inserted += 1,
3133                        "message" => {
3134                            self.messages_inserted_total += 1;
3135                            if outcome.searchable {
3136                                self.messages_inserted_searchable += 1;
3137                            }
3138                        }
3139                        "part" => self.parts_inserted += 1,
3140                        _ => {}
3141                    }
3142                }
3143                OutcomeStatus::Matched => {
3144                    self.matched += 1;
3145                    match outcome.kind {
3146                        "session" => self.sessions_matched += 1,
3147                        "message" => {
3148                            self.messages_matched_total += 1;
3149                            if outcome.searchable {
3150                                self.messages_matched_searchable += 1;
3151                            }
3152                        }
3153                        "part" => self.parts_matched += 1,
3154                        _ => {}
3155                    }
3156                }
3157                OutcomeStatus::Error => {
3158                    // Session-level rejection: exactly one session-kind Error
3159                    // outcome (see `error_outcomes_for_substream`). Per-event
3160                    // drop: one Error per message/part. The two populations
3161                    // are counted separately so the operator can tell a
3162                    // structural reject from a row-level skip.
3163                    if outcome.kind == "session" {
3164                        self.dropped_sessions += 1;
3165                    } else {
3166                        self.dropped_events += 1;
3167                    }
3168                    let reason = outcome
3169                        .error
3170                        .as_ref()
3171                        .and_then(|e| e.reason_key)
3172                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
3173                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
3174                }
3175            }
3176        }
3177    }
3178}
3179
3180/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
3181/// row per input event from the request's `events` array. The validator
3182/// returns these in array order so the wire layer can pack them directly
3183/// into [`crate::wire::IngestResult`] entries.
3184#[derive(Debug, Clone, PartialEq)]
3185pub struct RowOutcome {
3186    pub index: usize,
3187    pub kind: &'static str,
3188    pub pk: Value,
3189    pub status: OutcomeStatus,
3190    pub error: Option<RowError>,
3191    /// True iff `kind == "message"` AND the underlying row carries
3192    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
3193    /// so the CLI can show "searchable" message deltas distinct from raw
3194    /// inserts. Always false for session/part rows.
3195    pub searchable: bool,
3196}
3197
3198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3199pub enum OutcomeStatus {
3200    Inserted,
3201    Matched,
3202    Error,
3203}
3204
3205/// Structured per-row error body. Mirrors the wire shape so the handler
3206/// can pass it straight through.
3207#[derive(Debug, Clone, PartialEq, Eq)]
3208pub struct RowError {
3209    pub message: String,
3210    pub field: Option<&'static str>,
3211    pub reason: Option<&'static str>,
3212    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
3213    /// `reason` field above is human-prose; `reason_key` is the machine
3214    /// bucket. `None` means uncategorized; consumers attribute to
3215    /// `DROP_REASON_UNCATEGORIZED`.
3216    pub reason_key: Option<&'static str>,
3217}
3218
3219/// Buffered session events tagged with their input array index, so the
3220/// per-row outcomes can be re-attributed once `merge_insert` returns its
3221/// per-row Inserted/Matched stats.
3222#[derive(Debug)]
3223struct BufferedSession {
3224    index: usize,
3225    session: Session,
3226}
3227
3228#[derive(Debug)]
3229struct BufferedMessage {
3230    index: usize,
3231    message: Message,
3232    parts: Vec<BufferedPart>,
3233    search_text: Option<String>,
3234}
3235
3236#[derive(Debug)]
3237struct BufferedPart {
3238    index: usize,
3239    part: Part,
3240}
3241
3242/// State machine that turns the `events: Vec<IngestEvent>` array into a
3243/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
3244/// session substream so `merge_insert` runs once per substream (three
3245/// batches: sessions, messages, parts). A validation error on a single event
3246/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
3247/// continues; only Session-level invariants (immutable source_agent / project
3248/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
3249///
3250/// Writes are batched at flush time. As complete substreams arrive (a new
3251/// `Session` event closes out the current one), they accumulate in
3252/// `completed` rather than each one calling `merge_insert` immediately.
3253/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
3254/// at which point one batched 3-parallel-merge-insert covers all pending
3255/// substreams. This is the load-bearing perf change: per-substream commit
3256/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
3257/// and amortizing it across N sessions cuts wall time materially.
3258#[derive(Debug, Default)]
3259pub struct IngestValidator {
3260    session: Option<BufferedSession>,
3261    current_message: Option<BufferedMessage>,
3262    current_parts: Vec<BufferedPart>,
3263    messages: Vec<BufferedMessage>,
3264    /// Message ids already buffered in the current substream. Duplicate ids
3265    /// drop the offending event in-line rather than failing the whole batch
3266    /// downstream.
3267    seen_message_ids: HashSet<String>,
3268    /// `(message_id, part_id)` keys already buffered in the current
3269    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
3270    seen_part_keys: HashSet<(String, String)>,
3271    /// Substreams whose end-of-stream boundary has been observed but whose
3272    /// rows haven't been written yet. Flushed in batched mode by
3273    /// [`Self::flush`].
3274    completed: Vec<CompletedSubstream>,
3275}
3276
3277/// One closed substream ready for the batched flush path.
3278#[derive(Debug)]
3279struct CompletedSubstream {
3280    session_index: usize,
3281    session: Session,
3282    messages: Vec<BufferedMessage>,
3283}
3284
3285/// Ingest host provenance (`options.pond`, spec.md#model-pond-options),
3286/// computed once per process. An audit fact - "the process that inserted this
3287/// row" - not identity. Fallible lookups are omitted, never synthesized as
3288/// placeholders.
3289fn ingest_host_stamp() -> Option<&'static Value> {
3290    static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
3291    STAMP
3292        .get_or_init(|| {
3293            let mut host = serde_json::Map::new();
3294            if let Ok(username) = whoami::username() {
3295                host.insert("username".to_owned(), username.into());
3296            }
3297            if let Ok(hostname) = whoami::hostname() {
3298                host.insert("hostname".to_owned(), hostname.into());
3299            }
3300            if let Ok(devicename) = whoami::devicename() {
3301                host.insert("device_name".to_owned(), devicename.into());
3302            }
3303            (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
3304        })
3305        .as_ref()
3306}
3307
3308impl IngestValidator {
3309    /// Drive one input event through the validator. Returns the per-row
3310    /// outcomes the event triggered: empty when the event is just buffered,
3311    /// or N entries when a session substream just flushed (success or
3312    /// failure). `Err` is reserved for catastrophic storage failures that
3313    /// should fail the whole `pond_ingest` request.
3314    pub async fn push(
3315        &mut self,
3316        store: &Store,
3317        index: usize,
3318        event: IngestEvent,
3319    ) -> Result<Vec<RowOutcome>> {
3320        match event {
3321            IngestEvent::Session(session) => self.push_session(store, index, session).await,
3322            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
3323            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
3324        }
3325    }
3326
3327    /// Final flush at end-of-batch. Closes the in-flight substream and
3328    /// drains the pending-flush buffer. Returns the per-row outcomes (for
3329    /// the wire layer) alongside the honest per-table counts (for
3330    /// `IngestSummary::add_batch`).
3331    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3332        self.close_current_substream();
3333        self.flush(store).await
3334    }
3335
3336    /// Drain every completed substream into batched 3-parallel-merge_insert
3337    /// writes. Caller invokes this periodically (every N completed
3338    /// substreams) to keep memory bounded; in adapter-driven sync that
3339    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
3340    /// in-flight substream stays buffered - close it explicitly via
3341    /// [`Self::finish`] or by feeding the next Session event.
3342    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3343        if self.completed.is_empty() {
3344            return Ok((Vec::new(), BatchCounts::default()));
3345        }
3346        let completed = std::mem::take(&mut self.completed);
3347        store.upsert_session_batch(completed).await
3348    }
3349
3350    /// Number of fully-buffered substreams awaiting batched write. Used by
3351    /// the adapter caller to decide when to call [`Self::flush`].
3352    pub fn pending_substreams(&self) -> usize {
3353        self.completed.len()
3354    }
3355
3356    async fn push_session(
3357        &mut self,
3358        _store: &Store,
3359        index: usize,
3360        mut session: Session,
3361    ) -> Result<Vec<RowOutcome>> {
3362        // Close out the current substream (if any) - move it to the pending
3363        // buffer instead of writing immediately. The actual write happens
3364        // when the caller invokes `flush` / `finish`.
3365        self.close_current_substream();
3366
3367        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
3368        // if empty after trim. A Session event with empty source_agent is
3369        // dropped on the spot - the substream that would follow has nothing
3370        // to anchor on, so subsequent message/part events will also drop.
3371        let trimmed = session.source_agent.trim();
3372        if trimmed.is_empty() {
3373            return Ok(vec![RowOutcome {
3374                index,
3375                kind: "session",
3376                pk: Value::String(session.id.clone()),
3377                status: OutcomeStatus::Error,
3378                error: Some(RowError {
3379                    message: format!("session {} has empty source_agent after trim", session.id),
3380                    field: Some("source_agent"),
3381                    reason: None,
3382                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
3383                }),
3384                searchable: false,
3385            }]);
3386        }
3387        if trimmed.len() != session.source_agent.len() {
3388            session.source_agent = trimmed.to_owned();
3389        }
3390
3391        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
3392            return Ok(vec![RowOutcome {
3393                index,
3394                kind: "session",
3395                pk: Value::String(session.id.clone()),
3396                status: OutcomeStatus::Error,
3397                error: Some(RowError {
3398                    message: format!(
3399                        "session {} has parent_message_id without parent_session_id",
3400                        session.id,
3401                    ),
3402                    field: Some("parent_message_id"),
3403                    reason: None,
3404                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
3405                }),
3406                searchable: false,
3407            }]);
3408        }
3409
3410        self.seen_message_ids.clear();
3411        self.seen_part_keys.clear();
3412        self.session = Some(BufferedSession { index, session });
3413        Ok(Vec::new())
3414    }
3415
3416    fn close_current_substream(&mut self) {
3417        self.flush_current_message();
3418        let Some(BufferedSession {
3419            index: session_index,
3420            session,
3421        }) = self.session.take()
3422        else {
3423            return;
3424        };
3425        let messages = std::mem::take(&mut self.messages);
3426        self.seen_message_ids.clear();
3427        self.seen_part_keys.clear();
3428        self.completed.push(CompletedSubstream {
3429            session_index,
3430            session,
3431            messages,
3432        });
3433    }
3434
3435    fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
3436        let pk = Value::Array(vec![
3437            Value::String(message.session_id().to_owned()),
3438            Value::String(message.id().to_owned()),
3439        ]);
3440        let Some(session) = &self.session else {
3441            return vec![error_outcome(
3442                index,
3443                "message",
3444                pk,
3445                "first event in a session stream must be Session",
3446                None,
3447                DROP_REASON_MESSAGE_BEFORE_SESSION,
3448            )];
3449        };
3450        if message.session_id() != session.session.id {
3451            let msg = format!(
3452                "message {} references session {}, expected {}",
3453                message.id(),
3454                message.session_id(),
3455                session.session.id
3456            );
3457            return vec![error_outcome(
3458                index,
3459                "message",
3460                pk,
3461                &msg,
3462                Some("session_id"),
3463                DROP_REASON_MESSAGE_SESSION_MISMATCH,
3464            )];
3465        }
3466        if !self.seen_message_ids.insert(message.id().to_owned()) {
3467            // Keep same-substream duplicate ids visible in `dropped_events`;
3468            // adapters are expected to dedupe upstream (see claude-code's
3469            // per-file `seen_uuids`), so a hit here is worth investigating.
3470            let msg = format!("duplicate message id {} in session substream", message.id());
3471            return vec![error_outcome(
3472                index,
3473                "message",
3474                pk,
3475                &msg,
3476                None,
3477                DROP_REASON_DUPLICATE_MESSAGE_ID,
3478            )];
3479        }
3480        // `options.pond` is core-owned (spec.md#model-pond-options): stripped
3481        // and restamped at ingest so neither adapters nor wire clients can
3482        // spoof provenance. Matched rows are merge_insert no-ops, so re-ingest
3483        // never restamps stored rows.
3484        match ingest_host_stamp() {
3485            Some(stamp) => {
3486                message
3487                    .options_mut()
3488                    .insert("pond".to_owned(), stamp.clone());
3489            }
3490            None => {
3491                message.options_mut().remove("pond");
3492            }
3493        }
3494        self.flush_current_message();
3495        self.current_message = Some(BufferedMessage {
3496            index,
3497            message,
3498            parts: Vec::new(),
3499            search_text: None,
3500        });
3501        Vec::new()
3502    }
3503
3504    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
3505        let pk = Value::Array(vec![
3506            Value::String(part.session_id.clone()),
3507            Value::String(part.message_id.clone()),
3508            Value::String(part.id.clone()),
3509        ]);
3510        let Some(current) = &self.current_message else {
3511            return vec![error_outcome(
3512                index,
3513                "part",
3514                pk,
3515                "part event appeared before a message",
3516                None,
3517                DROP_REASON_PART_BEFORE_MESSAGE,
3518            )];
3519        };
3520        if part.session_id != current.message.session_id() {
3521            let msg = format!(
3522                "part {} references session {}, expected {}",
3523                part.id,
3524                part.session_id,
3525                current.message.session_id()
3526            );
3527            return vec![error_outcome(
3528                index,
3529                "part",
3530                pk,
3531                &msg,
3532                Some("session_id"),
3533                DROP_REASON_PART_MESSAGE_MISMATCH,
3534            )];
3535        }
3536        if part.message_id != current.message.id() {
3537            let msg = format!(
3538                "part {} references message {}, expected {}",
3539                part.id,
3540                part.message_id,
3541                current.message.id()
3542            );
3543            return vec![error_outcome(
3544                index,
3545                "part",
3546                pk,
3547                &msg,
3548                Some("message_id"),
3549                DROP_REASON_PART_MESSAGE_MISMATCH,
3550            )];
3551        }
3552        let part_key = (part.message_id.clone(), part.id.clone());
3553        if !self.seen_part_keys.insert(part_key) {
3554            let msg = format!(
3555                "duplicate part id {} for message {} in session substream",
3556                part.id, part.message_id
3557            );
3558            return vec![error_outcome(
3559                index,
3560                "part",
3561                pk,
3562                &msg,
3563                None,
3564                DROP_REASON_DUPLICATE_PART_KEY,
3565            )];
3566        }
3567        self.current_parts.push(BufferedPart { index, part });
3568        Vec::new()
3569    }
3570
3571    fn flush_current_message(&mut self) {
3572        let Some(mut buffered) = self.current_message.take() else {
3573            return;
3574        };
3575        let parts = std::mem::take(&mut self.current_parts);
3576        let mut canonical_parts = Vec::with_capacity(parts.len());
3577        for part in &parts {
3578            canonical_parts.push(part.part.clone());
3579        }
3580        buffered.search_text = search_text(&buffered.message, &canonical_parts);
3581        buffered.parts = parts;
3582        self.messages.push(buffered);
3583    }
3584}
3585
3586fn error_outcome(
3587    index: usize,
3588    kind: &'static str,
3589    pk: Value,
3590    message: &str,
3591    field: Option<&'static str>,
3592    reason_key: &'static str,
3593) -> RowOutcome {
3594    RowOutcome {
3595        index,
3596        kind,
3597        pk,
3598        status: OutcomeStatus::Error,
3599        error: Some(RowError {
3600            message: message.to_owned(),
3601            field,
3602            reason: None,
3603            reason_key: Some(reason_key),
3604        }),
3605        searchable: false,
3606    }
3607}
3608
3609/// Session-level rejection (immutable `source_agent` / `project` violation):
3610/// emit exactly one Error outcome on the Session row. The buffered messages
3611/// and parts of this substream are *not* surfaced as per-row errors - their
3612/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
3613fn error_outcomes_for_substream(
3614    session_index: usize,
3615    session: &Session,
3616    _messages: &[BufferedMessage],
3617    message: impl Into<String>,
3618    field: Option<&'static str>,
3619    reason_key: &'static str,
3620) -> Vec<RowOutcome> {
3621    let reason = field.map(|_| "immutable");
3622    vec![RowOutcome {
3623        index: session_index,
3624        kind: "session",
3625        pk: Value::String(session.id.clone()),
3626        status: OutcomeStatus::Error,
3627        error: Some(RowError {
3628            message: message.into(),
3629            field,
3630            reason,
3631            reason_key: Some(reason_key),
3632        }),
3633        searchable: false,
3634    }]
3635}
3636
3637/// Batched-path success helper. Each row's Inserted/Matched status is read
3638/// from the pre-existence sets captured by `upsert_session_batch` before its
3639/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
3640/// Also accumulates the per-table totals into `counts` so the CLI summary
3641/// gets the same truth without re-walking the outcomes.
3642fn success_outcomes_for_substream(
3643    session_index: usize,
3644    session: &Session,
3645    messages: &[BufferedMessage],
3646    existing_sessions: &std::collections::HashMap<String, Session>,
3647    existing_message_pks: &HashSet<(String, String)>,
3648    existing_part_pks: &HashSet<(String, String, String)>,
3649    counts: &mut BatchCounts,
3650) -> Vec<RowOutcome> {
3651    let session_was_present = existing_sessions.contains_key(&session.id);
3652    let session_status = if session_was_present {
3653        counts.sessions_matched += 1;
3654        UpsertStatus::Matched
3655    } else {
3656        counts.sessions_inserted += 1;
3657        UpsertStatus::Inserted
3658    };
3659
3660    let mut outcomes = Vec::with_capacity(1 + messages.len());
3661    outcomes.push(success_outcome(
3662        session_index,
3663        "session",
3664        Value::String(session.id.clone()),
3665        session_status,
3666        false,
3667    ));
3668    for buffered in messages {
3669        let key = (
3670            buffered.message.session_id().to_owned(),
3671            buffered.message.id().to_owned(),
3672        );
3673        let searchable = buffered.search_text.is_some();
3674        let message_status = if existing_message_pks.contains(&key) {
3675            counts.messages_matched_total += 1;
3676            if searchable {
3677                counts.messages_matched_searchable += 1;
3678            }
3679            UpsertStatus::Matched
3680        } else {
3681            counts.messages_inserted_total += 1;
3682            if searchable {
3683                counts.messages_inserted_searchable += 1;
3684            }
3685            UpsertStatus::Inserted
3686        };
3687        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
3688        outcomes.push(success_outcome(
3689            buffered.index,
3690            "message",
3691            pk,
3692            message_status,
3693            searchable,
3694        ));
3695        for part in &buffered.parts {
3696            let part_key = (
3697                part.part.session_id.clone(),
3698                part.part.message_id.clone(),
3699                part.part.id.clone(),
3700            );
3701            let part_status = if existing_part_pks.contains(&part_key) {
3702                counts.parts_matched += 1;
3703                UpsertStatus::Matched
3704            } else {
3705                counts.parts_inserted += 1;
3706                UpsertStatus::Inserted
3707            };
3708            let part_pk = Value::Array(vec![
3709                Value::String(part_key.0),
3710                Value::String(part_key.1),
3711                Value::String(part_key.2),
3712            ]);
3713            outcomes.push(success_outcome(
3714                part.index,
3715                "part",
3716                part_pk,
3717                part_status,
3718                false,
3719            ));
3720        }
3721    }
3722    outcomes
3723}
3724
3725fn success_outcome(
3726    index: usize,
3727    kind: &'static str,
3728    pk: Value,
3729    status: UpsertStatus,
3730    searchable: bool,
3731) -> RowOutcome {
3732    let status = match status {
3733        UpsertStatus::Inserted => OutcomeStatus::Inserted,
3734        UpsertStatus::Matched => OutcomeStatus::Matched,
3735    };
3736    RowOutcome {
3737        index,
3738        kind,
3739        pk,
3740        status,
3741        error: None,
3742        searchable,
3743    }
3744}
3745
3746#[derive(Debug, Clone, PartialEq, Eq)]
3747enum IngestError {
3748    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
3749    /// immutable post-first-write because the denormalized copies on
3750    /// `messages` were stamped from the prior Session at first ingest.
3751    /// A re-write that changes either would silently desync.
3752    ImmutableField {
3753        field: &'static str,
3754        session_id: String,
3755        stored: String,
3756        attempted: String,
3757    },
3758}
3759
3760impl std::fmt::Display for IngestError {
3761    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3762        match self {
3763            Self::ImmutableField {
3764                field,
3765                session_id,
3766                stored,
3767                attempted,
3768            } => write!(
3769                formatter,
3770                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
3771            ),
3772        }
3773    }
3774}
3775
3776impl std::error::Error for IngestError {}
3777
3778/// Compare an incoming Session row against the stored row on the two
3779/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
3780/// counts a NULL-vs-non-NULL change as a mismatch.
3781fn ensure_immutable_match(
3782    existing: &Session,
3783    incoming: &Session,
3784) -> std::result::Result<(), IngestError> {
3785    if existing.source_agent != incoming.source_agent {
3786        return Err(IngestError::ImmutableField {
3787            field: "source_agent",
3788            session_id: incoming.id.clone(),
3789            stored: existing.source_agent.clone(),
3790            attempted: incoming.source_agent.clone(),
3791        });
3792    }
3793    if existing.project != incoming.project {
3794        return Err(IngestError::ImmutableField {
3795            field: "project",
3796            session_id: incoming.id.clone(),
3797            stored: (*existing.project).clone(),
3798            attempted: (*incoming.project).clone(),
3799        });
3800    }
3801    Ok(())
3802}
3803
3804pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
3805    use crate::wire::Provenance;
3806    let mut chunks: Vec<String> = Vec::new();
3807    for part in parts {
3808        // spec.md#search: only conversational parts contribute to the indexed
3809        // text; harness-injected scaffolding is excluded from search.
3810        if part.provenance != Provenance::Conversational {
3811            continue;
3812        }
3813        match (message.role(), &part.kind) {
3814            (Role::User | Role::Assistant, PartKind::Text { text }) => {
3815                if let Some(text) = text {
3816                    chunks.push(text.to_string());
3817                }
3818            }
3819            (
3820                Role::User | Role::Assistant,
3821                PartKind::File {
3822                    media_type,
3823                    file_name,
3824                    data,
3825                },
3826            ) => {
3827                if let Some(file_name) = file_name {
3828                    chunks.push(file_name.clone());
3829                }
3830                if let Some(media_type) = media_type {
3831                    chunks.push(media_type.clone());
3832                }
3833                if let FileData::Url(uri) = data {
3834                    chunks.push(uri.clone());
3835                }
3836            }
3837            (
3838                Role::System | Role::Tool,
3839                PartKind::Text { .. }
3840                | PartKind::Reasoning { .. }
3841                | PartKind::File { .. }
3842                | PartKind::ToolCall { .. }
3843                | PartKind::ToolResult { .. }
3844                | PartKind::ToolApprovalRequest { .. }
3845                | PartKind::ToolApprovalResponse { .. },
3846            )
3847            | (
3848                Role::User | Role::Assistant,
3849                PartKind::Reasoning { .. }
3850                | PartKind::ToolCall { .. }
3851                | PartKind::ToolResult { .. }
3852                | PartKind::ToolApprovalRequest { .. }
3853                | PartKind::ToolApprovalResponse { .. },
3854            ) => {}
3855        }
3856    }
3857
3858    let text = chunks
3859        .into_iter()
3860        .filter(|chunk| !chunk.trim().is_empty())
3861        .collect::<Vec<_>>()
3862        .join("\n");
3863    if text.is_empty() { None } else { Some(text) }
3864}
3865
3866/// Non-empty conversational text (spec.md#search).
3867#[derive(Debug, Clone, PartialEq, Eq)]
3868pub struct SearchText(String);
3869
3870impl SearchText {
3871    pub fn as_str(&self) -> &str {
3872        &self.0
3873    }
3874
3875    pub fn into_inner(self) -> String {
3876        self.0
3877    }
3878}
3879
3880impl AsRef<str> for SearchText {
3881    fn as_ref(&self) -> &str {
3882        &self.0
3883    }
3884}
3885
3886#[derive(Debug, Clone, PartialEq)]
3887pub struct MessageWithParts {
3888    pub message: Message,
3889    pub parts: Vec<Part>,
3890}
3891
3892#[derive(Debug, Clone, PartialEq)]
3893pub struct SessionWithMessages {
3894    pub session: Session,
3895    pub messages: Vec<MessageWithParts>,
3896}
3897
3898#[derive(Debug, Clone)]
3899pub struct SessionViewParams<'a> {
3900    /// Page forward: messages strictly after this id.
3901    pub after_message_id: Option<&'a str>,
3902    /// Page backward: messages strictly before this id.
3903    pub before_message_id: Option<&'a str>,
3904    pub limit: usize,
3905    pub budget_bytes: usize,
3906    /// First-page end when neither anchor is set.
3907    pub session_from: SessionFrom,
3908}
3909
3910#[derive(Debug, Clone)]
3911pub struct MessageViewParams {
3912    /// Conversational siblings before the target (`grep -B`).
3913    pub context_before: usize,
3914    /// Conversational siblings after the target (`grep -A`).
3915    pub context_after: usize,
3916    pub budget_bytes: usize,
3917}
3918
3919/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
3920/// maps it to `not_found`) from a stale/unknown pagination anchor (mapped to
3921/// `validation_failed`): the message stream is append-only, so an anchor that
3922/// was ever valid never disappears - an unknown one is always a client error,
3923/// never a reason to silently restart the page.
3924#[derive(Debug, Clone, PartialEq)]
3925pub enum GetLookup<T> {
3926    NotFound,
3927    UnknownAnchor,
3928    Found(T),
3929}
3930
3931/// Canonical retrieval result for `pond_get` session mode: the stored session
3932/// plus the page of messages (each with its `Part`s) and a remaining count.
3933/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
3934#[derive(Debug, Clone, PartialEq)]
3935pub struct SessionPage {
3936    pub session: Session,
3937    pub messages: Vec<RetrievedMessage>,
3938    pub before_remaining: usize,
3939    pub after_remaining: usize,
3940}
3941
3942/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
3943/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
3944/// their parts so the handler can summarize them.
3945#[derive(Debug, Clone, PartialEq)]
3946pub struct MessagePage {
3947    pub session: Session,
3948    pub target: RetrievedMessage,
3949    pub target_parts: Vec<Part>,
3950    pub target_parts_remaining: usize,
3951    pub siblings: Vec<RetrievedMessage>,
3952}
3953
3954#[derive(Debug, Clone, PartialEq)]
3955pub struct RetrievedMessage {
3956    pub id: String,
3957    pub role: Role,
3958    pub timestamp: DateTime<Utc>,
3959    pub text: Option<String>,
3960    pub content: Option<String>,
3961    pub parts: Vec<Part>,
3962}
3963
3964#[derive(Debug, Clone)]
3965struct ScanRow {
3966    id: String,
3967    role: Role,
3968    timestamp: DateTime<Utc>,
3969    text: Option<String>,
3970    content: Option<String>,
3971}
3972
3973/// One row of the conversational scan. `text` is non-empty by
3974/// `IsNotNull("search_text")` pushdown (spec.md#search).
3975#[derive(Debug, Clone)]
3976pub struct ConversationalRow {
3977    pub session_id: String,
3978    pub message_id: String,
3979    pub role: Role,
3980    pub timestamp: DateTime<Utc>,
3981    pub text: SearchText,
3982}
3983
3984/// Number of leading `items` that fit within `limit` and the byte budget,
3985/// sizing each by `size`. Always emits at least one (a single oversize item
3986/// never blocks its own page); the budget then stops the page at the next item
3987/// boundary.
3988fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
3989    let capped = items.len().min(limit.clamp(1, 1000));
3990    let mut acc = 0usize;
3991    let mut emitted = 0usize;
3992    for item in &items[..capped] {
3993        let next = acc.saturating_add(size(item));
3994        if emitted > 0 && next > budget_bytes {
3995            break;
3996        }
3997        acc = next;
3998        emitted += 1;
3999    }
4000    emitted
4001}
4002
4003/// Like `page_by` but counts from the tail: how many trailing items fit
4004/// `limit` and the byte budget, dropping oldest first. The last (newest) item
4005/// is always kept, so the returned count is >= 1 for a non-empty slice and the
4006/// emitted page (`items[len - n..]`) stays chronological.
4007fn page_tail<T>(
4008    items: &[T],
4009    limit: usize,
4010    budget_bytes: usize,
4011    size: impl Fn(&T) -> usize,
4012) -> usize {
4013    let cap = limit.clamp(1, 1000);
4014    let mut acc = 0usize;
4015    let mut emitted = 0usize;
4016    for item in items.iter().rev() {
4017        if emitted >= cap {
4018            break;
4019        }
4020        let next = acc.saturating_add(size(item));
4021        if emitted > 0 && next > budget_bytes {
4022            break;
4023        }
4024        acc = next;
4025        emitted += 1;
4026    }
4027    emitted
4028}
4029
4030fn role_from_str(value: &str) -> Result<Role> {
4031    match value {
4032        "system" => Ok(Role::System),
4033        "user" => Ok(Role::User),
4034        "assistant" => Ok(Role::Assistant),
4035        "tool" => Ok(Role::Tool),
4036        other => anyhow::bail!("unknown message role {other}"),
4037    }
4038}
4039
4040/// Scalar indexes on `messages` (spec.md#datasets): only columns whose index
4041/// type matches the predicate actually issued against them. `project` is
4042/// filtered solely by `LikeContains`/`Regex` (substring), which a BTree cannot
4043/// accelerate, and `role` is never filtered - both are deliberately unindexed
4044/// (substring lookup stays on the SQL `LIKE` path). There is no index on
4045/// `embedding_model`: pond's invariant is one active model at a time (a model
4046/// swap goes through `pond optimize --force-embed` which drops the IVF_SQ,
4047/// clears stale rows, and re-bootstraps), so the only embedding-state filter is
4048/// `vector IS NOT NULL`. `id` lookups are rare and full-scan.
4049const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4050    (
4051        "session_id",
4052        BuiltinIndexType::BTree,
4053        "messages_session_id_btree",
4054    ),
4055    // Range-only column (`from_date`/`to_date` -> `timestamp >=` / `<=`,
4056    // never exact-equality, never `ORDER BY` against the index). ZoneMap's
4057    // per-fragment min/max prunes those filters with no recall loss (measured:
4058    // 258 zones -> ~6, ~42x fewer rows scanned on the real S3 corpus), and
4059    // skips the global ExternalSort that a BTree would pay during build.
4060    (
4061        "timestamp",
4062        BuiltinIndexType::ZoneMap,
4063        "messages_timestamp_zonemap",
4064    ),
4065    (
4066        "source_agent",
4067        BuiltinIndexType::Bitmap,
4068        "messages_source_agent_bitmap",
4069    ),
4070];
4071
4072/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
4073/// `parts_for_messages` (hydration on every `get` and grouped search).
4074const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4075    (
4076        "session_id",
4077        BuiltinIndexType::BTree,
4078        "parts_session_id_btree",
4079    ),
4080    (
4081        "message_id",
4082        BuiltinIndexType::BTree,
4083        "parts_message_id_btree",
4084    ),
4085];
4086
4087/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
4088/// `get` and every grouped search.
4089const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
4090    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
4091
4092/// Session ids per `session_id IN (...)` chunk in an incremental copy: large
4093/// enough to amortize per-scan setup, small enough to keep the pushed-down
4094/// predicate string and its btree lookup batch bounded.
4095const COPY_SESSION_IN_CHUNK: usize = 512;
4096
4097fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
4098    Predicate::In(
4099        column,
4100        values.iter().cloned().map(ScalarValue::String).collect(),
4101    )
4102}
4103
4104/// The kNN prefilter is the caller's scalar filter alone - pond does NOT add
4105/// `vector IS NOT NULL`. That looks like a safe guard but it is a remote-read
4106/// trap: Lance v2 keeps no per-column null metadata, so `IsNotNull(vector)`
4107/// forces a full read of the ~3 GiB `vector` column from the object store on
4108/// every query (the ANN prefilter is evaluated as a `LanceScan` over the
4109/// column) - measured at ~57 s/query on the 2M-row S3 corpus, dwarfing the
4110/// IVF probe itself. It is also redundant: the IVF_SQ index only contains
4111/// embedded rows, and Lance's `_distance IS NOT NULL` post-filter (present in
4112/// both the ANN and brute-force branches of the plan) already drops any
4113/// null-vector row the brute-force tail might surface. So an empty caller
4114/// filter yields an empty prefilter and a pure index probe (spec.md#search,
4115/// spec.md#search-prefilter-pushdown).
4116fn embedded_scope(filter: &Predicate) -> Predicate {
4117    filter.clone()
4118}
4119
4120/// IVF `nprobes` applied when `[search].nprobes` is unset. Left unset, Lance
4121/// probes up to every partition (~num_rows/4096, ~500 on the 2M-row corpus),
4122/// one object-store read each - the dominant cost of a vector scan on a remote
4123/// store. 32 bounds the reads while keeping recall (benchmarked, spec.md#search).
4124pub const DEFAULT_NPROBES: usize = 32;
4125
4126/// Apply pond's vector-search tuning to a kNN scanner, defaulting any unset
4127/// `[search]` knob so a default install never inherits Lance's unbounded
4128/// probe-every-partition behavior. No refine: IVF_SQ's per-dimension codes are
4129/// precise enough to rank from the prewarmed partition, so pond never re-reads
4130/// exact vectors from the data files (the remote-store GET storm PQ+refine
4131/// incurred).
4132fn apply_vector_search_knobs(
4133    scanner: &mut lance::dataset::scanner::Scanner,
4134    search: Option<&config::SearchConfig>,
4135) {
4136    let nprobes = search
4137        .and_then(|cfg| cfg.nprobes)
4138        .unwrap_or(DEFAULT_NPROBES);
4139    scanner.nprobes(nprobes);
4140}
4141
4142// Bare logical table names: the lance-namespace Directory impl owns the
4143// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
4144// a `.lance` path.
4145pub(crate) const SESSIONS: &str = "sessions";
4146pub(crate) const MESSAGES: &str = "messages";
4147pub(crate) const PARTS: &str = "parts";
4148
4149/// FTS index name on `messages.search_text`. Stable so status and index
4150/// creation name the same index.
4151pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
4152
4153/// IVF_SQ index name on `messages.vector` (spec.md#search). Stable so the
4154/// activation check, optimize/append, and status all name the same index. The
4155/// literal keeps the historical `_ivfpq` suffix as a stable identifier:
4156/// renaming it would orphan the existing segment under a new name. A plain
4157/// `optimize` folds into whatever index type already exists, so switching an
4158/// existing IVF_PQ store to IVF_SQ needs `pond optimize --rebuild`.
4159pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
4160
4161/// IVF_SQ tuning constants (spec.md#search):
4162/// - num_bits = 8 (per-dimension scalar quantization)
4163/// - max_iters = 15 (kmeans cap)
4164/// - cosine metric (e5 vectors are L2-normalized)
4165const IVF_SQ_NUM_BITS: u16 = 8;
4166const IVF_SQ_MAX_ITERS: usize = 15;
4167
4168/// Pond's production IndexIntents: the per-table intent set
4169/// `Store::open_with_options` registers with the substrate.
4170pub fn pond_index_intents() -> IndexIntents {
4171    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
4172}
4173
4174/// Same as [`pond_index_intents`] but with an overridable IVF_SQ activation
4175/// threshold. Used by tests that need to exercise the activation boundary
4176/// without writing 100k vectors.
4177pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
4178    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
4179    messages.push(IndexIntent {
4180        name: MESSAGES_FTS_INDEX,
4181        column: "search_text",
4182        trigger: IndexTrigger::OnAnyRows,
4183        params: IndexParamsKind::InvertedFtsWord,
4184    });
4185    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
4186        messages.push(IndexIntent {
4187            name,
4188            column,
4189            trigger: IndexTrigger::OnAnyRows,
4190            params: IndexParamsKind::Scalar(kind.clone()),
4191        });
4192    }
4193    messages.push(IndexIntent {
4194        name: MESSAGES_VECTOR_INDEX,
4195        column: "vector",
4196        trigger: IndexTrigger::OnNonNullCount {
4197            column: "vector",
4198            threshold: vector_threshold,
4199        },
4200        params: IndexParamsKind::IvfSqCosine {
4201            num_bits: IVF_SQ_NUM_BITS,
4202            max_iters: IVF_SQ_MAX_ITERS,
4203        },
4204    });
4205    let parts = PARTS_SCALAR_INDICES
4206        .iter()
4207        .map(|(column, kind, name)| IndexIntent {
4208            name,
4209            column,
4210            trigger: IndexTrigger::OnAnyRows,
4211            params: IndexParamsKind::Scalar(kind.clone()),
4212        })
4213        .collect();
4214    let sessions = SESSIONS_SCALAR_INDICES
4215        .iter()
4216        .map(|(column, kind, name)| IndexIntent {
4217            name,
4218            column,
4219            trigger: IndexTrigger::OnAnyRows,
4220            params: IndexParamsKind::Scalar(kind.clone()),
4221        })
4222        .collect();
4223    IndexIntents {
4224        sessions,
4225        messages,
4226        parts,
4227    }
4228}
4229
4230/// Default width of the `messages.vector` embedding column (spec.md#search):
4231/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
4232/// 384). Used when `[embeddings].dim` is absent.
4233pub const DEFAULT_EMBEDDING_DIM: usize = 384;
4234
4235/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
4236/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
4237/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
4238/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
4239/// which keeps unit tests config-free.
4240static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
4241
4242/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
4243/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
4244pub fn embedding_dim() -> usize {
4245    EMBEDDING_DIM_RUNTIME
4246        .get()
4247        .copied()
4248        .unwrap_or(DEFAULT_EMBEDDING_DIM)
4249}
4250
4251/// Seed [`embedding_dim`] from config. First call wins.
4252pub fn init_embedding_dim(dim: usize) {
4253    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
4254}
4255
4256/// Initial-`CREATE` write params for the namespace-mediated path. The
4257/// substrate seam stamps in `session`, `mode`, and `store_params`.
4258/// `auto_cleanup` is short; long-term recovery is `pond copy --to <file>`
4259/// snapshots plus deferred Lance tags (spec.md#session-durable-copy).
4260/// `skip_auto_cleanup` suppresses the per-commit hook so cleanup stays
4261/// operator-driven via `pond optimize` (one LIST per command instead of per write).
4262pub(crate) fn write_params_for_create() -> WriteParams {
4263    WriteParams {
4264        data_storage_version: Some(LanceFileVersion::V2_1),
4265        enable_v2_manifest_paths: true,
4266        enable_stable_row_ids: true,
4267        auto_cleanup: Some(AutoCleanupParams {
4268            interval: 20,
4269            older_than: chrono::TimeDelta::days(1),
4270        }),
4271        skip_auto_cleanup: true,
4272        ..WriteParams::default()
4273    }
4274}
4275
4276fn export_schema(table: Table) -> Arc<Schema> {
4277    match table {
4278        Table::Sessions => session_schema(),
4279        Table::Messages => message_schema(),
4280        Table::Parts => part_schema(),
4281    }
4282}
4283
4284fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
4285    let expected = export_schema(table);
4286    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
4287    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
4288    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
4289    if actual_names != expected_names {
4290        anyhow::bail!(
4291            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
4292            table.as_str(),
4293        );
4294    }
4295    Ok(())
4296}
4297
4298async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
4299    let source_uri = source
4300        .to_str()
4301        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
4302    let dataset = Dataset::open(source_uri)
4303        .await
4304        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
4305    ensure_schema_matches_archive(&dataset, table)?;
4306    Ok(dataset)
4307}
4308
4309pub(crate) fn session_schema() -> Arc<Schema> {
4310    Arc::new(Schema::new(vec![
4311        primary_field("id", DataType::Utf8, false),
4312        Field::new("parent_session_id", DataType::Utf8, true),
4313        Field::new("parent_message_id", DataType::Utf8, true),
4314        Field::new("source_agent", DataType::Utf8, false),
4315        Field::new(
4316            "created_at",
4317            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4318            false,
4319        ),
4320        Field::new("project", DataType::Utf8, false),
4321        json_field("options", false),
4322    ]))
4323}
4324
4325pub(crate) fn message_schema() -> Arc<Schema> {
4326    Arc::new(Schema::new(vec![
4327        primary_field("session_id", DataType::Utf8, false),
4328        primary_field("id", DataType::Utf8, false),
4329        Field::new(
4330            "timestamp",
4331            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4332            false,
4333        ),
4334        Field::new("role", DataType::Utf8, false),
4335        Field::new("source_agent", DataType::Utf8, false),
4336        Field::new("project", DataType::Utf8, false),
4337        Field::new("content", DataType::Utf8, true),
4338        Field::new("search_text", DataType::Utf8, true),
4339        // The message's derived embedding (spec.md#session-embed-from-canonical):
4340        // both null until `pond optimize` fills them, set together thereafter.
4341        Field::new("vector", embedding_vector_type(), true),
4342        Field::new("embedding_model", DataType::Utf8, true),
4343        json_field("options", false),
4344    ]))
4345}
4346
4347pub(crate) fn part_schema() -> Arc<Schema> {
4348    Arc::new(Schema::new(vec![
4349        primary_field("session_id", DataType::Utf8, false),
4350        primary_field("message_id", DataType::Utf8, false),
4351        primary_field("id", DataType::Utf8, false),
4352        Field::new("ordinal", DataType::Int32, false),
4353        Field::new("type", DataType::Utf8, false),
4354        // spec.md#model-part-provenance: conversation vs harness-injected; search
4355        // reads this column to exclude injected scaffolding.
4356        Field::new("provenance", DataType::Utf8, false),
4357        json_field("variant_data", false),
4358        legacy_blob_field("data", true),
4359        json_field("options", false),
4360    ]))
4361}
4362
4363pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
4364    let arrays = schema
4365        .fields()
4366        .iter()
4367        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
4368        .collect();
4369    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
4370}
4371
4372pub(crate) fn empty_reader(
4373    schema: Arc<Schema>,
4374) -> Result<
4375    RecordBatchIterator<
4376        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
4377    >,
4378> {
4379    let batch = empty_batch(schema.clone())?;
4380    Ok(RecordBatchIterator::new(
4381        vec![Ok(batch)].into_iter(),
4382        schema,
4383    ))
4384}
4385
4386pub(crate) struct MessageBatchRow<'a> {
4387    pub message: &'a Message,
4388    pub source_agent: &'a str,
4389    pub project: &'a str,
4390    pub search_text: Option<&'a str>,
4391}
4392
4393// Lance v7.0.0-beta.16's IVF_SQ build path (`rust/lance/src/index/vector/utils.rs`
4394// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
4395// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
4396// BFloat16 as a future-supported embedding type; until the Rust IVF_SQ build
4397// path catches up, store as Float16 (half-precision, also 2 bytes/element).
4398fn embedding_vector_type() -> DataType {
4399    DataType::FixedSizeList(
4400        Arc::new(Field::new("item", DataType::Float16, true)),
4401        embedding_dim() as i32,
4402    )
4403}
4404
4405/// The partial-schema source for the embedding column update: the `messages`
4406/// primary key plus the two columns `pond optimize` fills. The field definitions
4407/// match `message_schema` exactly so Lance accepts it as a subset upsert.
4408fn embedding_update_schema() -> Arc<Schema> {
4409    Arc::new(Schema::new(vec![
4410        primary_field("session_id", DataType::Utf8, false),
4411        primary_field("id", DataType::Utf8, false),
4412        Field::new("vector", embedding_vector_type(), true),
4413        Field::new("embedding_model", DataType::Utf8, true),
4414    ]))
4415}
4416
4417/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
4418/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
4419pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
4420    let dim = embedding_dim();
4421    let mut flat = Vec::with_capacity(rows.len() * dim);
4422    for row in rows {
4423        if row.vector.len() != dim {
4424            anyhow::bail!(
4425                "embedding for message {} has dim {}, expected {dim}",
4426                row.id,
4427                row.vector.len(),
4428            );
4429        }
4430        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
4431    }
4432    let values = Float16Array::from(flat);
4433    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
4434    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
4435        .context("failed to build embedding vector column")?;
4436
4437    RecordBatch::try_new(
4438        embedding_update_schema(),
4439        vec![
4440            Arc::new(StringArray::from(
4441                rows.iter()
4442                    .map(|row| row.session_id.as_str())
4443                    .collect::<Vec<_>>(),
4444            )),
4445            Arc::new(StringArray::from(
4446                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
4447            )),
4448            Arc::new(vectors),
4449            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
4450        ],
4451    )
4452    .context("failed to build embedding update batch")
4453}
4454
4455/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
4456/// is split before the running total of its text columns reaches this, and a
4457/// single cell at or above it is rejected rather than left to panic inside
4458/// `StringArray::from` (spec.md#adapter-bounded-values).
4459const COLUMN_BYTE_BUDGET: usize = 1 << 30;
4460
4461/// Contiguous row ranges whose summed text-column byte cost each stays within
4462/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
4463/// column too, since no single column's total can exceed it. `cells[i]` is row
4464/// `i`'s byte cost summed across every text column.
4465fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
4466    let mut chunks = Vec::new();
4467    let mut start = 0usize;
4468    let mut running = 0usize;
4469    for (index, &row) in cells.iter().enumerate() {
4470        if running + row > COLUMN_BYTE_BUDGET && index > start {
4471            chunks.push(start..index);
4472            start = index;
4473            running = 0;
4474        }
4475        running += row;
4476    }
4477    if start < cells.len() {
4478        chunks.push(start..cells.len());
4479    }
4480    chunks
4481}
4482
4483fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
4484    if bytes >= COLUMN_BYTE_BUDGET {
4485        anyhow::bail!(
4486            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
4487             overflow Arrow's i32 offset buffer"
4488        );
4489    }
4490    Ok(())
4491}
4492
4493async fn merge_insert_chunks(
4494    handle: &Handle,
4495    table: Table,
4496    batches: Vec<RecordBatch>,
4497) -> Result<u64> {
4498    let mut inserted = 0u64;
4499    for batch in batches {
4500        let rows = batch.num_rows();
4501        inserted += handle.merge_insert(table, batch, rows).await?;
4502    }
4503    Ok(inserted)
4504}
4505
4506pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
4507    let options = sessions
4508        .iter()
4509        .map(|session| json_bytes(&session.options))
4510        .collect::<Result<Vec<_>>>()?;
4511    let mut cells = Vec::with_capacity(sessions.len());
4512    for (session, encoded) in sessions.iter().zip(&options) {
4513        let columns = [
4514            session.id.len(),
4515            session.parent_session_id.as_deref().map_or(0, str::len),
4516            session.parent_message_id.as_deref().map_or(0, str::len),
4517            session.source_agent.len(),
4518            session.project.as_str().len(),
4519            encoded.len(),
4520        ];
4521        for bytes in columns {
4522            guard_cell("sessions", &session.id, bytes)?;
4523        }
4524        cells.push(columns.iter().sum());
4525    }
4526    chunk_ranges(&cells)
4527        .into_iter()
4528        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
4529        .collect()
4530}
4531
4532fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
4533    let schema = session_schema();
4534    RecordBatch::try_new(
4535        schema.clone(),
4536        vec![
4537            Arc::new(StringArray::from(
4538                sessions
4539                    .iter()
4540                    .map(|session| session.id.as_str())
4541                    .collect::<Vec<_>>(),
4542            )),
4543            Arc::new(StringArray::from(
4544                sessions
4545                    .iter()
4546                    .map(|session| session.parent_session_id.as_deref())
4547                    .collect::<Vec<_>>(),
4548            )),
4549            Arc::new(StringArray::from(
4550                sessions
4551                    .iter()
4552                    .map(|session| session.parent_message_id.as_deref())
4553                    .collect::<Vec<_>>(),
4554            )),
4555            Arc::new(StringArray::from(
4556                sessions
4557                    .iter()
4558                    .map(|session| session.source_agent.as_str())
4559                    .collect::<Vec<_>>(),
4560            )),
4561            Arc::new(
4562                TimestampMicrosecondArray::from(
4563                    sessions
4564                        .iter()
4565                        .map(|session| micros(session.created_at))
4566                        .collect::<Vec<_>>(),
4567                )
4568                .with_timezone("UTC"),
4569            ),
4570            Arc::new(StringArray::from(
4571                sessions
4572                    .iter()
4573                    .map(|session| session.project.as_str())
4574                    .collect::<Vec<_>>(),
4575            )),
4576            Arc::new(LargeBinaryArray::from_iter_values(
4577                options.iter().map(Vec::as_slice),
4578            )),
4579        ],
4580    )
4581    .context("failed to build session batch")
4582}
4583
4584pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
4585    let options = rows
4586        .iter()
4587        .map(|row| json_bytes(row.message.options()))
4588        .collect::<Result<Vec<_>>>()?;
4589    let mut cells = Vec::with_capacity(rows.len());
4590    for (row, encoded) in rows.iter().zip(&options) {
4591        let columns = [
4592            row.message.session_id().len(),
4593            row.message.id().len(),
4594            row.message.role().as_str().len(),
4595            row.source_agent.len(),
4596            row.project.len(),
4597            row.message.system_content().map_or(0, str::len),
4598            row.search_text.map_or(0, str::len),
4599            encoded.len(),
4600        ];
4601        for bytes in columns {
4602            guard_cell("messages", row.message.id(), bytes)?;
4603        }
4604        cells.push(columns.iter().sum());
4605    }
4606    chunk_ranges(&cells)
4607        .into_iter()
4608        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
4609        .collect()
4610}
4611
4612fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
4613    let schema = message_schema();
4614    RecordBatch::try_new(
4615        schema.clone(),
4616        vec![
4617            Arc::new(StringArray::from(
4618                rows.iter()
4619                    .map(|row| row.message.session_id())
4620                    .collect::<Vec<_>>(),
4621            )),
4622            Arc::new(StringArray::from(
4623                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
4624            )),
4625            Arc::new(
4626                TimestampMicrosecondArray::from(
4627                    rows.iter()
4628                        .map(|row| micros(row.message.timestamp()))
4629                        .collect::<Vec<_>>(),
4630                )
4631                .with_timezone("UTC"),
4632            ),
4633            Arc::new(StringArray::from(
4634                rows.iter()
4635                    .map(|row| row.message.role().as_str())
4636                    .collect::<Vec<_>>(),
4637            )),
4638            Arc::new(StringArray::from(
4639                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
4640            )),
4641            Arc::new(StringArray::from(
4642                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
4643            )),
4644            Arc::new(StringArray::from(
4645                rows.iter()
4646                    .map(|row| row.message.system_content())
4647                    .collect::<Vec<_>>(),
4648            )),
4649            Arc::new(StringArray::from(
4650                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
4651            )),
4652            // `vector` / `embedding_model` are written null at ingest; every
4653            // message starts un-embedded and `pond optimize` fills them later
4654            // (spec.md#session-embed-from-canonical).
4655            new_null_array(&embedding_vector_type(), rows.len()),
4656            new_null_array(&DataType::Utf8, rows.len()),
4657            Arc::new(LargeBinaryArray::from_iter_values(
4658                options.iter().map(Vec::as_slice),
4659            )),
4660        ],
4661    )
4662    .context("failed to build message batch")
4663}
4664
4665pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
4666    let variant_data = parts
4667        .iter()
4668        .map(|part| part_variant_json(&part.kind))
4669        .collect::<Result<Vec<_>>>()?;
4670    let options = parts
4671        .iter()
4672        .map(|part| json_bytes(&part.options))
4673        .collect::<Result<Vec<_>>>()?;
4674    let mut cells = Vec::with_capacity(parts.len());
4675    // The blob column is a BinaryArray, exempt from the text-column bound
4676    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
4677    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
4678        let columns = [
4679            part.session_id.len(),
4680            part.message_id.len(),
4681            part.id.len(),
4682            part.kind.type_name().len(),
4683            part.provenance.as_str().len(),
4684            variant.len(),
4685            encoded.len(),
4686        ];
4687        for bytes in columns {
4688            guard_cell("parts", &part.id, bytes)?;
4689        }
4690        cells.push(columns.iter().sum());
4691    }
4692    chunk_ranges(&cells)
4693        .into_iter()
4694        .map(|range| {
4695            parts_chunk(
4696                &parts[range.clone()],
4697                &variant_data[range.clone()],
4698                &options[range],
4699            )
4700        })
4701        .collect()
4702}
4703
4704fn parts_chunk(
4705    parts: &[Part],
4706    variant_data: &[Vec<u8>],
4707    options: &[Vec<u8>],
4708) -> Result<RecordBatch> {
4709    let schema = part_schema();
4710    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
4711    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
4712    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
4713    let blob_payloads: Vec<Option<&[u8]>> = parts
4714        .iter()
4715        .map(|part| match &part.kind {
4716            PartKind::File { data, .. } => Some(match data {
4717                FileData::String(value) => value.as_bytes(),
4718                FileData::Bytes(value) => value.as_slice(),
4719                FileData::Url(value) => value.as_bytes(),
4720            }),
4721            PartKind::Text { .. }
4722            | PartKind::Reasoning { .. }
4723            | PartKind::ToolCall { .. }
4724            | PartKind::ToolResult { .. }
4725            | PartKind::ToolApprovalRequest { .. }
4726            | PartKind::ToolApprovalResponse { .. } => None,
4727        })
4728        .collect();
4729    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
4730
4731    RecordBatch::try_new(
4732        schema.clone(),
4733        vec![
4734            Arc::new(StringArray::from(
4735                parts
4736                    .iter()
4737                    .map(|part| part.session_id.as_str())
4738                    .collect::<Vec<_>>(),
4739            )),
4740            Arc::new(StringArray::from(
4741                parts
4742                    .iter()
4743                    .map(|part| part.message_id.as_str())
4744                    .collect::<Vec<_>>(),
4745            )),
4746            Arc::new(StringArray::from(
4747                parts
4748                    .iter()
4749                    .map(|part| part.id.as_str())
4750                    .collect::<Vec<_>>(),
4751            )),
4752            Arc::new(Int32Array::from(
4753                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
4754            )),
4755            Arc::new(StringArray::from(
4756                parts
4757                    .iter()
4758                    .map(|part| part.kind.type_name())
4759                    .collect::<Vec<_>>(),
4760            )),
4761            Arc::new(StringArray::from(
4762                parts
4763                    .iter()
4764                    .map(|part| part.provenance.as_str())
4765                    .collect::<Vec<_>>(),
4766            )),
4767            Arc::new(LargeBinaryArray::from_iter_values(
4768                variant_data.iter().map(Vec::as_slice),
4769            )),
4770            Arc::new(blob_array),
4771            Arc::new(LargeBinaryArray::from_iter_values(
4772                options.iter().map(Vec::as_slice),
4773            )),
4774        ],
4775    )
4776    .context("failed to build parts batch")
4777}
4778
4779pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
4780    Ok(Session {
4781        id: string(batch, "id", row)?.context("session id is null")?,
4782        parent_session_id: string(batch, "parent_session_id", row)?,
4783        parent_message_id: string(batch, "parent_message_id", row)?,
4784        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4785        created_at: datetime(batch, "created_at", row)?,
4786        project: crate::adapter::Extracted::from_stored(
4787            string(batch, "project", row)?.context("project is null")?,
4788        ),
4789        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
4790    })
4791}
4792
4793/// [`SkipOracle`](crate::adapter::SkipOracle) over the resident row-meta map:
4794/// `pond sync` reads each session's stored max message timestamp from memory, so
4795/// the staleness check costs zero S3 (the map is rebuilt from the store, so the
4796/// check stays deterministic with no local cursor). A `None` map (never
4797/// prewarmed, or the build failed) yields no watermark, so every source
4798/// re-reads - safe, just slower.
4799pub struct RowmapOracle(pub Option<Arc<RowMetaSet>>);
4800
4801impl crate::adapter::SkipOracle for RowmapOracle {
4802    fn session_max_ts(&self, session_id: &str) -> Option<i64> {
4803        self.0.as_ref()?.lookup_max_ts(session_id)
4804    }
4805
4806    fn is_empty(&self) -> bool {
4807        self.0.as_ref().is_none_or(|set| set.is_empty())
4808    }
4809}
4810
4811fn row_meta_entry(batch: &RecordBatch, row_id: u64, row: usize) -> Result<RowMetaEntry> {
4812    Ok(RowMetaEntry {
4813        row_id,
4814        session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4815        message_id: string(batch, "id", row)?.context("message id is null")?,
4816        role: string(batch, "role", row)?.context("role is null")?,
4817        project: string(batch, "project", row)?.context("project is null")?,
4818        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4819        timestamp_micros: datetime(batch, "timestamp", row)?.timestamp_micros(),
4820        search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4821    })
4822}
4823
4824pub(crate) fn message_meta_from_batch(batch: &RecordBatch, row: usize) -> Result<MessageMeta> {
4825    Ok(MessageMeta {
4826        message_id: string(batch, "id", row)?.context("id is null")?,
4827        session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4828        role: string(batch, "role", row)?.context("role is null")?,
4829        project: string(batch, "project", row)?.context("project is null")?,
4830        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4831        timestamp: datetime(batch, "timestamp", row)?,
4832        search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4833    })
4834}
4835
4836pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
4837    let id = string(batch, "id", row)?.context("message id is null")?;
4838    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
4839    let timestamp = datetime(batch, "timestamp", row)?;
4840    let options =
4841        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
4842
4843    match string(batch, "role", row)?
4844        .context("message role is null")?
4845        .as_str()
4846    {
4847        "system" => Ok(Message::System {
4848            id,
4849            session_id,
4850            timestamp,
4851            // `content` is nullable in the schema; preserve the distinction
4852            // between "no content row stored" (`None`) and "empty string
4853            // stored" (`Some(extracted_empty)`). The value originally
4854            // came from a `Source` extraction at ingest time; rewrap via
4855            // the storage-internal `from_stored` so the type-system seal
4856            // for adapters stays intact.
4857            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
4858            options,
4859        }),
4860        "user" => Ok(Message::User {
4861            id,
4862            session_id,
4863            timestamp,
4864            options,
4865        }),
4866        "assistant" => Ok(Message::Assistant {
4867            id,
4868            session_id,
4869            timestamp,
4870            options,
4871        }),
4872        "tool" => Ok(Message::Tool {
4873            id,
4874            session_id,
4875            timestamp,
4876            options,
4877        }),
4878        other => anyhow::bail!("unknown message role {other}"),
4879    }
4880}
4881
4882pub(crate) fn part_from_batch(
4883    batch: &RecordBatch,
4884    row: usize,
4885    file_data: Option<FileData>,
4886) -> Result<Part> {
4887    let type_name = string(batch, "type", row)?.context("part type is null")?;
4888    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
4889    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
4890    Ok(Part {
4891        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
4892        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
4893        id: string(batch, "id", row)?.context("part id is null")?,
4894        ordinal: int32(batch, "ordinal", row)?,
4895        provenance: provenance_from_str(&provenance)?,
4896        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
4897        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
4898    })
4899}
4900
4901fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
4902    match value {
4903        "conversational" => Ok(crate::wire::Provenance::Conversational),
4904        "injected" => Ok(crate::wire::Provenance::Injected),
4905        other => anyhow::bail!("unknown part provenance {other}"),
4906    }
4907}
4908
4909fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
4910    let kind = file_data_kind(variant_data)?;
4911    match kind.as_str() {
4912        "string" => {
4913            let text = std::str::from_utf8(bytes)
4914                .context("file string payload is not UTF-8")?
4915                .to_owned();
4916            Ok(FileData::String(text))
4917        }
4918        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
4919        "url" => Ok(FileData::Url(
4920            std::str::from_utf8(bytes)
4921                .context("file URL payload is not UTF-8")?
4922                .to_owned(),
4923        )),
4924        other => anyhow::bail!("unknown file data_kind {other}"),
4925    }
4926}
4927
4928fn file_data_kind(variant_data: &[u8]) -> Result<String> {
4929    let value = json_parse::<Value>(variant_data)?;
4930    value
4931        .get("data_kind")
4932        .and_then(Value::as_str)
4933        .map(str::to_owned)
4934        .context("file part variant_data missing data_kind")
4935}
4936
4937fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
4938    batch
4939        .column_by_name(name)
4940        .with_context(|| format!("missing column {name}"))?
4941        .as_any()
4942        .downcast_ref::<UInt64Array>()
4943        .with_context(|| format!("column {name} is not UInt64"))
4944}
4945
4946pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
4947    let array = batch
4948        .column_by_name(name)
4949        .with_context(|| format!("missing column {name}"))?
4950        .as_any()
4951        .downcast_ref::<StringArray>()
4952        .with_context(|| format!("column {name} is not Utf8"))?;
4953    if array.is_null(row) {
4954        Ok(None)
4955    } else {
4956        Ok(Some(array.value(row).to_owned()))
4957    }
4958}
4959
4960fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
4961    // Lance can return a `lance.json` column either as raw JSONB bytes
4962    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
4963    // LargeUtf8), depending on the read path. Handle both.
4964    let column = batch
4965        .column_by_name(name)
4966        .with_context(|| format!("missing column {name}"))?;
4967    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
4968        return if array.is_null(row) {
4969            Ok(None)
4970        } else {
4971            Ok(Some(
4972                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
4973            ))
4974        };
4975    }
4976    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
4977        return if array.is_null(row) {
4978            Ok(None)
4979        } else {
4980            Ok(Some(array.value(row).as_bytes().to_vec()))
4981        };
4982    }
4983    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
4984        return if array.is_null(row) {
4985            Ok(None)
4986        } else {
4987            Ok(Some(array.value(row).as_bytes().to_vec()))
4988        };
4989    }
4990    anyhow::bail!("column {name} is not a JSON-compatible array")
4991}
4992
4993fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
4994    let array = batch
4995        .column_by_name(name)
4996        .with_context(|| format!("missing column {name}"))?
4997        .as_any()
4998        .downcast_ref::<Int32Array>()
4999        .with_context(|| format!("column {name} is not Int32"))?;
5000    Ok(array.value(row))
5001}
5002
5003pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
5004    let array = batch
5005        .column_by_name(name)
5006        .with_context(|| format!("missing column {name}"))?
5007        .as_any()
5008        .downcast_ref::<Float32Array>()
5009        .with_context(|| format!("column {name} is not Float32"))?;
5010    Ok(array.value(row))
5011}
5012
5013pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
5014    let array = batch
5015        .column_by_name(name)
5016        .with_context(|| format!("missing column {name}"))?
5017        .as_any()
5018        .downcast_ref::<TimestampMicrosecondArray>()
5019        .with_context(|| format!("column {name} is not timestamp_micros"))?;
5020    Utc.timestamp_micros(array.value(row))
5021        .single()
5022        .context("timestamp is out of range")
5023}
5024
5025fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
5026    Field::new(name, data_type, nullable).with_metadata(
5027        [(
5028            "lance-schema:unenforced-primary-key".to_owned(),
5029            "true".to_owned(),
5030        )]
5031        .into(),
5032    )
5033}
5034
5035// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
5036// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
5037// is marked unstable in Lance docs (`format/file/versioning.md`) and at
5038// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
5039// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
5040// allocated only one column_info, so the decoder's second `expect_next()`
5041// fires `"there were more fields in the schema than provided column
5042// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
5043// handles correctly (covered by Lance's own `test_compact_blob_columns`).
5044fn legacy_blob_field(name: &str, nullable: bool) -> Field {
5045    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
5046        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
5047            .into_iter()
5048            .collect(),
5049    )
5050}
5051
5052fn json_field(name: &str, nullable: bool) -> Field {
5053    lance_arrow::json::json_field(name, nullable)
5054}
5055
5056fn micros(timestamp: DateTime<Utc>) -> i64 {
5057    timestamp.timestamp_micros()
5058}
5059
5060fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
5061    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
5062    // matches the `lance.json` extension contract. Lance's compact path
5063    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
5064    // applies `decode_json -> encode_json` on this column; with proper JSONB
5065    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
5066    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
5067    // `try_into_dfstream`; compact still goes through the adapter).
5068    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
5069    lance_arrow::json::encode_json(&text)
5070        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
5071}
5072
5073fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
5074    serde_json::from_slice(value).context("failed to parse JSON field")
5075}
5076
5077fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
5078    if let PartKind::File {
5079        media_type,
5080        file_name,
5081        data,
5082    } = kind
5083    {
5084        let data_kind = match data {
5085            FileData::String(_) => "string",
5086            FileData::Bytes(_) => "bytes",
5087            FileData::Url(_) => "url",
5088        };
5089        return json_bytes(&serde_json::json!({
5090            "media_type": media_type,
5091            "file_name": file_name,
5092            "data_kind": data_kind,
5093        }));
5094    }
5095    let value = serde_json::to_value(kind)?;
5096    let mut object = value
5097        .as_object()
5098        .cloned()
5099        .context("part variant did not serialize to an object")?;
5100    object.remove("type");
5101    json_bytes(&object)
5102}
5103
5104fn part_kind_from_json(
5105    type_name: &str,
5106    variant_data: &[u8],
5107    file_data: Option<FileData>,
5108) -> Result<PartKind> {
5109    let mut value = json_parse::<Value>(variant_data)?;
5110    let object = value
5111        .as_object_mut()
5112        .context("part variant data is not an object")?;
5113    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
5114    if let Some(data) = file_data {
5115        object.remove("data_kind");
5116        object.insert("data".to_owned(), serde_json::to_value(data)?);
5117    }
5118    serde_json::from_value(value).context("failed to parse part kind")
5119}
5120
5121#[cfg(test)]
5122mod tests {
5123    #![allow(clippy::expect_used, clippy::unwrap_used)]
5124
5125    use super::*;
5126    use crate::{
5127        adapter::Extracted,
5128        handlers::ingest_events,
5129        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
5130    };
5131    use chrono::Utc;
5132    use serde_json::json;
5133    use tempfile::TempDir;
5134
5135    fn synthetic_session(id: &str) -> Session {
5136        Session {
5137            id: id.to_owned(),
5138            parent_session_id: None,
5139            parent_message_id: None,
5140            source_agent: "claude-code".to_owned(),
5141            created_at: Utc::now(),
5142            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
5143            options: ProviderOptions::new(),
5144        }
5145    }
5146
5147    #[test]
5148    fn search_text_excludes_injected_parts() {
5149        use crate::wire::Provenance;
5150        let message = Message::User {
5151            id: "m1".to_owned(),
5152            session_id: "s1".to_owned(),
5153            timestamp: Utc::now(),
5154            options: ProviderOptions::new(),
5155        };
5156        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
5157            session_id: "s1".to_owned(),
5158            id: id.to_owned(),
5159            message_id: "m1".to_owned(),
5160            ordinal: 0,
5161            provenance,
5162            options: ProviderOptions::new(),
5163            kind: PartKind::Text {
5164                text: Some(Extracted::from_test_value(text.to_owned())),
5165            },
5166        };
5167
5168        // A conversational part contributes; an injected one is excluded
5169        // (spec.md#search).
5170        let conversational = search_text(
5171            &message,
5172            &[text_part(
5173                "p1",
5174                "real human prompt",
5175                Provenance::Conversational,
5176            )],
5177        );
5178        assert_eq!(conversational.as_deref(), Some("real human prompt"));
5179
5180        let injected = search_text(
5181            &message,
5182            &[text_part(
5183                "p2",
5184                "<task-notification>...</task-notification>",
5185                Provenance::Injected,
5186            )],
5187        );
5188        assert!(
5189            injected.is_none(),
5190            "a message whose only part is injected has null search_text"
5191        );
5192    }
5193
5194    #[test]
5195    fn chunk_ranges_splits_on_byte_budget() {
5196        assert!(chunk_ranges(&[]).is_empty());
5197        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
5198
5199        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
5200        assert_eq!(
5201            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
5202            vec![0..1, 1..2, 2..3],
5203        );
5204
5205        // An oversized single row gets its own chunk, never an infinite loop.
5206        assert_eq!(
5207            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
5208            vec![0..1, 1..2, 2..3],
5209        );
5210    }
5211
5212    #[tokio::test]
5213    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
5214        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
5215        // Message is dropped on the spot, with one Error outcome surfaced. The
5216        // rest of the substream continues normally - subsequent valid messages
5217        // and parts get written.
5218        let temp = TempDir::new()?;
5219        let store = Store::open_local(temp.path()).await?;
5220        let session = synthetic_session("ordering");
5221        let orphan_part = Part {
5222            session_id: session.id.clone(),
5223            id: "orphan-part".to_owned(),
5224            message_id: "missing-message".to_owned(),
5225            ordinal: 0,
5226            provenance: crate::wire::Provenance::Conversational,
5227            options: ProviderOptions::new(),
5228            kind: PartKind::Text {
5229                text: Some(Extracted::from_test_value("orphan".to_owned())),
5230            },
5231        };
5232        let valid_message = Message::User {
5233            id: "valid-message".to_owned(),
5234            session_id: session.id.clone(),
5235            timestamp: Utc::now(),
5236            options: ProviderOptions::new(),
5237        };
5238        let valid_part = Part {
5239            session_id: session.id.clone(),
5240            id: "valid-part".to_owned(),
5241            message_id: valid_message.id().to_owned(),
5242            ordinal: 0,
5243            provenance: crate::wire::Provenance::Conversational,
5244            options: ProviderOptions::new(),
5245            kind: PartKind::Text {
5246                text: Some(Extracted::from_test_value("kept".to_owned())),
5247            },
5248        };
5249
5250        let mut validator = IngestValidator::default();
5251        validator
5252            .push(&store, 0, IngestEvent::Session(session.clone()))
5253            .await?;
5254        let part_outcomes = validator
5255            .push(&store, 1, IngestEvent::Part(orphan_part))
5256            .await?;
5257        assert_eq!(part_outcomes.len(), 1);
5258        assert_eq!(part_outcomes[0].kind, "part");
5259        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
5260        assert!(
5261            part_outcomes[0]
5262                .error
5263                .as_ref()
5264                .map(|e| e.message.contains("part event appeared before a message"))
5265                .unwrap_or(false),
5266            "error message must explain the ordering violation: {part_outcomes:?}"
5267        );
5268        validator
5269            .push(&store, 2, IngestEvent::Message(valid_message))
5270            .await?;
5271        validator
5272            .push(&store, 3, IngestEvent::Part(valid_part))
5273            .await?;
5274        validator.finish(&store).await?;
5275
5276        let (sessions, messages, parts) = store.row_counts().await?;
5277        assert_eq!(sessions, 1, "session committed despite the orphan part");
5278        assert_eq!(messages, 1, "valid message committed");
5279        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
5280
5281        Ok(())
5282    }
5283
5284    #[tokio::test]
5285    async fn resident_meta_map_hydration_matches_take_rows_fallback() -> anyhow::Result<()> {
5286        // The resident meta map must hydrate hits identically to the take_rows
5287        // fallback - same fields, and the microsecond timestamp survives the
5288        // i64 round-trip through the mmap blob.
5289        let temp = TempDir::new()?;
5290        let store = Store::open_local(temp.path()).await?;
5291        let session = synthetic_session("hydration-parity");
5292
5293        let messages = [
5294            (
5295                "m1",
5296                "the auth refactor landed cleanly",
5297                1_700_000_000_123_456_i64,
5298            ),
5299            (
5300                "m2",
5301                "balance handler now retries on rpc timeout",
5302                1_700_000_050_654_321,
5303            ),
5304        ];
5305        let mut validator = IngestValidator::default();
5306        validator
5307            .push(&store, 0, IngestEvent::Session(session.clone()))
5308            .await?;
5309        let mut seq = 1;
5310        for (mid, text, micros) in messages {
5311            let message = Message::User {
5312                id: mid.to_owned(),
5313                session_id: session.id.clone(),
5314                timestamp: DateTime::from_timestamp_micros(micros).unwrap(),
5315                options: ProviderOptions::new(),
5316            };
5317            validator
5318                .push(&store, seq, IngestEvent::Message(message))
5319                .await?;
5320            seq += 1;
5321            let part = Part {
5322                session_id: session.id.clone(),
5323                id: format!("{mid}-p0"),
5324                message_id: mid.to_owned(),
5325                ordinal: 0,
5326                provenance: crate::wire::Provenance::Conversational,
5327                options: ProviderOptions::new(),
5328                kind: PartKind::Text {
5329                    text: Some(Extracted::from_test_value(text.to_owned())),
5330                },
5331            };
5332            validator.push(&store, seq, IngestEvent::Part(part)).await?;
5333            seq += 1;
5334        }
5335        validator.finish(&store).await?;
5336
5337        let rowids: Vec<u64> = store
5338            .collect_row_metas()
5339            .await?
5340            .into_iter()
5341            .map(|entry| entry.row_id)
5342            .collect();
5343        assert_eq!(rowids.len(), 2);
5344
5345        let sort_by_id = |mut metas: Vec<MessageMeta>| {
5346            metas.sort_by(|left, right| left.message_id.cmp(&right.message_id));
5347            metas
5348        };
5349
5350        let fallback = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5351
5352        // Build and install the resident meta map; the same call now hydrates
5353        // from memory (zero misses - the map covers the whole table).
5354        store.ensure_rowmap(&temp.path().join("cache")).await?;
5355        let resident = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5356
5357        assert_eq!(
5358            resident, fallback,
5359            "resident-map hydration must match the take_rows fallback"
5360        );
5361        assert_eq!(
5362            resident[0].timestamp.timestamp_micros(),
5363            1_700_000_000_123_456
5364        );
5365        Ok(())
5366    }
5367
5368    #[tokio::test]
5369    async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
5370        // `open` eagerly creates sessions/messages but `parts` is lazy, so a
5371        // configured-but-never-synced store reports uninitialized - the signal
5372        // `pond status` uses to render an empty state instead of
5373        // erroring on the first parts describe.
5374        let temp = TempDir::new()?;
5375        let store = Store::open_local(temp.path()).await?;
5376        assert!(
5377            !store.initialized().await?,
5378            "fresh store has no parts table"
5379        );
5380
5381        let session = synthetic_session("initialized-probe");
5382        let message = Message::User {
5383            id: "message-1".to_owned(),
5384            session_id: session.id.clone(),
5385            timestamp: Utc::now(),
5386            options: ProviderOptions::new(),
5387        };
5388        let part = Part {
5389            session_id: session.id.clone(),
5390            id: "part-1".to_owned(),
5391            message_id: message.id().to_owned(),
5392            ordinal: 0,
5393            provenance: crate::wire::Provenance::Conversational,
5394            options: ProviderOptions::new(),
5395            kind: PartKind::Text {
5396                text: Some(Extracted::from_test_value("hello".to_owned())),
5397            },
5398        };
5399        let mut validator = IngestValidator::default();
5400        validator
5401            .push(&store, 0, IngestEvent::Session(session))
5402            .await?;
5403        validator
5404            .push(&store, 1, IngestEvent::Message(message))
5405            .await?;
5406        validator.push(&store, 2, IngestEvent::Part(part)).await?;
5407        validator.finish(&store).await?;
5408
5409        assert!(store.initialized().await?, "ingest creates the parts table");
5410        Ok(())
5411    }
5412
5413    #[tokio::test]
5414    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
5415        // Per-event drop: a duplicate message id within a substream drops the
5416        // *duplicate* and surfaces an Error outcome for it. The first wins; the
5417        // session still commits.
5418        let temp = TempDir::new()?;
5419        let store = Store::open_local(temp.path()).await?;
5420        let session = synthetic_session("duplicate-message");
5421        let first = Message::User {
5422            id: "message-1".to_owned(),
5423            session_id: session.id.clone(),
5424            timestamp: Utc::now(),
5425            options: ProviderOptions::new(),
5426        };
5427        let second = Message::Assistant {
5428            id: "message-1".to_owned(),
5429            session_id: session.id.clone(),
5430            timestamp: Utc::now(),
5431            options: ProviderOptions::new(),
5432        };
5433
5434        let mut validator = IngestValidator::default();
5435        validator
5436            .push(&store, 0, IngestEvent::Session(session.clone()))
5437            .await?;
5438        validator
5439            .push(&store, 1, IngestEvent::Message(first))
5440            .await?;
5441        let dup_outcomes = validator
5442            .push(&store, 2, IngestEvent::Message(second))
5443            .await?;
5444        assert_eq!(dup_outcomes.len(), 1);
5445        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
5446        assert!(
5447            dup_outcomes[0]
5448                .error
5449                .as_ref()
5450                .map(|e| e.message.contains("duplicate message id message-1"))
5451                .unwrap_or(false),
5452            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
5453        );
5454
5455        validator.finish(&store).await?;
5456        let (sessions, messages, _) = store.row_counts().await?;
5457        assert_eq!(sessions, 1, "session committed");
5458        assert_eq!(messages, 1, "only the first message committed");
5459
5460        Ok(())
5461    }
5462
5463    #[tokio::test]
5464    async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
5465    -> anyhow::Result<()> {
5466        // spec.md#model-pond-options: `options.pond` is core-owned. A stored
5467        // message carries the process's host stamp (when resolvable) and never
5468        // a client-supplied value; session and part options stay untouched.
5469        let temp = TempDir::new()?;
5470        let store = Store::open_local(temp.path()).await?;
5471        let session = synthetic_session("host-provenance");
5472        let mut spoofed = ProviderOptions::new();
5473        spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
5474        let message = Message::User {
5475            id: "message-1".to_owned(),
5476            session_id: session.id.clone(),
5477            timestamp: Utc::now(),
5478            options: spoofed,
5479        };
5480        let part = Part {
5481            session_id: session.id.clone(),
5482            id: "part-1".to_owned(),
5483            message_id: "message-1".to_owned(),
5484            ordinal: 0,
5485            provenance: crate::wire::Provenance::Conversational,
5486            options: ProviderOptions::new(),
5487            kind: PartKind::Text {
5488                text: Some(Extracted::from_test_value("hello".to_owned())),
5489            },
5490        };
5491
5492        let mut validator = IngestValidator::default();
5493        validator
5494            .push(&store, 0, IngestEvent::Session(session.clone()))
5495            .await?;
5496        validator
5497            .push(&store, 1, IngestEvent::Message(message))
5498            .await?;
5499        validator.push(&store, 2, IngestEvent::Part(part)).await?;
5500        validator.finish(&store).await?;
5501
5502        let stored = store
5503            .get_session(&session.id)
5504            .await?
5505            .expect("ingested session is readable");
5506        assert!(
5507            !stored.session.options.contains_key("pond"),
5508            "session rows are not stamped (attribution derives from messages)"
5509        );
5510        let stored_message = &stored.messages[0].message;
5511        match ingest_host_stamp() {
5512            Some(stamp) => {
5513                assert_eq!(
5514                    stored_message.options().get("pond"),
5515                    Some(stamp),
5516                    "stored message carries the real stamp, never the spoof"
5517                );
5518                let host = stamp
5519                    .pointer("/ingest/host")
5520                    .and_then(Value::as_object)
5521                    .expect("stamp shape is {ingest: {host: {..}}}");
5522                assert!(!host.is_empty(), "an all-empty stamp must be None instead");
5523                assert!(
5524                    host.values()
5525                        .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
5526                    "stamp fields are omitted when unavailable, never empty: {host:?}"
5527                );
5528            }
5529            None => assert!(
5530                stored_message.options().get("pond").is_none(),
5531                "with no resolvable stamp the spoofed key is still stripped"
5532            ),
5533        }
5534        assert!(
5535            !stored.messages[0].parts[0].options.contains_key("pond"),
5536            "part rows are not stamped (covered by their message's stamp)"
5537        );
5538
5539        Ok(())
5540    }
5541
5542    /// Regression: compact_files on `parts` with the blob column tripped a
5543    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
5544    /// batches give compact fragments to merge; every `FileData` variant
5545    /// exercises the blob round-trip. All-File batches sidestep a debug-only
5546    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
5547    /// write batch mixes null + valid rows in the blob column - benign in
5548    /// release, irrelevant to this regression's scope.
5549    #[tokio::test(flavor = "multi_thread")]
5550    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
5551        use crate::wire::{FileData, PartKind, Provenance};
5552        let temp = TempDir::new()?;
5553        let store = Store::open_local(temp.path()).await?;
5554
5555        let session = synthetic_session("compact-blob");
5556        store
5557            .upsert_sessions(std::slice::from_ref(&session))
5558            .await?;
5559
5560        let make_part = |idx: usize, kind: PartKind| Part {
5561            session_id: session.id.clone(),
5562            message_id: format!("msg-{idx}"),
5563            id: format!("part-{idx}"),
5564            ordinal: 0,
5565            provenance: Provenance::Conversational,
5566            options: ProviderOptions::new(),
5567            kind,
5568        };
5569
5570        let batch_a = vec![
5571            make_part(
5572                0,
5573                PartKind::File {
5574                    media_type: Some("text/plain".to_owned()),
5575                    file_name: Some("a.txt".to_owned()),
5576                    data: FileData::Bytes(b"alpha".to_vec()),
5577                },
5578            ),
5579            make_part(
5580                1,
5581                PartKind::File {
5582                    media_type: Some("text/plain".to_owned()),
5583                    file_name: Some("b.txt".to_owned()),
5584                    data: FileData::String("beta".to_owned()),
5585                },
5586            ),
5587        ];
5588        store.upsert_parts(&batch_a).await?;
5589
5590        let batch_b = vec![
5591            make_part(
5592                2,
5593                PartKind::File {
5594                    media_type: Some("application/octet-stream".to_owned()),
5595                    file_name: None,
5596                    data: FileData::Url("https://example.com/file".to_owned()),
5597                },
5598            ),
5599            make_part(
5600                3,
5601                PartKind::File {
5602                    media_type: Some("image/png".to_owned()),
5603                    file_name: Some("c.png".to_owned()),
5604                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
5605                },
5606            ),
5607        ];
5608        store.upsert_parts(&batch_b).await?;
5609
5610        store
5611            .optimize_indices(None, &MaintenancePolicy::always_compact())
5612            .await?
5613            .into_result()?;
5614
5615        Ok(())
5616    }
5617
5618    #[tokio::test]
5619    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
5620        let temp = TempDir::new()?;
5621        let store = Store::open_local(temp.path()).await?;
5622        let session = synthetic_session("blob");
5623        let message = Message::User {
5624            id: "message-1".to_owned(),
5625            session_id: session.id.clone(),
5626            timestamp: Utc::now(),
5627            options: ProviderOptions::new(),
5628        };
5629        let part = Part {
5630            session_id: session.id.clone(),
5631            id: "part-1".to_owned(),
5632            message_id: message.id().to_owned(),
5633            ordinal: 0,
5634            provenance: crate::wire::Provenance::Conversational,
5635            options: ProviderOptions::new(),
5636            kind: PartKind::File {
5637                media_type: Some("text/plain".to_owned()),
5638                file_name: Some("payload.txt".to_owned()),
5639                data: FileData::Bytes(b"pond".to_vec()),
5640            },
5641        };
5642
5643        let mut validator = IngestValidator::default();
5644        validator
5645            .push(&store, 0, IngestEvent::Session(session.clone()))
5646            .await?;
5647        validator
5648            .push(&store, 1, IngestEvent::Message(message.clone()))
5649            .await?;
5650        validator
5651            .push(&store, 2, IngestEvent::Part(part.clone()))
5652            .await?;
5653        validator.finish(&store).await?;
5654
5655        let stored = store
5656            .get_session(&session.id)
5657            .await?
5658            .expect("session should exist");
5659        let stored_part = &stored.messages[0].parts[0];
5660        assert_eq!(stored_part, &part);
5661
5662        Ok(())
5663    }
5664
5665    //
5666    // `Session.source_agent` and `Session.project` are immutable
5667    // post-first-write because `messages` denormalizes them at first
5668    // ingest; a silent overwrite would desync the denormalized
5669    // copies. pond core's `IngestValidator` probes the existing session
5670    // before the merge_insert and emits a per-row `validation_failed`
5671    // outcome with the typed field name when either changes. Other Session
5672    // fields (options, parent_session_id, created_at, parent_message_id)
5673    // re-write idempotently via merge_insert.
5674
5675    fn base_session() -> Session {
5676        Session {
5677            id: "01HXY00000000001".to_owned(),
5678            parent_session_id: None,
5679            parent_message_id: None,
5680            source_agent: "claude-code".to_owned(),
5681            created_at: Utc::now(),
5682            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
5683            options: ProviderOptions::new(),
5684        }
5685    }
5686
5687    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
5688        outcomes
5689            .iter()
5690            .filter(|outcome| outcome.status == target)
5691            .count()
5692    }
5693
5694    #[tokio::test(flavor = "multi_thread")]
5695    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
5696    -> anyhow::Result<()> {
5697        let temp = TempDir::new()?;
5698        let store = Store::open_local(temp.path()).await?;
5699
5700        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5701        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
5702
5703        let mut again = base_session();
5704        again.options.insert("title".to_owned(), json!("renamed"));
5705        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
5706        assert_eq!(
5707            count_status(&second, OutcomeStatus::Error),
5708            0,
5709            "options is mutable; the re-ingest must not surface an error: {second:?}",
5710        );
5711        assert_eq!(
5712            count_status(&second, OutcomeStatus::Matched),
5713            1,
5714            "unchanged immutable fields must match-insert via merge_insert",
5715        );
5716
5717        Ok(())
5718    }
5719
5720    #[tokio::test(flavor = "multi_thread")]
5721    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
5722        let temp = TempDir::new()?;
5723        let store = Store::open_local(temp.path()).await?;
5724
5725        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5726        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5727
5728        let mut tampered = base_session();
5729        tampered.source_agent = "codex-cli".to_owned();
5730        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5731        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
5732        let err_row = second
5733            .iter()
5734            .find(|outcome| outcome.status == OutcomeStatus::Error)
5735            .expect("error outcome present");
5736        let err = err_row.error.as_ref().expect("error body present");
5737        assert_eq!(err.field, Some("source_agent"));
5738        assert_eq!(err.reason, Some("immutable"));
5739
5740        // The stored row stayed on the original adapter - no silent rewrite.
5741        let stored = store
5742            .get_session(&base_session().id)
5743            .await?
5744            .expect("session row survives the rejected re-ingest");
5745        assert_eq!(stored.session.source_agent, "claude-code");
5746
5747        Ok(())
5748    }
5749
5750    #[tokio::test(flavor = "multi_thread")]
5751    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
5752        let temp = TempDir::new()?;
5753        let store = Store::open_local(temp.path()).await?;
5754
5755        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5756        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5757
5758        let mut tampered = base_session();
5759        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
5760        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5761        let err_row = second
5762            .iter()
5763            .find(|outcome| outcome.status == OutcomeStatus::Error)
5764            .expect("project change must surface an error outcome");
5765        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
5766
5767        let stored = store
5768            .get_session(&base_session().id)
5769            .await?
5770            .expect("session row survives");
5771        assert_eq!(
5772            stored.session.project.as_str(),
5773            "/home/me/proj",
5774            "stored project must remain the original",
5775        );
5776
5777        Ok(())
5778    }
5779
5780    #[tokio::test(flavor = "multi_thread")]
5781    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
5782        // Regression guard: re-ingesting an existing session with NEW
5783        // messages must surface as sessions_inserted=0, messages_inserted_*>0
5784        // on `BatchCounts`, and per-row outcomes must mark the new message
5785        // rows `Inserted` while the session row is `Matched`. The prior
5786        // implementation derived all per-row statuses from the batch-level
5787        // session inserted count, which silently flipped the new messages
5788        // into `Matched` (visible as "up to date" in the CLI bar tail).
5789        use crate::wire::Provenance;
5790        let temp = TempDir::new()?;
5791        let store = Store::open_local(temp.path()).await?;
5792        let session = base_session();
5793
5794        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
5795            session_id: session.id.clone(),
5796            id: part_id.to_owned(),
5797            message_id: message_id.to_owned(),
5798            ordinal: 0,
5799            provenance: Provenance::Conversational,
5800            options: ProviderOptions::new(),
5801            kind: PartKind::Text {
5802                text: Some(Extracted::from_test_value(body.to_owned())),
5803            },
5804        };
5805        let user_message = |id: &str| Message::User {
5806            id: id.to_owned(),
5807            session_id: session.id.clone(),
5808            timestamp: Utc::now(),
5809            options: ProviderOptions::new(),
5810        };
5811
5812        // First pass: 2 messages land fresh.
5813        let mut validator = IngestValidator::default();
5814        validator
5815            .push(&store, 0, IngestEvent::Session(session.clone()))
5816            .await?;
5817        validator
5818            .push(&store, 1, IngestEvent::Message(user_message("m1")))
5819            .await?;
5820        validator
5821            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
5822            .await?;
5823        validator
5824            .push(&store, 3, IngestEvent::Message(user_message("m2")))
5825            .await?;
5826        validator
5827            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
5828            .await?;
5829        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
5830        assert_eq!(first_counts.sessions_inserted, 1);
5831        assert_eq!(first_counts.messages_inserted_total, 2);
5832        assert_eq!(first_counts.messages_inserted_searchable, 2);
5833
5834        // Second pass: same session id, 3 NEW messages.
5835        let mut validator = IngestValidator::default();
5836        validator
5837            .push(&store, 0, IngestEvent::Session(session.clone()))
5838            .await?;
5839        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
5840            let pid = format!("p{}", idx + 3);
5841            validator
5842                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
5843                .await?;
5844            validator
5845                .push(
5846                    &store,
5847                    idx * 2 + 2,
5848                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
5849                )
5850                .await?;
5851        }
5852        let (second_outcomes, second_counts) = validator.finish(&store).await?;
5853
5854        assert_eq!(
5855            second_counts.sessions_inserted, 0,
5856            "existing session row must report as Matched, not Inserted",
5857        );
5858        assert_eq!(second_counts.sessions_matched, 1);
5859        assert_eq!(
5860            second_counts.messages_inserted_total, 3,
5861            "the three NEW messages must register as Inserted in BatchCounts",
5862        );
5863        assert_eq!(
5864            second_counts.messages_inserted_searchable, 3,
5865            "all three new messages carry conversational text -> searchable",
5866        );
5867        assert_eq!(second_counts.messages_matched_total, 0);
5868        assert_eq!(second_counts.parts_inserted, 3);
5869        assert_eq!(second_counts.parts_matched, 0);
5870
5871        // Per-row outcomes mirror the BatchCounts shape: the session row is
5872        // Matched, every new message + part row is Inserted.
5873        let session_outcome = second_outcomes
5874            .iter()
5875            .find(|outcome| outcome.kind == "session")
5876            .expect("session-row outcome present");
5877        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
5878        for outcome in &second_outcomes {
5879            if outcome.kind == "message" || outcome.kind == "part" {
5880                assert_eq!(
5881                    outcome.status,
5882                    OutcomeStatus::Inserted,
5883                    "new row must be Inserted, got: {outcome:?}",
5884                );
5885            }
5886        }
5887        Ok(())
5888    }
5889
5890    /// Ingest `count` synthetic messages spread across a handful of sessions
5891    /// and projects, each with conversational `search_text`. Returns the store
5892    /// and the message keys in `msg-{i}` order; every `vector` starts null.
5893    async fn store_with_messages(
5894        temp: &TempDir,
5895        count: usize,
5896    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5897        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
5898    }
5899
5900    /// Same as [`store_with_messages`] but tests optimize with a custom
5901    /// IVF_SQ activation threshold.
5902    async fn store_with_messages_at_threshold(
5903        temp: &TempDir,
5904        count: usize,
5905        _vector_threshold: usize,
5906    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5907        let store = Store::open_local(temp.path()).await?;
5908        let sessions = 8.min(count.max(1));
5909        let mut events = Vec::new();
5910        for s in 0..sessions {
5911            events.push(IngestEvent::Session(Session {
5912                id: format!("session-{s}"),
5913                parent_session_id: None,
5914                parent_message_id: None,
5915                source_agent: "claude-code".to_owned(),
5916                created_at: Utc::now(),
5917                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
5918                options: ProviderOptions::new(),
5919            }));
5920            for i in (s..count).step_by(sessions) {
5921                let message_id = format!("msg-{i}");
5922                events.push(IngestEvent::Message(Message::User {
5923                    id: message_id.clone(),
5924                    session_id: format!("session-{s}"),
5925                    timestamp: Utc::now(),
5926                    options: ProviderOptions::new(),
5927                }));
5928                events.push(IngestEvent::Part(Part {
5929                    session_id: format!("session-{s}"),
5930                    id: format!("{message_id}-part"),
5931                    message_id,
5932                    ordinal: 0,
5933                    provenance: crate::wire::Provenance::Conversational,
5934                    options: ProviderOptions::new(),
5935                    kind: PartKind::Text {
5936                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
5937                    },
5938                }));
5939            }
5940        }
5941        ingest_events(&store, events).await?;
5942        let keys = (0..count)
5943            .map(|i| MessageKey {
5944                session_id: format!("session-{}", i % sessions),
5945                message_id: format!("msg-{i}"),
5946            })
5947            .collect();
5948        Ok((store, keys))
5949    }
5950
5951    /// A deterministic pseudo-random vector of the production dimension.
5952    fn synthetic_vector(seed: usize) -> Vec<f32> {
5953        let mut state = (seed as u64)
5954            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
5955            .wrapping_add(1);
5956        (0..embedding_dim())
5957            .map(|_| {
5958                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
5959                #[allow(clippy::cast_precision_loss)]
5960                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
5961                unit - 1.0
5962            })
5963            .collect()
5964    }
5965
5966    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
5967    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
5968        keys.iter()
5969            .enumerate()
5970            .map(|(seed, key)| EmbeddedMessage {
5971                session_id: key.session_id.clone(),
5972                id: key.message_id.clone(),
5973                vector: synthetic_vector(seed),
5974            })
5975            .collect()
5976    }
5977
5978    fn embedding_update_batch_with_model(
5979        rows: &[EmbeddedMessage],
5980        model: &str,
5981    ) -> Result<RecordBatch> {
5982        let mut batch = embedding_update_batch(rows)?;
5983        let columns = batch
5984            .columns()
5985            .iter()
5986            .take(3)
5987            .cloned()
5988            .chain(std::iter::once(
5989                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
5990            ))
5991            .collect::<Vec<_>>();
5992        batch = RecordBatch::try_new(batch.schema(), columns)?;
5993        Ok(batch)
5994    }
5995
5996    #[tokio::test]
5997    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
5998        let temp = TempDir::new()?;
5999        // 4 messages cycle session-0..session-3, so `session-3` is a real
6000        // partition. Scalar-index pushdown is volume-independent: the planner
6001        // emits `ScalarIndexQuery` whenever the index exists.
6002        let (store, keys) = store_with_messages(&temp, 4).await?;
6003        store.write_embeddings(&embedded(&keys)).await?;
6004        store
6005            .optimize_indices(None, &MaintenancePolicy::always_compact())
6006            .await?
6007            .into_result()?;
6008
6009        let query = vec![0.01_f32; embedding_dim()];
6010        let plan = store
6011            .explain_vector_plan(
6012                &query,
6013                10,
6014                &Predicate::Eq("session_id", "session-3".into()),
6015                None,
6016            )
6017            .await?;
6018
6019        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
6020        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
6021        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
6022        // and unrelated.)
6023        assert!(
6024            plan.contains("ScalarIndexQuery"),
6025            "expected a ScalarIndexQuery node in the plan:\n{plan}",
6026        );
6027        let predicate_postfiltered = plan
6028            .lines()
6029            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
6030        assert!(
6031            !predicate_postfiltered,
6032            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
6033        );
6034        Ok(())
6035    }
6036
6037    #[tokio::test]
6038    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
6039        let temp = TempDir::new()?;
6040        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6041
6042        // First batch: 255 vectors, one below threshold. Optimize does not
6043        // create the IVF_SQ because the trigger is not met.
6044        store.write_embeddings(&embedded(&keys[..255])).await?;
6045        store
6046            .optimize_indices_with_vector_threshold(256)
6047            .await?
6048            .into_result()?;
6049        assert!(
6050            !store
6051                .handle
6052                .messages_index_names()
6053                .await?
6054                .iter()
6055                .any(|name| name == MESSAGES_VECTOR_INDEX),
6056            "IVF_SQ must not exist below the activation threshold",
6057        );
6058
6059        // Next batch: one more vector. Total reaches 256; optimize creates
6060        // the IVF_SQ.
6061        store.write_embeddings(&embedded(&keys[255..256])).await?;
6062        store
6063            .optimize_indices_with_vector_threshold(256)
6064            .await?
6065            .into_result()?;
6066        assert!(
6067            store
6068                .handle
6069                .messages_index_names()
6070                .await?
6071                .iter()
6072                .any(|name| name == MESSAGES_VECTOR_INDEX),
6073            "optimize must create the IVF_SQ once the threshold is crossed",
6074        );
6075
6076        // The remaining 44 rows stay un-embedded; the IVF_SQ trains over the
6077        // non-null subset and a planted vector is retrievable.
6078        let hits = store
6079            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
6080            .await?;
6081        assert!(
6082            hits.iter().any(|hit| hit.key == keys[0]),
6083            "an embedded row is retrievable via the index",
6084        );
6085        Ok(())
6086    }
6087
6088    #[tokio::test]
6089    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
6090    {
6091        let temp = TempDir::new()?;
6092        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6093        let old_rows = embedded(&keys);
6094        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
6095        store
6096            .handle
6097            .merge_update(Table::Messages, old_batch, old_rows.len())
6098            .await?;
6099        store
6100            .optimize_indices_with_vector_threshold(256)
6101            .await?
6102            .into_result()?;
6103        assert!(
6104            store
6105                .handle
6106                .messages_index_names()
6107                .await?
6108                .iter()
6109                .any(|name| name == MESSAGES_VECTOR_INDEX),
6110            "IVF_SQ must exist before a model swap",
6111        );
6112        assert_eq!(store.stale_embedding_count().await?, keys.len());
6113
6114        store.drop_vector_index().await?;
6115        let mut pending = Vec::new();
6116        let stream = store.pending_or_stale_messages();
6117        tokio::pin!(stream);
6118        while let Some(row) = stream.next().await {
6119            pending.push(row?);
6120        }
6121        assert_eq!(
6122            pending.len(),
6123            keys.len(),
6124            "force stream should see stale rows"
6125        );
6126        store.write_embeddings(&embedded(&keys)).await?;
6127        assert_eq!(store.stale_embedding_count().await?, 0);
6128        store
6129            .optimize_indices_with_vector_threshold(256)
6130            .await?
6131            .into_result()?;
6132        assert!(
6133            store
6134                .handle
6135                .messages_index_names()
6136                .await?
6137                .iter()
6138                .any(|name| name == MESSAGES_VECTOR_INDEX),
6139            "optimize must rebuild IVF_SQ after force re-embed",
6140        );
6141
6142        let stream = store.pending_or_stale_messages();
6143        tokio::pin!(stream);
6144        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
6145        Ok(())
6146    }
6147
6148    #[tokio::test]
6149    async fn session_last_message_ids_come_from_durable_messages() -> anyhow::Result<()> {
6150        let temp = TempDir::new()?;
6151        let store = Store::open_local(temp.path()).await?;
6152        let session = synthetic_session("oracle");
6153        store
6154            .upsert_sessions(std::slice::from_ref(&session))
6155            .await?;
6156        let timestamp =
6157            chrono::DateTime::from_timestamp(1_700_000_000, 0).expect("valid timestamp");
6158        let message_a = Message::User {
6159            id: "oracle-a".to_owned(),
6160            session_id: session.id.clone(),
6161            timestamp,
6162            options: ProviderOptions::new(),
6163        };
6164        let message_b = Message::User {
6165            id: "oracle-b".to_owned(),
6166            session_id: session.id.clone(),
6167            timestamp,
6168            options: ProviderOptions::new(),
6169        };
6170        store
6171            .upsert_messages(
6172                &session,
6173                &[
6174                    MessageWrite {
6175                        message: &message_a,
6176                        parts: &[],
6177                        search_text: Some("a"),
6178                    },
6179                    MessageWrite {
6180                        message: &message_b,
6181                        parts: &[],
6182                        search_text: Some("b"),
6183                    },
6184                ],
6185            )
6186            .await?;
6187
6188        let empty_session = synthetic_session("session-row-only");
6189        store.upsert_sessions(&[empty_session]).await?;
6190
6191        // Orphan: messages committed but the session row never was (the crash
6192        // window `upsert_session_batch`'s write order can leave). The gate must
6193        // NOT key on it, so the source re-ingests and heals the missing row.
6194        let orphan = synthetic_session("messages-no-row");
6195        let orphan_message = Message::User {
6196            id: "orphan-a".to_owned(),
6197            session_id: orphan.id.clone(),
6198            timestamp,
6199            options: ProviderOptions::new(),
6200        };
6201        store
6202            .upsert_messages(
6203                &orphan,
6204                &[MessageWrite {
6205                    message: &orphan_message,
6206                    parts: &[],
6207                    search_text: Some("a"),
6208                }],
6209            )
6210            .await?;
6211
6212        let map = store.session_last_message_ids().await?;
6213        assert_eq!(map.get("oracle").map(String::as_str), Some("oracle-b"));
6214        assert!(
6215            !map.contains_key("session-row-only"),
6216            "a session row without durable messages must not produce a freshness key",
6217        );
6218        assert!(
6219            !map.contains_key("messages-no-row"),
6220            "messages without a durable session row must not produce a freshness key",
6221        );
6222        Ok(())
6223    }
6224
6225    #[tokio::test]
6226    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
6227        let temp = TempDir::new()?;
6228        let (store, keys) = store_with_messages(&temp, 10).await?;
6229
6230        let before = store.embedding_progress().await?;
6231        assert_eq!(before.embedded, 0);
6232        assert_eq!(before.total, 10);
6233        assert_eq!(before.backlog, 10);
6234        assert_eq!(before.model, crate::embed::model_id());
6235
6236        store.write_embeddings(&embedded(&keys[..4])).await?;
6237        let partial = store.embedding_progress().await?;
6238        assert_eq!(partial.embedded, 4);
6239        assert_eq!(partial.total, 10);
6240        assert_eq!(partial.backlog, 6);
6241
6242        store.write_embeddings(&embedded(&keys[4..])).await?;
6243        let full = store.embedding_progress().await?;
6244        assert_eq!(full.embedded, 10);
6245        assert_eq!(full.total, 10);
6246        // The pending signal is the live un-embedded count and matches the
6247        // authoritative backlog - never derived from FTS num_docs.
6248        assert_eq!(full.backlog, 0);
6249        assert_eq!(full.backlog, store.embed_backlog_count().await?);
6250        Ok(())
6251    }
6252
6253    #[tokio::test]
6254    async fn ensure_rowmap_layers_a_delta_on_new_ingest() -> anyhow::Result<()> {
6255        let temp = TempDir::new()?;
6256        let (store, _keys) = store_with_messages(&temp, 6).await?;
6257        let cache = temp.path().join("cache");
6258
6259        store.ensure_rowmap(&cache).await?;
6260        assert_eq!(
6261            store.rowmap_delta_count(),
6262            Some(0),
6263            "first build is a lone base"
6264        );
6265
6266        // A new session's message bumps the version with a fresh fragment.
6267        ingest_events(
6268            &store,
6269            vec![
6270                IngestEvent::Session(Session {
6271                    id: "session-new".to_owned(),
6272                    parent_session_id: None,
6273                    parent_message_id: None,
6274                    source_agent: "claude-code".to_owned(),
6275                    created_at: Utc::now(),
6276                    project: Extracted::from_test_value("/proj/new".to_owned()),
6277                    options: ProviderOptions::new(),
6278                }),
6279                IngestEvent::Message(Message::User {
6280                    id: "m-new".to_owned(),
6281                    session_id: "session-new".to_owned(),
6282                    timestamp: Utc::now(),
6283                    options: ProviderOptions::new(),
6284                }),
6285                IngestEvent::Part(Part {
6286                    session_id: "session-new".to_owned(),
6287                    id: "m-new-part".to_owned(),
6288                    message_id: "m-new".to_owned(),
6289                    ordinal: 0,
6290                    provenance: crate::wire::Provenance::Conversational,
6291                    options: ProviderOptions::new(),
6292                    kind: PartKind::Text {
6293                        text: Some(Extracted::from_test_value("brand new message".to_owned())),
6294                    },
6295                }),
6296            ],
6297        )
6298        .await?;
6299
6300        // The refresh scans only the new fragment and layers a delta - not a
6301        // full rebuild.
6302        store.ensure_rowmap(&cache).await?;
6303        assert_eq!(
6304            store.rowmap_delta_count(),
6305            Some(1),
6306            "new ingest layered a delta"
6307        );
6308
6309        // The new session's count is served from the chain (base + delta sum).
6310        let counts = store
6311            .session_message_counts(&["session-new".to_owned()])
6312            .await?;
6313        assert_eq!(counts.get("session-new").copied(), Some(1));
6314        Ok(())
6315    }
6316
6317    #[tokio::test]
6318    async fn rowmap_chain_compacts_and_stays_bounded() -> anyhow::Result<()> {
6319        // Many version bumps (the remote-writers case) must not grow the chain
6320        // unboundedly: deltas cap at MAX, then compact into a fresh base.
6321        let temp = TempDir::new()?;
6322        let (store, _keys) = store_with_messages(&temp, 4).await?;
6323        let cache = temp.path().join("cache");
6324        store.ensure_rowmap(&cache).await?;
6325
6326        let mut reached_cap = false;
6327        let mut compacted = false;
6328        for i in 0..(Store::MAX_ROWMAP_DELTAS + 2) {
6329            let session = format!("session-x{i}");
6330            ingest_events(
6331                &store,
6332                vec![
6333                    IngestEvent::Session(Session {
6334                        id: session.clone(),
6335                        parent_session_id: None,
6336                        parent_message_id: None,
6337                        source_agent: "claude-code".to_owned(),
6338                        created_at: Utc::now(),
6339                        project: Extracted::from_test_value("/proj/x".to_owned()),
6340                        options: ProviderOptions::new(),
6341                    }),
6342                    IngestEvent::Message(Message::User {
6343                        id: format!("mx{i}"),
6344                        session_id: session.clone(),
6345                        timestamp: Utc::now(),
6346                        options: ProviderOptions::new(),
6347                    }),
6348                    IngestEvent::Part(Part {
6349                        session_id: session.clone(),
6350                        id: format!("mx{i}-part"),
6351                        message_id: format!("mx{i}"),
6352                        ordinal: 0,
6353                        provenance: crate::wire::Provenance::Conversational,
6354                        options: ProviderOptions::new(),
6355                        kind: PartKind::Text {
6356                            text: Some(Extracted::from_test_value(format!("msg {i}"))),
6357                        },
6358                    }),
6359                ],
6360            )
6361            .await?;
6362            store.ensure_rowmap(&cache).await?;
6363            let deltas = store.rowmap_delta_count().unwrap();
6364            assert!(
6365                deltas <= Store::MAX_ROWMAP_DELTAS,
6366                "delta count {deltas} exceeded the cap",
6367            );
6368            if deltas == Store::MAX_ROWMAP_DELTAS {
6369                reached_cap = true;
6370            }
6371            if reached_cap && deltas < Store::MAX_ROWMAP_DELTAS {
6372                compacted = true;
6373            }
6374        }
6375        assert!(reached_cap, "deltas accumulated to the cap (append path)");
6376        assert!(compacted, "the chain compacted back into a base");
6377
6378        // Files stay bounded and no build temps leak.
6379        let mut rmm = 0;
6380        for entry in std::fs::read_dir(&cache)? {
6381            let name = entry?.file_name().into_string().unwrap_or_default();
6382            assert!(!name.contains(".tmp-"), "leaked build temp: {name}");
6383            if name.ends_with(".rmm") {
6384                rmm += 1;
6385            }
6386        }
6387        assert!(
6388            rmm <= Store::MAX_ROWMAP_DELTAS + 1,
6389            "files unbounded: {rmm}"
6390        );
6391        Ok(())
6392    }
6393
6394    #[tokio::test]
6395    async fn embed_backlog_count_tracks_eligible_unembedded_rows() -> anyhow::Result<()> {
6396        let temp = TempDir::new()?;
6397        let (store, keys) = store_with_messages(&temp, 10).await?;
6398
6399        // Read straight from the dataset (no FTS index here), so it is correct
6400        // right after ingest - the case that lagged `embedding_progress`.
6401        assert_eq!(store.embed_backlog_count().await?, 10);
6402
6403        store.write_embeddings(&embedded(&keys[..4])).await?;
6404        assert_eq!(store.embed_backlog_count().await?, 6);
6405
6406        store.write_embeddings(&embedded(&keys[4..])).await?;
6407        assert_eq!(store.embed_backlog_count().await?, 0);
6408        Ok(())
6409    }
6410
6411    #[tokio::test]
6412    async fn session_message_counts_returns_per_session_counts_with_zeros_for_unknown_sessions()
6413    -> anyhow::Result<()> {
6414        // store_with_messages stripes `count` messages across 8 sessions
6415        // round-robin. 32 messages -> 4 per session, 0..8 deterministic.
6416        let temp = TempDir::new()?;
6417        let (store, _keys) = store_with_messages(&temp, 32).await?;
6418
6419        let mut requested: Vec<String> = (0..8).map(|s| format!("session-{s}")).collect();
6420        requested.push("session-unknown-a".to_owned());
6421        requested.push("session-unknown-b".to_owned());
6422        let counts = store.session_message_counts(&requested).await?;
6423
6424        // Map has an entry for every requested id (the contract): known
6425        // sessions hit 4, unknown sessions sit at 0.
6426        assert_eq!(counts.len(), requested.len());
6427        for s in 0..8 {
6428            assert_eq!(
6429                counts.get(&format!("session-{s}")).copied(),
6430                Some(4),
6431                "session-{s} should have 4 messages",
6432            );
6433        }
6434        assert_eq!(counts.get("session-unknown-a").copied(), Some(0));
6435        assert_eq!(counts.get("session-unknown-b").copied(), Some(0));
6436
6437        // Empty input is the documented zero-path.
6438        let empty = store.session_message_counts(&[]).await?;
6439        assert!(empty.is_empty());
6440        Ok(())
6441    }
6442}