Skip to main content

pond/
sessions.rs

1//! The session datasets (spec.md#datasets): the three Lance tables, the
2//! `Store` facade, ingest validation, and `search_text` extraction.
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use arc_swap::ArcSwapOption;
12use async_stream::try_stream;
13use chrono::{DateTime, TimeZone, Utc};
14use lance::Dataset;
15use lance::dataset::{AutoCleanupParams, ProjectionRequest, WriteMode, WriteParams};
16use lance::deps::arrow_array::{
17    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
18    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
19    UInt64Array, new_null_array,
20};
21use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
22use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
23use lance::index::DatasetIndexExt;
24use lance_file::version::LanceFileVersion;
25use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
26use serde::{Deserialize, Serialize, de::DeserializeOwned};
27use serde_json::Value;
28use tokio_stream::{Stream, StreamExt};
29
30use crate::{
31    config, embed,
32    rowmap::{RowMetaEntry, RowMetaMap, RowMetaSet, discover_chain},
33    substrate::{
34        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
35        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
36        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
37    },
38    wire::{FileData, Message, Part, PartKind, Role, SUMMARY_PART_TYPES, Session, SessionFrom},
39};
40use url::Url;
41
42#[derive(Debug)]
43pub struct Store {
44    handle: Handle,
45    /// Resident per-message meta map for index-only hit resolution and in-memory
46    /// hydration (see [`crate::rowmap`]). `None` until [`Store::ensure_rowmap`]
47    /// builds it (local tests, pre-prewarm), where the arms fall back to a
48    /// data-projection scan and hydration to `take_rows`. `ArcSwap` so a
49    /// version-bump rebuild swaps it under concurrent searches.
50    rowmap: ArcSwapOption<RowMetaSet>,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveCounts {
55    pub sessions: usize,
56    pub messages: usize,
57    pub parts: usize,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveVersions {
62    pub sessions: u64,
63    pub messages: u64,
64    pub parts: u64,
65}
66
67#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
68pub struct LanceArchiveExport {
69    pub rows: LanceArchiveCounts,
70    pub source_versions: LanceArchiveVersions,
71}
72
73#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
74pub struct LanceArchiveImport {
75    pub rows: LanceArchiveCounts,
76    pub inserted: LanceArchiveCounts,
77}
78
79/// One table's slice of a store-to-store copy plan: which sessions' rows for
80/// that table can be **appended** versus **merged** (spec.md#session-durable-copy).
81/// The choice is made per table by row presence on the destination, because the
82/// three tables are written by separate commits and an interrupted copy can
83/// leave them in different states (e.g. the small `sessions` table committed but
84/// `messages` not):
85/// - `append`: the destination has **zero** rows for the session in this table,
86///   so they cannot collide -> append (no merge join, no target probe; the
87///   bandwidth-bound fast path).
88/// - `merge`: the destination already has *some* rows but the source has more ->
89///   merge to dedup the rows already there (`WhenMatched::DoNothing`).
90#[derive(Debug, Clone, Default)]
91pub struct TablePlan {
92    pub append: Vec<String>,
93    pub merge: Vec<String>,
94}
95
96impl TablePlan {
97    pub fn is_empty(&self) -> bool {
98        self.append.is_empty() && self.merge.is_empty()
99    }
100}
101
102/// A store-to-store `pond copy` plan, decided per table (see [`TablePlan`]).
103/// `source_sessions` is the full source session count, kept so the caller can
104/// tell "destination already up to date" (empty plan, non-empty source) from
105/// "empty source", and so each table can recognize a from-empty/resumed run
106/// (`append.len() == source_sessions`) and skip the per-session `IN` filter.
107#[derive(Debug, Clone, Default)]
108pub struct DeltaPlan {
109    pub sessions: TablePlan,
110    pub messages: TablePlan,
111    pub parts: TablePlan,
112    pub source_sessions: usize,
113}
114
115impl DeltaPlan {
116    pub fn is_empty(&self) -> bool {
117        self.sessions.is_empty() && self.messages.is_empty() && self.parts.is_empty()
118    }
119
120    /// Sessions whose own row is absent on the destination - the "new" count for
121    /// the plan receipt. Sessions never grow in row count (one immutable row
122    /// each), so the `sessions` table only ever appends.
123    pub fn new_sessions(&self) -> usize {
124        self.sessions.append.len()
125    }
126
127    /// Distinct sessions touched by the copy across all three tables - the
128    /// figure the progress bar totals against.
129    pub fn total(&self) -> usize {
130        let mut seen = std::collections::HashSet::new();
131        for plan in [&self.sessions, &self.messages, &self.parts] {
132            seen.extend(plan.append.iter());
133            seen.extend(plan.merge.iter());
134        }
135        seen.len()
136    }
137}
138
139#[derive(Debug, Clone, Default)]
140pub struct IndexIntents {
141    pub sessions: Vec<IndexIntent>,
142    pub messages: Vec<IndexIntent>,
143    pub parts: Vec<IndexIntent>,
144}
145
146impl IndexIntents {
147    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
148        [
149            (Table::Sessions, &self.sessions),
150            (Table::Messages, &self.messages),
151            (Table::Parts, &self.parts),
152        ]
153    }
154}
155
156/// A message awaiting embedding: its primary key plus the `search_text` to
157/// embed. The vector lives on the same `messages` row, so no denormalized
158/// filter columns are needed (spec.md#session-embed-from-canonical).
159#[derive(Debug, Clone, PartialEq)]
160pub struct PendingMessage {
161    pub session_id: String,
162    pub id: String,
163    pub search_text: String,
164}
165
166/// One embedded message: a primary key and the vector to store. `pond optimize`
167/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
168#[derive(Debug, Clone, PartialEq)]
169pub struct EmbeddedMessage {
170    pub session_id: String,
171    pub id: String,
172    pub vector: Vec<f32>,
173}
174
175/// Message metadata used to hydrate search hits after retriever ranking.
176#[derive(Debug, Clone, PartialEq)]
177pub struct MessageMeta {
178    pub message_id: String,
179    pub session_id: String,
180    pub role: String,
181    pub project: String,
182    pub source_agent: String,
183    pub timestamp: DateTime<Utc>,
184    pub search_text: String,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
188pub struct MessageKey {
189    pub session_id: String,
190    pub message_id: String,
191}
192
193/// One retrieval-arm hit. `rowid` is `Some` when the row meta map (or its
194/// take_rows miss-fallback) resolved a stable row id, which lets hydration
195/// `take_rows` the exact row instead of re-finding it with an `IN`-predicate
196/// scan; `None` on the no-map fallback path (local tests, pre-prewarm).
197#[derive(Debug, Clone, PartialEq)]
198pub struct SearchHit {
199    pub rowid: Option<u64>,
200    pub key: MessageKey,
201    pub score: f32,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub enum UpsertStatus {
206    Inserted,
207    Matched,
208}
209
210/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
211/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
212/// results so the CLI can render compaction-skipped (under writer contention)
213/// distinctly from index-build failure (real problem).
214#[derive(Debug, Default)]
215pub struct OptimizeOutcome {
216    pub tables: Vec<TableOptimizeOutcome>,
217}
218
219impl OptimizeOutcome {
220    /// True if any table's indices phase reported a non-conflict failure.
221    /// `SkippedConflict` is expected under contention and does not count.
222    pub fn any_indices_failed(&self) -> bool {
223        self.tables.iter().any(|t| t.indices.is_failed())
224    }
225
226    /// Treat any `Failed` phase as an error. Tests that don't run under
227    /// contention use this to keep their existing `.await?` style: a real
228    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
229    pub fn into_result(self) -> Result<Self> {
230        for table in &self.tables {
231            if let PhaseOutcome::Failed(error) = &table.indices {
232                anyhow::bail!(
233                    "indices phase failed on {}: {error:#}",
234                    table.table.as_str()
235                );
236            }
237            if let PhaseOutcome::Failed(error) = &table.compaction {
238                anyhow::bail!(
239                    "compaction phase failed on {}: {error:#}",
240                    table.table.as_str()
241                );
242            }
243        }
244        Ok(self)
245    }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249pub struct RowTotals {
250    pub sessions: u64,
251    pub messages: u64,
252    pub parts: u64,
253}
254
255/// Embedding coverage for `pond status` / `pond optimize`. `total` is the count of
256/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
257/// without `search_text` produce no vector. `embedded` is the subset of those
258/// already carrying a vector under the current [`embed::model_id()`]. `backlog`
259/// is the authoritative count still owed an embedding (`total - embedded` by
260/// construction), read live from the dataset rather than derived by subtracting
261/// the FTS `num_docs`, which over-counts deleted-but-unpurged docs and would
262/// otherwise report a phantom backlog that never clears.
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct EmbeddingProgress {
265    pub embedded: usize,
266    pub total: usize,
267    pub backlog: usize,
268    pub model: &'static str,
269}
270
271#[derive(Debug, Clone, Copy)]
272pub struct MessageWrite<'a> {
273    pub message: &'a Message,
274    pub parts: &'a [Part],
275    pub search_text: Option<&'a str>,
276}
277
278impl Store {
279    /// Open against a local filesystem URL or a remote one for which the
280    /// caller has no extra options to pass (env vars suffice). CLI verbs
281    /// that load `[storage]` from config should call
282    /// [`Store::open_with_options`] instead so the same options flow into
283    /// every dataset open and write.
284    pub async fn open(location: &Url) -> Result<Self> {
285        Ok(Self {
286            handle: Handle::open(location).await?,
287            rowmap: ArcSwapOption::empty(),
288        })
289    }
290
291    /// Live byte size of the shared Lance session caches (index + metadata).
292    /// Diagnostic only - walks the caches.
293    pub fn lance_cache_bytes(&self) -> u64 {
294        self.handle.lance_cache_bytes()
295    }
296
297    /// Open with object-store options (S3 creds, region, endpoint, ...)
298    /// threaded through Lance verbatim. Keys are the standard `object_store`
299    /// config names; pond does not parse them. Empty options + default caps
300    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
301    /// config block via [`crate::substrate::RuntimeCaps`].
302    pub async fn open_with_options(
303        location: &Url,
304        storage_options: std::collections::HashMap<String, String>,
305        caps: crate::substrate::RuntimeCaps,
306    ) -> Result<Self> {
307        Ok(Self {
308            handle: Handle::open_with_options(location, storage_options, caps).await?,
309            rowmap: ArcSwapOption::empty(),
310        })
311    }
312
313    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
314    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
315    /// through [`Store::open_with_options`] so the production policy is
316    /// applied; tests get the backend-aware local-FS defaults.
317    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
318        let url = config::url_for_path(path)?;
319        Self::open_with_options(
320            &url,
321            std::collections::HashMap::new(),
322            crate::substrate::RuntimeCaps::default(),
323        )
324        .await
325    }
326
327    /// Export clean, index-free Lance datasets into `dest`.
328    ///
329    /// This rewrites the visible rows of each table instead of copying the
330    /// dataset roots. The resulting manifests therefore contain no references
331    /// to the source store's `_indices`, while `messages.vector` and
332    /// `messages.embedding_model` remain ordinary data columns and are
333    /// preserved.
334    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
335        std::fs::create_dir_all(dest)
336            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
337        let (sessions, sessions_version) = self
338            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
339            .await?;
340        let (messages, messages_version) = self
341            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
342            .await?;
343        let (parts, parts_version) = self
344            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
345            .await?;
346        Ok(LanceArchiveExport {
347            rows: LanceArchiveCounts {
348                sessions,
349                messages,
350                parts,
351            },
352            source_versions: LanceArchiveVersions {
353                sessions: sessions_version,
354                messages: messages_version,
355                parts: parts_version,
356            },
357        })
358    }
359
360    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
361        let sessions_dataset =
362            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
363        let messages_dataset =
364            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
365        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
366        let (sessions, sessions_inserted) = self
367            .import_clean_table(Table::Sessions, sessions_dataset)
368            .await?;
369        let (messages, messages_inserted) = self
370            .import_clean_table(Table::Messages, messages_dataset)
371            .await?;
372        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
373        Ok(LanceArchiveImport {
374            rows: LanceArchiveCounts {
375                sessions,
376                messages,
377                parts,
378            },
379            inserted: LanceArchiveCounts {
380                sessions: sessions_inserted,
381                messages: messages_inserted,
382                parts: parts_inserted,
383            },
384        })
385    }
386
387    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
388        let dataset = self.handle.dataset(table).await?;
389        let source_version = dataset.version_id();
390        let schema = export_schema(table);
391        let mut scan = dataset.scan();
392        // The default scan projects blob columns as descriptor structs
393        // (position/size into the source's blob storage) - meaningless in an
394        // archive and unwritable at V2_1. `AllBinary` materializes the bytes
395        // so the rewritten table is self-contained.
396        scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
397        let mut stream = scan
398            .try_into_stream()
399            .await
400            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
401        let dest_uri = dest
402            .to_str()
403            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
404
405        let mut rows = 0usize;
406        let mut wrote = false;
407        while let Some(batch) = stream.next().await {
408            let batch = batch
409                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
410            rows += batch.num_rows();
411            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
412            let mut params = write_params_for_create();
413            if wrote {
414                params.mode = WriteMode::Append;
415            }
416            Dataset::write(reader, dest_uri, Some(params))
417                .await
418                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
419            wrote = true;
420        }
421
422        if !wrote {
423            let batch = RecordBatch::new_empty(schema.clone());
424            let reader = RecordBatchIterator::new([Ok(batch)], schema);
425            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
426                .await
427                .with_context(|| {
428                    format!("failed to write empty {} archive table", table.as_str())
429                })?;
430        }
431        Ok((rows, source_version))
432    }
433
434    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
435        // Force the destination table into existence up front: an empty
436        // archive table yields zero batches, so merge_insert alone would
437        // leave a lazily-created table (parts) missing on the destination.
438        let _ = self.handle.dataset(table).await?;
439        self.merge_scanner(table, dataset.scan(), "archive import")
440            .await
441    }
442
443    /// Stream a prepared source `scanner` into this store's `table` via
444    /// `merge_insert_stats`, materializing blob bytes (not descriptor structs)
445    /// so the merge writes them into the destination's own schema. Shared by
446    /// the archive-restore and store-to-store copy paths; `context` names the
447    /// caller in error messages. Returns (rows scanned, rows inserted).
448    async fn merge_scanner(
449        &self,
450        table: Table,
451        mut scanner: lance::dataset::scanner::Scanner,
452        context: &'static str,
453    ) -> Result<(usize, usize)> {
454        scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
455        let mut stream = scanner
456            .try_into_stream()
457            .await
458            .with_context(|| format!("failed to scan {} for {context}", table.as_str()))?;
459        let mut rows = 0usize;
460        let mut inserted = 0usize;
461        while let Some(batch) = stream.next().await {
462            let batch = batch
463                .with_context(|| format!("failed to read {} {context} batch", table.as_str()))?;
464            let row_count = batch.num_rows();
465            rows += row_count;
466            let stats = self
467                .handle
468                .merge_insert_stats(table, batch, row_count)
469                .await
470                .with_context(|| format!("failed to merge {} during {context}", table.as_str()))?;
471            inserted += (stats.num_inserted_rows + stats.num_updated_rows) as usize;
472        }
473        Ok((rows, inserted))
474    }
475
476    /// Per-session message count - the data-intrinsic freshness key for
477    /// incremental `pond copy`. pond is append-only (merge is
478    /// `WhenMatched::DoNothing`; no edits or deletes), so this count rises iff a
479    /// session gained messages, catching growth a `MAX(timestamp)` key would
480    /// miss when a new message shares the session's latest timestamp. The count
481    /// is source-authored and survives the copy unchanged, so it compares
482    /// soundly across two stores with independent clocks
483    /// (spec.md#session-durable-copy). Projects only
484    /// the one column it counts; resolves the `session_id` array once per batch
485    /// and allocates a key only on a session's first row. Distinct from
486    /// `session_message_counts`, which counts a supplied id list one query each;
487    /// this counts every session in a single scan.
488    pub async fn all_session_message_counts(&self) -> Result<HashMap<String, usize>> {
489        self.all_session_row_counts(Table::Messages).await
490    }
491
492    pub async fn all_session_part_counts(&self) -> Result<HashMap<String, usize>> {
493        self.all_session_row_counts(Table::Parts).await
494    }
495
496    /// Count rows per `session_id` across one table in a single scan, projecting
497    /// only the `session_id` column and allocating a key on a session's first
498    /// row. Both `messages` and `parts` lead their primary key with `session_id`
499    /// (`lance-table-creation-session-scoped-pk`).
500    async fn all_session_row_counts(&self, table: Table) -> Result<HashMap<String, usize>> {
501        let scanner = self
502            .handle
503            .scan(table, ScanOpts::project_only(&["session_id"]))
504            .await?;
505        let mut stream = scanner.try_into_stream().await?;
506        let mut out: HashMap<String, usize> = HashMap::new();
507        while let Some(batch) = stream.next().await {
508            let batch = batch?;
509            let session_ids = batch
510                .column_by_name("session_id")
511                .context("scan projection dropped the session_id column")?
512                .as_any()
513                .downcast_ref::<StringArray>()
514                .context("session_id column is not Utf8")?;
515            for row in 0..batch.num_rows() {
516                if session_ids.is_null(row) {
517                    continue;
518                }
519                let session_id = session_ids.value(row);
520                if let Some(count) = out.get_mut(session_id) {
521                    *count += 1;
522                } else {
523                    out.insert(session_id.to_owned(), 1);
524                }
525            }
526        }
527        Ok(out)
528    }
529
530    /// Plan an incremental store-to-store copy into `self` from `source`,
531    /// deciding **per table** whether each source session's rows can be appended
532    /// (the destination has none, so they cannot collide) or must be merged (the
533    /// destination has some, source has more). Reads both id-sets plus
534    /// per-session message and part counts. Parts have their own data-derived
535    /// signal so a part added under an existing message routes through merge
536    /// instead of relying on the closing verify to catch it
537    /// (spec.md#session-movement-complete).
538    pub async fn plan_incremental_from(&self, source: &Store) -> Result<DeltaPlan> {
539        let (
540            source_ids,
541            dest_ids,
542            source_msg_counts,
543            dest_msg_counts,
544            source_part_counts,
545            dest_part_counts,
546        ) = tokio::try_join!(
547            source.collect_ids(Table::Sessions),
548            self.collect_ids(Table::Sessions),
549            source.all_session_message_counts(),
550            self.all_session_message_counts(),
551            source.all_session_part_counts(),
552            self.all_session_part_counts(),
553        )?;
554        let source_sessions = source_ids.len();
555        let mut plan = DeltaPlan {
556            source_sessions,
557            ..DeltaPlan::default()
558        };
559        for id in &source_ids {
560            // The `sessions` table holds one immutable row per session, so it
561            // only ever appends an absent id - a present row is identical.
562            if !dest_ids.contains(id) {
563                plan.sessions.append.push(id.clone());
564            }
565            let source_msgs = source_msg_counts.get(id).copied().unwrap_or(0);
566            let dest_msgs = dest_msg_counts.get(id).copied().unwrap_or(0);
567            if dest_msgs == 0 {
568                if source_msgs > 0 {
569                    plan.messages.append.push(id.clone());
570                }
571            } else if source_msgs > dest_msgs {
572                plan.messages.merge.push(id.clone());
573            }
574            let source_parts = source_part_counts.get(id).copied().unwrap_or(0);
575            let dest_parts = dest_part_counts.get(id).copied().unwrap_or(0);
576            if dest_parts == 0 {
577                if source_parts > 0 {
578                    plan.parts.append.push(id.clone());
579                }
580            } else if source_parts > dest_parts {
581                plan.parts.merge.push(id.clone());
582            }
583        }
584        Ok(plan)
585    }
586
587    /// Copy the planned delta from `source` into `self`, streaming the source
588    /// scan straight into the destination - no local staging copy. Each table
589    /// picks its primitive per session from its [`TablePlan`]
590    /// (spec.md#session-durable-copy): **append** the sessions whose rows are
591    /// absent here (cannot collide; one commit per scan, bandwidth-bound) then
592    /// **merge** the partially-present ones (`WhenMatched::DoNothing` dedups the
593    /// rows already there). Append-only storage is what makes the append safe: a
594    /// re-run re-plans from current destination state, so an
595    /// interrupted-then-resumed copy never double-appends (landed rows are no
596    /// longer absent).
597    pub async fn copy_delta_from(
598        &self,
599        source: &Store,
600        plan: &DeltaPlan,
601    ) -> Result<LanceArchiveImport> {
602        // The three tables are independent Lance datasets with separate write
603        // locks, so copy them concurrently - mirrors the ingest path's
604        // three-table `try_join!` (see `upsert_session_batch`).
605        let ((sessions, sessions_inserted), (messages, messages_inserted), (parts, parts_inserted)) =
606            tokio::try_join!(
607                self.copy_table(
608                    source,
609                    Table::Sessions,
610                    "id",
611                    &plan.sessions,
612                    plan.source_sessions,
613                ),
614                self.copy_table(
615                    source,
616                    Table::Messages,
617                    "session_id",
618                    &plan.messages,
619                    plan.source_sessions,
620                ),
621                self.copy_table(
622                    source,
623                    Table::Parts,
624                    "session_id",
625                    &plan.parts,
626                    plan.source_sessions,
627                ),
628            )?;
629        Ok(LanceArchiveImport {
630            rows: LanceArchiveCounts {
631                sessions,
632                messages,
633                parts,
634            },
635            inserted: LanceArchiveCounts {
636                sessions: sessions_inserted,
637                messages: messages_inserted,
638                parts: parts_inserted,
639            },
640        })
641    }
642
643    /// Copy one table's slice of the plan: append the sessions whose rows are
644    /// absent here, then merge the ones whose rows are partially present.
645    /// Sequential within a table (one write lock); `copy_delta_from` runs the
646    /// three tables in parallel. Returns (rows transferred, rows inserted) -
647    /// equal for the append portion, which never dedups.
648    async fn copy_table(
649        &self,
650        source: &Store,
651        table: Table,
652        key_column: &'static str,
653        table_plan: &TablePlan,
654        source_sessions: usize,
655    ) -> Result<(usize, usize)> {
656        // Force the destination table into existence up front so a lazily
657        // created table (parts) is never left missing when its slice is empty -
658        // same reason as the archive import path.
659        let _ = self.handle.dataset(table).await?;
660
661        let appended = self
662            .append_sessions(
663                source,
664                table,
665                key_column,
666                &table_plan.append,
667                source_sessions,
668            )
669            .await?;
670
671        // Sessions with rows already on the destination take the merge path to
672        // skip the rows already there. Chunk the `IN` list so each chunk pushes
673        // down to the key column's btree index.
674        let mut merged_rows = 0usize;
675        let mut merged_inserted = 0usize;
676        for chunk in table_plan.merge.chunks(COPY_SESSION_IN_CHUNK) {
677            let predicate = in_predicate(key_column, chunk);
678            let scanner = source
679                .handle
680                .scan(
681                    table,
682                    ScanOpts {
683                        predicate: Some(&predicate),
684                        projection: None,
685                    },
686                )
687                .await?;
688            let (r, i) = self.merge_scanner(table, scanner, "copy").await?;
689            merged_rows += r;
690            merged_inserted += i;
691        }
692
693        Ok((appended + merged_rows, appended + merged_inserted))
694    }
695
696    /// Append one table's slice for the listed sessions. A from-empty or resumed
697    /// copy (`session_ids.len() == source_sessions`: every session's rows for
698    /// this table are absent on the destination) scans the source wholesale
699    /// under one commit; a partial copy chunks the `IN` predicate (btree-pushed)
700    /// but still commits once per chunk, not per scan batch. Returns rows
701    /// appended.
702    async fn append_sessions(
703        &self,
704        source: &Store,
705        table: Table,
706        key_column: &'static str,
707        session_ids: &[String],
708        source_sessions: usize,
709    ) -> Result<usize> {
710        if session_ids.is_empty() {
711            return Ok(0);
712        }
713        if session_ids.len() == source_sessions {
714            return self.append_scanner(source, table, None).await;
715        }
716        let mut rows = 0usize;
717        for chunk in session_ids.chunks(COPY_SESSION_IN_CHUNK) {
718            let predicate = in_predicate(key_column, chunk);
719            rows += self.append_scanner(source, table, Some(&predicate)).await?;
720        }
721        Ok(rows)
722    }
723
724    /// Append a prepared source scan into this store's `table` via
725    /// `Handle::append_stream`, materializing blob bytes (`AllBinary`) so the
726    /// write is self-contained. The closure is a *factory*: `append_stream`
727    /// rebuilds the one-shot scan stream on each OCC attempt. Returns rows
728    /// appended.
729    async fn append_scanner(
730        &self,
731        source: &Store,
732        table: Table,
733        predicate: Option<&Predicate>,
734    ) -> Result<usize> {
735        let make_source = || async {
736            let mut scanner = source
737                .handle
738                .scan(
739                    table,
740                    ScanOpts {
741                        predicate,
742                        projection: None,
743                    },
744                )
745                .await?;
746            scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
747            let stream: SendableRecordBatchStream = scanner
748                .try_into_stream()
749                .await
750                .with_context(|| format!("failed to scan {} for copy", table.as_str()))?
751                .into();
752            Ok(stream)
753        };
754        let stats = self.handle.append_stream(table, make_source).await?;
755        Ok(stats.rows as usize)
756    }
757
758    /// Append source rows whose `filter_column` is in `values`. Absent rows
759    /// can't collide, so append is safe where the count-based plan would merge
760    /// (spec.md#session-durable-copy). Drives the `copy_bench` append-vs-merge
761    /// regression guard.
762    pub async fn append_absent_rows(
763        &self,
764        source: &Store,
765        table: Table,
766        filter_column: &'static str,
767        values: &[String],
768    ) -> Result<usize> {
769        if values.is_empty() {
770            return Ok(0);
771        }
772        let _ = self.handle.dataset(table).await?;
773        let mut rows = 0usize;
774        for chunk in values.chunks(COPY_SESSION_IN_CHUNK) {
775            let predicate = in_predicate(filter_column, chunk);
776            rows += self.append_scanner(source, table, Some(&predicate)).await?;
777        }
778        Ok(rows)
779    }
780
781    /// Flat write path. Per-row insert/match truth is not synthesized here -
782    /// honest outcomes come from the pre-existence scan on
783    /// [`Self::upsert_session_batch`]; the CLI sync and wire ingest paths use
784    /// that, so these helpers only need to surface write failure.
785    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
786        if sessions.is_empty() {
787            return Ok(());
788        }
789        let batches = sessions_batches(sessions)?;
790        merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
791        Ok(())
792    }
793
794    /// Batched write path used by the adapter ingest loop and by the wire
795    /// handler's final flush. Receives N completed substreams from the
796    /// validator and:
797    ///
798    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
799    ///      per session, sequentially. Sessions that fail produce one Error
800    ///      outcome and are excluded from the write batch.
801    ///   2. Deduplicates in-batch at the substream level: when two substreams
802    ///      in the same batch share a `session_id` (Claude Code's subagent
803    ///      files reuse their parent's id), the first occurrence wins. The
804    ///      second is either *merged* (same `source_agent` + `project`:
805    ///      messages/parts append, no duplicate rows) or *rejected*
806    ///      (different `project` - the subagent-vs-parent case). Row-level
807    ///      duplicates that slip past here are caught downstream by Lance's
808    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
809    ///      (invariant 17): this layer's job is preserving substream merge
810    ///      semantics, not policing the PK uniqueness Lance handles itself.
811    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
812    ///      parts) across every valid substream.
813    ///   4. Commits messages + parts first, then sessions. The session row is
814    ///      the freshness-bearing row; writing it last makes a partial
815    ///      non-atomic flush re-ingest and heal (spec.md#session-movement-complete).
816    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
817    async fn upsert_session_batch(
818        &self,
819        substreams: Vec<CompletedSubstream>,
820    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
821        if substreams.is_empty() {
822            return Ok((Vec::new(), BatchCounts::default()));
823        }
824
825        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
826        let mut counts = BatchCounts::default();
827
828        // In-batch dedup. First occurrence of each session_id wins; later
829        // occurrences either merge or get rejected. Iteration order preserves
830        // original substream order so outcomes index correctly.
831        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
832        let mut by_session_id: std::collections::HashMap<String, usize> =
833            std::collections::HashMap::with_capacity(substreams.len());
834        for substream in substreams {
835            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
836                let existing = &merged[existing_idx];
837                if existing.session.source_agent != substream.session.source_agent
838                    || existing.session.project != substream.session.project
839                {
840                    // Subagent-vs-parent class. The first occurrence's
841                    // metadata stays authoritative; this substream is
842                    // rejected on the same immutable-field axis as the
843                    // storage-side check.
844                    let reason = if existing.session.source_agent != substream.session.source_agent
845                    {
846                        IngestError::ImmutableField {
847                            field: "source_agent",
848                            session_id: substream.session.id.clone(),
849                            stored: existing.session.source_agent.clone(),
850                            attempted: substream.session.source_agent.clone(),
851                        }
852                    } else {
853                        IngestError::ImmutableField {
854                            field: "project",
855                            session_id: substream.session.id.clone(),
856                            stored: (*existing.session.project).clone(),
857                            attempted: (*substream.session.project).clone(),
858                        }
859                    };
860                    let field = match &reason {
861                        IngestError::ImmutableField { field, .. } => Some(*field),
862                    };
863                    let reason_key = match field {
864                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
865                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
866                        _ => DROP_REASON_UNCATEGORIZED,
867                    };
868                    outcomes.extend(error_outcomes_for_substream(
869                        substream.session_index,
870                        &substream.session,
871                        &substream.messages,
872                        reason.to_string(),
873                        field,
874                        reason_key,
875                    ));
876                    continue;
877                }
878                // Same session, same metadata: merge messages. Dedup message
879                // ids defensively (within one batch, the validator's seen
880                // sets are per-substream so cross-substream dups can happen
881                // legally if both files re-emit the same row).
882                let existing = &mut merged[existing_idx];
883                let mut seen: std::collections::HashSet<String> = existing
884                    .messages
885                    .iter()
886                    .map(|m| m.message.id().to_owned())
887                    .collect();
888                for msg in substream.messages {
889                    if seen.insert(msg.message.id().to_owned()) {
890                        existing.messages.push(msg);
891                    }
892                }
893                continue;
894            }
895            by_session_id.insert(substream.session.id.clone(), merged.len());
896            merged.push(substream);
897        }
898
899        // Pre-existence sweep: one scan per table keyed on the batch's
900        // session_ids, capped at the substream count. Replaces the prior
901        // N-sequential `find_session` calls and gives us honest per-row
902        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
903        let session_id_values: Vec<ScalarValue> = merged
904            .iter()
905            .map(|substream| ScalarValue::String(substream.session.id.clone()))
906            .collect();
907        let existing_sessions: std::collections::HashMap<String, Session> =
908            if session_id_values.is_empty() {
909                std::collections::HashMap::new()
910            } else {
911                let batch = self
912                    .handle
913                    .scan_batch(
914                        Table::Sessions,
915                        Some(&Predicate::In("id", session_id_values.clone())),
916                        &[],
917                    )
918                    .await?;
919                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
920                for row in 0..batch.num_rows() {
921                    let session = session_from_batch(&batch, row)?;
922                    map.insert(session.id.clone(), session);
923                }
924                map
925            };
926        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
927            HashSet::new()
928        } else {
929            let batch = self
930                .handle
931                .scan_batch(
932                    Table::Messages,
933                    Some(&Predicate::In("session_id", session_id_values.clone())),
934                    &["session_id", "id"],
935                )
936                .await?;
937            let mut set = HashSet::with_capacity(batch.num_rows());
938            for row in 0..batch.num_rows() {
939                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
940                let mid = string(&batch, "id", row)?.context("message id is null")?;
941                set.insert((sid, mid));
942            }
943            set
944        };
945        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
946            HashSet::new()
947        } else {
948            let batch = self
949                .handle
950                .scan_batch(
951                    Table::Parts,
952                    Some(&Predicate::In("session_id", session_id_values)),
953                    &["session_id", "message_id", "id"],
954                )
955                .await?;
956            let mut set = HashSet::with_capacity(batch.num_rows());
957            for row in 0..batch.num_rows() {
958                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
959                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
960                let pid = string(&batch, "id", row)?.context("part id is null")?;
961                set.insert((sid, mid, pid));
962            }
963            set
964        };
965
966        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
967        for substream in merged {
968            if let Some(existing) = existing_sessions.get(&substream.session.id)
969                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
970            {
971                let field = match &failure {
972                    IngestError::ImmutableField { field, .. } => Some(*field),
973                };
974                let reason_key = match field {
975                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
976                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
977                    _ => DROP_REASON_UNCATEGORIZED,
978                };
979                outcomes.extend(error_outcomes_for_substream(
980                    substream.session_index,
981                    &substream.session,
982                    &substream.messages,
983                    failure.to_string(),
984                    field,
985                    reason_key,
986                ));
987                continue;
988            }
989            writeable.push(substream);
990        }
991
992        if writeable.is_empty() {
993            outcomes.sort_by_key(|outcome| outcome.index);
994            return Ok((outcomes, counts));
995        }
996
997        let sessions_owned: Vec<Session> = writeable
998            .iter()
999            .map(|substream| substream.session.clone())
1000            .collect();
1001        // Append only rows absent from the pre-existence sweep; present rows are
1002        // merge no-ops (spec.md#adapter-integrity-additive-sync). `seen_*` carries
1003        // the in-batch dedup floor (spec.md#adapter-integrity-dedup).
1004        let mut seen_messages: HashSet<(String, String)> = HashSet::new();
1005        let message_rows: Vec<MessageBatchRow<'_>> = writeable
1006            .iter()
1007            .flat_map(|substream| {
1008                substream.messages.iter().map(|buffered| MessageBatchRow {
1009                    message: &buffered.message,
1010                    source_agent: &substream.session.source_agent,
1011                    project: &substream.session.project,
1012                    search_text: buffered.search_text.as_deref(),
1013                })
1014            })
1015            .filter(|row| {
1016                let key = (
1017                    row.message.session_id().to_owned(),
1018                    row.message.id().to_owned(),
1019                );
1020                !existing_message_pks.contains(&key) && seen_messages.insert(key)
1021            })
1022            .collect();
1023        let mut seen_parts: HashSet<(String, String, String)> = HashSet::new();
1024        let part_rows: Vec<Part> = writeable
1025            .iter()
1026            .flat_map(|substream| {
1027                substream.messages.iter().flat_map(|buffered| {
1028                    buffered
1029                        .parts
1030                        .iter()
1031                        .map(|buffered_part| buffered_part.part.clone())
1032                })
1033            })
1034            .filter(|part| {
1035                let key = (
1036                    part.session_id.clone(),
1037                    part.message_id.clone(),
1038                    part.id.clone(),
1039                );
1040                !existing_part_pks.contains(&key) && seen_parts.insert(key)
1041            })
1042            .collect();
1043
1044        let session_batches = sessions_batches(&sessions_owned)?;
1045        let message_batches = messages_batches(&message_rows)?;
1046        let part_batches = parts_batches(&part_rows)?;
1047
1048        let (_messages_appended, _parts_appended) = tokio::try_join!(
1049            self.handle.append_batches(Table::Messages, message_batches),
1050            self.handle.append_batches(Table::Parts, part_batches),
1051        )?;
1052        let _sessions_inserted =
1053            merge_insert_chunks(&self.handle, Table::Sessions, session_batches).await?;
1054
1055        for substream in &writeable {
1056            outcomes.extend(success_outcomes_for_substream(
1057                substream.session_index,
1058                &substream.session,
1059                &substream.messages,
1060                &existing_sessions,
1061                &existing_message_pks,
1062                &existing_part_pks,
1063                &mut counts,
1064            ));
1065        }
1066
1067        outcomes.sort_by_key(|outcome| outcome.index);
1068        Ok((outcomes, counts))
1069    }
1070
1071    pub async fn upsert_messages(
1072        &self,
1073        session: &Session,
1074        messages: &[MessageWrite<'_>],
1075    ) -> Result<()> {
1076        if messages.is_empty() {
1077            return Ok(());
1078        }
1079
1080        let rows = messages
1081            .iter()
1082            .map(|write| MessageBatchRow {
1083                message: write.message,
1084                source_agent: &session.source_agent,
1085                project: &session.project,
1086                search_text: write.search_text,
1087            })
1088            .collect::<Vec<_>>();
1089        let batches = messages_batches(&rows)?;
1090        merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
1091        Ok(())
1092    }
1093
1094    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
1095        if parts.is_empty() {
1096            return Ok(());
1097        }
1098        let batches = parts_batches(parts)?;
1099        merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
1100        Ok(())
1101    }
1102
1103    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
1104        let Some(session) = self.find_session(session_id).await? else {
1105            return Ok(None);
1106        };
1107        let messages = self.messages_for_session(session_id).await?;
1108        Ok(Some(SessionWithMessages { session, messages }))
1109    }
1110
1111    /// Every session id currently in the store, unsorted.
1112    pub async fn session_ids(&self) -> Result<Vec<String>> {
1113        let batch = self
1114            .handle
1115            .scan_batch(Table::Sessions, None, &["id"])
1116            .await?;
1117        let mut ids = Vec::with_capacity(batch.num_rows());
1118        for row in 0..batch.num_rows() {
1119            if let Some(id) = string(&batch, "id", row)? {
1120                ids.push(id);
1121            }
1122        }
1123        Ok(ids)
1124    }
1125
1126    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
1127        let batch = self
1128            .handle
1129            .scan_batch(
1130                Table::Sessions,
1131                Some(&Predicate::Eq(
1132                    "parent_session_id",
1133                    parent_session_id.into(),
1134                )),
1135                &[
1136                    "id",
1137                    "parent_session_id",
1138                    "parent_message_id",
1139                    "source_agent",
1140                    "created_at",
1141                    "project",
1142                    "options",
1143                ],
1144            )
1145            .await?;
1146        let mut sessions = Vec::with_capacity(batch.num_rows());
1147        for row in 0..batch.num_rows() {
1148            sessions.push(session_from_batch(&batch, row)?);
1149        }
1150        sessions.sort_by(|left, right| left.id.cmp(&right.id));
1151        Ok(sessions)
1152    }
1153
1154    /// `session_id -> last durable message id` for the sync freshness gate.
1155    /// Scans stored message data only, never Lance version history:
1156    /// `Dataset::versions()` is remote-manifest-bound on object stores, and a
1157    /// write timestamp can exist even when a non-atomic ingest did not commit
1158    /// the messages (spec.md#session-movement-complete).
1159    ///
1160    /// Only emits a key when the session row is ALSO durable. `upsert_session_batch`
1161    /// commits messages+parts before the session row, so a partial flush can leave
1162    /// a session whose messages are stored but whose session row is not; keying on
1163    /// messages alone would report it fresh and orphan the missing row. Intersecting
1164    /// with the sessions id-set forces a re-ingest that heals it
1165    /// (spec.md#session-movement-complete).
1166    pub async fn session_last_message_ids(&self) -> Result<HashMap<String, String>> {
1167        let (session_ids, latest) = tokio::try_join!(self.collect_ids(Table::Sessions), async {
1168            let scanner = self
1169                .handle
1170                .scan(
1171                    Table::Messages,
1172                    ScanOpts::project_only(&["session_id", "id", "timestamp"]),
1173                )
1174                .await?;
1175            let mut stream = scanner.try_into_stream().await?;
1176            let mut latest: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
1177            while let Some(batch) = stream.next().await {
1178                let batch = batch?;
1179                let session_ids = batch
1180                    .column_by_name("session_id")
1181                    .context("scan projection dropped the session_id column")?
1182                    .as_any()
1183                    .downcast_ref::<StringArray>()
1184                    .context("session_id column is not Utf8")?;
1185                for row in 0..batch.num_rows() {
1186                    if session_ids.is_null(row) {
1187                        continue;
1188                    }
1189                    let session_id = session_ids.value(row);
1190                    let Some(id) = string(&batch, "id", row)? else {
1191                        continue;
1192                    };
1193                    let timestamp = datetime(&batch, "timestamp", row)?;
1194                    match latest.get_mut(session_id) {
1195                        Some((stored_ts, stored_id))
1196                            if timestamp > *stored_ts
1197                                || (timestamp == *stored_ts
1198                                    && id.as_str() > stored_id.as_str()) =>
1199                        {
1200                            *stored_ts = timestamp;
1201                            *stored_id = id;
1202                        }
1203                        None => {
1204                            latest.insert(session_id.to_owned(), (timestamp, id));
1205                        }
1206                        _ => {}
1207                    }
1208                }
1209            }
1210            Ok::<_, anyhow::Error>(latest)
1211        })?;
1212        Ok(latest
1213            .into_iter()
1214            .filter(|(session_id, _)| session_ids.contains(session_id))
1215            .map(|(session_id, (_, message_id))| (session_id, message_id))
1216            .collect())
1217    }
1218
1219    /// Whole-session view for `pond_get` session scope (spec.md#protocol).
1220    /// Always the conversational view (`search_text IS NOT NULL`) with one-line
1221    /// part summaries - full part bodies are reached by `message_id` scope, not
1222    /// here. The page is the window selected by the anchors (`after_message_id`
1223    /// pages forward, `before_message_id` pages backward) or, with neither,
1224    /// `session_from` (start/end); it is bounded by `limit` and a byte budget,
1225    /// never cutting mid-message. `before_remaining`/`after_remaining` drive the
1226    /// bidirectional page markers.
1227    pub async fn session_view(
1228        &self,
1229        session_id: &str,
1230        params: SessionViewParams<'_>,
1231    ) -> Result<GetLookup<SessionPage>> {
1232        let Some(session) = self.find_session(session_id).await? else {
1233            return Ok(GetLookup::NotFound);
1234        };
1235        let mut rows: Vec<ScanRow> = self
1236            .scan_conversational_messages(session_id)
1237            .await?
1238            .into_iter()
1239            .map(|row| ScanRow {
1240                id: row.message_id,
1241                role: row.role,
1242                timestamp: row.timestamp,
1243                text: Some(row.text.into_inner()),
1244                content: None,
1245            })
1246            .collect();
1247        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1248
1249        let size = |row: &ScanRow| row.text.as_deref().map_or(0, str::len);
1250        let total = rows.len();
1251        // Append-only stream: a real anchor never vanishes, so an unknown
1252        // anchor is a stale/mistyped client cursor, not "start over".
1253        let (win_start, win_end) = match (params.after_message_id, params.before_message_id) {
1254            (Some(after), _) => {
1255                let pos = match rows.iter().position(|row| row.id == after) {
1256                    Some(idx) => idx + 1,
1257                    None => return Ok(GetLookup::UnknownAnchor),
1258                };
1259                let n = page_by(&rows[pos..], params.limit, params.budget_bytes, size);
1260                (pos, pos + n)
1261            }
1262            (None, Some(before)) => {
1263                let pos = match rows.iter().position(|row| row.id == before) {
1264                    Some(idx) => idx,
1265                    None => return Ok(GetLookup::UnknownAnchor),
1266                };
1267                let n = page_tail(&rows[..pos], params.limit, params.budget_bytes, size);
1268                (pos - n, pos)
1269            }
1270            (None, None) => match params.session_from {
1271                SessionFrom::Start => (0, page_by(&rows, params.limit, params.budget_bytes, size)),
1272                SessionFrom::End => {
1273                    let n = page_tail(&rows, params.limit, params.budget_bytes, size);
1274                    (total - n, total)
1275                }
1276            },
1277        };
1278        let emitted = &rows[win_start..win_end];
1279        let before_remaining = win_start;
1280        let after_remaining = total - win_end;
1281        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
1282
1283        let mut parts_by_message = self.summary_parts_for_messages(session_id, &ids).await?;
1284        let messages = emitted
1285            .iter()
1286            .map(|row| RetrievedMessage {
1287                id: row.id.clone(),
1288                role: row.role,
1289                timestamp: row.timestamp,
1290                text: row.text.clone(),
1291                content: row.content.clone(),
1292                parts: parts_by_message
1293                    .remove(&(session_id.to_owned(), row.id.clone()))
1294                    .unwrap_or_default(),
1295            })
1296            .collect();
1297
1298        Ok(GetLookup::Found(SessionPage {
1299            session,
1300            messages,
1301            before_remaining,
1302            after_remaining,
1303        }))
1304    }
1305
1306    /// Message-scope retrieval for `pond_get` message scope (spec.md#protocol):
1307    /// the target with its full parts (budget-bounded) plus `context_before`
1308    /// conversational siblings before and `context_after` after it. `NotFound`
1309    /// when no stored message carries `message_id`. Sibling parts are carried
1310    /// for summarizing; the target's parts ride `target_parts`.
1311    pub async fn message_view(
1312        &self,
1313        message_id: &str,
1314        params: MessageViewParams,
1315    ) -> Result<GetLookup<MessagePage>> {
1316        let Some(session_id) = self.session_id_for_message(message_id).await? else {
1317            return Ok(GetLookup::NotFound);
1318        };
1319        let Some(session) = self.find_session(&session_id).await? else {
1320            return Ok(GetLookup::NotFound);
1321        };
1322        let mut rows = self.scan_all_messages(&session_id).await?;
1323        // Siblings are always the conversational view: in carrier-heavy sessions
1324        // the system/tool rows would otherwise fill the whole window and push
1325        // the actual conversation out of it. The target stays regardless of its
1326        // own role - the caller asked for that message.
1327        rows.retain(|row| row.text.is_some() || row.id == message_id);
1328        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1329        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
1330            return Ok(GetLookup::NotFound);
1331        };
1332
1333        let start = target_pos.saturating_sub(params.context_before);
1334        let end = (target_pos + params.context_after + 1).min(rows.len());
1335        let window = &rows[start..end];
1336        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
1337        // The target's full parts (blobs included) ride the response; siblings
1338        // are only summarized, but they share this one window scan.
1339        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
1340
1341        let all_parts = parts_by_message
1342            .remove(&(session_id.clone(), message_id.to_owned()))
1343            .unwrap_or_default();
1344        // Target parts are budget-bounded (no per-part pagination cursor). The
1345        // 1000 cap is the page_by hard ceiling; the budget is the real bound.
1346        let part_count = page_by(&all_parts, 1000, params.budget_bytes, |part| {
1347            serde_json::to_string(part).map_or(0, |json| json.len())
1348        });
1349        let target_parts = all_parts[..part_count].to_vec();
1350        let target_parts_remaining = all_parts.len() - part_count;
1351
1352        let target_row = &rows[target_pos];
1353        let target = RetrievedMessage {
1354            id: target_row.id.clone(),
1355            role: target_row.role,
1356            timestamp: target_row.timestamp,
1357            text: target_row.text.clone(),
1358            content: target_row.content.clone(),
1359            // Target structure is carried in full by `target_parts`.
1360            parts: Vec::new(),
1361        };
1362        let siblings = window
1363            .iter()
1364            .enumerate()
1365            .filter(|(idx, _)| start + idx != target_pos)
1366            .map(|(_, row)| RetrievedMessage {
1367                id: row.id.clone(),
1368                role: row.role,
1369                timestamp: row.timestamp,
1370                text: row.text.clone(),
1371                content: row.content.clone(),
1372                parts: parts_by_message
1373                    .get(&(session_id.clone(), row.id.clone()))
1374                    .cloned()
1375                    .unwrap_or_default(),
1376            })
1377            .collect();
1378
1379        Ok(GetLookup::Found(MessagePage {
1380            session,
1381            target,
1382            target_parts,
1383            target_parts_remaining,
1384            siblings,
1385        }))
1386    }
1387
1388    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1389        let batch = self
1390            .handle
1391            .scan_batch(
1392                Table::Messages,
1393                Some(&Predicate::Eq("session_id", session_id.into())),
1394                &["id", "timestamp", "role", "search_text", "content"],
1395            )
1396            .await?;
1397        let mut rows = Vec::with_capacity(batch.num_rows());
1398        for row in 0..batch.num_rows() {
1399            let id = string(&batch, "id", row)?.context("message id is null")?;
1400            let role =
1401                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1402            let timestamp = datetime(&batch, "timestamp", row)?;
1403            rows.push(ScanRow {
1404                id,
1405                role,
1406                timestamp,
1407                text: string(&batch, "search_text", row)?,
1408                content: string(&batch, "content", row)?,
1409            });
1410        }
1411        Ok(rows)
1412    }
1413
1414    /// Conversational scan over one session: rows ordered by
1415    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1416    /// read seam (spec.md#search-prefilter-pushdown).
1417    pub async fn scan_conversational_messages(
1418        &self,
1419        session_id: &str,
1420    ) -> Result<Vec<ConversationalRow>> {
1421        let filter = Predicate::And(vec![
1422            Predicate::Eq("session_id", session_id.into()),
1423            Predicate::IsNotNull("search_text"),
1424        ]);
1425        let batch = self
1426            .handle
1427            .scan_batch(
1428                Table::Messages,
1429                Some(&filter),
1430                &["id", "timestamp", "role", "search_text"],
1431            )
1432            .await?;
1433
1434        let mut rows = Vec::with_capacity(batch.num_rows());
1435        for row in 0..batch.num_rows() {
1436            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1437            let role =
1438                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1439            let timestamp = datetime(&batch, "timestamp", row)?;
1440            let text_str = string(&batch, "search_text", row)?.context(
1441                "search_text null after IsNotNull pushdown - storage invariant violated",
1442            )?;
1443            rows.push(ConversationalRow {
1444                session_id: session_id.to_owned(),
1445                message_id,
1446                role,
1447                timestamp,
1448                text: SearchText(text_str),
1449            });
1450        }
1451        rows.sort_by(|a, b| {
1452            a.timestamp
1453                .cmp(&b.timestamp)
1454                .then_with(|| a.message_id.cmp(&b.message_id))
1455        });
1456        Ok(rows)
1457    }
1458
1459    /// Locate the session id for a stored message. Cheap when only the routing
1460    /// hint is needed - callers that need the messages use `scan_all_messages`.
1461    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1462        let batch = self
1463            .handle
1464            .scan_batch(
1465                Table::Messages,
1466                Some(&Predicate::Eq("id", message_id.into())),
1467                &["session_id"],
1468            )
1469            .await?;
1470        if batch.num_rows() == 0 {
1471            return Ok(None);
1472        }
1473        string(&batch, "session_id", 0)
1474    }
1475
1476    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1477        self.handle.row_counts().await
1478    }
1479
1480    /// The primary-key (`id`) set for `table`. Powers storage verification
1481    /// (`pond copy --verify-only` and copy's closing check).
1482    pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
1483        self.handle.collect_ids(table).await
1484    }
1485
1486    /// Stream `table`'s `id` column and return `(rows_scanned, rows whose id is
1487    /// absent from `present`)`. Streaming means the scanned side is never
1488    /// materialized into a set, so a verify holds only the `present` (other)
1489    /// side per table instead of both.
1490    pub async fn id_diff_against(
1491        &self,
1492        table: Table,
1493        present: &std::collections::HashSet<String>,
1494    ) -> Result<(usize, usize)> {
1495        let scanner = self
1496            .handle
1497            .scan(table, ScanOpts::project_only(&["id"]))
1498            .await?;
1499        let mut stream = scanner.try_into_stream().await?;
1500        let (mut rows, mut absent) = (0usize, 0usize);
1501        while let Some(batch) = stream.next().await {
1502            let batch = batch?;
1503            let ids = batch
1504                .column_by_name("id")
1505                .context("scan projection dropped the id column")?
1506                .as_any()
1507                .downcast_ref::<StringArray>()
1508                .context("id column is not Utf8")?;
1509            for id in ids.iter().flatten() {
1510                rows += 1;
1511                if !present.contains(id) {
1512                    absent += 1;
1513                }
1514            }
1515        }
1516        Ok((rows, absent))
1517    }
1518
1519    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1520    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1521    /// handle's freshness gate, so each query sees a current snapshot.
1522    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1523        Ok(Arc::new(self.handle.dataset(table).await?))
1524    }
1525
1526    /// Page the heavy search indices in from storage so the first user query
1527    /// after process start never eats the 175-442 s cold S3 load
1528    /// (spec.md#search). Vector via `prewarm_index` (loads the IVF_SQ partition
1529    /// storage). FTS is warmed with one synthetic query: Lance's full FTS
1530    /// `prewarm_index` loads *every* token's posting list, which resident-sets
1531    /// the whole inverted index (~1.7 GiB on the 2M-row corpus) and blows the
1532    /// server RAM budget - so we settle the term dictionary + a hot token
1533    /// instead and let real queries page their own postings. Best effort: a
1534    /// missing index (IVF_SQ below activation, or no FTS yet on an empty store)
1535    /// is logged, not fatal.
1536    pub async fn prewarm(&self, cache_dir: &Path) -> Result<()> {
1537        let messages = self.dataset(Table::Messages).await?;
1538        if let Err(error) = messages.prewarm_index(MESSAGES_VECTOR_INDEX).await {
1539            tracing::debug!(%error, "vector index prewarm skipped");
1540        }
1541        // Best-effort: on failure `rowmap` stays empty and the arms fall back to
1542        // the data-take path, so search still works (slower on a remote store).
1543        if let Err(error) = self.ensure_rowmap(cache_dir).await {
1544            tracing::warn!(%error, "rowmap build skipped; arms fall back to data-take resolution");
1545        }
1546        // Warm the FTS posting lists; the rowmap build above touched only the
1547        // data columns.
1548        if let Err(error) = self
1549            .fts_search("pond", 1, &Predicate::And(Vec::new()))
1550            .await
1551        {
1552            tracing::debug!(%error, "fts index prewarm skipped");
1553        }
1554        Ok(())
1555    }
1556
1557    /// Stable filesystem-safe cache key: same store URL -> same key, so sibling
1558    /// pond processes share one map file and distinct stores never collide.
1559    fn store_key(&self) -> String {
1560        let digest = blake3::hash(self.handle.location().as_str().as_bytes());
1561        digest.to_hex()[..16].to_owned()
1562    }
1563
1564    /// Max delta segments before the chain is compacted into a fresh base.
1565    const MAX_ROWMAP_DELTAS: usize = 16;
1566
1567    /// Columns the resident meta map is built from. The full scan and the delta
1568    /// scan MUST project the same set in the same order - both feed
1569    /// [`row_meta_entry`], so a column added to one only would silently corrupt
1570    /// delta hydration.
1571    const ROW_META_COLUMNS: [&str; 7] = [
1572        "session_id",
1573        "id",
1574        "role",
1575        "project",
1576        "source_agent",
1577        "timestamp",
1578        "search_text",
1579    ];
1580
1581    /// Install the resident meta map covering the current `messages` version.
1582    /// Idempotent - a chain already at that version is kept. On a version bump
1583    /// it layers a delta segment (scanning only the new fragments), compacts the
1584    /// deltas locally once they pile up, and full-rebuilds the base only on a
1585    /// store compaction - all under a build `flock` so N local processes don't
1586    /// rescan the store at once.
1587    pub async fn ensure_rowmap(&self, cache_dir: &Path) -> Result<()> {
1588        let version = self.messages_version().await?;
1589        if let Some(current) = self.rowmap.load_full()
1590            && current.version() == version
1591        {
1592            return Ok(());
1593        }
1594        std::fs::create_dir_all(cache_dir)
1595            .with_context(|| format!("create cache dir {}", cache_dir.display()))?;
1596        let store_key = self.store_key();
1597
1598        // A sibling may already have published a chain at this version; install
1599        // it without rebuilding.
1600        if let Some(chain) = discover_chain(cache_dir, &store_key)
1601            && chain.version() == version
1602            && let Ok(set) = RowMetaSet::open(&chain)
1603        {
1604            self.rowmap.store(Some(Arc::new(set)));
1605            Self::sweep_stale_rowmaps(cache_dir, &store_key, chain.base_version);
1606            return Ok(());
1607        }
1608        if let Some(set) = self
1609            .extend_rowmap_coordinated(cache_dir, &store_key, version)
1610            .await?
1611        {
1612            self.rowmap.store(Some(Arc::new(set)));
1613        }
1614        Ok(())
1615    }
1616
1617    /// Extend the chain to `version` under the build `flock` (spec: lock the
1618    /// build only; atomic rename already prevents corruption). `None` when
1619    /// another local process holds the lock - this caller keeps its current map
1620    /// (or the take_rows fallback) until a later refresh opens what the winner
1621    /// published.
1622    async fn extend_rowmap_coordinated(
1623        &self,
1624        cache_dir: &Path,
1625        store_key: &str,
1626        version: u64,
1627    ) -> Result<Option<RowMetaSet>> {
1628        let lock_path = cache_dir.join(format!("rowmetamap-{store_key}.lock"));
1629        let lock = std::fs::File::create(&lock_path)
1630            .with_context(|| format!("create rowmap build lock {}", lock_path.display()))?;
1631        match lock.try_lock() {
1632            Ok(()) => {}
1633            Err(std::fs::TryLockError::WouldBlock) => return Ok(None),
1634            Err(std::fs::TryLockError::Error(error)) => {
1635                return Err(error).context("lock rowmap build");
1636            }
1637        }
1638
1639        // Re-check after acquiring: a sibling may have published `version`. An
1640        // open failure here (older MAGIC after an upgrade, or corruption) falls
1641        // through to the purge+rebuild below rather than erroring.
1642        if let Some(chain) = discover_chain(cache_dir, store_key)
1643            && chain.version() == version
1644            && let Ok(set) = RowMetaSet::open(&chain)
1645        {
1646            return Ok(Some(set));
1647        }
1648
1649        // Holding the lock makes us the only builder, so every build temp is a
1650        // dead orphan from a crashed build - clear them before writing ours.
1651        Self::sweep_orphan_temps(cache_dir, store_key);
1652
1653        // Validate any existing chain opens; an unreadable segment (an older
1654        // MAGIC after a pond upgrade, or a corrupt file) is purged so the build
1655        // below is a clean full rebuild instead of erroring forever or appending
1656        // a fresh delta onto an unreadable base. The opened set also feeds the
1657        // delta its high-water mark and row count (cheap mmap reads).
1658        let chain = discover_chain(cache_dir, store_key);
1659        let existing = match &chain {
1660            Some(paths) => match RowMetaSet::open(paths) {
1661                Ok(set) => Some((paths, set)),
1662                Err(error) => {
1663                    tracing::warn!(%error, store = store_key, "rowmap unreadable; purging and rebuilding");
1664                    Self::purge_rowmaps(cache_dir, store_key);
1665                    None
1666                }
1667            },
1668            None => None,
1669        };
1670        // A row-id-keyed append delta (None on a reclaimed base or net deletion)
1671        // decides the path.
1672        let delta = match &existing {
1673            Some((_, set)) => {
1674                self.collect_row_metas_delta(
1675                    set.version(),
1676                    set.max_row_id().unwrap_or(0),
1677                    set.len(),
1678                )
1679                .await?
1680            }
1681            None => None,
1682        };
1683
1684        let base_version = match (&existing, delta) {
1685            // Append with room: layer a new delta segment.
1686            (Some((paths, _)), Some(entries)) if paths.deltas.len() < Self::MAX_ROWMAP_DELTAS => {
1687                let path = RowMetaMap::delta_path(cache_dir, store_key, version);
1688                RowMetaMap::build(&path, version, entries)?;
1689                paths.base_version
1690            }
1691            // Append but the deltas are full: compact the existing segments
1692            // (read locally from their mmaps) plus this delta into a fresh base -
1693            // no full store re-read.
1694            (Some((_, set)), Some(entries)) => {
1695                let mut merged = set.merged_entries();
1696                merged.extend(entries);
1697                let path = RowMetaMap::path_for(cache_dir, store_key, version);
1698                RowMetaMap::build(&path, version, merged)?;
1699                version
1700            }
1701            // No chain, or a reclaimed base / deletion since it: full scan -> base.
1702            _ => {
1703                let entries = self.collect_row_metas().await?;
1704                let path = RowMetaMap::path_for(cache_dir, store_key, version);
1705                RowMetaMap::build(&path, version, entries)?;
1706                version
1707            }
1708        };
1709
1710        let chain =
1711            discover_chain(cache_dir, store_key).context("rowmap chain missing after build")?;
1712        let set = RowMetaSet::open(&chain)?;
1713        Self::sweep_stale_rowmaps(cache_dir, store_key, base_version);
1714        Ok(Some(set))
1715    }
1716
1717    /// Remove this store's segment files (`-v{V}` bases, `-d{V}` deltas) for
1718    /// versions strictly older than `keep` (best-effort). A newer file belongs
1719    /// to a sibling that advanced past us; unlinking a superseded file is safe
1720    /// even if a sibling has it mapped - Unix keeps the inode alive until unmap.
1721    fn sweep_stale_rowmaps(cache_dir: &Path, store_key: &str, keep: u64) {
1722        let prefix = format!("rowmetamap-{store_key}-");
1723        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1724            return;
1725        };
1726        for entry in entries.flatten() {
1727            let name = entry.file_name();
1728            let Some(rest) = name
1729                .to_str()
1730                .and_then(|name| name.strip_prefix(&prefix))
1731                .and_then(|rest| rest.strip_suffix(".rmm"))
1732            else {
1733                continue;
1734            };
1735            let version = rest
1736                .strip_prefix('v')
1737                .or_else(|| rest.strip_prefix('d'))
1738                .and_then(|digits| digits.parse::<u64>().ok());
1739            if let Some(version) = version
1740                && version < keep
1741            {
1742                let _ = std::fs::remove_file(entry.path());
1743            }
1744        }
1745    }
1746
1747    /// Remove every segment file (`-v{V}` / `-d{V}`) for this store regardless of
1748    /// version - used when a discovered chain is unreadable (older MAGIC after an
1749    /// upgrade, or corruption) so the next build starts clean. Sound under the
1750    /// build lock; POSIX keeps any inode a sibling still has mapped alive.
1751    fn purge_rowmaps(cache_dir: &Path, store_key: &str) {
1752        let prefix = format!("rowmetamap-{store_key}-");
1753        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1754            return;
1755        };
1756        for entry in entries.flatten() {
1757            if let Some(name) = entry.file_name().to_str()
1758                && name.starts_with(&prefix)
1759                && name.ends_with(".rmm")
1760            {
1761                let _ = std::fs::remove_file(entry.path());
1762            }
1763        }
1764    }
1765
1766    /// Remove abandoned build temp files (`*.tmp-*`) for this store. Best-effort,
1767    /// and only sound under the build lock - the holder is the sole builder, so
1768    /// any temp present is a crashed-build orphan, not a live write.
1769    fn sweep_orphan_temps(cache_dir: &Path, store_key: &str) {
1770        let prefix = format!("rowmetamap-{store_key}-");
1771        let Ok(entries) = std::fs::read_dir(cache_dir) else {
1772            return;
1773        };
1774        for entry in entries.flatten() {
1775            let name = entry.file_name();
1776            let Some(name) = name.to_str() else { continue };
1777            if name.starts_with(&prefix) && name.contains(".tmp-") {
1778                let _ = std::fs::remove_file(entry.path());
1779            }
1780        }
1781    }
1782
1783    #[cfg(test)]
1784    pub(crate) fn rowmap_delta_count(&self) -> Option<usize> {
1785        self.rowmap.load_full().map(|set| set.delta_count())
1786    }
1787
1788    /// The currently-installed resident meta map, if any. `pond sync` reads it
1789    /// (via [`RowmapOracle`]) as the freshness oracle (max timestamp per
1790    /// session); `None` falls back to re-reading every source.
1791    pub fn rowmap_snapshot(&self) -> Option<Arc<RowMetaSet>> {
1792        self.rowmap.load_full()
1793    }
1794
1795    /// Resolve index-only `(row_id, score)` hits to keys via the map; row ids the
1796    /// map lacks (appended since build) fall back to one `take_rows` batch. The
1797    /// caller re-sorts, so the misses appended at the end carry no order meaning.
1798    async fn resolve_rowid_hits(
1799        &self,
1800        map: &RowMetaSet,
1801        hits: Vec<(u64, f32)>,
1802    ) -> Result<Vec<SearchHit>> {
1803        let mut resolved = Vec::with_capacity(hits.len());
1804        let mut misses: Vec<(u64, f32)> = Vec::new();
1805        for (rowid, score) in hits {
1806            match map.lookup(rowid) {
1807                Some((session_id, message_id)) => resolved.push(SearchHit {
1808                    rowid: Some(rowid),
1809                    key: MessageKey {
1810                        session_id: session_id.to_owned(),
1811                        message_id: message_id.to_owned(),
1812                    },
1813                    score,
1814                }),
1815                None => misses.push((rowid, score)),
1816            }
1817        }
1818        // A miss still knows its rowid; carry it so hydration can take_rows it
1819        // alongside the hits the map resolved.
1820        if !misses.is_empty() {
1821            let rowids: Vec<u64> = misses.iter().map(|(rowid, _)| *rowid).collect();
1822            let keys = self.message_keys_by_rowids(&rowids).await?;
1823            for ((rowid, score), key) in misses.into_iter().zip(keys) {
1824                resolved.push(SearchHit {
1825                    rowid: Some(rowid),
1826                    key,
1827                    score,
1828                });
1829            }
1830        }
1831        Ok(resolved)
1832    }
1833
1834    /// Resolve stable row ids to `(session_id, id)` via `take_rows`, which
1835    /// returns rows in `rowids` order - the caller's `zip` relies on that.
1836    async fn message_keys_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageKey>> {
1837        let dataset = self.handle.dataset(Table::Messages).await?;
1838        let projection = ProjectionRequest::from_columns(["session_id", "id"], dataset.schema());
1839        let batch = dataset.take_rows(rowids, projection).await?;
1840        let mut keys = Vec::with_capacity(batch.num_rows());
1841        for row in 0..batch.num_rows() {
1842            keys.push(MessageKey {
1843                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1844                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1845            });
1846        }
1847        Ok(keys)
1848    }
1849
1850    /// Write a `pond_sql_query` export artifact.
1851    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1852        self.handle.export_write(name, bytes).await
1853    }
1854
1855    /// Read a `pond_sql_query` export artifact back.
1856    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1857        self.handle.export_read(name).await
1858    }
1859
1860    /// Local filesystem path of an export artifact on `file://` installs.
1861    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1862        self.handle.export_local_path(name)
1863    }
1864
1865    /// Distinct adapter names present in the corpus, sorted. Scans only the
1866    /// `source_agent` column of the small `sessions` table, so `pond status`
1867    /// gets its adapter count without touching the 2M-row `messages` table.
1868    /// `include_subagents=false` drops `source_agent` values containing `/`
1869    /// (e.g. `claude-code/general-purpose`).
1870    pub async fn adapter_names(&self, include_subagents: bool) -> Result<Vec<String>> {
1871        let scanner = self
1872            .handle
1873            .scan(Table::Sessions, ScanOpts::project_only(&["source_agent"]))
1874            .await?;
1875        let mut stream = scanner.try_into_stream().await?;
1876        let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
1877        while let Some(batch) = stream.next().await {
1878            let batch = batch?;
1879            for row in 0..batch.num_rows() {
1880                let agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1881                if !include_subagents && agent.contains('/') {
1882                    continue;
1883                }
1884                names.insert(agent);
1885            }
1886        }
1887        Ok(names.into_iter().collect())
1888    }
1889
1890    /// Write a batch of embeddings into `messages`: set `vector` and
1891    /// `embedding_model` on each row by `(session_id, id)`
1892    /// (spec.md#session-embed-from-canonical). The column update goes through the
1893    /// write seam and lands as a new manifest version (`append-only`).
1894    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1895        if rows.is_empty() {
1896            return Ok(());
1897        }
1898        let batch = embedding_update_batch(rows)?;
1899        self.handle
1900            .merge_update(Table::Messages, batch, rows.len())
1901            .await?;
1902        Ok(())
1903    }
1904
1905    /// Stream the backlog of messages needing embedding: rows with `search_text`
1906    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1907    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1908        try_stream! {
1909            // Filter on `embedding_model IS NULL`, not `vector IS NULL`: the two
1910            // are co-set (write_embeddings sets both, spec.md#session-embed-from-canonical),
1911            // but evaluating the predicate over the narrow model-id column reads
1912            // ~50x fewer bytes than scanning the Float16 vector column - the
1913            // difference between a whole-table vector decode and a cheap scan.
1914            let filter = Predicate::And(vec![
1915                Predicate::IsNull("embedding_model"),
1916                Predicate::IsNotNull("search_text"),
1917            ]);
1918            let projection: &[&str] = &["session_id", "id", "search_text"];
1919            let scanner = self
1920                .handle
1921                .scan(
1922                    Table::Messages,
1923                    ScanOpts::with_predicate_and_projection(&filter, projection),
1924                )
1925                .await?;
1926            let mut batches = scanner
1927                .try_into_stream()
1928                .await
1929                .context("failed to open messages stream")?;
1930            while let Some(batch) = batches.next().await {
1931                let batch = batch?;
1932                for row in 0..batch.num_rows() {
1933                    yield PendingMessage {
1934                        session_id: string(&batch, "session_id", row)?
1935                            .context("session_id is null")?,
1936                        id: string(&batch, "id", row)?.context("message id is null")?,
1937                        search_text: string(&batch, "search_text", row)?
1938                            .context("search_text is null")?,
1939                    };
1940                }
1941            }
1942        }
1943    }
1944
1945    /// Stream messages that are either never embedded or stale under the
1946    /// current model. `pond optimize --force-embed` feeds this to the same unconditional
1947    /// merge_update as the normal backlog; the filter makes that semantically
1948    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1949    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1950        try_stream! {
1951            // `embedding_model IS NULL` (co-set with `vector IS NULL`, but a ~50x
1952            // narrower column read) for the never-embedded rows, OR a model
1953            // mismatch for the stale ones - both decided off the model-id column.
1954            let filter = Predicate::And(vec![
1955                Predicate::IsNotNull("search_text"),
1956                Predicate::Or(vec![
1957                    Predicate::IsNull("embedding_model"),
1958                    Predicate::Ne("embedding_model", embed::model_id().into()),
1959                ]),
1960            ]);
1961            let projection: &[&str] = &["session_id", "id", "search_text"];
1962            let scanner = self
1963                .handle
1964                .scan(
1965                    Table::Messages,
1966                    ScanOpts::with_predicate_and_projection(&filter, projection),
1967                )
1968                .await?;
1969            let mut batches = scanner
1970                .try_into_stream()
1971                .await
1972                .context("failed to open pending-or-stale messages stream")?;
1973            while let Some(batch) = batches.next().await {
1974                let batch = batch?;
1975                for row in 0..batch.num_rows() {
1976                    yield PendingMessage {
1977                        session_id: string(&batch, "session_id", row)?
1978                            .context("session_id is null")?,
1979                        id: string(&batch, "id", row)?.context("message id is null")?,
1980                        search_text: string(&batch, "search_text", row)?
1981                            .context("search_text is null")?,
1982                    };
1983                }
1984            }
1985        }
1986    }
1987
1988    /// BM25 full-text retriever over `messages.search_text`. With the row meta map
1989    /// loaded the scan is index-only (no data columns -> no `TakeExec`, no
1990    /// scattered GETs) and hits resolve through the map; otherwise it falls back
1991    /// to `fts_search_keys` so search works before prewarm.
1992    pub async fn fts_search(
1993        &self,
1994        query: &str,
1995        limit: usize,
1996        filter: &Predicate,
1997    ) -> Result<Vec<SearchHit>> {
1998        let mut hits = if let Some(map) = self.rowmap.load_full() {
1999            let rowid_hits = self.fts_search_rowids(query, limit, filter).await?;
2000            self.resolve_rowid_hits(&map, rowid_hits).await?
2001        } else {
2002            self.fts_search_keys(query, limit, filter).await?
2003        };
2004        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
2005        // order, which varies between runs and across calls with different pool
2006        // sizes. Without an explicit tiebreak the downstream session grouping and
2007        // rank for a tied target can flip session-to-session, making results
2008        // nondeterministic. Sort by `score desc`, then `(session_id, message_id)` asc.
2009        hits.sort_by(|left, right| {
2010            right
2011                .score
2012                .partial_cmp(&left.score)
2013                .unwrap_or(std::cmp::Ordering::Equal)
2014                .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2015                .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2016        });
2017        Ok(hits)
2018    }
2019
2020    /// Shared FTS-scan setup: scope filter, the `search_text` full-text query,
2021    /// `fast_search` when indexed, and `limit`. Callers set only their projection.
2022    async fn fts_scanner(
2023        &self,
2024        query: &str,
2025        limit: usize,
2026        filter: &Predicate,
2027    ) -> Result<lance::dataset::scanner::Scanner> {
2028        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2029        scanner.full_text_search(
2030            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2031        )?;
2032        if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2033            scanner.fast_search();
2034        }
2035        // Lance ships an autoprojection that silently appends `_score` to FTS
2036        // output when the projection omits it. That behavior is going away;
2037        // we opt into the future explicit-projection contract here so the
2038        // scanner stops emitting a per-call deprecation warning, and each caller
2039        // lists `_score` in its own projection.
2040        scanner.disable_scoring_autoprojection();
2041        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2042        Ok(scanner)
2043    }
2044
2045    /// No-map FTS fallback: project the key columns plus `_score` directly,
2046    /// taking the `TakeExec` cost. Unsorted; `fts_search` applies the sort.
2047    async fn fts_search_keys(
2048        &self,
2049        query: &str,
2050        limit: usize,
2051        filter: &Predicate,
2052    ) -> Result<Vec<SearchHit>> {
2053        let mut scanner = self.fts_scanner(query, limit, filter).await?;
2054        scanner.project(&["session_id", "id", "_score"])?;
2055        let batch = scanner.try_into_batch().await?;
2056        let mut hits = Vec::with_capacity(batch.num_rows());
2057        for row in 0..batch.num_rows() {
2058            let key = MessageKey {
2059                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2060                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
2061            };
2062            hits.push(SearchHit {
2063                rowid: None,
2064                key,
2065                score: float32(&batch, "_score", row)?,
2066            });
2067        }
2068        Ok(hits)
2069    }
2070
2071    /// Current `messages` dataset version - the key a `RowMetaMap` is built
2072    /// against (pond's stable row ids keep a built map valid until this advances).
2073    pub async fn messages_version(&self) -> Result<u64> {
2074        Ok(self
2075            .handle
2076            .dataset(Table::Messages)
2077            .await?
2078            .version()
2079            .version)
2080    }
2081
2082    /// Scan the hydration columns with row ids into a `Vec`, the input to
2083    /// `RowMetaMap::build`. One large sequential scan (few big reads), unlike the
2084    /// scattered per-hit take it replaces; `search_text` dominates the bytes.
2085    pub async fn collect_row_metas(&self) -> Result<Vec<RowMetaEntry>> {
2086        let mut scanner = self.handle.scanner(Table::Messages, None).await?;
2087        scanner.with_row_id();
2088        scanner.project(&Self::ROW_META_COLUMNS)?;
2089        let mut stream = scanner.try_into_stream().await?;
2090        let mut out = Vec::new();
2091        while let Some(batch) = stream.next().await {
2092            let batch = batch?;
2093            let rowids = uint64(&batch, "_rowid")?;
2094            for row in 0..batch.num_rows() {
2095                out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2096            }
2097        }
2098        Ok(out)
2099    }
2100
2101    /// Row metas for the rows appended since the base segment - the input to a
2102    /// delta layered on a base whose high-water mark is `base_max_row_id` and
2103    /// which covers `base_row_count` rows. `None` (caller rebuilds the base from
2104    /// a full scan) when the chain can't be cheaply extended:
2105    /// - `base_version`'s manifest was reclaimed by the cleanup retention window
2106    ///   (spec.md#concurrency), so the version no longer resolves; or
2107    /// - the live row count dropped below the base: rows were deleted, and a
2108    ///   pure append can't remove the base's now-stale entries.
2109    ///
2110    /// Stable row ids (`enable_stable_row_ids`) make this an append: embedding's
2111    /// `merge_update` and compaction rewrite message fragments but preserve
2112    /// row_ids and never touch a ROW_META column, so existing base entries stay
2113    /// valid under that churn. Only genuine appends carry `row_id >
2114    /// base_max_row_id`; emitting just those keeps the delta disjoint from the
2115    /// base, which the per-segment count sums depend on.
2116    async fn collect_row_metas_delta(
2117        &self,
2118        base_version: u64,
2119        base_max_row_id: u64,
2120        base_row_count: usize,
2121    ) -> Result<Option<Vec<RowMetaEntry>>> {
2122        let dataset = self.handle.dataset(Table::Messages).await?;
2123        let Ok(old) = dataset.checkout_version(base_version).await else {
2124            return Ok(None);
2125        };
2126        if dataset.count_rows(None).await? < base_row_count {
2127            return Ok(None);
2128        }
2129        // Restrict the scan to fragments added since the base (recent churn -
2130        // not the untouched bulk). Rewritten/compacted fragments carry only
2131        // existing row_ids (<= base_max_row_id) and are filtered out row-wise;
2132        // genuine appends carry higher ids and are kept.
2133        let old_ids: HashSet<u64> = old.get_fragments().iter().map(|f| f.id() as u64).collect();
2134        let added: Vec<_> = dataset
2135            .get_fragments()
2136            .iter()
2137            .filter(|fragment| !old_ids.contains(&(fragment.id() as u64)))
2138            .map(|fragment| fragment.metadata().clone())
2139            .collect();
2140        if added.is_empty() {
2141            return Ok(Some(Vec::new()));
2142        }
2143        let mut scanner = dataset.scan();
2144        scanner.with_fragments(added);
2145        scanner.with_row_id();
2146        scanner.project(&Self::ROW_META_COLUMNS)?;
2147        let mut stream = scanner.try_into_stream().await?;
2148        let mut out = Vec::new();
2149        while let Some(batch) = stream.next().await {
2150            let batch = batch?;
2151            let rowids = uint64(&batch, "_rowid")?;
2152            for row in 0..batch.num_rows() {
2153                let row_id = rowids.value(row);
2154                if row_id > base_max_row_id {
2155                    out.push(row_meta_entry(&batch, row_id, row)?);
2156                }
2157            }
2158        }
2159        Ok(Some(out))
2160    }
2161
2162    /// Index-only FTS retriever: `_rowid` + `_score` only, so Lance inserts no
2163    /// `TakeExec` and issues no scattered GETs. `fts_search` resolves the row ids.
2164    async fn fts_search_rowids(
2165        &self,
2166        query: &str,
2167        limit: usize,
2168        filter: &Predicate,
2169    ) -> Result<Vec<(u64, f32)>> {
2170        let mut scanner = self.fts_scanner(query, limit, filter).await?;
2171        scanner.with_row_id();
2172        scanner.project(&["_score"])?;
2173        let batch = scanner.try_into_batch().await?;
2174        let rowids = uint64(&batch, "_rowid")?;
2175        let mut hits = Vec::with_capacity(batch.num_rows());
2176        for row in 0..batch.num_rows() {
2177            hits.push((rowids.value(row), float32(&batch, "_score", row)?));
2178        }
2179        Ok(hits)
2180    }
2181
2182    /// Count of searchable messages (non-null `search_text`) inside the
2183    /// caller's filter scope - the universe a search actually ran over.
2184    /// Powers the response's absence honesty (spec.md#search): "no relevant
2185    /// hits" only means something relative to how many messages were
2186    /// searchable at all, and 0 tells the caller their filters excluded
2187    /// everything before retrieval even started.
2188    pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
2189        // Unfiltered: the FTS index already counts non-null search_text rows
2190        // (`num_docs`), and fast_search only searches those indexed docs - so
2191        // num_docs is exactly the universe a search ran over. Reading it avoids
2192        // the ~133 MB `IsNotNull(search_text)` column scan Lance pays per query
2193        // (no per-column null metadata). Filtered scopes fall back to the scan.
2194        if matches!(filter, Predicate::And(clauses) if clauses.is_empty())
2195            && let Some(count) = self.fts_num_docs().await?
2196        {
2197            return Ok(count);
2198        }
2199        let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
2200        let dataset = self.handle.dataset(Table::Messages).await?;
2201        let count = dataset.count_rows(Some(scope.to_lance())).await?;
2202        Ok(count)
2203    }
2204
2205    /// Non-null `search_text` count read from the FTS index's `num_docs`
2206    /// statistic (summed across delta segments). `None` when the FTS index is
2207    /// absent (empty store) so the caller falls back to the count scan.
2208    async fn fts_num_docs(&self) -> Result<Option<usize>> {
2209        if !self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2210            return Ok(None);
2211        }
2212        let dataset = self.handle.dataset(Table::Messages).await?;
2213        let json = dataset.index_statistics(MESSAGES_FTS_INDEX).await?;
2214        let parsed: Value =
2215            serde_json::from_str(&json).context("failed to parse FTS index_statistics")?;
2216        let total: u64 = parsed["indices"]
2217            .as_array()
2218            .map(|segments| {
2219                segments
2220                    .iter()
2221                    .filter_map(|segment| segment["num_docs"].as_u64())
2222                    .sum()
2223            })
2224            .unwrap_or(0);
2225        Ok(Some(usize::try_from(total).unwrap_or(usize::MAX)))
2226    }
2227
2228    /// Whether any `messages` row carries a vector (spec.md#search) - the
2229    /// signal that lets the `vector` arm run instead of degrading to `fts`. The single-active-
2230    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
2231    /// vector belongs to the current model.
2232    pub async fn has_embeddings(&self) -> Result<bool> {
2233        let scope = Predicate::IsNotNull("vector");
2234        let mut scanner = self
2235            .handle
2236            .scan(
2237                Table::Messages,
2238                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
2239            )
2240            .await?;
2241        scanner.limit(Some(1), None)?;
2242        let batch = scanner.try_into_batch().await?;
2243        Ok(batch.num_rows() > 0)
2244    }
2245
2246    /// One embedded row's model id, or `None` when nothing is embedded yet. A
2247    /// `LIMIT 1` point read: the single-active-model invariant (see
2248    /// `has_embeddings`) means any embedded row's model is representative, so a
2249    /// model swap is detectable by comparing this to the configured model -
2250    /// without the full-column `stale_embedding_count` scan that ran every sync.
2251    pub async fn sample_embedded_model(&self) -> Result<Option<String>> {
2252        let scope = Predicate::IsNotNull("embedding_model");
2253        let mut scanner = self
2254            .handle
2255            .scan(
2256                Table::Messages,
2257                ScanOpts::with_predicate_and_projection(&scope, &["embedding_model"]),
2258            )
2259            .await?;
2260        scanner.limit(Some(1), None)?;
2261        let batch = scanner.try_into_batch().await?;
2262        if batch.num_rows() == 0 {
2263            return Ok(None);
2264        }
2265        string(&batch, "embedding_model", 0)
2266    }
2267
2268    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
2269    /// scalar predicate alone (spec.md#search-prefilter-pushdown) - see
2270    /// `embedded_scope` for why pond does NOT add `vector IS NOT NULL`. nprobes
2271    /// falls back to [`DEFAULT_NPROBES`] when `[search]` leaves it unset, so a
2272    /// default install never inherits Lance's unbounded "probe every partition"
2273    /// behavior on a remote store. No refine (see `apply_vector_search_knobs`).
2274    /// Index-only + map resolve when loaded, else key projection - see `fts_search`.
2275    pub async fn vector_search(
2276        &self,
2277        query: &[f32],
2278        limit: usize,
2279        filter: &Predicate,
2280        search: Option<&config::SearchConfig>,
2281    ) -> Result<Vec<SearchHit>> {
2282        let mut hits = if let Some(map) = self.rowmap.load_full() {
2283            let rowid_hits = self
2284                .vector_search_rowids(query, limit, filter, search)
2285                .await?;
2286            self.resolve_rowid_hits(&map, rowid_hits).await?
2287        } else {
2288            self.vector_search_keys(query, limit, filter, search)
2289                .await?
2290        };
2291        // Stable secondary sort: same reasoning as `fts_search` - IVF_SQ can
2292        // emit hits with effectively identical `_distance` in fragment-dependent
2293        // order, which makes RRF dedup-ranks nondeterministic for tied
2294        // neighbors. Sort by distance asc (smaller = more similar), then by
2295        // `(session_id, message_id)` asc.
2296        hits.sort_by(|left, right| {
2297            left.score
2298                .partial_cmp(&right.score)
2299                .unwrap_or(std::cmp::Ordering::Equal)
2300                .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2301                .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2302        });
2303        Ok(hits)
2304    }
2305
2306    /// Shared vector-scan setup: scope, `nearest`, knobs, `fast_search`.
2307    async fn vector_scanner(
2308        &self,
2309        query: &[f32],
2310        limit: usize,
2311        filter: &Predicate,
2312        search: Option<&config::SearchConfig>,
2313    ) -> Result<lance::dataset::scanner::Scanner> {
2314        let scope = embedded_scope(filter);
2315        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2316        let key = Float32Array::from(query.to_vec());
2317        scanner.nearest("vector", &key, limit)?;
2318        apply_vector_search_knobs(&mut scanner, search);
2319        if self
2320            .handle
2321            .messages_has_index(MESSAGES_VECTOR_INDEX)
2322            .await?
2323        {
2324            scanner.fast_search();
2325        }
2326        scanner.disable_scoring_autoprojection();
2327        Ok(scanner)
2328    }
2329
2330    /// Index-only vector retriever: `_rowid` + `_distance` only, so no `TakeExec`.
2331    /// `vector_search` resolves the row ids. Mirrors `fts_search_rowids`.
2332    async fn vector_search_rowids(
2333        &self,
2334        query: &[f32],
2335        limit: usize,
2336        filter: &Predicate,
2337        search: Option<&config::SearchConfig>,
2338    ) -> Result<Vec<(u64, f32)>> {
2339        let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2340        scanner.with_row_id();
2341        scanner.project(&["_distance"])?;
2342        let batch = scanner.try_into_batch().await?;
2343        let rowids = uint64(&batch, "_rowid")?;
2344        let mut hits = Vec::with_capacity(batch.num_rows());
2345        for row in 0..batch.num_rows() {
2346            hits.push((rowids.value(row), float32(&batch, "_distance", row)?));
2347        }
2348        Ok(hits)
2349    }
2350
2351    /// No-map vector fallback: project the key columns plus `_distance` directly.
2352    /// Unsorted; `vector_search` sorts. Mirrors `fts_search_keys`.
2353    async fn vector_search_keys(
2354        &self,
2355        query: &[f32],
2356        limit: usize,
2357        filter: &Predicate,
2358        search: Option<&config::SearchConfig>,
2359    ) -> Result<Vec<SearchHit>> {
2360        let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2361        scanner.project(&["session_id", "id", "_distance"])?;
2362        let batch = scanner.try_into_batch().await?;
2363        let mut hits = Vec::with_capacity(batch.num_rows());
2364        for row in 0..batch.num_rows() {
2365            let key = MessageKey {
2366                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2367                message_id: string(&batch, "id", row)?.context("message id is null")?,
2368            };
2369            hits.push(SearchHit {
2370                rowid: None,
2371                key,
2372                score: float32(&batch, "_distance", row)?,
2373            });
2374        }
2375        Ok(hits)
2376    }
2377
2378    /// The DataFusion plan string for a filtered vector scan - the
2379    /// `search-prefilter-pushdown` regression guard reads it.
2380    pub async fn explain_vector_plan(
2381        &self,
2382        query: &[f32],
2383        limit: usize,
2384        filter: &Predicate,
2385        search: Option<&config::SearchConfig>,
2386    ) -> Result<String> {
2387        let scope = embedded_scope(filter);
2388        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2389        let key = Float32Array::from(query.to_vec());
2390        scanner.nearest("vector", &key, limit)?;
2391        apply_vector_search_knobs(&mut scanner, search);
2392        if self
2393            .handle
2394            .messages_has_index(MESSAGES_VECTOR_INDEX)
2395            .await?
2396        {
2397            scanner.fast_search();
2398        }
2399        scanner
2400            .explain_plan(true)
2401            .await
2402            .context("explain_plan failed")
2403    }
2404
2405    pub async fn explain_fts_plan(
2406        &self,
2407        query: &str,
2408        limit: usize,
2409        filter: &Predicate,
2410    ) -> Result<String> {
2411        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2412        scanner.full_text_search(
2413            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2414        )?;
2415        if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2416            scanner.fast_search();
2417        }
2418        scanner.project(&["session_id", "id"])?;
2419        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2420        scanner
2421            .explain_plan(true)
2422            .await
2423            .context("explain_plan failed")
2424    }
2425
2426    /// Hydrate search hits by stable row id (spec.md#search). Resolves each
2427    /// rowid from the resident meta map in memory (no object-store round-trip -
2428    /// Lance caches index/metadata but never data column values, so a `take_rows`
2429    /// re-reads `search_text` from storage every query). Rowids the map lacks
2430    /// (appended since it was built, or no map loaded) fall back to a single
2431    /// `take_rows` batch. The caller indexes the result by key, so order is
2432    /// irrelevant.
2433    pub async fn message_metas_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2434        if rowids.is_empty() {
2435            return Ok(Vec::new());
2436        }
2437        let mut metas = Vec::with_capacity(rowids.len());
2438        let misses: Vec<u64> = if let Some(map) = self.rowmap.load_full() {
2439            let (hits, misses) = map.hydrate(rowids);
2440            metas.extend(hits.into_iter().map(|entry| MessageMeta {
2441                message_id: entry.message_id,
2442                session_id: entry.session_id,
2443                role: entry.role,
2444                project: entry.project,
2445                source_agent: entry.source_agent,
2446                timestamp:
2447                    DateTime::from_timestamp_micros(entry.timestamp_micros).unwrap_or_default(),
2448                search_text: entry.search_text,
2449            }));
2450            misses
2451        } else {
2452            rowids.to_vec()
2453        };
2454        if !misses.is_empty() {
2455            metas.extend(self.message_metas_by_rowids_take(&misses).await?);
2456        }
2457        Ok(metas)
2458    }
2459
2460    /// `take_rows` hydration of exactly `rowids` - the cache-miss fallback for
2461    /// rows the resident meta map lacks. `take_rows` coalesces the reads per
2462    /// fragment (Lance's own batching), so a scattered take is few requests, not
2463    /// one per row.
2464    async fn message_metas_by_rowids_take(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2465        let dataset = self.handle.dataset(Table::Messages).await?;
2466        let projection = ProjectionRequest::from_columns(
2467            [
2468                "id",
2469                "session_id",
2470                "role",
2471                "project",
2472                "source_agent",
2473                "timestamp",
2474                "search_text",
2475            ],
2476            dataset.schema(),
2477        );
2478        let batch = dataset.take_rows(rowids, projection).await?;
2479        let mut metas = Vec::with_capacity(batch.num_rows());
2480        for row in 0..batch.num_rows() {
2481            metas.push(message_meta_from_batch(&batch, row)?);
2482        }
2483        Ok(metas)
2484    }
2485
2486    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
2487    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
2488        if keys.is_empty() {
2489            return Ok(Vec::new());
2490        }
2491        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
2492        let session_ids = keys
2493            .iter()
2494            .map(|key| key.session_id.clone())
2495            .collect::<Vec<_>>();
2496        let message_ids = keys
2497            .iter()
2498            .map(|key| key.message_id.clone())
2499            .collect::<Vec<_>>();
2500        let predicate = Predicate::And(vec![
2501            in_predicate("session_id", &session_ids),
2502            in_predicate("id", &message_ids),
2503        ]);
2504        let batch = self
2505            .handle
2506            .scan_batch(
2507                Table::Messages,
2508                Some(&predicate),
2509                &[
2510                    "id",
2511                    "session_id",
2512                    "role",
2513                    "project",
2514                    "source_agent",
2515                    "timestamp",
2516                    "search_text",
2517                ],
2518            )
2519            .await?;
2520        let mut metas = Vec::with_capacity(batch.num_rows());
2521        for row in 0..batch.num_rows() {
2522            // The IN x IN predicate is a cross-product, so the scan can return
2523            // pairs that were never asked for; keep only the wanted keys.
2524            let meta = message_meta_from_batch(&batch, row)?;
2525            if wanted.contains(&MessageKey {
2526                session_id: meta.session_id.clone(),
2527                message_id: meta.message_id.clone(),
2528            }) {
2529                metas.push(meta);
2530            }
2531        }
2532        Ok(metas)
2533    }
2534
2535    /// Total message count per session, for search session summaries. One
2536    /// `session_id IN (...)` scan projecting only `session_id`, aggregated in
2537    /// pond, instead of `N` concurrent `count_rows(session_id = X)` round-trips
2538    /// against `messages_session_id_btree`. Same wire shape for any backend,
2539    /// but one S3 operation instead of `N` on remote stores. Sessions with
2540    /// zero matching messages are present in the map with count `0` so the
2541    /// caller can distinguish "filter excluded everything" from "session
2542    /// missing from the response."
2543    pub async fn session_message_counts(
2544        &self,
2545        session_ids: &[String],
2546    ) -> Result<BTreeMap<String, usize>> {
2547        if session_ids.is_empty() {
2548            return Ok(BTreeMap::new());
2549        }
2550        // A version-matched resident map covers every current row, so its
2551        // per-session counts are authoritative (a session absent from it has 0
2552        // messages) - serve them with no scan. The version gate is load-bearing:
2553        // unlike meta hydration, a count cannot detect staleness by a row-id
2554        // miss, so a map that predates appended rows would undercount. A stale
2555        // or absent map falls through to the IN-scan.
2556        if let Some(map) = self.rowmap.load_full()
2557            && map.version() == self.messages_version().await?
2558        {
2559            return Ok(session_ids
2560                .iter()
2561                .map(|id| (id.clone(), map.lookup_count(id).unwrap_or(0)))
2562                .collect());
2563        }
2564        let predicate = in_predicate("session_id", session_ids);
2565        let scanner = self
2566            .handle
2567            .scan(
2568                Table::Messages,
2569                ScanOpts::with_predicate_and_projection(&predicate, &["session_id"]),
2570            )
2571            .await?;
2572        let mut stream = scanner
2573            .try_into_stream()
2574            .await
2575            .context("failed to open session_message_counts stream")?;
2576        let mut counts: BTreeMap<String, usize> =
2577            session_ids.iter().map(|id| (id.clone(), 0)).collect();
2578        while let Some(batch) = stream.next().await {
2579            let batch = batch.context("failed to read session_message_counts batch")?;
2580            let column = batch
2581                .column_by_name("session_id")
2582                .context("session_message_counts: session_id column missing")?
2583                .as_any()
2584                .downcast_ref::<StringArray>()
2585                .context("session_message_counts: session_id column is not Utf8")?;
2586            for value in column.iter().flatten() {
2587                if let Some(entry) = counts.get_mut(value) {
2588                    *entry += 1;
2589                }
2590            }
2591        }
2592        Ok(counts)
2593    }
2594
2595    /// Rows appended to `messages` since the FTS index was last optimized.
2596    /// A missing index reports the whole table; the query is manifest-only.
2597    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
2598        self.handle
2599            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
2600            .await
2601    }
2602
2603    /// Rows added or rewritten in `messages` since the IVF_SQ vector index
2604    /// was last optimized. Below
2605    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
2606    /// must read [`embedding_progress`](Self::embedding_progress) too and
2607    /// distinguish "index not built yet" from "index trails data".
2608    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
2609        self.handle
2610            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
2611            .await
2612    }
2613
2614    /// Embedding coverage: how many `messages` rows carry a vector and how
2615    /// many are still eligible. Drives the `pond status` embeddings line and
2616    /// the `pond optimize` progress bar's known total.
2617    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
2618        let dataset = self.handle.dataset(Table::Messages).await?;
2619        // `embedded` counts `embedding_model IS NOT NULL`, not `vector`: the two
2620        // are co-set (spec.md#session-embed-from-canonical) so the count is
2621        // identical, but the model-id string column is ~50x narrower than the
2622        // Float16 vector (Lance 7.0.0 has no per-column null_count, so this is a
2623        // data-page read).
2624        let embedded = dataset
2625            .count_rows(Some(Predicate::IsNotNull("embedding_model").to_lance()))
2626            .await?;
2627        // `backlog` and `total` come from live, deletion-aware counts, not the
2628        // FTS `num_docs`: num_docs counts indexed docs incl. deleted-but-unpurged
2629        // ones, so `num_docs - embedded` reports a phantom backlog that survives
2630        // every embed. `embedded` (model present) + `backlog` (model absent,
2631        // search_text present) is exactly the live eligible set, since embedding
2632        // a row requires its search_text.
2633        let backlog = self.embed_backlog_count().await?;
2634        Ok(EmbeddingProgress {
2635            embedded,
2636            total: embedded + backlog,
2637            backlog,
2638            model: embed::model_id(),
2639        })
2640    }
2641
2642    /// Messages eligible but not yet embedded (`search_text` present,
2643    /// `embedding_model` null) - the exact set [`crate::embed::EmbedWorker`]
2644    /// processes. Read straight from the dataset so it is correct right after
2645    /// ingest, unlike the FTS `num_docs` `embedding_progress` shows (which lags
2646    /// until the index is rebuilt - the embed stage runs before that).
2647    pub async fn embed_backlog_count(&self) -> Result<usize> {
2648        let dataset = self.handle.dataset(Table::Messages).await?;
2649        let filter = Predicate::And(vec![
2650            Predicate::IsNull("embedding_model"),
2651            Predicate::IsNotNull("search_text"),
2652        ]);
2653        Ok(dataset.count_rows(Some(filter.to_lance())).await?)
2654    }
2655
2656    /// Count rows whose `embedding_model` is not the currently configured
2657    /// model AND whose `vector` is still populated - the signal `pond optimize`
2658    /// uses to detect a model swap and require `--force-embed`.
2659    pub async fn stale_embedding_count(&self) -> Result<usize> {
2660        let dataset = self.handle.dataset(Table::Messages).await?;
2661        // Same shape as the original (IsNotNull AND Ne), but the null check is on
2662        // the narrow model-id column, not the ~50x-wider Float16 vector: the two
2663        // are co-set (spec.md#session-embed-from-canonical), so `embedding_model
2664        // IS NOT NULL` equals `vector IS NOT NULL`, and the model-id page read is
2665        // far cheaper than the vector's.
2666        dataset
2667            .count_rows(Some(
2668                Predicate::And(vec![
2669                    Predicate::IsNotNull("embedding_model"),
2670                    Predicate::Ne("embedding_model", embed::model_id().into()),
2671                ])
2672                .to_lance(),
2673            ))
2674            .await
2675            .map_err(Into::into)
2676    }
2677
2678    /// Run the per-table maintenance cycle (compact + indices) across every
2679    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
2680    /// and compaction commit independently, so a hot writer that starves
2681    /// compaction on one table does not abort the index work the operator
2682    /// asked for on other tables (or even on the same table).
2683    pub async fn optimize_indices(
2684        &self,
2685        progress: Option<OptimizeProgressFn>,
2686        maintenance: &MaintenancePolicy,
2687    ) -> Result<OptimizeOutcome> {
2688        let intents = pond_index_intents();
2689        let mut tables = Vec::with_capacity(3);
2690        for (table, intents) in intents.all() {
2691            let outcome = self
2692                .handle
2693                .optimize_table(table, intents, progress.as_ref(), maintenance)
2694                .await;
2695            tables.push(outcome);
2696        }
2697        Ok(OptimizeOutcome { tables })
2698    }
2699
2700    /// Fold trailing fragments into existing indices across every table,
2701    /// without running compaction. Used by `pond optimize`'s tail so newly
2702    /// written vectors land in the FTS / IVF_SQ / btree / bitmap indices
2703    /// without paying the compaction retry budget while embed itself may
2704    /// still be writing in a sibling process.
2705    pub async fn build_indices_only(
2706        &self,
2707        progress: Option<OptimizeProgressFn>,
2708    ) -> Result<OptimizeOutcome> {
2709        let policy = pond_index_intents();
2710        let mut tables = Vec::with_capacity(3);
2711        for (table, intents) in policy.all() {
2712            let indices = self
2713                .handle
2714                .optimize_table_indices_only(table, intents, progress.as_ref())
2715                .await;
2716            tables.push(TableOptimizeOutcome {
2717                table,
2718                indices,
2719                compaction: PhaseOutcome::NotAttempted,
2720            });
2721        }
2722        Ok(OptimizeOutcome { tables })
2723    }
2724
2725    #[cfg(test)]
2726    async fn optimize_indices_with_vector_threshold(
2727        &self,
2728        vector_threshold: usize,
2729    ) -> Result<OptimizeOutcome> {
2730        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
2731        let policy = MaintenancePolicy::always_compact();
2732        let mut tables = Vec::with_capacity(3);
2733        for (table, intents) in intents.all() {
2734            let outcome = self
2735                .handle
2736                .optimize_table(table, intents, None, &policy)
2737                .await;
2738            tables.push(outcome);
2739        }
2740        Ok(OptimizeOutcome { tables })
2741    }
2742
2743    /// Reclaim superseded data/index files across every indexed table (Lance
2744    /// `cleanup_old_versions`), without compaction. `pond optimize --rebuild`
2745    /// runs this after the rebuild so the index segments it just replaced are
2746    /// dropped immediately. The retention floor still protects versions a live
2747    /// reader may have pinned (spec.md#concurrency).
2748    pub async fn cleanup_old_versions(&self, older_than: chrono::Duration) -> Result<()> {
2749        for (table, _) in pond_index_intents().all() {
2750            self.handle
2751                .cleanup_table_versions(table, older_than)
2752                .await?;
2753        }
2754        Ok(())
2755    }
2756
2757    pub async fn rebuild_indices(
2758        &self,
2759        intent_name: Option<&str>,
2760        progress: Option<OptimizeProgressFn>,
2761    ) -> Result<()> {
2762        let policy = pond_index_intents();
2763        let mut matched = false;
2764        for (table, intents) in policy.all() {
2765            for intent in intents {
2766                if intent_name.is_none_or(|name| name == intent.name) {
2767                    matched = true;
2768                    self.handle
2769                        .rebuild_index(table, intent, progress.as_ref())
2770                        .await?;
2771                }
2772            }
2773        }
2774        if let Some(name) = intent_name
2775            && !matched
2776        {
2777            anyhow::bail!("unknown index intent {name:?}");
2778        }
2779        Ok(())
2780    }
2781
2782    /// Drop a named index from whichever table owns it. Used by `pond optimize
2783    /// --drop-index <name>` to clean up orphaned indices (e.g. after renaming
2784    /// an intent whose on-disk name no longer matches the policy). Finds the
2785    /// owning table via parallel `load_indices` lookups, then drops on just
2786    /// that table - so real I/O errors surface with the right context instead
2787    /// of being hidden behind "no such index" from the wrong table.
2788    pub async fn drop_index_by_name(&self, name: &str) -> Result<()> {
2789        let Some(owner) = self.handle.find_index_owner(name).await? else {
2790            anyhow::bail!("no index named {name:?} found on any table");
2791        };
2792        self.handle.drop_index(owner, name).await
2793    }
2794
2795    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
2796        let policy = pond_index_intents();
2797        let mut statuses = Vec::new();
2798        for (table, intents) in policy.all() {
2799            statuses.extend(self.handle.index_status(table, intents).await?);
2800        }
2801        Ok(statuses)
2802    }
2803
2804    /// Drop the IVF_SQ index on `messages.vector`. Used by `pond optimize
2805    /// --force-embed` before re-bootstrapping under a different model. Silent
2806    /// when the index does not exist.
2807    pub async fn drop_vector_index(&self) -> Result<()> {
2808        match self
2809            .handle
2810            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
2811            .await
2812        {
2813            Ok(()) => Ok(()),
2814            Err(error) => {
2815                let msg = error.to_string();
2816                if msg.contains("not found") || msg.contains("does not exist") {
2817                    Ok(())
2818                } else {
2819                    Err(error)
2820                }
2821            }
2822        }
2823    }
2824
2825    /// On-disk byte totals per dataset, sized through Lance's object store
2826    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
2827    pub async fn table_sizes(&self) -> Result<TableSizes> {
2828        self.handle.table_sizes().await
2829    }
2830
2831    pub async fn initialized(&self) -> Result<bool> {
2832        self.handle.initialized().await
2833    }
2834
2835    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
2836        let batch = self
2837            .handle
2838            .scan_batch(
2839                Table::Sessions,
2840                Some(&Predicate::Eq("id", session_id.into())),
2841                &[],
2842            )
2843            .await?;
2844        if batch.num_rows() == 0 {
2845            Ok(None)
2846        } else {
2847            Ok(Some(session_from_batch(&batch, 0)?))
2848        }
2849    }
2850
2851    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
2852        let batch = self
2853            .handle
2854            .scan_batch(
2855                Table::Messages,
2856                Some(&Predicate::Eq("session_id", session_id.into())),
2857                &[
2858                    "session_id",
2859                    "id",
2860                    "timestamp",
2861                    "role",
2862                    "content",
2863                    "options",
2864                ],
2865            )
2866            .await?;
2867        let mut messages = Vec::with_capacity(batch.num_rows());
2868        for row in 0..batch.num_rows() {
2869            messages.push(message_from_batch(&batch, row)?);
2870        }
2871        messages.sort_by(|left, right| {
2872            left.timestamp()
2873                .cmp(&right.timestamp())
2874                .then_with(|| left.id().cmp(right.id()))
2875        });
2876
2877        let message_ids = messages
2878            .iter()
2879            .map(|message| message.id().to_owned())
2880            .collect::<Vec<_>>();
2881        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
2882
2883        Ok(messages
2884            .into_iter()
2885            .map(|message| {
2886                let key = (message.session_id().to_owned(), message.id().to_owned());
2887                let parts = parts_by_message.remove(&key).unwrap_or_default();
2888                MessageWithParts { message, parts }
2889            })
2890            .collect())
2891    }
2892
2893    /// Every part of these messages, full fidelity (file blobs included). The
2894    /// canonical read primitive - restore/export, verbatim mode, and the
2895    /// message-mode target all need the complete set.
2896    pub async fn parts_for_messages(
2897        &self,
2898        session_id: &str,
2899        message_ids: &[String],
2900    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2901        self.scan_parts(session_id, message_ids, None).await
2902    }
2903
2904    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
2905    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
2906    /// nothing. For the summary-only reads (conversational/complete session
2907    /// views, search hits) - it never feeds restore/export.
2908    pub async fn summary_parts_for_messages(
2909        &self,
2910        session_id: &str,
2911        message_ids: &[String],
2912    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2913        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
2914            .await
2915    }
2916
2917    async fn scan_parts(
2918        &self,
2919        session_id: &str,
2920        message_ids: &[String],
2921        part_types: Option<&[&str]>,
2922    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2923        if message_ids.is_empty() {
2924            return Ok(BTreeMap::new());
2925        }
2926        let mut clauses = vec![
2927            Predicate::Eq("session_id", session_id.into()),
2928            in_predicate("message_id", message_ids),
2929        ];
2930        if let Some(types) = part_types {
2931            clauses.push(Predicate::In(
2932                "type",
2933                types.iter().map(|&t| t.into()).collect(),
2934            ));
2935        }
2936        let predicate = Predicate::And(clauses);
2937        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
2938        let mut scanner = self
2939            .handle
2940            .scan(
2941                Table::Parts,
2942                ScanOpts::with_predicate_and_projection(
2943                    &predicate,
2944                    &[
2945                        "session_id",
2946                        "message_id",
2947                        "id",
2948                        "ordinal",
2949                        "type",
2950                        "provenance",
2951                        "variant_data",
2952                        "options",
2953                    ],
2954                ),
2955            )
2956            .await?;
2957        scanner.with_row_address();
2958        let batch = scanner.try_into_batch().await.context("scan failed")?;
2959        let row_addresses = uint64(&batch, "_rowaddr")?;
2960        let mut file_payloads = BTreeMap::<usize, FileData>::new();
2961        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
2962        for row in 0..batch.num_rows() {
2963            if string(&batch, "type", row)?.as_deref() == Some("file") {
2964                let variant_data =
2965                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
2966                file_rows.push((row, row_addresses.value(row), variant_data));
2967            }
2968        }
2969        if !file_rows.is_empty() {
2970            let addresses = file_rows
2971                .iter()
2972                .map(|(_, address, _)| *address)
2973                .collect::<Vec<_>>();
2974            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
2975            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
2976                // Legacy blob (lance-encoding:blob): payload is bytes; the
2977                // url variant stored its URL as UTF-8 bytes, recovered via
2978                // `file_data_from_blob`'s `data_kind = "url"` branch.
2979                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
2980                file_payloads.insert(row, payload);
2981            }
2982        }
2983        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
2984        for row in 0..batch.num_rows() {
2985            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
2986            parts_by_message
2987                .entry((part.session_id.clone(), part.message_id.clone()))
2988                .or_default()
2989                .push(part);
2990        }
2991        for parts in parts_by_message.values_mut() {
2992            parts.sort_by_key(|part| part.ordinal);
2993        }
2994        Ok(parts_by_message)
2995    }
2996}
2997
2998#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2999#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
3000pub enum IngestEvent {
3001    Session(Session),
3002    Message(Message),
3003    Part(Part),
3004}
3005
3006/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
3007/// The wire layer (`pond_ingest`) instead returns per-row results; the
3008/// aggregate is derived from those at the wire boundary.
3009///
3010/// Fields are bucketed by population so the summary never conflates "100
3011/// validator-rejected rows in 1 bad session" with "100 separate failures."
3012/// The shape is set by spec.md#adapter-integrity-event-ordering.
3013#[derive(Debug, Clone, PartialEq, Eq, Default)]
3014pub struct IngestSummary {
3015    /// Rows actually written to Lance, summed across all three tables.
3016    /// Use the per-table fields below for user-facing counts; this stays
3017    /// for `accepted()` and existing wire callers.
3018    pub inserted: usize,
3019    /// Rows that already existed (merge_insert no-op match).
3020    pub matched: usize,
3021    /// Session rows inserted this pass.
3022    pub sessions_inserted: usize,
3023    /// Message rows inserted this pass (total - includes tool calls,
3024    /// tool results, and other non-searchable messages).
3025    pub messages_inserted_total: usize,
3026    /// Subset of `messages_inserted_total` whose `search_text` is non-null
3027    /// (eligible for FTS + semantic indexing). The user-facing "messages"
3028    /// count in `pond sync` / `pond status` reads this field.
3029    pub messages_inserted_searchable: usize,
3030    /// Part rows inserted this pass.
3031    pub parts_inserted: usize,
3032    /// Session rows already-present (merge_insert matched).
3033    pub sessions_matched: usize,
3034    /// Message rows already-present (merge_insert matched), total.
3035    pub messages_matched_total: usize,
3036    /// Subset of `messages_matched_total` with `search_text`.
3037    pub messages_matched_searchable: usize,
3038    /// Part rows already-present.
3039    pub parts_matched: usize,
3040    /// Events the validator dropped under per-event-drop policy (ordering
3041    /// violation, orphan part, mismatched parent, adapter parse failure,
3042    /// duplicate-id collision, ...). Counted by event, not by session: a
3043    /// session with one bad part stays in this bucket as 1, not as "the
3044    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
3045    /// own emissions upstream when source replay is expected; the
3046    /// validator's in-batch HashSet is a safety net, not a feature
3047    /// adapters may rely on. If this bucket grows on a clean adapter,
3048    /// inspect `drop_reasons` for the top contributors.
3049    pub dropped_events: usize,
3050    /// Sessions whose Session-level invariants (immutable `source_agent` /
3051    /// `project` against the stored row) failed at flush time and
3052    /// whose substream got rejected wholesale. Always small relative to
3053    /// `inserted`; if not, there's a real problem to investigate.
3054    pub dropped_sessions: usize,
3055    /// Files the adapter couldn't decode at all (no Session header
3056    /// extractable: empty `.jsonl`, missing required field).
3057    pub skipped_files: usize,
3058    /// Files that produced no importable session and were benignly skipped:
3059    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
3060    /// metadata file), or an unextractable header. Never an error or a drop;
3061    /// the underlying cause is logged at `-vv` (debug) verbosity.
3062    pub skipped_empty: usize,
3063    /// Sessions short-circuited via the per-session staleness skip
3064    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
3065    /// pond last wrote that session's row, so re-decode was bypassed.
3066    pub skipped_fresh: usize,
3067    /// Storage-layer failures whose retries were exhausted (commit
3068    /// conflicts, transient IO that didn't recover). Hard zero on healthy
3069    /// runs.
3070    pub storage_errors: usize,
3071    /// Oversized values truncated to a bounded sentinel at the seam
3072    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
3073    pub truncated_values: usize,
3074    /// Histogram of stable reason keys for the combined `dropped_events +
3075    /// dropped_sessions` populations. Keys are `&'static str` (see the
3076    /// `DROP_REASON_*` constants) so consumers can match by identity.
3077    /// Empty on a clean run. Used by `pond sync` to print the top reasons
3078    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
3079    pub drop_reasons: BTreeMap<&'static str, usize>,
3080}
3081
3082/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
3083/// the per-row `RowError::reason_key`. `&'static str` so consumers can
3084/// match by identity rather than prose. Adding a new variant: pick a short
3085/// snake_case identifier, route it from the validator/adapter, and update
3086/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
3087pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
3088pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
3089pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
3090pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
3091pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
3092pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
3093pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
3094pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
3095pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
3096pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
3097pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
3098
3099/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
3100/// returned counts together with the pre-existence sets captured by
3101/// `upsert_session_batch`. Folded into a per-sync summary via
3102/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
3103/// is a no-op write, so the inserted/matched split is informational - we still
3104/// surface it because both `pond sync` and `pond_ingest` clients reconcile
3105/// against "which rows landed this call."
3106#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3107pub struct BatchCounts {
3108    pub sessions_inserted: usize,
3109    pub sessions_matched: usize,
3110    pub messages_inserted_total: usize,
3111    pub messages_inserted_searchable: usize,
3112    pub messages_matched_total: usize,
3113    pub messages_matched_searchable: usize,
3114    pub parts_inserted: usize,
3115    pub parts_matched: usize,
3116}
3117
3118impl IngestSummary {
3119    pub fn accepted(&self) -> usize {
3120        self.inserted + self.matched
3121    }
3122
3123    /// Sole writer of the per-table counters on the CLI batched flush path.
3124    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
3125    /// both for the same rows would double-count.
3126    pub fn add_batch(&mut self, counts: &BatchCounts) {
3127        self.sessions_inserted += counts.sessions_inserted;
3128        self.sessions_matched += counts.sessions_matched;
3129        self.messages_inserted_total += counts.messages_inserted_total;
3130        self.messages_inserted_searchable += counts.messages_inserted_searchable;
3131        self.messages_matched_total += counts.messages_matched_total;
3132        self.messages_matched_searchable += counts.messages_matched_searchable;
3133        self.parts_inserted += counts.parts_inserted;
3134        self.parts_matched += counts.parts_matched;
3135        self.inserted +=
3136            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
3137        self.matched +=
3138            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
3139    }
3140
3141    /// Sum every counter from `other` into `self`. Used by the multi-source
3142    /// `pond sync` loop so adding a new field to this struct doesn't silently
3143    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
3144    pub fn merge(&mut self, other: &Self) {
3145        self.inserted += other.inserted;
3146        self.matched += other.matched;
3147        self.sessions_inserted += other.sessions_inserted;
3148        self.messages_inserted_total += other.messages_inserted_total;
3149        self.messages_inserted_searchable += other.messages_inserted_searchable;
3150        self.parts_inserted += other.parts_inserted;
3151        self.sessions_matched += other.sessions_matched;
3152        self.messages_matched_total += other.messages_matched_total;
3153        self.messages_matched_searchable += other.messages_matched_searchable;
3154        self.parts_matched += other.parts_matched;
3155        self.dropped_events += other.dropped_events;
3156        self.dropped_sessions += other.dropped_sessions;
3157        self.skipped_files += other.skipped_files;
3158        self.skipped_empty += other.skipped_empty;
3159        self.skipped_fresh += other.skipped_fresh;
3160        self.storage_errors += other.storage_errors;
3161        self.truncated_values += other.truncated_values;
3162        for (key, value) in &other.drop_reasons {
3163            *self.drop_reasons.entry(key).or_insert(0) += value;
3164        }
3165    }
3166
3167    /// Same dispatch as [`Self::add_outcomes`] but ignores
3168    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
3169    /// via [`Self::add_batch`] and uses this method to attribute per-row
3170    /// `Error` outcomes from the same flush.
3171    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
3172        for outcome in outcomes {
3173            if !matches!(outcome.status, OutcomeStatus::Error) {
3174                continue;
3175            }
3176            if outcome.kind == "session" {
3177                self.dropped_sessions += 1;
3178            } else {
3179                self.dropped_events += 1;
3180            }
3181            let reason = outcome
3182                .error
3183                .as_ref()
3184                .and_then(|error| error.reason_key)
3185                .unwrap_or(DROP_REASON_UNCATEGORIZED);
3186            *self.drop_reasons.entry(reason).or_insert(0) += 1;
3187        }
3188    }
3189
3190    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
3191        for outcome in outcomes {
3192            match outcome.status {
3193                OutcomeStatus::Inserted => {
3194                    self.inserted += 1;
3195                    match outcome.kind {
3196                        "session" => self.sessions_inserted += 1,
3197                        "message" => {
3198                            self.messages_inserted_total += 1;
3199                            if outcome.searchable {
3200                                self.messages_inserted_searchable += 1;
3201                            }
3202                        }
3203                        "part" => self.parts_inserted += 1,
3204                        _ => {}
3205                    }
3206                }
3207                OutcomeStatus::Matched => {
3208                    self.matched += 1;
3209                    match outcome.kind {
3210                        "session" => self.sessions_matched += 1,
3211                        "message" => {
3212                            self.messages_matched_total += 1;
3213                            if outcome.searchable {
3214                                self.messages_matched_searchable += 1;
3215                            }
3216                        }
3217                        "part" => self.parts_matched += 1,
3218                        _ => {}
3219                    }
3220                }
3221                OutcomeStatus::Error => {
3222                    // Session-level rejection: exactly one session-kind Error
3223                    // outcome (see `error_outcomes_for_substream`). Per-event
3224                    // drop: one Error per message/part. The two populations
3225                    // are counted separately so the operator can tell a
3226                    // structural reject from a row-level skip.
3227                    if outcome.kind == "session" {
3228                        self.dropped_sessions += 1;
3229                    } else {
3230                        self.dropped_events += 1;
3231                    }
3232                    let reason = outcome
3233                        .error
3234                        .as_ref()
3235                        .and_then(|e| e.reason_key)
3236                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
3237                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
3238                }
3239            }
3240        }
3241    }
3242}
3243
3244/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
3245/// row per input event from the request's `events` array. The validator
3246/// returns these in array order so the wire layer can pack them directly
3247/// into [`crate::wire::IngestResult`] entries.
3248#[derive(Debug, Clone, PartialEq)]
3249pub struct RowOutcome {
3250    pub index: usize,
3251    pub kind: &'static str,
3252    pub pk: Value,
3253    pub status: OutcomeStatus,
3254    pub error: Option<RowError>,
3255    /// True iff `kind == "message"` AND the underlying row carries
3256    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
3257    /// so the CLI can show "searchable" message deltas distinct from raw
3258    /// inserts. Always false for session/part rows.
3259    pub searchable: bool,
3260}
3261
3262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3263pub enum OutcomeStatus {
3264    Inserted,
3265    Matched,
3266    Error,
3267}
3268
3269/// Structured per-row error body. Mirrors the wire shape so the handler
3270/// can pass it straight through.
3271#[derive(Debug, Clone, PartialEq, Eq)]
3272pub struct RowError {
3273    pub message: String,
3274    pub field: Option<&'static str>,
3275    pub reason: Option<&'static str>,
3276    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
3277    /// `reason` field above is human-prose; `reason_key` is the machine
3278    /// bucket. `None` means uncategorized; consumers attribute to
3279    /// `DROP_REASON_UNCATEGORIZED`.
3280    pub reason_key: Option<&'static str>,
3281}
3282
3283/// Buffered session events tagged with their input array index, so the
3284/// per-row outcomes can be re-attributed once `merge_insert` returns its
3285/// per-row Inserted/Matched stats.
3286#[derive(Debug)]
3287struct BufferedSession {
3288    index: usize,
3289    session: Session,
3290}
3291
3292#[derive(Debug)]
3293struct BufferedMessage {
3294    index: usize,
3295    message: Message,
3296    parts: Vec<BufferedPart>,
3297    search_text: Option<String>,
3298}
3299
3300#[derive(Debug)]
3301struct BufferedPart {
3302    index: usize,
3303    part: Part,
3304}
3305
3306/// State machine that turns the `events: Vec<IngestEvent>` array into a
3307/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
3308/// session substream so `merge_insert` runs once per substream (three
3309/// batches: sessions, messages, parts). A validation error on a single event
3310/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
3311/// continues; only Session-level invariants (immutable source_agent / project
3312/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
3313///
3314/// Writes are batched at flush time. As complete substreams arrive (a new
3315/// `Session` event closes out the current one), they accumulate in
3316/// `completed` rather than each one calling `merge_insert` immediately.
3317/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
3318/// at which point one batched 3-parallel-merge-insert covers all pending
3319/// substreams. This is the load-bearing perf change: per-substream commit
3320/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
3321/// and amortizing it across N sessions cuts wall time materially.
3322#[derive(Debug, Default)]
3323pub struct IngestValidator {
3324    session: Option<BufferedSession>,
3325    current_message: Option<BufferedMessage>,
3326    current_parts: Vec<BufferedPart>,
3327    messages: Vec<BufferedMessage>,
3328    /// Message ids already buffered in the current substream. Duplicate ids
3329    /// drop the offending event in-line rather than failing the whole batch
3330    /// downstream.
3331    seen_message_ids: HashSet<String>,
3332    /// `(message_id, part_id)` keys already buffered in the current
3333    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
3334    seen_part_keys: HashSet<(String, String)>,
3335    /// Substreams whose end-of-stream boundary has been observed but whose
3336    /// rows haven't been written yet. Flushed in batched mode by
3337    /// [`Self::flush`].
3338    completed: Vec<CompletedSubstream>,
3339}
3340
3341/// One closed substream ready for the batched flush path.
3342#[derive(Debug)]
3343struct CompletedSubstream {
3344    session_index: usize,
3345    session: Session,
3346    messages: Vec<BufferedMessage>,
3347}
3348
3349/// Ingest host provenance (`options.pond`, spec.md#model-pond-options),
3350/// computed once per process. An audit fact - "the process that inserted this
3351/// row" - not identity. Fallible lookups are omitted, never synthesized as
3352/// placeholders.
3353fn ingest_host_stamp() -> Option<&'static Value> {
3354    static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
3355    STAMP
3356        .get_or_init(|| {
3357            let mut host = serde_json::Map::new();
3358            if let Ok(username) = whoami::username() {
3359                host.insert("username".to_owned(), username.into());
3360            }
3361            if let Ok(hostname) = whoami::hostname() {
3362                host.insert("hostname".to_owned(), hostname.into());
3363            }
3364            if let Ok(devicename) = whoami::devicename() {
3365                host.insert("device_name".to_owned(), devicename.into());
3366            }
3367            (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
3368        })
3369        .as_ref()
3370}
3371
3372impl IngestValidator {
3373    /// Drive one input event through the validator. Returns the per-row
3374    /// outcomes the event triggered: empty when the event is just buffered,
3375    /// or N entries when a session substream just flushed (success or
3376    /// failure). `Err` is reserved for catastrophic storage failures that
3377    /// should fail the whole `pond_ingest` request.
3378    pub async fn push(
3379        &mut self,
3380        store: &Store,
3381        index: usize,
3382        event: IngestEvent,
3383    ) -> Result<Vec<RowOutcome>> {
3384        match event {
3385            IngestEvent::Session(session) => self.push_session(store, index, session).await,
3386            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
3387            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
3388        }
3389    }
3390
3391    /// Final flush at end-of-batch. Closes the in-flight substream and
3392    /// drains the pending-flush buffer. Returns the per-row outcomes (for
3393    /// the wire layer) alongside the honest per-table counts (for
3394    /// `IngestSummary::add_batch`).
3395    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3396        self.close_current_substream();
3397        self.flush(store).await
3398    }
3399
3400    /// Drain every completed substream into batched 3-parallel-merge_insert
3401    /// writes. Caller invokes this periodically (every N completed
3402    /// substreams) to keep memory bounded; in adapter-driven sync that
3403    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
3404    /// in-flight substream stays buffered - close it explicitly via
3405    /// [`Self::finish`] or by feeding the next Session event.
3406    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3407        if self.completed.is_empty() {
3408            return Ok((Vec::new(), BatchCounts::default()));
3409        }
3410        let completed = std::mem::take(&mut self.completed);
3411        store.upsert_session_batch(completed).await
3412    }
3413
3414    /// Number of fully-buffered substreams awaiting batched write. Used by
3415    /// the adapter caller to decide when to call [`Self::flush`].
3416    pub fn pending_substreams(&self) -> usize {
3417        self.completed.len()
3418    }
3419
3420    async fn push_session(
3421        &mut self,
3422        _store: &Store,
3423        index: usize,
3424        mut session: Session,
3425    ) -> Result<Vec<RowOutcome>> {
3426        // Close out the current substream (if any) - move it to the pending
3427        // buffer instead of writing immediately. The actual write happens
3428        // when the caller invokes `flush` / `finish`.
3429        self.close_current_substream();
3430
3431        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
3432        // if empty after trim. A Session event with empty source_agent is
3433        // dropped on the spot - the substream that would follow has nothing
3434        // to anchor on, so subsequent message/part events will also drop.
3435        let trimmed = session.source_agent.trim();
3436        if trimmed.is_empty() {
3437            return Ok(vec![RowOutcome {
3438                index,
3439                kind: "session",
3440                pk: Value::String(session.id.clone()),
3441                status: OutcomeStatus::Error,
3442                error: Some(RowError {
3443                    message: format!("session {} has empty source_agent after trim", session.id),
3444                    field: Some("source_agent"),
3445                    reason: None,
3446                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
3447                }),
3448                searchable: false,
3449            }]);
3450        }
3451        if trimmed.len() != session.source_agent.len() {
3452            session.source_agent = trimmed.to_owned();
3453        }
3454
3455        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
3456            return Ok(vec![RowOutcome {
3457                index,
3458                kind: "session",
3459                pk: Value::String(session.id.clone()),
3460                status: OutcomeStatus::Error,
3461                error: Some(RowError {
3462                    message: format!(
3463                        "session {} has parent_message_id without parent_session_id",
3464                        session.id,
3465                    ),
3466                    field: Some("parent_message_id"),
3467                    reason: None,
3468                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
3469                }),
3470                searchable: false,
3471            }]);
3472        }
3473
3474        self.seen_message_ids.clear();
3475        self.seen_part_keys.clear();
3476        self.session = Some(BufferedSession { index, session });
3477        Ok(Vec::new())
3478    }
3479
3480    fn close_current_substream(&mut self) {
3481        self.flush_current_message();
3482        let Some(BufferedSession {
3483            index: session_index,
3484            session,
3485        }) = self.session.take()
3486        else {
3487            return;
3488        };
3489        let messages = std::mem::take(&mut self.messages);
3490        self.seen_message_ids.clear();
3491        self.seen_part_keys.clear();
3492        self.completed.push(CompletedSubstream {
3493            session_index,
3494            session,
3495            messages,
3496        });
3497    }
3498
3499    fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
3500        let pk = Value::Array(vec![
3501            Value::String(message.session_id().to_owned()),
3502            Value::String(message.id().to_owned()),
3503        ]);
3504        let Some(session) = &self.session else {
3505            return vec![error_outcome(
3506                index,
3507                "message",
3508                pk,
3509                "first event in a session stream must be Session",
3510                None,
3511                DROP_REASON_MESSAGE_BEFORE_SESSION,
3512            )];
3513        };
3514        if message.session_id() != session.session.id {
3515            let msg = format!(
3516                "message {} references session {}, expected {}",
3517                message.id(),
3518                message.session_id(),
3519                session.session.id
3520            );
3521            return vec![error_outcome(
3522                index,
3523                "message",
3524                pk,
3525                &msg,
3526                Some("session_id"),
3527                DROP_REASON_MESSAGE_SESSION_MISMATCH,
3528            )];
3529        }
3530        if !self.seen_message_ids.insert(message.id().to_owned()) {
3531            // Keep same-substream duplicate ids visible in `dropped_events`;
3532            // adapters are expected to dedupe upstream (see claude-code's
3533            // per-file `seen_uuids`), so a hit here is worth investigating.
3534            let msg = format!("duplicate message id {} in session substream", message.id());
3535            return vec![error_outcome(
3536                index,
3537                "message",
3538                pk,
3539                &msg,
3540                None,
3541                DROP_REASON_DUPLICATE_MESSAGE_ID,
3542            )];
3543        }
3544        // `options.pond` is core-owned (spec.md#model-pond-options): stripped
3545        // and restamped at ingest so neither adapters nor wire clients can
3546        // spoof provenance. Matched rows are merge_insert no-ops, so re-ingest
3547        // never restamps stored rows.
3548        match ingest_host_stamp() {
3549            Some(stamp) => {
3550                message
3551                    .options_mut()
3552                    .insert("pond".to_owned(), stamp.clone());
3553            }
3554            None => {
3555                message.options_mut().remove("pond");
3556            }
3557        }
3558        self.flush_current_message();
3559        self.current_message = Some(BufferedMessage {
3560            index,
3561            message,
3562            parts: Vec::new(),
3563            search_text: None,
3564        });
3565        Vec::new()
3566    }
3567
3568    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
3569        let pk = Value::Array(vec![
3570            Value::String(part.session_id.clone()),
3571            Value::String(part.message_id.clone()),
3572            Value::String(part.id.clone()),
3573        ]);
3574        let Some(current) = &self.current_message else {
3575            return vec![error_outcome(
3576                index,
3577                "part",
3578                pk,
3579                "part event appeared before a message",
3580                None,
3581                DROP_REASON_PART_BEFORE_MESSAGE,
3582            )];
3583        };
3584        if part.session_id != current.message.session_id() {
3585            let msg = format!(
3586                "part {} references session {}, expected {}",
3587                part.id,
3588                part.session_id,
3589                current.message.session_id()
3590            );
3591            return vec![error_outcome(
3592                index,
3593                "part",
3594                pk,
3595                &msg,
3596                Some("session_id"),
3597                DROP_REASON_PART_MESSAGE_MISMATCH,
3598            )];
3599        }
3600        if part.message_id != current.message.id() {
3601            let msg = format!(
3602                "part {} references message {}, expected {}",
3603                part.id,
3604                part.message_id,
3605                current.message.id()
3606            );
3607            return vec![error_outcome(
3608                index,
3609                "part",
3610                pk,
3611                &msg,
3612                Some("message_id"),
3613                DROP_REASON_PART_MESSAGE_MISMATCH,
3614            )];
3615        }
3616        let part_key = (part.message_id.clone(), part.id.clone());
3617        if !self.seen_part_keys.insert(part_key) {
3618            let msg = format!(
3619                "duplicate part id {} for message {} in session substream",
3620                part.id, part.message_id
3621            );
3622            return vec![error_outcome(
3623                index,
3624                "part",
3625                pk,
3626                &msg,
3627                None,
3628                DROP_REASON_DUPLICATE_PART_KEY,
3629            )];
3630        }
3631        self.current_parts.push(BufferedPart { index, part });
3632        Vec::new()
3633    }
3634
3635    fn flush_current_message(&mut self) {
3636        let Some(mut buffered) = self.current_message.take() else {
3637            return;
3638        };
3639        let parts = std::mem::take(&mut self.current_parts);
3640        let mut canonical_parts = Vec::with_capacity(parts.len());
3641        for part in &parts {
3642            canonical_parts.push(part.part.clone());
3643        }
3644        buffered.search_text = search_text(&buffered.message, &canonical_parts);
3645        buffered.parts = parts;
3646        self.messages.push(buffered);
3647    }
3648}
3649
3650fn error_outcome(
3651    index: usize,
3652    kind: &'static str,
3653    pk: Value,
3654    message: &str,
3655    field: Option<&'static str>,
3656    reason_key: &'static str,
3657) -> RowOutcome {
3658    RowOutcome {
3659        index,
3660        kind,
3661        pk,
3662        status: OutcomeStatus::Error,
3663        error: Some(RowError {
3664            message: message.to_owned(),
3665            field,
3666            reason: None,
3667            reason_key: Some(reason_key),
3668        }),
3669        searchable: false,
3670    }
3671}
3672
3673/// Session-level rejection (immutable `source_agent` / `project` violation):
3674/// emit exactly one Error outcome on the Session row. The buffered messages
3675/// and parts of this substream are *not* surfaced as per-row errors - their
3676/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
3677fn error_outcomes_for_substream(
3678    session_index: usize,
3679    session: &Session,
3680    _messages: &[BufferedMessage],
3681    message: impl Into<String>,
3682    field: Option<&'static str>,
3683    reason_key: &'static str,
3684) -> Vec<RowOutcome> {
3685    let reason = field.map(|_| "immutable");
3686    vec![RowOutcome {
3687        index: session_index,
3688        kind: "session",
3689        pk: Value::String(session.id.clone()),
3690        status: OutcomeStatus::Error,
3691        error: Some(RowError {
3692            message: message.into(),
3693            field,
3694            reason,
3695            reason_key: Some(reason_key),
3696        }),
3697        searchable: false,
3698    }]
3699}
3700
3701/// Batched-path success helper. Each row's Inserted/Matched status is read
3702/// from the pre-existence sets captured by `upsert_session_batch` before its
3703/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
3704/// Also accumulates the per-table totals into `counts` so the CLI summary
3705/// gets the same truth without re-walking the outcomes.
3706fn success_outcomes_for_substream(
3707    session_index: usize,
3708    session: &Session,
3709    messages: &[BufferedMessage],
3710    existing_sessions: &std::collections::HashMap<String, Session>,
3711    existing_message_pks: &HashSet<(String, String)>,
3712    existing_part_pks: &HashSet<(String, String, String)>,
3713    counts: &mut BatchCounts,
3714) -> Vec<RowOutcome> {
3715    let session_was_present = existing_sessions.contains_key(&session.id);
3716    let session_status = if session_was_present {
3717        counts.sessions_matched += 1;
3718        UpsertStatus::Matched
3719    } else {
3720        counts.sessions_inserted += 1;
3721        UpsertStatus::Inserted
3722    };
3723
3724    let mut outcomes = Vec::with_capacity(1 + messages.len());
3725    outcomes.push(success_outcome(
3726        session_index,
3727        "session",
3728        Value::String(session.id.clone()),
3729        session_status,
3730        false,
3731    ));
3732    for buffered in messages {
3733        let key = (
3734            buffered.message.session_id().to_owned(),
3735            buffered.message.id().to_owned(),
3736        );
3737        let searchable = buffered.search_text.is_some();
3738        let message_status = if existing_message_pks.contains(&key) {
3739            counts.messages_matched_total += 1;
3740            if searchable {
3741                counts.messages_matched_searchable += 1;
3742            }
3743            UpsertStatus::Matched
3744        } else {
3745            counts.messages_inserted_total += 1;
3746            if searchable {
3747                counts.messages_inserted_searchable += 1;
3748            }
3749            UpsertStatus::Inserted
3750        };
3751        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
3752        outcomes.push(success_outcome(
3753            buffered.index,
3754            "message",
3755            pk,
3756            message_status,
3757            searchable,
3758        ));
3759        for part in &buffered.parts {
3760            let part_key = (
3761                part.part.session_id.clone(),
3762                part.part.message_id.clone(),
3763                part.part.id.clone(),
3764            );
3765            let part_status = if existing_part_pks.contains(&part_key) {
3766                counts.parts_matched += 1;
3767                UpsertStatus::Matched
3768            } else {
3769                counts.parts_inserted += 1;
3770                UpsertStatus::Inserted
3771            };
3772            let part_pk = Value::Array(vec![
3773                Value::String(part_key.0),
3774                Value::String(part_key.1),
3775                Value::String(part_key.2),
3776            ]);
3777            outcomes.push(success_outcome(
3778                part.index,
3779                "part",
3780                part_pk,
3781                part_status,
3782                false,
3783            ));
3784        }
3785    }
3786    outcomes
3787}
3788
3789fn success_outcome(
3790    index: usize,
3791    kind: &'static str,
3792    pk: Value,
3793    status: UpsertStatus,
3794    searchable: bool,
3795) -> RowOutcome {
3796    let status = match status {
3797        UpsertStatus::Inserted => OutcomeStatus::Inserted,
3798        UpsertStatus::Matched => OutcomeStatus::Matched,
3799    };
3800    RowOutcome {
3801        index,
3802        kind,
3803        pk,
3804        status,
3805        error: None,
3806        searchable,
3807    }
3808}
3809
3810#[derive(Debug, Clone, PartialEq, Eq)]
3811enum IngestError {
3812    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
3813    /// immutable post-first-write because the denormalized copies on
3814    /// `messages` were stamped from the prior Session at first ingest.
3815    /// A re-write that changes either would silently desync.
3816    ImmutableField {
3817        field: &'static str,
3818        session_id: String,
3819        stored: String,
3820        attempted: String,
3821    },
3822}
3823
3824impl std::fmt::Display for IngestError {
3825    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3826        match self {
3827            Self::ImmutableField {
3828                field,
3829                session_id,
3830                stored,
3831                attempted,
3832            } => write!(
3833                formatter,
3834                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
3835            ),
3836        }
3837    }
3838}
3839
3840impl std::error::Error for IngestError {}
3841
3842/// Compare an incoming Session row against the stored row on the two
3843/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
3844/// counts a NULL-vs-non-NULL change as a mismatch.
3845fn ensure_immutable_match(
3846    existing: &Session,
3847    incoming: &Session,
3848) -> std::result::Result<(), IngestError> {
3849    if existing.source_agent != incoming.source_agent {
3850        return Err(IngestError::ImmutableField {
3851            field: "source_agent",
3852            session_id: incoming.id.clone(),
3853            stored: existing.source_agent.clone(),
3854            attempted: incoming.source_agent.clone(),
3855        });
3856    }
3857    if existing.project != incoming.project {
3858        return Err(IngestError::ImmutableField {
3859            field: "project",
3860            session_id: incoming.id.clone(),
3861            stored: (*existing.project).clone(),
3862            attempted: (*incoming.project).clone(),
3863        });
3864    }
3865    Ok(())
3866}
3867
3868pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
3869    use crate::wire::Provenance;
3870    let mut chunks: Vec<String> = Vec::new();
3871    for part in parts {
3872        // spec.md#search: only conversational parts contribute to the indexed
3873        // text; harness-injected scaffolding is excluded from search.
3874        if part.provenance != Provenance::Conversational {
3875            continue;
3876        }
3877        match (message.role(), &part.kind) {
3878            (Role::User | Role::Assistant, PartKind::Text { text }) => {
3879                if let Some(text) = text {
3880                    chunks.push(text.to_string());
3881                }
3882            }
3883            (
3884                Role::User | Role::Assistant,
3885                PartKind::File {
3886                    media_type,
3887                    file_name,
3888                    data,
3889                },
3890            ) => {
3891                if let Some(file_name) = file_name {
3892                    chunks.push(file_name.clone());
3893                }
3894                if let Some(media_type) = media_type {
3895                    chunks.push(media_type.clone());
3896                }
3897                if let FileData::Url(uri) = data {
3898                    chunks.push(uri.clone());
3899                }
3900            }
3901            (
3902                Role::System | Role::Tool,
3903                PartKind::Text { .. }
3904                | PartKind::Reasoning { .. }
3905                | PartKind::File { .. }
3906                | PartKind::ToolCall { .. }
3907                | PartKind::ToolResult { .. }
3908                | PartKind::ToolApprovalRequest { .. }
3909                | PartKind::ToolApprovalResponse { .. },
3910            )
3911            | (
3912                Role::User | Role::Assistant,
3913                PartKind::Reasoning { .. }
3914                | PartKind::ToolCall { .. }
3915                | PartKind::ToolResult { .. }
3916                | PartKind::ToolApprovalRequest { .. }
3917                | PartKind::ToolApprovalResponse { .. },
3918            ) => {}
3919        }
3920    }
3921
3922    let text = chunks
3923        .into_iter()
3924        .filter(|chunk| !chunk.trim().is_empty())
3925        .collect::<Vec<_>>()
3926        .join("\n");
3927    if text.is_empty() { None } else { Some(text) }
3928}
3929
3930/// Non-empty conversational text (spec.md#search).
3931#[derive(Debug, Clone, PartialEq, Eq)]
3932pub struct SearchText(String);
3933
3934impl SearchText {
3935    pub fn as_str(&self) -> &str {
3936        &self.0
3937    }
3938
3939    pub fn into_inner(self) -> String {
3940        self.0
3941    }
3942}
3943
3944impl AsRef<str> for SearchText {
3945    fn as_ref(&self) -> &str {
3946        &self.0
3947    }
3948}
3949
3950#[derive(Debug, Clone, PartialEq)]
3951pub struct MessageWithParts {
3952    pub message: Message,
3953    pub parts: Vec<Part>,
3954}
3955
3956#[derive(Debug, Clone, PartialEq)]
3957pub struct SessionWithMessages {
3958    pub session: Session,
3959    pub messages: Vec<MessageWithParts>,
3960}
3961
3962#[derive(Debug, Clone)]
3963pub struct SessionViewParams<'a> {
3964    /// Page forward: messages strictly after this id.
3965    pub after_message_id: Option<&'a str>,
3966    /// Page backward: messages strictly before this id.
3967    pub before_message_id: Option<&'a str>,
3968    pub limit: usize,
3969    pub budget_bytes: usize,
3970    /// First-page end when neither anchor is set.
3971    pub session_from: SessionFrom,
3972}
3973
3974#[derive(Debug, Clone)]
3975pub struct MessageViewParams {
3976    /// Conversational siblings before the target (`grep -B`).
3977    pub context_before: usize,
3978    /// Conversational siblings after the target (`grep -A`).
3979    pub context_after: usize,
3980    pub budget_bytes: usize,
3981}
3982
3983/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
3984/// maps it to `not_found`) from a stale/unknown pagination anchor (mapped to
3985/// `validation_failed`): the message stream is append-only, so an anchor that
3986/// was ever valid never disappears - an unknown one is always a client error,
3987/// never a reason to silently restart the page.
3988#[derive(Debug, Clone, PartialEq)]
3989pub enum GetLookup<T> {
3990    NotFound,
3991    UnknownAnchor,
3992    Found(T),
3993}
3994
3995/// Canonical retrieval result for `pond_get` session mode: the stored session
3996/// plus the page of messages (each with its `Part`s) and a remaining count.
3997/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
3998#[derive(Debug, Clone, PartialEq)]
3999pub struct SessionPage {
4000    pub session: Session,
4001    pub messages: Vec<RetrievedMessage>,
4002    pub before_remaining: usize,
4003    pub after_remaining: usize,
4004}
4005
4006/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
4007/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
4008/// their parts so the handler can summarize them.
4009#[derive(Debug, Clone, PartialEq)]
4010pub struct MessagePage {
4011    pub session: Session,
4012    pub target: RetrievedMessage,
4013    pub target_parts: Vec<Part>,
4014    pub target_parts_remaining: usize,
4015    pub siblings: Vec<RetrievedMessage>,
4016}
4017
4018#[derive(Debug, Clone, PartialEq)]
4019pub struct RetrievedMessage {
4020    pub id: String,
4021    pub role: Role,
4022    pub timestamp: DateTime<Utc>,
4023    pub text: Option<String>,
4024    pub content: Option<String>,
4025    pub parts: Vec<Part>,
4026}
4027
4028#[derive(Debug, Clone)]
4029struct ScanRow {
4030    id: String,
4031    role: Role,
4032    timestamp: DateTime<Utc>,
4033    text: Option<String>,
4034    content: Option<String>,
4035}
4036
4037/// One row of the conversational scan. `text` is non-empty by
4038/// `IsNotNull("search_text")` pushdown (spec.md#search).
4039#[derive(Debug, Clone)]
4040pub struct ConversationalRow {
4041    pub session_id: String,
4042    pub message_id: String,
4043    pub role: Role,
4044    pub timestamp: DateTime<Utc>,
4045    pub text: SearchText,
4046}
4047
4048/// Number of leading `items` that fit within `limit` and the byte budget,
4049/// sizing each by `size`. Always emits at least one (a single oversize item
4050/// never blocks its own page); the budget then stops the page at the next item
4051/// boundary.
4052fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
4053    let capped = items.len().min(limit.clamp(1, 1000));
4054    let mut acc = 0usize;
4055    let mut emitted = 0usize;
4056    for item in &items[..capped] {
4057        let next = acc.saturating_add(size(item));
4058        if emitted > 0 && next > budget_bytes {
4059            break;
4060        }
4061        acc = next;
4062        emitted += 1;
4063    }
4064    emitted
4065}
4066
4067/// Like `page_by` but counts from the tail: how many trailing items fit
4068/// `limit` and the byte budget, dropping oldest first. The last (newest) item
4069/// is always kept, so the returned count is >= 1 for a non-empty slice and the
4070/// emitted page (`items[len - n..]`) stays chronological.
4071fn page_tail<T>(
4072    items: &[T],
4073    limit: usize,
4074    budget_bytes: usize,
4075    size: impl Fn(&T) -> usize,
4076) -> usize {
4077    let cap = limit.clamp(1, 1000);
4078    let mut acc = 0usize;
4079    let mut emitted = 0usize;
4080    for item in items.iter().rev() {
4081        if emitted >= cap {
4082            break;
4083        }
4084        let next = acc.saturating_add(size(item));
4085        if emitted > 0 && next > budget_bytes {
4086            break;
4087        }
4088        acc = next;
4089        emitted += 1;
4090    }
4091    emitted
4092}
4093
4094fn role_from_str(value: &str) -> Result<Role> {
4095    match value {
4096        "system" => Ok(Role::System),
4097        "user" => Ok(Role::User),
4098        "assistant" => Ok(Role::Assistant),
4099        "tool" => Ok(Role::Tool),
4100        other => anyhow::bail!("unknown message role {other}"),
4101    }
4102}
4103
4104/// Scalar indexes on `messages` (spec.md#datasets): only columns whose index
4105/// type matches the predicate actually issued against them. `project` is
4106/// filtered solely by `LikeContains`/`Regex` (substring), which a BTree cannot
4107/// accelerate, and `role` is never filtered - both are deliberately unindexed
4108/// (substring lookup stays on the SQL `LIKE` path). There is no index on
4109/// `embedding_model`: pond's invariant is one active model at a time (a model
4110/// swap goes through `pond optimize --force-embed` which drops the IVF_SQ,
4111/// clears stale rows, and re-bootstraps), so the only embedding-state filter is
4112/// `vector IS NOT NULL`. `id` lookups are rare and full-scan.
4113const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4114    (
4115        "session_id",
4116        BuiltinIndexType::BTree,
4117        "messages_session_id_btree",
4118    ),
4119    // Range-only column (`from_date`/`to_date` -> `timestamp >=` / `<=`,
4120    // never exact-equality, never `ORDER BY` against the index). ZoneMap's
4121    // per-fragment min/max prunes those filters with no recall loss (measured:
4122    // 258 zones -> ~6, ~42x fewer rows scanned on the real S3 corpus), and
4123    // skips the global ExternalSort that a BTree would pay during build.
4124    (
4125        "timestamp",
4126        BuiltinIndexType::ZoneMap,
4127        "messages_timestamp_zonemap",
4128    ),
4129    (
4130        "source_agent",
4131        BuiltinIndexType::Bitmap,
4132        "messages_source_agent_bitmap",
4133    ),
4134];
4135
4136/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
4137/// `parts_for_messages` (hydration on every `get` and grouped search).
4138const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4139    (
4140        "session_id",
4141        BuiltinIndexType::BTree,
4142        "parts_session_id_btree",
4143    ),
4144    (
4145        "message_id",
4146        BuiltinIndexType::BTree,
4147        "parts_message_id_btree",
4148    ),
4149];
4150
4151/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
4152/// `get` and every grouped search.
4153const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
4154    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
4155
4156/// Session ids per `session_id IN (...)` chunk in an incremental copy: large
4157/// enough to amortize per-scan setup, small enough to keep the pushed-down
4158/// predicate string and its btree lookup batch bounded.
4159const COPY_SESSION_IN_CHUNK: usize = 512;
4160
4161fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
4162    Predicate::In(
4163        column,
4164        values.iter().cloned().map(ScalarValue::String).collect(),
4165    )
4166}
4167
4168/// The kNN prefilter is the caller's scalar filter alone - pond does NOT add
4169/// `vector IS NOT NULL`. That looks like a safe guard but it is a remote-read
4170/// trap: Lance v2 keeps no per-column null metadata, so `IsNotNull(vector)`
4171/// forces a full read of the ~3 GiB `vector` column from the object store on
4172/// every query (the ANN prefilter is evaluated as a `LanceScan` over the
4173/// column) - measured at ~57 s/query on the 2M-row S3 corpus, dwarfing the
4174/// IVF probe itself. It is also redundant: the IVF_SQ index only contains
4175/// embedded rows, and Lance's `_distance IS NOT NULL` post-filter (present in
4176/// both the ANN and brute-force branches of the plan) already drops any
4177/// null-vector row the brute-force tail might surface. So an empty caller
4178/// filter yields an empty prefilter and a pure index probe (spec.md#search,
4179/// spec.md#search-prefilter-pushdown).
4180fn embedded_scope(filter: &Predicate) -> Predicate {
4181    filter.clone()
4182}
4183
4184/// IVF `nprobes` applied when `[search].nprobes` is unset. Left unset, Lance
4185/// probes up to every partition (~num_rows/4096, ~500 on the 2M-row corpus),
4186/// one object-store read each - the dominant cost of a vector scan on a remote
4187/// store. 32 bounds the reads while keeping recall (benchmarked, spec.md#search).
4188pub const DEFAULT_NPROBES: usize = 32;
4189
4190/// Apply pond's vector-search tuning to a kNN scanner, defaulting any unset
4191/// `[search]` knob so a default install never inherits Lance's unbounded
4192/// probe-every-partition behavior. No refine: IVF_SQ's per-dimension codes are
4193/// precise enough to rank from the prewarmed partition, so pond never re-reads
4194/// exact vectors from the data files (the remote-store GET storm PQ+refine
4195/// incurred).
4196fn apply_vector_search_knobs(
4197    scanner: &mut lance::dataset::scanner::Scanner,
4198    search: Option<&config::SearchConfig>,
4199) {
4200    let nprobes = search
4201        .and_then(|cfg| cfg.nprobes)
4202        .unwrap_or(DEFAULT_NPROBES);
4203    scanner.nprobes(nprobes);
4204}
4205
4206// Bare logical table names: the lance-namespace Directory impl owns the
4207// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
4208// a `.lance` path.
4209pub(crate) const SESSIONS: &str = "sessions";
4210pub(crate) const MESSAGES: &str = "messages";
4211pub(crate) const PARTS: &str = "parts";
4212
4213/// FTS index name on `messages.search_text`. Stable so status and index
4214/// creation name the same index.
4215pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
4216
4217/// IVF_SQ index name on `messages.vector` (spec.md#search). Stable so the
4218/// activation check, optimize/append, and status all name the same index. The
4219/// literal keeps the historical `_ivfpq` suffix as a stable identifier:
4220/// renaming it would orphan the existing segment under a new name. A plain
4221/// `optimize` folds into whatever index type already exists, so switching an
4222/// existing IVF_PQ store to IVF_SQ needs `pond optimize --rebuild`.
4223pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
4224
4225/// IVF_SQ tuning constants (spec.md#search):
4226/// - num_bits = 8 (per-dimension scalar quantization)
4227/// - max_iters = 15 (kmeans cap)
4228/// - cosine metric (e5 vectors are L2-normalized)
4229const IVF_SQ_NUM_BITS: u16 = 8;
4230const IVF_SQ_MAX_ITERS: usize = 15;
4231
4232/// Pond's production IndexIntents: the per-table intent set
4233/// `Store::open_with_options` registers with the substrate.
4234pub fn pond_index_intents() -> IndexIntents {
4235    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
4236}
4237
4238/// Same as [`pond_index_intents`] but with an overridable IVF_SQ activation
4239/// threshold. Used by tests that need to exercise the activation boundary
4240/// without writing 100k vectors.
4241pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
4242    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
4243    messages.push(IndexIntent {
4244        name: MESSAGES_FTS_INDEX,
4245        column: "search_text",
4246        trigger: IndexTrigger::OnAnyRows,
4247        params: IndexParamsKind::InvertedFtsWord,
4248    });
4249    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
4250        messages.push(IndexIntent {
4251            name,
4252            column,
4253            trigger: IndexTrigger::OnAnyRows,
4254            params: IndexParamsKind::Scalar(kind.clone()),
4255        });
4256    }
4257    messages.push(IndexIntent {
4258        name: MESSAGES_VECTOR_INDEX,
4259        column: "vector",
4260        trigger: IndexTrigger::OnNonNullCount {
4261            column: "vector",
4262            threshold: vector_threshold,
4263        },
4264        params: IndexParamsKind::IvfSqCosine {
4265            num_bits: IVF_SQ_NUM_BITS,
4266            max_iters: IVF_SQ_MAX_ITERS,
4267        },
4268    });
4269    let parts = PARTS_SCALAR_INDICES
4270        .iter()
4271        .map(|(column, kind, name)| IndexIntent {
4272            name,
4273            column,
4274            trigger: IndexTrigger::OnAnyRows,
4275            params: IndexParamsKind::Scalar(kind.clone()),
4276        })
4277        .collect();
4278    let sessions = SESSIONS_SCALAR_INDICES
4279        .iter()
4280        .map(|(column, kind, name)| IndexIntent {
4281            name,
4282            column,
4283            trigger: IndexTrigger::OnAnyRows,
4284            params: IndexParamsKind::Scalar(kind.clone()),
4285        })
4286        .collect();
4287    IndexIntents {
4288        sessions,
4289        messages,
4290        parts,
4291    }
4292}
4293
4294/// Default width of the `messages.vector` embedding column (spec.md#search):
4295/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
4296/// 384). Used when `[embeddings].dim` is absent.
4297pub const DEFAULT_EMBEDDING_DIM: usize = 384;
4298
4299/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
4300/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
4301/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
4302/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
4303/// which keeps unit tests config-free.
4304static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
4305
4306/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
4307/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
4308pub fn embedding_dim() -> usize {
4309    EMBEDDING_DIM_RUNTIME
4310        .get()
4311        .copied()
4312        .unwrap_or(DEFAULT_EMBEDDING_DIM)
4313}
4314
4315/// Seed [`embedding_dim`] from config. First call wins.
4316pub fn init_embedding_dim(dim: usize) {
4317    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
4318}
4319
4320/// Initial-`CREATE` write params for the namespace-mediated path. The
4321/// substrate seam stamps in `session`, `mode`, and `store_params`.
4322/// `auto_cleanup` is short; long-term recovery is `pond copy --to <file>`
4323/// snapshots plus deferred Lance tags (spec.md#session-durable-copy).
4324/// `skip_auto_cleanup` suppresses the per-commit hook so cleanup stays
4325/// operator-driven via `pond optimize` (one LIST per command instead of per write).
4326pub(crate) fn write_params_for_create() -> WriteParams {
4327    WriteParams {
4328        data_storage_version: Some(LanceFileVersion::V2_1),
4329        enable_v2_manifest_paths: true,
4330        enable_stable_row_ids: true,
4331        auto_cleanup: Some(AutoCleanupParams {
4332            interval: 20,
4333            older_than: chrono::TimeDelta::days(1),
4334        }),
4335        skip_auto_cleanup: true,
4336        ..WriteParams::default()
4337    }
4338}
4339
4340fn export_schema(table: Table) -> Arc<Schema> {
4341    match table {
4342        Table::Sessions => session_schema(),
4343        Table::Messages => message_schema(),
4344        Table::Parts => part_schema(),
4345    }
4346}
4347
4348fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
4349    let expected = export_schema(table);
4350    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
4351    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
4352    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
4353    if actual_names != expected_names {
4354        anyhow::bail!(
4355            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
4356            table.as_str(),
4357        );
4358    }
4359    Ok(())
4360}
4361
4362async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
4363    let source_uri = source
4364        .to_str()
4365        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
4366    let dataset = Dataset::open(source_uri)
4367        .await
4368        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
4369    ensure_schema_matches_archive(&dataset, table)?;
4370    Ok(dataset)
4371}
4372
4373pub(crate) fn session_schema() -> Arc<Schema> {
4374    Arc::new(Schema::new(vec![
4375        primary_field("id", DataType::Utf8, false),
4376        Field::new("parent_session_id", DataType::Utf8, true),
4377        Field::new("parent_message_id", DataType::Utf8, true),
4378        Field::new("source_agent", DataType::Utf8, false),
4379        Field::new(
4380            "created_at",
4381            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4382            false,
4383        ),
4384        Field::new("project", DataType::Utf8, false),
4385        json_field("options", false),
4386    ]))
4387}
4388
4389pub(crate) fn message_schema() -> Arc<Schema> {
4390    Arc::new(Schema::new(vec![
4391        primary_field("session_id", DataType::Utf8, false),
4392        primary_field("id", DataType::Utf8, false),
4393        Field::new(
4394            "timestamp",
4395            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4396            false,
4397        ),
4398        Field::new("role", DataType::Utf8, false),
4399        Field::new("source_agent", DataType::Utf8, false),
4400        Field::new("project", DataType::Utf8, false),
4401        Field::new("content", DataType::Utf8, true),
4402        Field::new("search_text", DataType::Utf8, true),
4403        // The message's derived embedding (spec.md#session-embed-from-canonical):
4404        // both null until `pond optimize` fills them, set together thereafter.
4405        Field::new("vector", embedding_vector_type(), true),
4406        Field::new("embedding_model", DataType::Utf8, true),
4407        json_field("options", false),
4408    ]))
4409}
4410
4411pub(crate) fn part_schema() -> Arc<Schema> {
4412    Arc::new(Schema::new(vec![
4413        primary_field("session_id", DataType::Utf8, false),
4414        primary_field("message_id", DataType::Utf8, false),
4415        primary_field("id", DataType::Utf8, false),
4416        Field::new("ordinal", DataType::Int32, false),
4417        Field::new("type", DataType::Utf8, false),
4418        // spec.md#model-part-provenance: conversation vs harness-injected; search
4419        // reads this column to exclude injected scaffolding.
4420        Field::new("provenance", DataType::Utf8, false),
4421        json_field("variant_data", false),
4422        legacy_blob_field("data", true),
4423        json_field("options", false),
4424    ]))
4425}
4426
4427pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
4428    let arrays = schema
4429        .fields()
4430        .iter()
4431        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
4432        .collect();
4433    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
4434}
4435
4436pub(crate) fn empty_reader(
4437    schema: Arc<Schema>,
4438) -> Result<
4439    RecordBatchIterator<
4440        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
4441    >,
4442> {
4443    let batch = empty_batch(schema.clone())?;
4444    Ok(RecordBatchIterator::new(
4445        vec![Ok(batch)].into_iter(),
4446        schema,
4447    ))
4448}
4449
4450pub(crate) struct MessageBatchRow<'a> {
4451    pub message: &'a Message,
4452    pub source_agent: &'a str,
4453    pub project: &'a str,
4454    pub search_text: Option<&'a str>,
4455}
4456
4457// Lance v7.0.0-beta.16's IVF_SQ build path (`rust/lance/src/index/vector/utils.rs`
4458// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
4459// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
4460// BFloat16 as a future-supported embedding type; until the Rust IVF_SQ build
4461// path catches up, store as Float16 (half-precision, also 2 bytes/element).
4462fn embedding_vector_type() -> DataType {
4463    DataType::FixedSizeList(
4464        Arc::new(Field::new("item", DataType::Float16, true)),
4465        embedding_dim() as i32,
4466    )
4467}
4468
4469/// The partial-schema source for the embedding column update: the `messages`
4470/// primary key plus the two columns `pond optimize` fills. The field definitions
4471/// match `message_schema` exactly so Lance accepts it as a subset upsert.
4472fn embedding_update_schema() -> Arc<Schema> {
4473    Arc::new(Schema::new(vec![
4474        primary_field("session_id", DataType::Utf8, false),
4475        primary_field("id", DataType::Utf8, false),
4476        Field::new("vector", embedding_vector_type(), true),
4477        Field::new("embedding_model", DataType::Utf8, true),
4478    ]))
4479}
4480
4481/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
4482/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
4483pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
4484    let dim = embedding_dim();
4485    let mut flat = Vec::with_capacity(rows.len() * dim);
4486    for row in rows {
4487        if row.vector.len() != dim {
4488            anyhow::bail!(
4489                "embedding for message {} has dim {}, expected {dim}",
4490                row.id,
4491                row.vector.len(),
4492            );
4493        }
4494        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
4495    }
4496    let values = Float16Array::from(flat);
4497    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
4498    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
4499        .context("failed to build embedding vector column")?;
4500
4501    RecordBatch::try_new(
4502        embedding_update_schema(),
4503        vec![
4504            Arc::new(StringArray::from(
4505                rows.iter()
4506                    .map(|row| row.session_id.as_str())
4507                    .collect::<Vec<_>>(),
4508            )),
4509            Arc::new(StringArray::from(
4510                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
4511            )),
4512            Arc::new(vectors),
4513            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
4514        ],
4515    )
4516    .context("failed to build embedding update batch")
4517}
4518
4519/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
4520/// is split before the running total of its text columns reaches this, and a
4521/// single cell at or above it is rejected rather than left to panic inside
4522/// `StringArray::from` (spec.md#adapter-bounded-values).
4523const COLUMN_BYTE_BUDGET: usize = 1 << 30;
4524
4525/// Contiguous row ranges whose summed text-column byte cost each stays within
4526/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
4527/// column too, since no single column's total can exceed it. `cells[i]` is row
4528/// `i`'s byte cost summed across every text column.
4529fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
4530    let mut chunks = Vec::new();
4531    let mut start = 0usize;
4532    let mut running = 0usize;
4533    for (index, &row) in cells.iter().enumerate() {
4534        if running + row > COLUMN_BYTE_BUDGET && index > start {
4535            chunks.push(start..index);
4536            start = index;
4537            running = 0;
4538        }
4539        running += row;
4540    }
4541    if start < cells.len() {
4542        chunks.push(start..cells.len());
4543    }
4544    chunks
4545}
4546
4547fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
4548    if bytes >= COLUMN_BYTE_BUDGET {
4549        anyhow::bail!(
4550            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
4551             overflow Arrow's i32 offset buffer"
4552        );
4553    }
4554    Ok(())
4555}
4556
4557async fn merge_insert_chunks(
4558    handle: &Handle,
4559    table: Table,
4560    batches: Vec<RecordBatch>,
4561) -> Result<u64> {
4562    let mut inserted = 0u64;
4563    for batch in batches {
4564        let rows = batch.num_rows();
4565        inserted += handle.merge_insert(table, batch, rows).await?;
4566    }
4567    Ok(inserted)
4568}
4569
4570pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
4571    let options = sessions
4572        .iter()
4573        .map(|session| json_bytes(&session.options))
4574        .collect::<Result<Vec<_>>>()?;
4575    let mut cells = Vec::with_capacity(sessions.len());
4576    for (session, encoded) in sessions.iter().zip(&options) {
4577        let columns = [
4578            session.id.len(),
4579            session.parent_session_id.as_deref().map_or(0, str::len),
4580            session.parent_message_id.as_deref().map_or(0, str::len),
4581            session.source_agent.len(),
4582            session.project.as_str().len(),
4583            encoded.len(),
4584        ];
4585        for bytes in columns {
4586            guard_cell("sessions", &session.id, bytes)?;
4587        }
4588        cells.push(columns.iter().sum());
4589    }
4590    chunk_ranges(&cells)
4591        .into_iter()
4592        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
4593        .collect()
4594}
4595
4596fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
4597    let schema = session_schema();
4598    RecordBatch::try_new(
4599        schema.clone(),
4600        vec![
4601            Arc::new(StringArray::from(
4602                sessions
4603                    .iter()
4604                    .map(|session| session.id.as_str())
4605                    .collect::<Vec<_>>(),
4606            )),
4607            Arc::new(StringArray::from(
4608                sessions
4609                    .iter()
4610                    .map(|session| session.parent_session_id.as_deref())
4611                    .collect::<Vec<_>>(),
4612            )),
4613            Arc::new(StringArray::from(
4614                sessions
4615                    .iter()
4616                    .map(|session| session.parent_message_id.as_deref())
4617                    .collect::<Vec<_>>(),
4618            )),
4619            Arc::new(StringArray::from(
4620                sessions
4621                    .iter()
4622                    .map(|session| session.source_agent.as_str())
4623                    .collect::<Vec<_>>(),
4624            )),
4625            Arc::new(
4626                TimestampMicrosecondArray::from(
4627                    sessions
4628                        .iter()
4629                        .map(|session| micros(session.created_at))
4630                        .collect::<Vec<_>>(),
4631                )
4632                .with_timezone("UTC"),
4633            ),
4634            Arc::new(StringArray::from(
4635                sessions
4636                    .iter()
4637                    .map(|session| session.project.as_str())
4638                    .collect::<Vec<_>>(),
4639            )),
4640            Arc::new(LargeBinaryArray::from_iter_values(
4641                options.iter().map(Vec::as_slice),
4642            )),
4643        ],
4644    )
4645    .context("failed to build session batch")
4646}
4647
4648pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
4649    let options = rows
4650        .iter()
4651        .map(|row| json_bytes(row.message.options()))
4652        .collect::<Result<Vec<_>>>()?;
4653    let mut cells = Vec::with_capacity(rows.len());
4654    for (row, encoded) in rows.iter().zip(&options) {
4655        let columns = [
4656            row.message.session_id().len(),
4657            row.message.id().len(),
4658            row.message.role().as_str().len(),
4659            row.source_agent.len(),
4660            row.project.len(),
4661            row.message.system_content().map_or(0, str::len),
4662            row.search_text.map_or(0, str::len),
4663            encoded.len(),
4664        ];
4665        for bytes in columns {
4666            guard_cell("messages", row.message.id(), bytes)?;
4667        }
4668        cells.push(columns.iter().sum());
4669    }
4670    chunk_ranges(&cells)
4671        .into_iter()
4672        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
4673        .collect()
4674}
4675
4676fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
4677    let schema = message_schema();
4678    RecordBatch::try_new(
4679        schema.clone(),
4680        vec![
4681            Arc::new(StringArray::from(
4682                rows.iter()
4683                    .map(|row| row.message.session_id())
4684                    .collect::<Vec<_>>(),
4685            )),
4686            Arc::new(StringArray::from(
4687                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
4688            )),
4689            Arc::new(
4690                TimestampMicrosecondArray::from(
4691                    rows.iter()
4692                        .map(|row| micros(row.message.timestamp()))
4693                        .collect::<Vec<_>>(),
4694                )
4695                .with_timezone("UTC"),
4696            ),
4697            Arc::new(StringArray::from(
4698                rows.iter()
4699                    .map(|row| row.message.role().as_str())
4700                    .collect::<Vec<_>>(),
4701            )),
4702            Arc::new(StringArray::from(
4703                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
4704            )),
4705            Arc::new(StringArray::from(
4706                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
4707            )),
4708            Arc::new(StringArray::from(
4709                rows.iter()
4710                    .map(|row| row.message.system_content())
4711                    .collect::<Vec<_>>(),
4712            )),
4713            Arc::new(StringArray::from(
4714                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
4715            )),
4716            // `vector` / `embedding_model` are written null at ingest; every
4717            // message starts un-embedded and `pond optimize` fills them later
4718            // (spec.md#session-embed-from-canonical).
4719            new_null_array(&embedding_vector_type(), rows.len()),
4720            new_null_array(&DataType::Utf8, rows.len()),
4721            Arc::new(LargeBinaryArray::from_iter_values(
4722                options.iter().map(Vec::as_slice),
4723            )),
4724        ],
4725    )
4726    .context("failed to build message batch")
4727}
4728
4729pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
4730    let variant_data = parts
4731        .iter()
4732        .map(|part| part_variant_json(&part.kind))
4733        .collect::<Result<Vec<_>>>()?;
4734    let options = parts
4735        .iter()
4736        .map(|part| json_bytes(&part.options))
4737        .collect::<Result<Vec<_>>>()?;
4738    let mut cells = Vec::with_capacity(parts.len());
4739    // The blob column is a BinaryArray, exempt from the text-column bound
4740    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
4741    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
4742        let columns = [
4743            part.session_id.len(),
4744            part.message_id.len(),
4745            part.id.len(),
4746            part.kind.type_name().len(),
4747            part.provenance.as_str().len(),
4748            variant.len(),
4749            encoded.len(),
4750        ];
4751        for bytes in columns {
4752            guard_cell("parts", &part.id, bytes)?;
4753        }
4754        cells.push(columns.iter().sum());
4755    }
4756    chunk_ranges(&cells)
4757        .into_iter()
4758        .map(|range| {
4759            parts_chunk(
4760                &parts[range.clone()],
4761                &variant_data[range.clone()],
4762                &options[range],
4763            )
4764        })
4765        .collect()
4766}
4767
4768fn parts_chunk(
4769    parts: &[Part],
4770    variant_data: &[Vec<u8>],
4771    options: &[Vec<u8>],
4772) -> Result<RecordBatch> {
4773    let schema = part_schema();
4774    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
4775    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
4776    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
4777    let blob_payloads: Vec<Option<&[u8]>> = parts
4778        .iter()
4779        .map(|part| match &part.kind {
4780            PartKind::File { data, .. } => Some(match data {
4781                FileData::String(value) => value.as_bytes(),
4782                FileData::Bytes(value) => value.as_slice(),
4783                FileData::Url(value) => value.as_bytes(),
4784            }),
4785            PartKind::Text { .. }
4786            | PartKind::Reasoning { .. }
4787            | PartKind::ToolCall { .. }
4788            | PartKind::ToolResult { .. }
4789            | PartKind::ToolApprovalRequest { .. }
4790            | PartKind::ToolApprovalResponse { .. } => None,
4791        })
4792        .collect();
4793    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
4794
4795    RecordBatch::try_new(
4796        schema.clone(),
4797        vec![
4798            Arc::new(StringArray::from(
4799                parts
4800                    .iter()
4801                    .map(|part| part.session_id.as_str())
4802                    .collect::<Vec<_>>(),
4803            )),
4804            Arc::new(StringArray::from(
4805                parts
4806                    .iter()
4807                    .map(|part| part.message_id.as_str())
4808                    .collect::<Vec<_>>(),
4809            )),
4810            Arc::new(StringArray::from(
4811                parts
4812                    .iter()
4813                    .map(|part| part.id.as_str())
4814                    .collect::<Vec<_>>(),
4815            )),
4816            Arc::new(Int32Array::from(
4817                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
4818            )),
4819            Arc::new(StringArray::from(
4820                parts
4821                    .iter()
4822                    .map(|part| part.kind.type_name())
4823                    .collect::<Vec<_>>(),
4824            )),
4825            Arc::new(StringArray::from(
4826                parts
4827                    .iter()
4828                    .map(|part| part.provenance.as_str())
4829                    .collect::<Vec<_>>(),
4830            )),
4831            Arc::new(LargeBinaryArray::from_iter_values(
4832                variant_data.iter().map(Vec::as_slice),
4833            )),
4834            Arc::new(blob_array),
4835            Arc::new(LargeBinaryArray::from_iter_values(
4836                options.iter().map(Vec::as_slice),
4837            )),
4838        ],
4839    )
4840    .context("failed to build parts batch")
4841}
4842
4843pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
4844    Ok(Session {
4845        id: string(batch, "id", row)?.context("session id is null")?,
4846        parent_session_id: string(batch, "parent_session_id", row)?,
4847        parent_message_id: string(batch, "parent_message_id", row)?,
4848        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4849        created_at: datetime(batch, "created_at", row)?,
4850        project: crate::adapter::Extracted::from_stored(
4851            string(batch, "project", row)?.context("project is null")?,
4852        ),
4853        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
4854    })
4855}
4856
4857/// [`SkipOracle`](crate::adapter::SkipOracle) over the resident row-meta map:
4858/// `pond sync` reads each session's stored max message timestamp from memory, so
4859/// the staleness check costs zero S3 (the map is rebuilt from the store, so the
4860/// check stays deterministic with no local cursor). A `None` map (never
4861/// prewarmed, or the build failed) yields no watermark, so every source
4862/// re-reads - safe, just slower.
4863pub struct RowmapOracle(pub Option<Arc<RowMetaSet>>);
4864
4865impl crate::adapter::SkipOracle for RowmapOracle {
4866    fn session_max_ts(&self, session_id: &str) -> Option<i64> {
4867        self.0.as_ref()?.lookup_max_ts(session_id)
4868    }
4869
4870    fn is_empty(&self) -> bool {
4871        self.0.as_ref().is_none_or(|set| set.is_empty())
4872    }
4873}
4874
4875fn row_meta_entry(batch: &RecordBatch, row_id: u64, row: usize) -> Result<RowMetaEntry> {
4876    Ok(RowMetaEntry {
4877        row_id,
4878        session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4879        message_id: string(batch, "id", row)?.context("message id is null")?,
4880        role: string(batch, "role", row)?.context("role is null")?,
4881        project: string(batch, "project", row)?.context("project is null")?,
4882        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4883        timestamp_micros: datetime(batch, "timestamp", row)?.timestamp_micros(),
4884        search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4885    })
4886}
4887
4888pub(crate) fn message_meta_from_batch(batch: &RecordBatch, row: usize) -> Result<MessageMeta> {
4889    Ok(MessageMeta {
4890        message_id: string(batch, "id", row)?.context("id is null")?,
4891        session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4892        role: string(batch, "role", row)?.context("role is null")?,
4893        project: string(batch, "project", row)?.context("project is null")?,
4894        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4895        timestamp: datetime(batch, "timestamp", row)?,
4896        search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4897    })
4898}
4899
4900pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
4901    let id = string(batch, "id", row)?.context("message id is null")?;
4902    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
4903    let timestamp = datetime(batch, "timestamp", row)?;
4904    let options =
4905        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
4906
4907    match string(batch, "role", row)?
4908        .context("message role is null")?
4909        .as_str()
4910    {
4911        "system" => Ok(Message::System {
4912            id,
4913            session_id,
4914            timestamp,
4915            // `content` is nullable in the schema; preserve the distinction
4916            // between "no content row stored" (`None`) and "empty string
4917            // stored" (`Some(extracted_empty)`). The value originally
4918            // came from a `Source` extraction at ingest time; rewrap via
4919            // the storage-internal `from_stored` so the type-system seal
4920            // for adapters stays intact.
4921            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
4922            options,
4923        }),
4924        "user" => Ok(Message::User {
4925            id,
4926            session_id,
4927            timestamp,
4928            options,
4929        }),
4930        "assistant" => Ok(Message::Assistant {
4931            id,
4932            session_id,
4933            timestamp,
4934            options,
4935        }),
4936        "tool" => Ok(Message::Tool {
4937            id,
4938            session_id,
4939            timestamp,
4940            options,
4941        }),
4942        other => anyhow::bail!("unknown message role {other}"),
4943    }
4944}
4945
4946pub(crate) fn part_from_batch(
4947    batch: &RecordBatch,
4948    row: usize,
4949    file_data: Option<FileData>,
4950) -> Result<Part> {
4951    let type_name = string(batch, "type", row)?.context("part type is null")?;
4952    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
4953    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
4954    Ok(Part {
4955        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
4956        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
4957        id: string(batch, "id", row)?.context("part id is null")?,
4958        ordinal: int32(batch, "ordinal", row)?,
4959        provenance: provenance_from_str(&provenance)?,
4960        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
4961        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
4962    })
4963}
4964
4965fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
4966    match value {
4967        "conversational" => Ok(crate::wire::Provenance::Conversational),
4968        "injected" => Ok(crate::wire::Provenance::Injected),
4969        other => anyhow::bail!("unknown part provenance {other}"),
4970    }
4971}
4972
4973fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
4974    let kind = file_data_kind(variant_data)?;
4975    match kind.as_str() {
4976        "string" => {
4977            let text = std::str::from_utf8(bytes)
4978                .context("file string payload is not UTF-8")?
4979                .to_owned();
4980            Ok(FileData::String(text))
4981        }
4982        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
4983        "url" => Ok(FileData::Url(
4984            std::str::from_utf8(bytes)
4985                .context("file URL payload is not UTF-8")?
4986                .to_owned(),
4987        )),
4988        other => anyhow::bail!("unknown file data_kind {other}"),
4989    }
4990}
4991
4992fn file_data_kind(variant_data: &[u8]) -> Result<String> {
4993    let value = json_parse::<Value>(variant_data)?;
4994    value
4995        .get("data_kind")
4996        .and_then(Value::as_str)
4997        .map(str::to_owned)
4998        .context("file part variant_data missing data_kind")
4999}
5000
5001fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
5002    batch
5003        .column_by_name(name)
5004        .with_context(|| format!("missing column {name}"))?
5005        .as_any()
5006        .downcast_ref::<UInt64Array>()
5007        .with_context(|| format!("column {name} is not UInt64"))
5008}
5009
5010pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
5011    let array = batch
5012        .column_by_name(name)
5013        .with_context(|| format!("missing column {name}"))?
5014        .as_any()
5015        .downcast_ref::<StringArray>()
5016        .with_context(|| format!("column {name} is not Utf8"))?;
5017    if array.is_null(row) {
5018        Ok(None)
5019    } else {
5020        Ok(Some(array.value(row).to_owned()))
5021    }
5022}
5023
5024fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
5025    // Lance can return a `lance.json` column either as raw JSONB bytes
5026    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
5027    // LargeUtf8), depending on the read path. Handle both.
5028    let column = batch
5029        .column_by_name(name)
5030        .with_context(|| format!("missing column {name}"))?;
5031    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
5032        return if array.is_null(row) {
5033            Ok(None)
5034        } else {
5035            Ok(Some(
5036                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
5037            ))
5038        };
5039    }
5040    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
5041        return if array.is_null(row) {
5042            Ok(None)
5043        } else {
5044            Ok(Some(array.value(row).as_bytes().to_vec()))
5045        };
5046    }
5047    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
5048        return if array.is_null(row) {
5049            Ok(None)
5050        } else {
5051            Ok(Some(array.value(row).as_bytes().to_vec()))
5052        };
5053    }
5054    anyhow::bail!("column {name} is not a JSON-compatible array")
5055}
5056
5057fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
5058    let array = batch
5059        .column_by_name(name)
5060        .with_context(|| format!("missing column {name}"))?
5061        .as_any()
5062        .downcast_ref::<Int32Array>()
5063        .with_context(|| format!("column {name} is not Int32"))?;
5064    Ok(array.value(row))
5065}
5066
5067pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
5068    let array = batch
5069        .column_by_name(name)
5070        .with_context(|| format!("missing column {name}"))?
5071        .as_any()
5072        .downcast_ref::<Float32Array>()
5073        .with_context(|| format!("column {name} is not Float32"))?;
5074    Ok(array.value(row))
5075}
5076
5077pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
5078    let array = batch
5079        .column_by_name(name)
5080        .with_context(|| format!("missing column {name}"))?
5081        .as_any()
5082        .downcast_ref::<TimestampMicrosecondArray>()
5083        .with_context(|| format!("column {name} is not timestamp_micros"))?;
5084    Utc.timestamp_micros(array.value(row))
5085        .single()
5086        .context("timestamp is out of range")
5087}
5088
5089fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
5090    Field::new(name, data_type, nullable).with_metadata(
5091        [(
5092            "lance-schema:unenforced-primary-key".to_owned(),
5093            "true".to_owned(),
5094        )]
5095        .into(),
5096    )
5097}
5098
5099// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
5100// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
5101// is marked unstable in Lance docs (`format/file/versioning.md`) and at
5102// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
5103// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
5104// allocated only one column_info, so the decoder's second `expect_next()`
5105// fires `"there were more fields in the schema than provided column
5106// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
5107// handles correctly (covered by Lance's own `test_compact_blob_columns`).
5108fn legacy_blob_field(name: &str, nullable: bool) -> Field {
5109    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
5110        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
5111            .into_iter()
5112            .collect(),
5113    )
5114}
5115
5116fn json_field(name: &str, nullable: bool) -> Field {
5117    lance_arrow::json::json_field(name, nullable)
5118}
5119
5120fn micros(timestamp: DateTime<Utc>) -> i64 {
5121    timestamp.timestamp_micros()
5122}
5123
5124fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
5125    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
5126    // matches the `lance.json` extension contract. Lance's compact path
5127    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
5128    // applies `decode_json -> encode_json` on this column; with proper JSONB
5129    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
5130    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
5131    // `try_into_dfstream`; compact still goes through the adapter).
5132    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
5133    lance_arrow::json::encode_json(&text)
5134        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
5135}
5136
5137fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
5138    serde_json::from_slice(value).context("failed to parse JSON field")
5139}
5140
5141fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
5142    if let PartKind::File {
5143        media_type,
5144        file_name,
5145        data,
5146    } = kind
5147    {
5148        let data_kind = match data {
5149            FileData::String(_) => "string",
5150            FileData::Bytes(_) => "bytes",
5151            FileData::Url(_) => "url",
5152        };
5153        return json_bytes(&serde_json::json!({
5154            "media_type": media_type,
5155            "file_name": file_name,
5156            "data_kind": data_kind,
5157        }));
5158    }
5159    let value = serde_json::to_value(kind)?;
5160    let mut object = value
5161        .as_object()
5162        .cloned()
5163        .context("part variant did not serialize to an object")?;
5164    object.remove("type");
5165    json_bytes(&object)
5166}
5167
5168fn part_kind_from_json(
5169    type_name: &str,
5170    variant_data: &[u8],
5171    file_data: Option<FileData>,
5172) -> Result<PartKind> {
5173    let mut value = json_parse::<Value>(variant_data)?;
5174    let object = value
5175        .as_object_mut()
5176        .context("part variant data is not an object")?;
5177    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
5178    if let Some(data) = file_data {
5179        object.remove("data_kind");
5180        object.insert("data".to_owned(), serde_json::to_value(data)?);
5181    }
5182    serde_json::from_value(value).context("failed to parse part kind")
5183}
5184
5185#[cfg(test)]
5186mod tests {
5187    #![allow(clippy::expect_used, clippy::unwrap_used)]
5188
5189    use super::*;
5190    use crate::{
5191        adapter::Extracted,
5192        handlers::ingest_events,
5193        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
5194    };
5195    use chrono::Utc;
5196    use serde_json::json;
5197    use tempfile::TempDir;
5198
5199    fn synthetic_session(id: &str) -> Session {
5200        Session {
5201            id: id.to_owned(),
5202            parent_session_id: None,
5203            parent_message_id: None,
5204            source_agent: "claude-code".to_owned(),
5205            created_at: Utc::now(),
5206            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
5207            options: ProviderOptions::new(),
5208        }
5209    }
5210
5211    #[test]
5212    fn search_text_excludes_injected_parts() {
5213        use crate::wire::Provenance;
5214        let message = Message::User {
5215            id: "m1".to_owned(),
5216            session_id: "s1".to_owned(),
5217            timestamp: Utc::now(),
5218            options: ProviderOptions::new(),
5219        };
5220        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
5221            session_id: "s1".to_owned(),
5222            id: id.to_owned(),
5223            message_id: "m1".to_owned(),
5224            ordinal: 0,
5225            provenance,
5226            options: ProviderOptions::new(),
5227            kind: PartKind::Text {
5228                text: Some(Extracted::from_test_value(text.to_owned())),
5229            },
5230        };
5231
5232        // A conversational part contributes; an injected one is excluded
5233        // (spec.md#search).
5234        let conversational = search_text(
5235            &message,
5236            &[text_part(
5237                "p1",
5238                "real human prompt",
5239                Provenance::Conversational,
5240            )],
5241        );
5242        assert_eq!(conversational.as_deref(), Some("real human prompt"));
5243
5244        let injected = search_text(
5245            &message,
5246            &[text_part(
5247                "p2",
5248                "<task-notification>...</task-notification>",
5249                Provenance::Injected,
5250            )],
5251        );
5252        assert!(
5253            injected.is_none(),
5254            "a message whose only part is injected has null search_text"
5255        );
5256    }
5257
5258    #[test]
5259    fn chunk_ranges_splits_on_byte_budget() {
5260        assert!(chunk_ranges(&[]).is_empty());
5261        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
5262
5263        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
5264        assert_eq!(
5265            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
5266            vec![0..1, 1..2, 2..3],
5267        );
5268
5269        // An oversized single row gets its own chunk, never an infinite loop.
5270        assert_eq!(
5271            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
5272            vec![0..1, 1..2, 2..3],
5273        );
5274    }
5275
5276    #[tokio::test]
5277    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
5278        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
5279        // Message is dropped on the spot, with one Error outcome surfaced. The
5280        // rest of the substream continues normally - subsequent valid messages
5281        // and parts get written.
5282        let temp = TempDir::new()?;
5283        let store = Store::open_local(temp.path()).await?;
5284        let session = synthetic_session("ordering");
5285        let orphan_part = Part {
5286            session_id: session.id.clone(),
5287            id: "orphan-part".to_owned(),
5288            message_id: "missing-message".to_owned(),
5289            ordinal: 0,
5290            provenance: crate::wire::Provenance::Conversational,
5291            options: ProviderOptions::new(),
5292            kind: PartKind::Text {
5293                text: Some(Extracted::from_test_value("orphan".to_owned())),
5294            },
5295        };
5296        let valid_message = Message::User {
5297            id: "valid-message".to_owned(),
5298            session_id: session.id.clone(),
5299            timestamp: Utc::now(),
5300            options: ProviderOptions::new(),
5301        };
5302        let valid_part = Part {
5303            session_id: session.id.clone(),
5304            id: "valid-part".to_owned(),
5305            message_id: valid_message.id().to_owned(),
5306            ordinal: 0,
5307            provenance: crate::wire::Provenance::Conversational,
5308            options: ProviderOptions::new(),
5309            kind: PartKind::Text {
5310                text: Some(Extracted::from_test_value("kept".to_owned())),
5311            },
5312        };
5313
5314        let mut validator = IngestValidator::default();
5315        validator
5316            .push(&store, 0, IngestEvent::Session(session.clone()))
5317            .await?;
5318        let part_outcomes = validator
5319            .push(&store, 1, IngestEvent::Part(orphan_part))
5320            .await?;
5321        assert_eq!(part_outcomes.len(), 1);
5322        assert_eq!(part_outcomes[0].kind, "part");
5323        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
5324        assert!(
5325            part_outcomes[0]
5326                .error
5327                .as_ref()
5328                .map(|e| e.message.contains("part event appeared before a message"))
5329                .unwrap_or(false),
5330            "error message must explain the ordering violation: {part_outcomes:?}"
5331        );
5332        validator
5333            .push(&store, 2, IngestEvent::Message(valid_message))
5334            .await?;
5335        validator
5336            .push(&store, 3, IngestEvent::Part(valid_part))
5337            .await?;
5338        validator.finish(&store).await?;
5339
5340        let (sessions, messages, parts) = store.row_counts().await?;
5341        assert_eq!(sessions, 1, "session committed despite the orphan part");
5342        assert_eq!(messages, 1, "valid message committed");
5343        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
5344
5345        Ok(())
5346    }
5347
5348    #[tokio::test]
5349    async fn resident_meta_map_hydration_matches_take_rows_fallback() -> anyhow::Result<()> {
5350        // The resident meta map must hydrate hits identically to the take_rows
5351        // fallback - same fields, and the microsecond timestamp survives the
5352        // i64 round-trip through the mmap blob.
5353        let temp = TempDir::new()?;
5354        let store = Store::open_local(temp.path()).await?;
5355        let session = synthetic_session("hydration-parity");
5356
5357        let messages = [
5358            (
5359                "m1",
5360                "the auth refactor landed cleanly",
5361                1_700_000_000_123_456_i64,
5362            ),
5363            (
5364                "m2",
5365                "balance handler now retries on rpc timeout",
5366                1_700_000_050_654_321,
5367            ),
5368        ];
5369        let mut validator = IngestValidator::default();
5370        validator
5371            .push(&store, 0, IngestEvent::Session(session.clone()))
5372            .await?;
5373        let mut seq = 1;
5374        for (mid, text, micros) in messages {
5375            let message = Message::User {
5376                id: mid.to_owned(),
5377                session_id: session.id.clone(),
5378                timestamp: DateTime::from_timestamp_micros(micros).unwrap(),
5379                options: ProviderOptions::new(),
5380            };
5381            validator
5382                .push(&store, seq, IngestEvent::Message(message))
5383                .await?;
5384            seq += 1;
5385            let part = Part {
5386                session_id: session.id.clone(),
5387                id: format!("{mid}-p0"),
5388                message_id: mid.to_owned(),
5389                ordinal: 0,
5390                provenance: crate::wire::Provenance::Conversational,
5391                options: ProviderOptions::new(),
5392                kind: PartKind::Text {
5393                    text: Some(Extracted::from_test_value(text.to_owned())),
5394                },
5395            };
5396            validator.push(&store, seq, IngestEvent::Part(part)).await?;
5397            seq += 1;
5398        }
5399        validator.finish(&store).await?;
5400
5401        let rowids: Vec<u64> = store
5402            .collect_row_metas()
5403            .await?
5404            .into_iter()
5405            .map(|entry| entry.row_id)
5406            .collect();
5407        assert_eq!(rowids.len(), 2);
5408
5409        let sort_by_id = |mut metas: Vec<MessageMeta>| {
5410            metas.sort_by(|left, right| left.message_id.cmp(&right.message_id));
5411            metas
5412        };
5413
5414        let fallback = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5415
5416        // Build and install the resident meta map; the same call now hydrates
5417        // from memory (zero misses - the map covers the whole table).
5418        store.ensure_rowmap(&temp.path().join("cache")).await?;
5419        let resident = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5420
5421        assert_eq!(
5422            resident, fallback,
5423            "resident-map hydration must match the take_rows fallback"
5424        );
5425        assert_eq!(
5426            resident[0].timestamp.timestamp_micros(),
5427            1_700_000_000_123_456
5428        );
5429        Ok(())
5430    }
5431
5432    #[tokio::test]
5433    async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
5434        // `open` eagerly creates sessions/messages but `parts` is lazy, so a
5435        // configured-but-never-synced store reports uninitialized - the signal
5436        // `pond status` uses to render an empty state instead of
5437        // erroring on the first parts describe.
5438        let temp = TempDir::new()?;
5439        let store = Store::open_local(temp.path()).await?;
5440        assert!(
5441            !store.initialized().await?,
5442            "fresh store has no parts table"
5443        );
5444
5445        let session = synthetic_session("initialized-probe");
5446        let message = Message::User {
5447            id: "message-1".to_owned(),
5448            session_id: session.id.clone(),
5449            timestamp: Utc::now(),
5450            options: ProviderOptions::new(),
5451        };
5452        let part = Part {
5453            session_id: session.id.clone(),
5454            id: "part-1".to_owned(),
5455            message_id: message.id().to_owned(),
5456            ordinal: 0,
5457            provenance: crate::wire::Provenance::Conversational,
5458            options: ProviderOptions::new(),
5459            kind: PartKind::Text {
5460                text: Some(Extracted::from_test_value("hello".to_owned())),
5461            },
5462        };
5463        let mut validator = IngestValidator::default();
5464        validator
5465            .push(&store, 0, IngestEvent::Session(session))
5466            .await?;
5467        validator
5468            .push(&store, 1, IngestEvent::Message(message))
5469            .await?;
5470        validator.push(&store, 2, IngestEvent::Part(part)).await?;
5471        validator.finish(&store).await?;
5472
5473        assert!(store.initialized().await?, "ingest creates the parts table");
5474        Ok(())
5475    }
5476
5477    #[tokio::test]
5478    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
5479        // Per-event drop: a duplicate message id within a substream drops the
5480        // *duplicate* and surfaces an Error outcome for it. The first wins; the
5481        // session still commits.
5482        let temp = TempDir::new()?;
5483        let store = Store::open_local(temp.path()).await?;
5484        let session = synthetic_session("duplicate-message");
5485        let first = Message::User {
5486            id: "message-1".to_owned(),
5487            session_id: session.id.clone(),
5488            timestamp: Utc::now(),
5489            options: ProviderOptions::new(),
5490        };
5491        let second = Message::Assistant {
5492            id: "message-1".to_owned(),
5493            session_id: session.id.clone(),
5494            timestamp: Utc::now(),
5495            options: ProviderOptions::new(),
5496        };
5497
5498        let mut validator = IngestValidator::default();
5499        validator
5500            .push(&store, 0, IngestEvent::Session(session.clone()))
5501            .await?;
5502        validator
5503            .push(&store, 1, IngestEvent::Message(first))
5504            .await?;
5505        let dup_outcomes = validator
5506            .push(&store, 2, IngestEvent::Message(second))
5507            .await?;
5508        assert_eq!(dup_outcomes.len(), 1);
5509        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
5510        assert!(
5511            dup_outcomes[0]
5512                .error
5513                .as_ref()
5514                .map(|e| e.message.contains("duplicate message id message-1"))
5515                .unwrap_or(false),
5516            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
5517        );
5518
5519        validator.finish(&store).await?;
5520        let (sessions, messages, _) = store.row_counts().await?;
5521        assert_eq!(sessions, 1, "session committed");
5522        assert_eq!(messages, 1, "only the first message committed");
5523
5524        Ok(())
5525    }
5526
5527    #[tokio::test]
5528    async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
5529    -> anyhow::Result<()> {
5530        // spec.md#model-pond-options: `options.pond` is core-owned. A stored
5531        // message carries the process's host stamp (when resolvable) and never
5532        // a client-supplied value; session and part options stay untouched.
5533        let temp = TempDir::new()?;
5534        let store = Store::open_local(temp.path()).await?;
5535        let session = synthetic_session("host-provenance");
5536        let mut spoofed = ProviderOptions::new();
5537        spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
5538        let message = Message::User {
5539            id: "message-1".to_owned(),
5540            session_id: session.id.clone(),
5541            timestamp: Utc::now(),
5542            options: spoofed,
5543        };
5544        let part = Part {
5545            session_id: session.id.clone(),
5546            id: "part-1".to_owned(),
5547            message_id: "message-1".to_owned(),
5548            ordinal: 0,
5549            provenance: crate::wire::Provenance::Conversational,
5550            options: ProviderOptions::new(),
5551            kind: PartKind::Text {
5552                text: Some(Extracted::from_test_value("hello".to_owned())),
5553            },
5554        };
5555
5556        let mut validator = IngestValidator::default();
5557        validator
5558            .push(&store, 0, IngestEvent::Session(session.clone()))
5559            .await?;
5560        validator
5561            .push(&store, 1, IngestEvent::Message(message))
5562            .await?;
5563        validator.push(&store, 2, IngestEvent::Part(part)).await?;
5564        validator.finish(&store).await?;
5565
5566        let stored = store
5567            .get_session(&session.id)
5568            .await?
5569            .expect("ingested session is readable");
5570        assert!(
5571            !stored.session.options.contains_key("pond"),
5572            "session rows are not stamped (attribution derives from messages)"
5573        );
5574        let stored_message = &stored.messages[0].message;
5575        match ingest_host_stamp() {
5576            Some(stamp) => {
5577                assert_eq!(
5578                    stored_message.options().get("pond"),
5579                    Some(stamp),
5580                    "stored message carries the real stamp, never the spoof"
5581                );
5582                let host = stamp
5583                    .pointer("/ingest/host")
5584                    .and_then(Value::as_object)
5585                    .expect("stamp shape is {ingest: {host: {..}}}");
5586                assert!(!host.is_empty(), "an all-empty stamp must be None instead");
5587                assert!(
5588                    host.values()
5589                        .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
5590                    "stamp fields are omitted when unavailable, never empty: {host:?}"
5591                );
5592            }
5593            None => assert!(
5594                stored_message.options().get("pond").is_none(),
5595                "with no resolvable stamp the spoofed key is still stripped"
5596            ),
5597        }
5598        assert!(
5599            !stored.messages[0].parts[0].options.contains_key("pond"),
5600            "part rows are not stamped (covered by their message's stamp)"
5601        );
5602
5603        Ok(())
5604    }
5605
5606    /// Regression: compact_files on `parts` with the blob column tripped a
5607    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
5608    /// batches give compact fragments to merge; every `FileData` variant
5609    /// exercises the blob round-trip. All-File batches sidestep a debug-only
5610    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
5611    /// write batch mixes null + valid rows in the blob column - benign in
5612    /// release, irrelevant to this regression's scope.
5613    #[tokio::test(flavor = "multi_thread")]
5614    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
5615        use crate::wire::{FileData, PartKind, Provenance};
5616        let temp = TempDir::new()?;
5617        let store = Store::open_local(temp.path()).await?;
5618
5619        let session = synthetic_session("compact-blob");
5620        store
5621            .upsert_sessions(std::slice::from_ref(&session))
5622            .await?;
5623
5624        let make_part = |idx: usize, kind: PartKind| Part {
5625            session_id: session.id.clone(),
5626            message_id: format!("msg-{idx}"),
5627            id: format!("part-{idx}"),
5628            ordinal: 0,
5629            provenance: Provenance::Conversational,
5630            options: ProviderOptions::new(),
5631            kind,
5632        };
5633
5634        let batch_a = vec![
5635            make_part(
5636                0,
5637                PartKind::File {
5638                    media_type: Some("text/plain".to_owned()),
5639                    file_name: Some("a.txt".to_owned()),
5640                    data: FileData::Bytes(b"alpha".to_vec()),
5641                },
5642            ),
5643            make_part(
5644                1,
5645                PartKind::File {
5646                    media_type: Some("text/plain".to_owned()),
5647                    file_name: Some("b.txt".to_owned()),
5648                    data: FileData::String("beta".to_owned()),
5649                },
5650            ),
5651        ];
5652        store.upsert_parts(&batch_a).await?;
5653
5654        let batch_b = vec![
5655            make_part(
5656                2,
5657                PartKind::File {
5658                    media_type: Some("application/octet-stream".to_owned()),
5659                    file_name: None,
5660                    data: FileData::Url("https://example.com/file".to_owned()),
5661                },
5662            ),
5663            make_part(
5664                3,
5665                PartKind::File {
5666                    media_type: Some("image/png".to_owned()),
5667                    file_name: Some("c.png".to_owned()),
5668                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
5669                },
5670            ),
5671        ];
5672        store.upsert_parts(&batch_b).await?;
5673
5674        store
5675            .optimize_indices(None, &MaintenancePolicy::always_compact())
5676            .await?
5677            .into_result()?;
5678
5679        Ok(())
5680    }
5681
5682    #[tokio::test]
5683    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
5684        let temp = TempDir::new()?;
5685        let store = Store::open_local(temp.path()).await?;
5686        let session = synthetic_session("blob");
5687        let message = Message::User {
5688            id: "message-1".to_owned(),
5689            session_id: session.id.clone(),
5690            timestamp: Utc::now(),
5691            options: ProviderOptions::new(),
5692        };
5693        let part = Part {
5694            session_id: session.id.clone(),
5695            id: "part-1".to_owned(),
5696            message_id: message.id().to_owned(),
5697            ordinal: 0,
5698            provenance: crate::wire::Provenance::Conversational,
5699            options: ProviderOptions::new(),
5700            kind: PartKind::File {
5701                media_type: Some("text/plain".to_owned()),
5702                file_name: Some("payload.txt".to_owned()),
5703                data: FileData::Bytes(b"pond".to_vec()),
5704            },
5705        };
5706
5707        let mut validator = IngestValidator::default();
5708        validator
5709            .push(&store, 0, IngestEvent::Session(session.clone()))
5710            .await?;
5711        validator
5712            .push(&store, 1, IngestEvent::Message(message.clone()))
5713            .await?;
5714        validator
5715            .push(&store, 2, IngestEvent::Part(part.clone()))
5716            .await?;
5717        validator.finish(&store).await?;
5718
5719        let stored = store
5720            .get_session(&session.id)
5721            .await?
5722            .expect("session should exist");
5723        let stored_part = &stored.messages[0].parts[0];
5724        assert_eq!(stored_part, &part);
5725
5726        Ok(())
5727    }
5728
5729    //
5730    // `Session.source_agent` and `Session.project` are immutable
5731    // post-first-write because `messages` denormalizes them at first
5732    // ingest; a silent overwrite would desync the denormalized
5733    // copies. pond core's `IngestValidator` probes the existing session
5734    // before the merge_insert and emits a per-row `validation_failed`
5735    // outcome with the typed field name when either changes. Other Session
5736    // fields (options, parent_session_id, created_at, parent_message_id)
5737    // re-write idempotently via merge_insert.
5738
5739    fn base_session() -> Session {
5740        Session {
5741            id: "01HXY00000000001".to_owned(),
5742            parent_session_id: None,
5743            parent_message_id: None,
5744            source_agent: "claude-code".to_owned(),
5745            created_at: Utc::now(),
5746            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
5747            options: ProviderOptions::new(),
5748        }
5749    }
5750
5751    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
5752        outcomes
5753            .iter()
5754            .filter(|outcome| outcome.status == target)
5755            .count()
5756    }
5757
5758    #[tokio::test(flavor = "multi_thread")]
5759    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
5760    -> anyhow::Result<()> {
5761        let temp = TempDir::new()?;
5762        let store = Store::open_local(temp.path()).await?;
5763
5764        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5765        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
5766
5767        let mut again = base_session();
5768        again.options.insert("title".to_owned(), json!("renamed"));
5769        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
5770        assert_eq!(
5771            count_status(&second, OutcomeStatus::Error),
5772            0,
5773            "options is mutable; the re-ingest must not surface an error: {second:?}",
5774        );
5775        assert_eq!(
5776            count_status(&second, OutcomeStatus::Matched),
5777            1,
5778            "unchanged immutable fields must match-insert via merge_insert",
5779        );
5780
5781        Ok(())
5782    }
5783
5784    #[tokio::test(flavor = "multi_thread")]
5785    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
5786        let temp = TempDir::new()?;
5787        let store = Store::open_local(temp.path()).await?;
5788
5789        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5790        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5791
5792        let mut tampered = base_session();
5793        tampered.source_agent = "codex-cli".to_owned();
5794        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5795        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
5796        let err_row = second
5797            .iter()
5798            .find(|outcome| outcome.status == OutcomeStatus::Error)
5799            .expect("error outcome present");
5800        let err = err_row.error.as_ref().expect("error body present");
5801        assert_eq!(err.field, Some("source_agent"));
5802        assert_eq!(err.reason, Some("immutable"));
5803
5804        // The stored row stayed on the original adapter - no silent rewrite.
5805        let stored = store
5806            .get_session(&base_session().id)
5807            .await?
5808            .expect("session row survives the rejected re-ingest");
5809        assert_eq!(stored.session.source_agent, "claude-code");
5810
5811        Ok(())
5812    }
5813
5814    #[tokio::test(flavor = "multi_thread")]
5815    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
5816        let temp = TempDir::new()?;
5817        let store = Store::open_local(temp.path()).await?;
5818
5819        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5820        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5821
5822        let mut tampered = base_session();
5823        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
5824        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5825        let err_row = second
5826            .iter()
5827            .find(|outcome| outcome.status == OutcomeStatus::Error)
5828            .expect("project change must surface an error outcome");
5829        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
5830
5831        let stored = store
5832            .get_session(&base_session().id)
5833            .await?
5834            .expect("session row survives");
5835        assert_eq!(
5836            stored.session.project.as_str(),
5837            "/home/me/proj",
5838            "stored project must remain the original",
5839        );
5840
5841        Ok(())
5842    }
5843
5844    #[tokio::test(flavor = "multi_thread")]
5845    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
5846        // Regression guard: re-ingesting an existing session with NEW
5847        // messages must surface as sessions_inserted=0, messages_inserted_*>0
5848        // on `BatchCounts`, and per-row outcomes must mark the new message
5849        // rows `Inserted` while the session row is `Matched`. The prior
5850        // implementation derived all per-row statuses from the batch-level
5851        // session inserted count, which silently flipped the new messages
5852        // into `Matched` (visible as "up to date" in the CLI bar tail).
5853        use crate::wire::Provenance;
5854        let temp = TempDir::new()?;
5855        let store = Store::open_local(temp.path()).await?;
5856        let session = base_session();
5857
5858        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
5859            session_id: session.id.clone(),
5860            id: part_id.to_owned(),
5861            message_id: message_id.to_owned(),
5862            ordinal: 0,
5863            provenance: Provenance::Conversational,
5864            options: ProviderOptions::new(),
5865            kind: PartKind::Text {
5866                text: Some(Extracted::from_test_value(body.to_owned())),
5867            },
5868        };
5869        let user_message = |id: &str| Message::User {
5870            id: id.to_owned(),
5871            session_id: session.id.clone(),
5872            timestamp: Utc::now(),
5873            options: ProviderOptions::new(),
5874        };
5875
5876        // First pass: 2 messages land fresh.
5877        let mut validator = IngestValidator::default();
5878        validator
5879            .push(&store, 0, IngestEvent::Session(session.clone()))
5880            .await?;
5881        validator
5882            .push(&store, 1, IngestEvent::Message(user_message("m1")))
5883            .await?;
5884        validator
5885            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
5886            .await?;
5887        validator
5888            .push(&store, 3, IngestEvent::Message(user_message("m2")))
5889            .await?;
5890        validator
5891            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
5892            .await?;
5893        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
5894        assert_eq!(first_counts.sessions_inserted, 1);
5895        assert_eq!(first_counts.messages_inserted_total, 2);
5896        assert_eq!(first_counts.messages_inserted_searchable, 2);
5897
5898        // Second pass: same session id, 3 NEW messages.
5899        let mut validator = IngestValidator::default();
5900        validator
5901            .push(&store, 0, IngestEvent::Session(session.clone()))
5902            .await?;
5903        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
5904            let pid = format!("p{}", idx + 3);
5905            validator
5906                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
5907                .await?;
5908            validator
5909                .push(
5910                    &store,
5911                    idx * 2 + 2,
5912                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
5913                )
5914                .await?;
5915        }
5916        let (second_outcomes, second_counts) = validator.finish(&store).await?;
5917
5918        assert_eq!(
5919            second_counts.sessions_inserted, 0,
5920            "existing session row must report as Matched, not Inserted",
5921        );
5922        assert_eq!(second_counts.sessions_matched, 1);
5923        assert_eq!(
5924            second_counts.messages_inserted_total, 3,
5925            "the three NEW messages must register as Inserted in BatchCounts",
5926        );
5927        assert_eq!(
5928            second_counts.messages_inserted_searchable, 3,
5929            "all three new messages carry conversational text -> searchable",
5930        );
5931        assert_eq!(second_counts.messages_matched_total, 0);
5932        assert_eq!(second_counts.parts_inserted, 3);
5933        assert_eq!(second_counts.parts_matched, 0);
5934
5935        // Per-row outcomes mirror the BatchCounts shape: the session row is
5936        // Matched, every new message + part row is Inserted.
5937        let session_outcome = second_outcomes
5938            .iter()
5939            .find(|outcome| outcome.kind == "session")
5940            .expect("session-row outcome present");
5941        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
5942        for outcome in &second_outcomes {
5943            if outcome.kind == "message" || outcome.kind == "part" {
5944                assert_eq!(
5945                    outcome.status,
5946                    OutcomeStatus::Inserted,
5947                    "new row must be Inserted, got: {outcome:?}",
5948                );
5949            }
5950        }
5951        Ok(())
5952    }
5953
5954    /// Ingest `count` synthetic messages spread across a handful of sessions
5955    /// and projects, each with conversational `search_text`. Returns the store
5956    /// and the message keys in `msg-{i}` order; every `vector` starts null.
5957    async fn store_with_messages(
5958        temp: &TempDir,
5959        count: usize,
5960    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5961        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
5962    }
5963
5964    /// Same as [`store_with_messages`] but tests optimize with a custom
5965    /// IVF_SQ activation threshold.
5966    async fn store_with_messages_at_threshold(
5967        temp: &TempDir,
5968        count: usize,
5969        _vector_threshold: usize,
5970    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5971        let store = Store::open_local(temp.path()).await?;
5972        let sessions = 8.min(count.max(1));
5973        let mut events = Vec::new();
5974        for s in 0..sessions {
5975            events.push(IngestEvent::Session(Session {
5976                id: format!("session-{s}"),
5977                parent_session_id: None,
5978                parent_message_id: None,
5979                source_agent: "claude-code".to_owned(),
5980                created_at: Utc::now(),
5981                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
5982                options: ProviderOptions::new(),
5983            }));
5984            for i in (s..count).step_by(sessions) {
5985                let message_id = format!("msg-{i}");
5986                events.push(IngestEvent::Message(Message::User {
5987                    id: message_id.clone(),
5988                    session_id: format!("session-{s}"),
5989                    timestamp: Utc::now(),
5990                    options: ProviderOptions::new(),
5991                }));
5992                events.push(IngestEvent::Part(Part {
5993                    session_id: format!("session-{s}"),
5994                    id: format!("{message_id}-part"),
5995                    message_id,
5996                    ordinal: 0,
5997                    provenance: crate::wire::Provenance::Conversational,
5998                    options: ProviderOptions::new(),
5999                    kind: PartKind::Text {
6000                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
6001                    },
6002                }));
6003            }
6004        }
6005        ingest_events(&store, events).await?;
6006        let keys = (0..count)
6007            .map(|i| MessageKey {
6008                session_id: format!("session-{}", i % sessions),
6009                message_id: format!("msg-{i}"),
6010            })
6011            .collect();
6012        Ok((store, keys))
6013    }
6014
6015    /// A deterministic pseudo-random vector of the production dimension.
6016    fn synthetic_vector(seed: usize) -> Vec<f32> {
6017        let mut state = (seed as u64)
6018            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
6019            .wrapping_add(1);
6020        (0..embedding_dim())
6021            .map(|_| {
6022                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
6023                #[allow(clippy::cast_precision_loss)]
6024                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
6025                unit - 1.0
6026            })
6027            .collect()
6028    }
6029
6030    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
6031    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
6032        keys.iter()
6033            .enumerate()
6034            .map(|(seed, key)| EmbeddedMessage {
6035                session_id: key.session_id.clone(),
6036                id: key.message_id.clone(),
6037                vector: synthetic_vector(seed),
6038            })
6039            .collect()
6040    }
6041
6042    fn embedding_update_batch_with_model(
6043        rows: &[EmbeddedMessage],
6044        model: &str,
6045    ) -> Result<RecordBatch> {
6046        let mut batch = embedding_update_batch(rows)?;
6047        let columns = batch
6048            .columns()
6049            .iter()
6050            .take(3)
6051            .cloned()
6052            .chain(std::iter::once(
6053                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
6054            ))
6055            .collect::<Vec<_>>();
6056        batch = RecordBatch::try_new(batch.schema(), columns)?;
6057        Ok(batch)
6058    }
6059
6060    #[tokio::test]
6061    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
6062        let temp = TempDir::new()?;
6063        // 4 messages cycle session-0..session-3, so `session-3` is a real
6064        // partition. Scalar-index pushdown is volume-independent: the planner
6065        // emits `ScalarIndexQuery` whenever the index exists.
6066        let (store, keys) = store_with_messages(&temp, 4).await?;
6067        store.write_embeddings(&embedded(&keys)).await?;
6068        store
6069            .optimize_indices(None, &MaintenancePolicy::always_compact())
6070            .await?
6071            .into_result()?;
6072
6073        let query = vec![0.01_f32; embedding_dim()];
6074        let plan = store
6075            .explain_vector_plan(
6076                &query,
6077                10,
6078                &Predicate::Eq("session_id", "session-3".into()),
6079                None,
6080            )
6081            .await?;
6082
6083        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
6084        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
6085        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
6086        // and unrelated.)
6087        assert!(
6088            plan.contains("ScalarIndexQuery"),
6089            "expected a ScalarIndexQuery node in the plan:\n{plan}",
6090        );
6091        let predicate_postfiltered = plan
6092            .lines()
6093            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
6094        assert!(
6095            !predicate_postfiltered,
6096            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
6097        );
6098        Ok(())
6099    }
6100
6101    #[tokio::test]
6102    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
6103        let temp = TempDir::new()?;
6104        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6105
6106        // First batch: 255 vectors, one below threshold. Optimize does not
6107        // create the IVF_SQ because the trigger is not met.
6108        store.write_embeddings(&embedded(&keys[..255])).await?;
6109        store
6110            .optimize_indices_with_vector_threshold(256)
6111            .await?
6112            .into_result()?;
6113        assert!(
6114            !store
6115                .handle
6116                .messages_index_names()
6117                .await?
6118                .iter()
6119                .any(|name| name == MESSAGES_VECTOR_INDEX),
6120            "IVF_SQ must not exist below the activation threshold",
6121        );
6122
6123        // Next batch: one more vector. Total reaches 256; optimize creates
6124        // the IVF_SQ.
6125        store.write_embeddings(&embedded(&keys[255..256])).await?;
6126        store
6127            .optimize_indices_with_vector_threshold(256)
6128            .await?
6129            .into_result()?;
6130        assert!(
6131            store
6132                .handle
6133                .messages_index_names()
6134                .await?
6135                .iter()
6136                .any(|name| name == MESSAGES_VECTOR_INDEX),
6137            "optimize must create the IVF_SQ once the threshold is crossed",
6138        );
6139
6140        // The remaining 44 rows stay un-embedded; the IVF_SQ trains over the
6141        // non-null subset and a planted vector is retrievable.
6142        let hits = store
6143            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
6144            .await?;
6145        assert!(
6146            hits.iter().any(|hit| hit.key == keys[0]),
6147            "an embedded row is retrievable via the index",
6148        );
6149        Ok(())
6150    }
6151
6152    #[tokio::test]
6153    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
6154    {
6155        let temp = TempDir::new()?;
6156        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6157        let old_rows = embedded(&keys);
6158        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
6159        store
6160            .handle
6161            .merge_update(Table::Messages, old_batch, old_rows.len())
6162            .await?;
6163        store
6164            .optimize_indices_with_vector_threshold(256)
6165            .await?
6166            .into_result()?;
6167        assert!(
6168            store
6169                .handle
6170                .messages_index_names()
6171                .await?
6172                .iter()
6173                .any(|name| name == MESSAGES_VECTOR_INDEX),
6174            "IVF_SQ must exist before a model swap",
6175        );
6176        assert_eq!(store.stale_embedding_count().await?, keys.len());
6177
6178        store.drop_vector_index().await?;
6179        let mut pending = Vec::new();
6180        let stream = store.pending_or_stale_messages();
6181        tokio::pin!(stream);
6182        while let Some(row) = stream.next().await {
6183            pending.push(row?);
6184        }
6185        assert_eq!(
6186            pending.len(),
6187            keys.len(),
6188            "force stream should see stale rows"
6189        );
6190        store.write_embeddings(&embedded(&keys)).await?;
6191        assert_eq!(store.stale_embedding_count().await?, 0);
6192        store
6193            .optimize_indices_with_vector_threshold(256)
6194            .await?
6195            .into_result()?;
6196        assert!(
6197            store
6198                .handle
6199                .messages_index_names()
6200                .await?
6201                .iter()
6202                .any(|name| name == MESSAGES_VECTOR_INDEX),
6203            "optimize must rebuild IVF_SQ after force re-embed",
6204        );
6205
6206        let stream = store.pending_or_stale_messages();
6207        tokio::pin!(stream);
6208        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
6209        Ok(())
6210    }
6211
6212    #[tokio::test]
6213    async fn session_last_message_ids_come_from_durable_messages() -> anyhow::Result<()> {
6214        let temp = TempDir::new()?;
6215        let store = Store::open_local(temp.path()).await?;
6216        let session = synthetic_session("oracle");
6217        store
6218            .upsert_sessions(std::slice::from_ref(&session))
6219            .await?;
6220        let timestamp =
6221            chrono::DateTime::from_timestamp(1_700_000_000, 0).expect("valid timestamp");
6222        let message_a = Message::User {
6223            id: "oracle-a".to_owned(),
6224            session_id: session.id.clone(),
6225            timestamp,
6226            options: ProviderOptions::new(),
6227        };
6228        let message_b = Message::User {
6229            id: "oracle-b".to_owned(),
6230            session_id: session.id.clone(),
6231            timestamp,
6232            options: ProviderOptions::new(),
6233        };
6234        store
6235            .upsert_messages(
6236                &session,
6237                &[
6238                    MessageWrite {
6239                        message: &message_a,
6240                        parts: &[],
6241                        search_text: Some("a"),
6242                    },
6243                    MessageWrite {
6244                        message: &message_b,
6245                        parts: &[],
6246                        search_text: Some("b"),
6247                    },
6248                ],
6249            )
6250            .await?;
6251
6252        let empty_session = synthetic_session("session-row-only");
6253        store.upsert_sessions(&[empty_session]).await?;
6254
6255        // Orphan: messages committed but the session row never was (the crash
6256        // window `upsert_session_batch`'s write order can leave). The gate must
6257        // NOT key on it, so the source re-ingests and heals the missing row.
6258        let orphan = synthetic_session("messages-no-row");
6259        let orphan_message = Message::User {
6260            id: "orphan-a".to_owned(),
6261            session_id: orphan.id.clone(),
6262            timestamp,
6263            options: ProviderOptions::new(),
6264        };
6265        store
6266            .upsert_messages(
6267                &orphan,
6268                &[MessageWrite {
6269                    message: &orphan_message,
6270                    parts: &[],
6271                    search_text: Some("a"),
6272                }],
6273            )
6274            .await?;
6275
6276        let map = store.session_last_message_ids().await?;
6277        assert_eq!(map.get("oracle").map(String::as_str), Some("oracle-b"));
6278        assert!(
6279            !map.contains_key("session-row-only"),
6280            "a session row without durable messages must not produce a freshness key",
6281        );
6282        assert!(
6283            !map.contains_key("messages-no-row"),
6284            "messages without a durable session row must not produce a freshness key",
6285        );
6286        Ok(())
6287    }
6288
6289    #[tokio::test]
6290    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
6291        let temp = TempDir::new()?;
6292        let (store, keys) = store_with_messages(&temp, 10).await?;
6293
6294        let before = store.embedding_progress().await?;
6295        assert_eq!(before.embedded, 0);
6296        assert_eq!(before.total, 10);
6297        assert_eq!(before.backlog, 10);
6298        assert_eq!(before.model, crate::embed::model_id());
6299
6300        store.write_embeddings(&embedded(&keys[..4])).await?;
6301        let partial = store.embedding_progress().await?;
6302        assert_eq!(partial.embedded, 4);
6303        assert_eq!(partial.total, 10);
6304        assert_eq!(partial.backlog, 6);
6305
6306        store.write_embeddings(&embedded(&keys[4..])).await?;
6307        let full = store.embedding_progress().await?;
6308        assert_eq!(full.embedded, 10);
6309        assert_eq!(full.total, 10);
6310        // The pending signal is the live un-embedded count and matches the
6311        // authoritative backlog - never derived from FTS num_docs.
6312        assert_eq!(full.backlog, 0);
6313        assert_eq!(full.backlog, store.embed_backlog_count().await?);
6314        Ok(())
6315    }
6316
6317    #[tokio::test]
6318    async fn ensure_rowmap_layers_a_delta_on_new_ingest() -> anyhow::Result<()> {
6319        let temp = TempDir::new()?;
6320        let (store, _keys) = store_with_messages(&temp, 6).await?;
6321        let cache = temp.path().join("cache");
6322
6323        store.ensure_rowmap(&cache).await?;
6324        assert_eq!(
6325            store.rowmap_delta_count(),
6326            Some(0),
6327            "first build is a lone base"
6328        );
6329
6330        // A new session's message bumps the version with a fresh fragment.
6331        ingest_events(
6332            &store,
6333            vec![
6334                IngestEvent::Session(Session {
6335                    id: "session-new".to_owned(),
6336                    parent_session_id: None,
6337                    parent_message_id: None,
6338                    source_agent: "claude-code".to_owned(),
6339                    created_at: Utc::now(),
6340                    project: Extracted::from_test_value("/proj/new".to_owned()),
6341                    options: ProviderOptions::new(),
6342                }),
6343                IngestEvent::Message(Message::User {
6344                    id: "m-new".to_owned(),
6345                    session_id: "session-new".to_owned(),
6346                    timestamp: Utc::now(),
6347                    options: ProviderOptions::new(),
6348                }),
6349                IngestEvent::Part(Part {
6350                    session_id: "session-new".to_owned(),
6351                    id: "m-new-part".to_owned(),
6352                    message_id: "m-new".to_owned(),
6353                    ordinal: 0,
6354                    provenance: crate::wire::Provenance::Conversational,
6355                    options: ProviderOptions::new(),
6356                    kind: PartKind::Text {
6357                        text: Some(Extracted::from_test_value("brand new message".to_owned())),
6358                    },
6359                }),
6360            ],
6361        )
6362        .await?;
6363
6364        // The refresh scans only the new fragment and layers a delta - not a
6365        // full rebuild.
6366        store.ensure_rowmap(&cache).await?;
6367        assert_eq!(
6368            store.rowmap_delta_count(),
6369            Some(1),
6370            "new ingest layered a delta"
6371        );
6372
6373        // The new session's count is served from the chain (base + delta sum).
6374        let counts = store
6375            .session_message_counts(&["session-new".to_owned()])
6376            .await?;
6377        assert_eq!(counts.get("session-new").copied(), Some(1));
6378        Ok(())
6379    }
6380
6381    /// Regression for the v0.10.0 sync death-spiral: the 1h cleanup retention can
6382    /// reclaim the dataset version the on-disk chain was last built at. The delta
6383    /// extender's `checkout_version(base)` then errored, the error nuked
6384    /// `ensure_rowmap`, and the oracle silently fell back to re-reading every
6385    /// source on every sync forever (the chain never advanced past the reclaimed
6386    /// base). A reclaimed base must degrade to a full rebuild, like compaction.
6387    #[tokio::test]
6388    async fn ensure_rowmap_rebuilds_when_base_manifest_reclaimed() -> anyhow::Result<()> {
6389        let temp = TempDir::new()?;
6390        let (store, _keys) = store_with_messages(&temp, 6).await?;
6391        let cache = temp.path().join("cache");
6392
6393        // Build the chain at the current version, then snapshot the manifests
6394        // that exist at-or-below it - these are exactly what cleanup reclaims.
6395        store.ensure_rowmap(&cache).await?;
6396        assert_eq!(store.rowmap_delta_count(), Some(0), "first build is a base");
6397        let base_version = store.messages_version().await?;
6398        let versions_dir = temp.path().join("messages.lance").join("_versions");
6399        let base_manifests: Vec<_> = std::fs::read_dir(&versions_dir)?
6400            .filter_map(|entry| entry.ok().map(|entry| entry.path()))
6401            .filter(|path| path.extension().is_some_and(|ext| ext == "manifest"))
6402            .collect();
6403        assert!(
6404            !base_manifests.is_empty(),
6405            "the base version has a manifest"
6406        );
6407
6408        // A new session bumps the version, so the on-disk chain now trails the
6409        // dataset and a refresh would normally delta from `base_version`.
6410        ingest_events(
6411            &store,
6412            vec![
6413                IngestEvent::Session(Session {
6414                    id: "session-after".to_owned(),
6415                    parent_session_id: None,
6416                    parent_message_id: None,
6417                    source_agent: "claude-code".to_owned(),
6418                    created_at: Utc::now(),
6419                    project: Extracted::from_test_value("/proj/after".to_owned()),
6420                    options: ProviderOptions::new(),
6421                }),
6422                IngestEvent::Message(Message::User {
6423                    id: "m-after".to_owned(),
6424                    session_id: "session-after".to_owned(),
6425                    timestamp: Utc::now(),
6426                    options: ProviderOptions::new(),
6427                }),
6428                IngestEvent::Part(Part {
6429                    session_id: "session-after".to_owned(),
6430                    id: "m-after-part".to_owned(),
6431                    message_id: "m-after".to_owned(),
6432                    ordinal: 0,
6433                    provenance: crate::wire::Provenance::Conversational,
6434                    options: ProviderOptions::new(),
6435                    kind: PartKind::Text {
6436                        text: Some(Extracted::from_test_value("after the base".to_owned())),
6437                    },
6438                }),
6439            ],
6440        )
6441        .await?;
6442        assert!(
6443            store.messages_version().await? > base_version,
6444            "the new ingest advanced the dataset past the chain's base"
6445        );
6446
6447        // Reclaim the base version's manifest exactly as `cleanup_old_versions`
6448        // would: `checkout_version(base_version)` can no longer resolve.
6449        for manifest in &base_manifests {
6450            std::fs::remove_file(manifest)?;
6451        }
6452
6453        // A fresh Store finds the trailing chain on disk, tries to delta from the
6454        // reclaimed base, and must fall back to a full rebuild - Ok, not Err.
6455        let reopened = Store::open_local(temp.path()).await?;
6456        reopened.ensure_rowmap(&cache).await?;
6457        assert!(
6458            reopened.rowmap_snapshot().is_some(),
6459            "map rebuilt after the base manifest was reclaimed"
6460        );
6461        assert_eq!(
6462            reopened.rowmap_delta_count(),
6463            Some(0),
6464            "a reclaimed base forces a fresh full-scan base, not a stuck chain"
6465        );
6466
6467        // The rebuilt base covers the post-base ingest, so the oracle is whole.
6468        let counts = reopened
6469            .session_message_counts(&["session-after".to_owned()])
6470            .await?;
6471        assert_eq!(counts.get("session-after").copied(), Some(1));
6472        Ok(())
6473    }
6474
6475    /// The steady-state hot path: embedding rewrites the message fragments every
6476    /// sync (merge_update on the `vector` column). Keying the delta off fragment
6477    /// identity made that rewrite force a full 2.1M-row rebuild every sync.
6478    /// Stable row ids preserve row_ids across the rewrite, so the refresh must
6479    /// layer a cheap append-only delta of just the new rows - and must NOT
6480    /// double-count the rewritten rows that still live in the base.
6481    #[tokio::test]
6482    async fn ensure_rowmap_deltas_across_embedding_fragment_rewrite() -> anyhow::Result<()> {
6483        let temp = TempDir::new()?;
6484        let (store, keys) = store_with_messages(&temp, 6).await?;
6485        let cache = temp.path().join("cache");
6486        store.ensure_rowmap(&cache).await?;
6487        assert_eq!(store.rowmap_delta_count(), Some(0), "first build is a base");
6488
6489        // Embedding rewrites every message fragment (new fragment ids, same
6490        // stable row_ids, untouched ROW_META columns).
6491        store.write_embeddings(&embedded(&keys)).await?;
6492
6493        // A new session appends one row on top of the rewritten fragments.
6494        ingest_events(
6495            &store,
6496            vec![
6497                IngestEvent::Session(Session {
6498                    id: "session-after".to_owned(),
6499                    parent_session_id: None,
6500                    parent_message_id: None,
6501                    source_agent: "claude-code".to_owned(),
6502                    created_at: Utc::now(),
6503                    project: Extracted::from_test_value("/proj/after".to_owned()),
6504                    options: ProviderOptions::new(),
6505                }),
6506                IngestEvent::Message(Message::User {
6507                    id: "m-after".to_owned(),
6508                    session_id: "session-after".to_owned(),
6509                    timestamp: Utc::now(),
6510                    options: ProviderOptions::new(),
6511                }),
6512                IngestEvent::Part(Part {
6513                    session_id: "session-after".to_owned(),
6514                    id: "m-after-part".to_owned(),
6515                    message_id: "m-after".to_owned(),
6516                    ordinal: 0,
6517                    provenance: crate::wire::Provenance::Conversational,
6518                    options: ProviderOptions::new(),
6519                    kind: PartKind::Text {
6520                        text: Some(Extracted::from_test_value("after embedding".to_owned())),
6521                    },
6522                }),
6523            ],
6524        )
6525        .await?;
6526
6527        // The refresh layers a delta of just the appended row, not a full
6528        // rebuild - despite every prior fragment having been rewritten.
6529        store.ensure_rowmap(&cache).await?;
6530        assert_eq!(
6531            store.rowmap_delta_count(),
6532            Some(1),
6533            "fragment rewrite + append must layer a delta, not full-rebuild"
6534        );
6535
6536        // Counts stay honest: the rewritten base rows are not re-emitted into the
6537        // delta, so nothing is double-counted across base + delta segments.
6538        let counts = store
6539            .session_message_counts(&["session-after".to_owned(), "session-0".to_owned()])
6540            .await?;
6541        assert_eq!(counts.get("session-after").copied(), Some(1));
6542        assert_eq!(
6543            counts.get("session-0").copied(),
6544            Some(1),
6545            "a base row survived the rewrite without being double-counted"
6546        );
6547        Ok(())
6548    }
6549
6550    #[tokio::test]
6551    async fn rowmap_chain_compacts_and_stays_bounded() -> anyhow::Result<()> {
6552        // Many version bumps (the remote-writers case) must not grow the chain
6553        // unboundedly: deltas cap at MAX, then compact into a fresh base.
6554        let temp = TempDir::new()?;
6555        let (store, _keys) = store_with_messages(&temp, 4).await?;
6556        let cache = temp.path().join("cache");
6557        store.ensure_rowmap(&cache).await?;
6558
6559        let mut reached_cap = false;
6560        let mut compacted = false;
6561        for i in 0..(Store::MAX_ROWMAP_DELTAS + 2) {
6562            let session = format!("session-x{i}");
6563            ingest_events(
6564                &store,
6565                vec![
6566                    IngestEvent::Session(Session {
6567                        id: session.clone(),
6568                        parent_session_id: None,
6569                        parent_message_id: None,
6570                        source_agent: "claude-code".to_owned(),
6571                        created_at: Utc::now(),
6572                        project: Extracted::from_test_value("/proj/x".to_owned()),
6573                        options: ProviderOptions::new(),
6574                    }),
6575                    IngestEvent::Message(Message::User {
6576                        id: format!("mx{i}"),
6577                        session_id: session.clone(),
6578                        timestamp: Utc::now(),
6579                        options: ProviderOptions::new(),
6580                    }),
6581                    IngestEvent::Part(Part {
6582                        session_id: session.clone(),
6583                        id: format!("mx{i}-part"),
6584                        message_id: format!("mx{i}"),
6585                        ordinal: 0,
6586                        provenance: crate::wire::Provenance::Conversational,
6587                        options: ProviderOptions::new(),
6588                        kind: PartKind::Text {
6589                            text: Some(Extracted::from_test_value(format!("msg {i}"))),
6590                        },
6591                    }),
6592                ],
6593            )
6594            .await?;
6595            store.ensure_rowmap(&cache).await?;
6596            let deltas = store.rowmap_delta_count().unwrap();
6597            assert!(
6598                deltas <= Store::MAX_ROWMAP_DELTAS,
6599                "delta count {deltas} exceeded the cap",
6600            );
6601            if deltas == Store::MAX_ROWMAP_DELTAS {
6602                reached_cap = true;
6603            }
6604            if reached_cap && deltas < Store::MAX_ROWMAP_DELTAS {
6605                compacted = true;
6606            }
6607        }
6608        assert!(reached_cap, "deltas accumulated to the cap (append path)");
6609        assert!(compacted, "the chain compacted back into a base");
6610
6611        // Files stay bounded and no build temps leak.
6612        let mut rmm = 0;
6613        for entry in std::fs::read_dir(&cache)? {
6614            let name = entry?.file_name().into_string().unwrap_or_default();
6615            assert!(!name.contains(".tmp-"), "leaked build temp: {name}");
6616            if name.ends_with(".rmm") {
6617                rmm += 1;
6618            }
6619        }
6620        assert!(
6621            rmm <= Store::MAX_ROWMAP_DELTAS + 1,
6622            "files unbounded: {rmm}"
6623        );
6624        Ok(())
6625    }
6626
6627    #[tokio::test]
6628    async fn embed_backlog_count_tracks_eligible_unembedded_rows() -> anyhow::Result<()> {
6629        let temp = TempDir::new()?;
6630        let (store, keys) = store_with_messages(&temp, 10).await?;
6631
6632        // Read straight from the dataset (no FTS index here), so it is correct
6633        // right after ingest - the case that lagged `embedding_progress`.
6634        assert_eq!(store.embed_backlog_count().await?, 10);
6635
6636        store.write_embeddings(&embedded(&keys[..4])).await?;
6637        assert_eq!(store.embed_backlog_count().await?, 6);
6638
6639        store.write_embeddings(&embedded(&keys[4..])).await?;
6640        assert_eq!(store.embed_backlog_count().await?, 0);
6641        Ok(())
6642    }
6643
6644    #[tokio::test]
6645    async fn session_message_counts_returns_per_session_counts_with_zeros_for_unknown_sessions()
6646    -> anyhow::Result<()> {
6647        // store_with_messages stripes `count` messages across 8 sessions
6648        // round-robin. 32 messages -> 4 per session, 0..8 deterministic.
6649        let temp = TempDir::new()?;
6650        let (store, _keys) = store_with_messages(&temp, 32).await?;
6651
6652        let mut requested: Vec<String> = (0..8).map(|s| format!("session-{s}")).collect();
6653        requested.push("session-unknown-a".to_owned());
6654        requested.push("session-unknown-b".to_owned());
6655        let counts = store.session_message_counts(&requested).await?;
6656
6657        // Map has an entry for every requested id (the contract): known
6658        // sessions hit 4, unknown sessions sit at 0.
6659        assert_eq!(counts.len(), requested.len());
6660        for s in 0..8 {
6661            assert_eq!(
6662                counts.get(&format!("session-{s}")).copied(),
6663                Some(4),
6664                "session-{s} should have 4 messages",
6665            );
6666        }
6667        assert_eq!(counts.get("session-unknown-a").copied(), Some(0));
6668        assert_eq!(counts.get("session-unknown-b").copied(), Some(0));
6669
6670        // Empty input is the documented zero-path.
6671        let empty = store.session_message_counts(&[]).await?;
6672        assert!(empty.is_empty());
6673        Ok(())
6674    }
6675}