Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, OptimizeProgressFn,
28        PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table, TableOptimizeOutcome, TableSizes,
29        VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{
32        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33        SessionFrom,
34    },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40    handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45    pub sessions: usize,
46    pub messages: usize,
47    pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52    pub sessions: u64,
53    pub messages: u64,
54    pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59    pub rows: LanceArchiveCounts,
60    pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65    pub rows: LanceArchiveCounts,
66    pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71    pub sessions: Vec<IndexIntent>,
72    pub messages: Vec<IndexIntent>,
73    pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78        [
79            (Table::Sessions, &self.sessions),
80            (Table::Messages, &self.messages),
81            (Table::Parts, &self.parts),
82        ]
83    }
84}
85
86/// A message awaiting embedding: its primary key plus the `search_text` to
87/// embed. The vector lives on the same `messages` row, so no denormalized
88/// filter columns are needed (spec.md#session-embed-from-canonical).
89#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91    pub session_id: String,
92    pub id: String,
93    pub search_text: String,
94}
95
96/// One embedded message: a primary key and the vector to store. `pond embed`
97/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
98#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100    pub session_id: String,
101    pub id: String,
102    pub vector: Vec<f32>,
103}
104
105/// Message metadata used to hydrate search hits after retriever ranking.
106#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108    pub message_id: String,
109    pub session_id: String,
110    pub role: String,
111    pub project: String,
112    pub source_agent: String,
113    pub timestamp: DateTime<Utc>,
114    pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119    pub session_id: String,
120    pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125    Inserted,
126    Matched,
127}
128
129/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
130/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
131/// results so the CLI can render compaction-skipped (under writer contention)
132/// distinctly from index-build failure (real problem).
133#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135    pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139    /// True if any table's indices phase reported a non-conflict failure.
140    /// `SkippedConflict` is expected under contention and does not count.
141    pub fn any_indices_failed(&self) -> bool {
142        self.tables.iter().any(|t| t.indices.is_failed())
143    }
144
145    /// Treat any `Failed` phase as an error. Tests that don't run under
146    /// contention use this to keep their existing `.await?` style: a real
147    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
148    pub fn into_result(self) -> Result<Self> {
149        for table in &self.tables {
150            if let PhaseOutcome::Failed(error) = &table.indices {
151                anyhow::bail!(
152                    "indices phase failed on {}: {error:#}",
153                    table.table.as_str()
154                );
155            }
156            if let PhaseOutcome::Failed(error) = &table.compaction {
157                anyhow::bail!(
158                    "compaction phase failed on {}: {error:#}",
159                    table.table.as_str()
160                );
161            }
162        }
163        Ok(self)
164    }
165}
166
167/// Default manifest-retention window for `pond index optimize`'s explicit
168/// cleanup pass. Matches LanceDB's recommended OSS-operator practice
169/// (lancedb docs: performance.mdx, tables/update.mdx). Note: with
170/// `delete_unverified=false` (default), Lance protects files newer than
171/// 7 days regardless of this value (`UNVERIFIED_THRESHOLD_DAYS` in
172/// lance/dataset/cleanup.rs).
173pub fn default_cleanup_older_than() -> chrono::Duration {
174    chrono::Duration::days(1)
175}
176
177/// Cleanup parameters for the compaction phase of `pond index optimize`.
178/// `delete_unverified=true` overrides Lance's 7-day in-progress safety
179/// guard - required to reclaim files younger than 7 days, unsafe under
180/// concurrent writers.
181#[derive(Debug, Clone, Copy)]
182pub struct CleanupConfig {
183    pub older_than: chrono::Duration,
184    pub delete_unverified: bool,
185}
186
187impl Default for CleanupConfig {
188    fn default() -> Self {
189        Self {
190            older_than: default_cleanup_older_than(),
191            delete_unverified: false,
192        }
193    }
194}
195
196/// What `pond status` reports: where the data lives, total rows per table,
197/// and a per-(adapter, project) breakdown built from one `messages` scan.
198#[derive(Debug, Clone)]
199pub struct CorpusStats {
200    pub data_url: Url,
201    pub totals: RowTotals,
202    /// One entry per adapter present in the corpus. When `include_subagents`
203    /// is false (the CLI default), sub-agent rows (`source_agent` containing
204    /// `/`) are filtered out so only the main-agent sessions appear. When
205    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
206    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
207    /// this into registry order at render time so the tree matches the
208    /// discovery picker.
209    pub adapters: Vec<AdapterStats>,
210    /// Whether the rollup includes sub-agent sessions. The renderer prints a
211    /// hint about `--include-subagents` when this is false so users know the
212    /// `totals` row above counts sessions that aren't broken down below.
213    pub include_subagents: bool,
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub struct RowTotals {
218    pub sessions: u64,
219    pub messages: u64,
220    pub parts: u64,
221}
222
223/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
224/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
225/// without `search_text` produce no vector. `embedded` is the subset of those
226/// already carrying a vector under the current [`embed::model_id()`]. The pending
227/// backlog is `total - embedded`.
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub struct EmbeddingProgress {
230    pub embedded: usize,
231    pub total: usize,
232    pub model: &'static str,
233}
234
235#[derive(Debug, Clone)]
236pub struct AdapterStats {
237    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
238    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
239    /// `include_subagents` is on.
240    pub adapter: String,
241    pub sessions: u64,
242    pub messages: u64,
243    /// Projects under this adapter, sorted by message count desc, then by
244    /// project name asc.
245    pub projects: Vec<ProjectStats>,
246}
247
248#[derive(Debug, Clone)]
249pub struct ProjectStats {
250    pub project: String,
251    pub sessions: u64,
252    pub messages: u64,
253}
254
255#[derive(Default)]
256struct GroupAccumulator {
257    messages: u64,
258    session_ids: HashSet<String>,
259}
260
261#[derive(Debug, Clone, Copy)]
262pub struct MessageWrite<'a> {
263    pub message: &'a Message,
264    pub parts: &'a [Part],
265    pub search_text: Option<&'a str>,
266}
267
268impl Store {
269    /// Open against a local filesystem URL or a remote one for which the
270    /// caller has no extra options to pass (env vars suffice). CLI verbs
271    /// that load `[storage]` from config should call
272    /// [`Store::open_with_options`] instead so the same options flow into
273    /// every dataset open and write.
274    pub async fn open(location: &Url) -> Result<Self> {
275        Ok(Self {
276            handle: Handle::open(location).await?,
277        })
278    }
279
280    /// Open with object-store options (S3 creds, region, endpoint, ...)
281    /// threaded through Lance verbatim. Keys are the standard `object_store`
282    /// config names; pond does not parse them. Empty options + default caps
283    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
284    /// config block via [`crate::substrate::RuntimeCaps`].
285    pub async fn open_with_options(
286        location: &Url,
287        storage_options: std::collections::HashMap<String, String>,
288        caps: crate::substrate::RuntimeCaps,
289    ) -> Result<Self> {
290        Ok(Self {
291            handle: Handle::open_with_options(location, storage_options, caps).await?,
292        })
293    }
294
295    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
296    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
297    /// through [`Store::open_with_options`] so the production policy is
298    /// applied; tests get the backend-aware local-FS defaults.
299    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
300        let url = config::url_for_path(path)?;
301        Self::open_with_options(
302            &url,
303            std::collections::HashMap::new(),
304            crate::substrate::RuntimeCaps::default(),
305        )
306        .await
307    }
308
309    /// Export clean, index-free Lance datasets into `dest`.
310    ///
311    /// This rewrites the visible rows of each table instead of copying the
312    /// dataset roots. The resulting manifests therefore contain no references
313    /// to the source store's `_indices`, while `messages.vector` and
314    /// `messages.embedding_model` remain ordinary data columns and are
315    /// preserved.
316    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
317        std::fs::create_dir_all(dest)
318            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
319        let (sessions, sessions_version) = self
320            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
321            .await?;
322        let (messages, messages_version) = self
323            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
324            .await?;
325        let (parts, parts_version) = self
326            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
327            .await?;
328        Ok(LanceArchiveExport {
329            rows: LanceArchiveCounts {
330                sessions,
331                messages,
332                parts,
333            },
334            source_versions: LanceArchiveVersions {
335                sessions: sessions_version,
336                messages: messages_version,
337                parts: parts_version,
338            },
339        })
340    }
341
342    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
343        let sessions_dataset =
344            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
345        let messages_dataset =
346            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
347        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
348        let (sessions, sessions_inserted) = self
349            .import_clean_table(Table::Sessions, sessions_dataset)
350            .await?;
351        let (messages, messages_inserted) = self
352            .import_clean_table(Table::Messages, messages_dataset)
353            .await?;
354        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
355        Ok(LanceArchiveImport {
356            rows: LanceArchiveCounts {
357                sessions,
358                messages,
359                parts,
360            },
361            inserted: LanceArchiveCounts {
362                sessions: sessions_inserted,
363                messages: messages_inserted,
364                parts: parts_inserted,
365            },
366        })
367    }
368
369    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
370        let dataset = self.handle.dataset(table).await?;
371        let source_version = dataset.version_id();
372        let schema = export_schema(table);
373        let mut stream = dataset
374            .scan()
375            .try_into_stream()
376            .await
377            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
378        let dest_uri = dest
379            .to_str()
380            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
381
382        let mut rows = 0usize;
383        let mut wrote = false;
384        while let Some(batch) = stream.next().await {
385            let batch = batch
386                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
387            rows += batch.num_rows();
388            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
389            let mut params = write_params_for_create();
390            if wrote {
391                params.mode = WriteMode::Append;
392            }
393            Dataset::write(reader, dest_uri, Some(params))
394                .await
395                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
396            wrote = true;
397        }
398
399        if !wrote {
400            let batch = RecordBatch::new_empty(schema.clone());
401            let reader = RecordBatchIterator::new([Ok(batch)], schema);
402            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
403                .await
404                .with_context(|| {
405                    format!("failed to write empty {} archive table", table.as_str())
406                })?;
407        }
408        Ok((rows, source_version))
409    }
410
411    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
412        let mut stream = dataset
413            .scan()
414            .try_into_stream()
415            .await
416            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
417        let mut rows = 0usize;
418        let mut inserted = 0usize;
419        while let Some(batch) = stream.next().await {
420            let batch = batch
421                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
422            let row_count = batch.num_rows();
423            rows += row_count;
424            inserted += self
425                .handle
426                .merge_insert(table, batch, row_count)
427                .await
428                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
429                as usize;
430        }
431        Ok((rows, inserted))
432    }
433
434    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
435        if sessions.is_empty() {
436            return Ok(Vec::new());
437        }
438        let batches = sessions_batches(sessions)?;
439        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
440        // Per-row outcomes; no fold here (see `pond index optimize`).
441        Ok(statuses_from_inserted(sessions.len(), inserted))
442    }
443
444    /// Batched write path used by the adapter ingest loop and by the wire
445    /// handler's final flush. Receives N completed substreams from the
446    /// validator and:
447    ///
448    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
449    ///      per session, sequentially. Sessions that fail produce one Error
450    ///      outcome and are excluded from the write batch.
451    ///   2. Deduplicates in-batch at the substream level: when two substreams
452    ///      in the same batch share a `session_id` (Claude Code's subagent
453    ///      files reuse their parent's id), the first occurrence wins. The
454    ///      second is either *merged* (same `source_agent` + `project`:
455    ///      messages/parts append, no duplicate rows) or *rejected*
456    ///      (different `project` - the subagent-vs-parent case). Row-level
457    ///      duplicates that slip past here are caught downstream by Lance's
458    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
459    ///      (invariant 17): this layer's job is preserving substream merge
460    ///      semantics, not policing the PK uniqueness Lance handles itself.
461    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
462    ///      parts) across every valid substream.
463    ///   4. Fires the three `merge_insert` calls in parallel via
464    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
465    ///      independent, so these proceed concurrently.
466    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
467    async fn upsert_session_batch(
468        &self,
469        substreams: Vec<CompletedSubstream>,
470    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
471        if substreams.is_empty() {
472            return Ok((Vec::new(), BatchCounts::default()));
473        }
474
475        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
476        let mut counts = BatchCounts::default();
477
478        // In-batch dedup. First occurrence of each session_id wins; later
479        // occurrences either merge or get rejected. Iteration order preserves
480        // original substream order so outcomes index correctly.
481        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
482        let mut by_session_id: std::collections::HashMap<String, usize> =
483            std::collections::HashMap::with_capacity(substreams.len());
484        for substream in substreams {
485            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
486                let existing = &merged[existing_idx];
487                if existing.session.source_agent != substream.session.source_agent
488                    || existing.session.project != substream.session.project
489                {
490                    // Subagent-vs-parent class. The first occurrence's
491                    // metadata stays authoritative; this substream is
492                    // rejected on the same immutable-field axis as the
493                    // storage-side check.
494                    let reason = if existing.session.source_agent != substream.session.source_agent
495                    {
496                        IngestError::ImmutableField {
497                            field: "source_agent",
498                            session_id: substream.session.id.clone(),
499                            stored: existing.session.source_agent.clone(),
500                            attempted: substream.session.source_agent.clone(),
501                        }
502                    } else {
503                        IngestError::ImmutableField {
504                            field: "project",
505                            session_id: substream.session.id.clone(),
506                            stored: (*existing.session.project).clone(),
507                            attempted: (*substream.session.project).clone(),
508                        }
509                    };
510                    let field = match &reason {
511                        IngestError::ImmutableField { field, .. } => Some(*field),
512                    };
513                    let reason_key = match field {
514                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
515                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
516                        _ => DROP_REASON_UNCATEGORIZED,
517                    };
518                    outcomes.extend(error_outcomes_for_substream(
519                        substream.session_index,
520                        &substream.session,
521                        &substream.messages,
522                        reason.to_string(),
523                        field,
524                        reason_key,
525                    ));
526                    continue;
527                }
528                // Same session, same metadata: merge messages. Dedup message
529                // ids defensively (within one batch, the validator's seen
530                // sets are per-substream so cross-substream dups can happen
531                // legally if both files re-emit the same row).
532                let existing = &mut merged[existing_idx];
533                let mut seen: std::collections::HashSet<String> = existing
534                    .messages
535                    .iter()
536                    .map(|m| m.message.id().to_owned())
537                    .collect();
538                for msg in substream.messages {
539                    if seen.insert(msg.message.id().to_owned()) {
540                        existing.messages.push(msg);
541                    }
542                }
543                continue;
544            }
545            by_session_id.insert(substream.session.id.clone(), merged.len());
546            merged.push(substream);
547        }
548
549        // Pre-existence sweep: one scan per table keyed on the batch's
550        // session_ids, capped at the substream count. Replaces the prior
551        // N-sequential `find_session` calls and gives us honest per-row
552        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
553        let session_id_values: Vec<ScalarValue> = merged
554            .iter()
555            .map(|substream| ScalarValue::String(substream.session.id.clone()))
556            .collect();
557        let existing_sessions: std::collections::HashMap<String, Session> =
558            if session_id_values.is_empty() {
559                std::collections::HashMap::new()
560            } else {
561                let batch = self
562                    .handle
563                    .scan_batch(
564                        Table::Sessions,
565                        Some(&Predicate::In("id", session_id_values.clone())),
566                        &[],
567                    )
568                    .await?;
569                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
570                for row in 0..batch.num_rows() {
571                    let session = session_from_batch(&batch, row)?;
572                    map.insert(session.id.clone(), session);
573                }
574                map
575            };
576        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
577            HashSet::new()
578        } else {
579            let batch = self
580                .handle
581                .scan_batch(
582                    Table::Messages,
583                    Some(&Predicate::In("session_id", session_id_values.clone())),
584                    &["session_id", "id"],
585                )
586                .await?;
587            let mut set = HashSet::with_capacity(batch.num_rows());
588            for row in 0..batch.num_rows() {
589                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
590                let mid = string(&batch, "id", row)?.context("message id is null")?;
591                set.insert((sid, mid));
592            }
593            set
594        };
595        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
596            HashSet::new()
597        } else {
598            let batch = self
599                .handle
600                .scan_batch(
601                    Table::Parts,
602                    Some(&Predicate::In("session_id", session_id_values)),
603                    &["session_id", "message_id", "id"],
604                )
605                .await?;
606            let mut set = HashSet::with_capacity(batch.num_rows());
607            for row in 0..batch.num_rows() {
608                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
609                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
610                let pid = string(&batch, "id", row)?.context("part id is null")?;
611                set.insert((sid, mid, pid));
612            }
613            set
614        };
615
616        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
617        for substream in merged {
618            if let Some(existing) = existing_sessions.get(&substream.session.id)
619                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
620            {
621                let field = match &failure {
622                    IngestError::ImmutableField { field, .. } => Some(*field),
623                };
624                let reason_key = match field {
625                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
626                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
627                    _ => DROP_REASON_UNCATEGORIZED,
628                };
629                outcomes.extend(error_outcomes_for_substream(
630                    substream.session_index,
631                    &substream.session,
632                    &substream.messages,
633                    failure.to_string(),
634                    field,
635                    reason_key,
636                ));
637                continue;
638            }
639            writeable.push(substream);
640        }
641
642        if writeable.is_empty() {
643            outcomes.sort_by_key(|outcome| outcome.index);
644            return Ok((outcomes, counts));
645        }
646
647        let sessions_owned: Vec<Session> = writeable
648            .iter()
649            .map(|substream| substream.session.clone())
650            .collect();
651        let message_rows: Vec<MessageBatchRow<'_>> = writeable
652            .iter()
653            .flat_map(|substream| {
654                substream.messages.iter().map(|buffered| MessageBatchRow {
655                    message: &buffered.message,
656                    source_agent: &substream.session.source_agent,
657                    project: &substream.session.project,
658                    search_text: buffered.search_text.as_deref(),
659                })
660            })
661            .collect();
662        let part_rows: Vec<Part> = writeable
663            .iter()
664            .flat_map(|substream| {
665                substream.messages.iter().flat_map(|buffered| {
666                    buffered
667                        .parts
668                        .iter()
669                        .map(|buffered_part| buffered_part.part.clone())
670                })
671            })
672            .collect();
673
674        let session_batches = sessions_batches(&sessions_owned)?;
675        let message_batches = messages_batches(&message_rows)?;
676        let part_batches = parts_batches(&part_rows)?;
677
678        // Merge_insert returns a batch-level inserted count which we cross-
679        // check against our pre-existence sets, but for per-row truth we
680        // attribute through the sets themselves (next loop). Under
681        // single-writer the two agree exactly; under a hostile concurrent
682        // writer the sets are authoritative for THIS request's wire shape -
683        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
684        // distinction informational, not behavioral.
685        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
686            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
687            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
688            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
689        )?;
690
691        for substream in &writeable {
692            outcomes.extend(success_outcomes_for_substream(
693                substream.session_index,
694                &substream.session,
695                &substream.messages,
696                &existing_sessions,
697                &existing_message_pks,
698                &existing_part_pks,
699                &mut counts,
700            ));
701        }
702
703        outcomes.sort_by_key(|outcome| outcome.index);
704        Ok((outcomes, counts))
705    }
706
707    pub async fn upsert_messages(
708        &self,
709        session: &Session,
710        messages: &[MessageWrite<'_>],
711    ) -> Result<Vec<UpsertStatus>> {
712        if messages.is_empty() {
713            return Ok(Vec::new());
714        }
715
716        let rows = messages
717            .iter()
718            .map(|write| MessageBatchRow {
719                message: write.message,
720                source_agent: &session.source_agent,
721                project: &session.project,
722                search_text: write.search_text,
723            })
724            .collect::<Vec<_>>();
725        let batches = messages_batches(&rows)?;
726        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
727        Ok(statuses_from_inserted(messages.len(), inserted))
728    }
729
730    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
731        if parts.is_empty() {
732            return Ok(Vec::new());
733        }
734        let batches = parts_batches(parts)?;
735        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
736        Ok(statuses_from_inserted(parts.len(), inserted))
737    }
738
739    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
740        let Some(session) = self.find_session(session_id).await? else {
741            return Ok(None);
742        };
743        let messages = self.messages_for_session(session_id).await?;
744        Ok(Some(SessionWithMessages { session, messages }))
745    }
746
747    /// Every session id currently in the store, unsorted.
748    pub async fn session_ids(&self) -> Result<Vec<String>> {
749        let batch = self
750            .handle
751            .scan_batch(Table::Sessions, None, &["id"])
752            .await?;
753        let mut ids = Vec::with_capacity(batch.num_rows());
754        for row in 0..batch.num_rows() {
755            if let Some(id) = string(&batch, "id", row)? {
756                ids.push(id);
757            }
758        }
759        Ok(ids)
760    }
761
762    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
763        let batch = self
764            .handle
765            .scan_batch(
766                Table::Sessions,
767                Some(&Predicate::Eq(
768                    "parent_session_id",
769                    parent_session_id.into(),
770                )),
771                &[
772                    "id",
773                    "parent_session_id",
774                    "parent_message_id",
775                    "source_agent",
776                    "created_at",
777                    "project",
778                    "options",
779                ],
780            )
781            .await?;
782        let mut sessions = Vec::with_capacity(batch.num_rows());
783        for row in 0..batch.num_rows() {
784            sessions.push(session_from_batch(&batch, row)?);
785        }
786        sessions.sort_by(|left, right| left.id.cmp(&right.id));
787        Ok(sessions)
788    }
789
790    /// `session_id -> wall-clock time of the Lance manifest version that
791    /// last wrote the row` for the per-session staleness skip
792    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
793    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
794    /// and joins it against `Dataset::versions()` for commit timestamps.
795    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
796        use lance::deps::arrow_array::UInt64Array;
797
798        let dataset = self.handle.dataset(Table::Sessions).await?;
799        let version_list = dataset.versions().await?;
800        let versions: HashMap<u64, DateTime<Utc>> = version_list
801            .iter()
802            .map(|v| (v.version, v.timestamp))
803            .collect();
804        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
805        // pruned versions from the manifest list, leaving rows whose
806        // `_row_last_updated_at_version` points at a version that no longer
807        // resolves. Those rows are still real and were ingested at some time
808        // <= the oldest still-visible version's commit timestamp - so falling
809        // back to that bound preserves a sound `mtime <= ingested` upper edge
810        // and keeps the staleness skip working after cleanup.
811        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
812
813        let scanner = self
814            .handle
815            .scan(
816                Table::Sessions,
817                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
818            )
819            .await?;
820        let mut stream = scanner.try_into_stream().await?;
821        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
822        while let Some(batch) = stream.next().await {
823            let batch = batch?;
824            let version_array = batch
825                .column_by_name("_row_last_updated_at_version")
826                .context("missing _row_last_updated_at_version column")?
827                .as_any()
828                .downcast_ref::<UInt64Array>()
829                .context("_row_last_updated_at_version is not UInt64")?;
830            for row in 0..batch.num_rows() {
831                let Some(id) = string(&batch, "id", row)? else {
832                    continue;
833                };
834                if version_array.is_null(row) {
835                    continue;
836                }
837                let version = version_array.value(row);
838                let ts = versions.get(&version).copied().or(oldest_visible_ts);
839                if let Some(ts) = ts {
840                    out.insert(id, ts);
841                }
842            }
843        }
844        Ok(out)
845    }
846
847    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
848    /// Conversational filters to `search_text IS NOT NULL`; Complete and
849    /// Verbatim scan every message. Every mode attaches compact part summaries;
850    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
851    /// lower bound (a message id); the page is bounded by `limit` and a byte
852    /// budget and never cuts mid-message.
853    pub async fn session_view(
854        &self,
855        session_id: &str,
856        params: SessionViewParams<'_>,
857    ) -> Result<GetLookup<SessionPage>> {
858        let Some(session) = self.find_session(session_id).await? else {
859            return Ok(GetLookup::NotFound);
860        };
861
862        let mut rows = match params.mode {
863            ResponseMode::Conversational => self
864                .scan_conversational_messages(session_id)
865                .await?
866                .into_iter()
867                .map(|row| ScanRow {
868                    id: row.message_id,
869                    role: row.role,
870                    timestamp: row.timestamp,
871                    text: Some(row.text.into_inner()),
872                    content: None,
873                })
874                .collect(),
875            ResponseMode::Complete | ResponseMode::Verbatim => {
876                self.scan_all_messages(session_id).await?
877            }
878        };
879        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
880
881        let start_at = match params.after_id {
882            // Append-only stream: a real anchor never vanishes, so an unknown
883            // `after_id` is a stale/mistyped client cursor, not "start over".
884            Some(after) => match rows.iter().position(|row| row.id == after) {
885                Some(idx) => idx + 1,
886                None => return Ok(GetLookup::UnknownAfterId),
887            },
888            None => 0,
889        };
890        let remaining = rows.get(start_at..).unwrap_or(&[]);
891        let (emitted, messages_remaining) = match params.session_from {
892            SessionFrom::Start => {
893                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
894                    row.text.as_deref().map_or(0, str::len)
895                });
896                (&remaining[..n], remaining.len() - n)
897            }
898            // Tail: the newest messages that fit `limit` and the byte budget,
899            // dropping oldest first; the newest is always kept and the page
900            // stays chronological so the agent reads the flow forward.
901            SessionFrom::End => {
902                let mut bytes = 0usize;
903                let mut start = remaining.len();
904                for row in remaining.iter().rev() {
905                    if remaining.len() - start >= params.limit {
906                        break;
907                    }
908                    let size = row.text.as_deref().map_or(0, str::len);
909                    if start < remaining.len() && bytes + size > params.budget_bytes {
910                        break;
911                    }
912                    bytes += size;
913                    start -= 1;
914                }
915                (&remaining[start..], start)
916            }
917        };
918        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
919
920        // Conversational/Complete only summarize parts; Verbatim inlines every
921        // part (blobs included).
922        let mut parts_by_message = match params.mode {
923            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
924            ResponseMode::Conversational | ResponseMode::Complete => {
925                self.summary_parts_for_messages(session_id, &ids).await?
926            }
927        };
928        let messages = emitted
929            .iter()
930            .map(|row| RetrievedMessage {
931                id: row.id.clone(),
932                role: row.role,
933                timestamp: row.timestamp,
934                text: row.text.clone(),
935                content: row.content.clone(),
936                parts: parts_by_message
937                    .remove(&(session_id.to_owned(), row.id.clone()))
938                    .unwrap_or_default(),
939            })
940            .collect();
941
942        Ok(GetLookup::Found(SessionPage {
943            session,
944            messages,
945            messages_remaining,
946        }))
947    }
948
949    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
950    /// the target with its full parts (paginated by `after_id` over part
951    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
952    /// `None` when no stored message carries `message_id`. Sibling parts are
953    /// carried for summarizing; the target's parts ride `target_parts`.
954    pub async fn message_view(
955        &self,
956        message_id: &str,
957        params: MessageViewParams<'_>,
958    ) -> Result<GetLookup<MessagePage>> {
959        let Some(session_id) = self.session_id_for_message(message_id).await? else {
960            return Ok(GetLookup::NotFound);
961        };
962        let Some(session) = self.find_session(&session_id).await? else {
963            return Ok(GetLookup::NotFound);
964        };
965        let mut rows = self.scan_all_messages(&session_id).await?;
966        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
967        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
968            return Ok(GetLookup::NotFound);
969        };
970
971        let start = target_pos.saturating_sub(params.context_depth);
972        let end = (target_pos + params.context_depth + 1).min(rows.len());
973        let window = &rows[start..end];
974        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
975        // The target's full parts (blobs included) ride the response; siblings
976        // are only summarized, but they share this one window scan.
977        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
978
979        let all_parts = parts_by_message
980            .remove(&(session_id.clone(), message_id.to_owned()))
981            .unwrap_or_default();
982        let start_part = match params.after_id {
983            // Exclusive over ordinals: parts are ordinal-sorted, so the first
984            // part past the anchor's ordinal is the page start. An anchor absent
985            // from the target's parts is a stale/mistyped client cursor.
986            Some(after) => match all_parts.iter().find(|part| part.id == after) {
987                Some(anchor) => all_parts
988                    .iter()
989                    .position(|part| part.ordinal > anchor.ordinal)
990                    .unwrap_or(all_parts.len()),
991                None => return Ok(GetLookup::UnknownAfterId),
992            },
993            None => 0,
994        };
995        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
996        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
997            serde_json::to_string(part).map_or(0, |json| json.len())
998        });
999        let target_parts = remaining_parts[..part_count].to_vec();
1000        let target_parts_remaining = remaining_parts.len() - part_count;
1001
1002        let target_row = &rows[target_pos];
1003        let target = RetrievedMessage {
1004            id: target_row.id.clone(),
1005            role: target_row.role,
1006            timestamp: target_row.timestamp,
1007            text: target_row.text.clone(),
1008            content: target_row.content.clone(),
1009            // Target structure is carried in full by `target_parts`.
1010            parts: Vec::new(),
1011        };
1012        let siblings = window
1013            .iter()
1014            .enumerate()
1015            .filter(|(idx, _)| start + idx != target_pos)
1016            .map(|(_, row)| RetrievedMessage {
1017                id: row.id.clone(),
1018                role: row.role,
1019                timestamp: row.timestamp,
1020                text: row.text.clone(),
1021                content: row.content.clone(),
1022                parts: parts_by_message
1023                    .get(&(session_id.clone(), row.id.clone()))
1024                    .cloned()
1025                    .unwrap_or_default(),
1026            })
1027            .collect();
1028
1029        Ok(GetLookup::Found(MessagePage {
1030            session,
1031            target,
1032            target_parts,
1033            target_parts_remaining,
1034            siblings,
1035        }))
1036    }
1037
1038    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1039        let batch = self
1040            .handle
1041            .scan_batch(
1042                Table::Messages,
1043                Some(&Predicate::Eq("session_id", session_id.into())),
1044                &["id", "timestamp", "role", "search_text", "content"],
1045            )
1046            .await?;
1047        let mut rows = Vec::with_capacity(batch.num_rows());
1048        for row in 0..batch.num_rows() {
1049            let id = string(&batch, "id", row)?.context("message id is null")?;
1050            let role =
1051                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1052            let timestamp = datetime(&batch, "timestamp", row)?;
1053            rows.push(ScanRow {
1054                id,
1055                role,
1056                timestamp,
1057                text: string(&batch, "search_text", row)?,
1058                content: string(&batch, "content", row)?,
1059            });
1060        }
1061        Ok(rows)
1062    }
1063
1064    /// Conversational scan over one session: rows ordered by
1065    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1066    /// read seam (spec.md#search-prefilter-pushdown).
1067    pub async fn scan_conversational_messages(
1068        &self,
1069        session_id: &str,
1070    ) -> Result<Vec<ConversationalRow>> {
1071        let filter = Predicate::And(vec![
1072            Predicate::Eq("session_id", session_id.into()),
1073            Predicate::IsNotNull("search_text"),
1074        ]);
1075        let batch = self
1076            .handle
1077            .scan_batch(
1078                Table::Messages,
1079                Some(&filter),
1080                &["id", "timestamp", "role", "search_text"],
1081            )
1082            .await?;
1083
1084        let mut rows = Vec::with_capacity(batch.num_rows());
1085        for row in 0..batch.num_rows() {
1086            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1087            let role =
1088                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1089            let timestamp = datetime(&batch, "timestamp", row)?;
1090            let text_str = string(&batch, "search_text", row)?.context(
1091                "search_text null after IsNotNull pushdown - storage invariant violated",
1092            )?;
1093            rows.push(ConversationalRow {
1094                session_id: session_id.to_owned(),
1095                message_id,
1096                role,
1097                timestamp,
1098                text: SearchText(text_str),
1099            });
1100        }
1101        rows.sort_by(|a, b| {
1102            a.timestamp
1103                .cmp(&b.timestamp)
1104                .then_with(|| a.message_id.cmp(&b.message_id))
1105        });
1106        Ok(rows)
1107    }
1108
1109    /// Locate the session id for a stored message. Cheap when only the routing
1110    /// hint is needed - callers that need the messages use `scan_all_messages`.
1111    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1112        let batch = self
1113            .handle
1114            .scan_batch(
1115                Table::Messages,
1116                Some(&Predicate::Eq("id", message_id.into())),
1117                &["session_id"],
1118            )
1119            .await?;
1120        if batch.num_rows() == 0 {
1121            return Ok(None);
1122        }
1123        string(&batch, "session_id", 0)
1124    }
1125
1126    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1127        self.handle.row_counts().await
1128    }
1129
1130    /// Compute the per-adapter / per-project rollup that drives
1131    /// `pond status`. One scan over `messages` projecting the three
1132    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1133    /// aggregated in-memory. Bounded by the cross product of adapters and
1134    /// projects, which stays small on real corpora.
1135    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1136        let scanner = self
1137            .handle
1138            .scan(
1139                Table::Messages,
1140                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1141            )
1142            .await?;
1143        let mut stream = scanner.try_into_stream().await?;
1144        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1145        while let Some(batch) = stream.next().await {
1146            let batch = batch?;
1147            for row in 0..batch.num_rows() {
1148                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1149                let project = string(&batch, "project", row)?.unwrap_or_default();
1150                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1151                let is_subagent = source_agent.contains('/');
1152                if is_subagent && !include_subagents {
1153                    continue;
1154                }
1155                let entry = groups.entry((source_agent, project)).or_default();
1156                entry.messages += 1;
1157                entry.session_ids.insert(session_id);
1158            }
1159        }
1160
1161        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1162        let totals = RowTotals {
1163            sessions: totals_sessions as u64,
1164            messages: totals_messages as u64,
1165            parts: totals_parts as u64,
1166        };
1167
1168        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1169        for ((adapter, project), acc) in groups {
1170            by_adapter.entry(adapter).or_default().push(ProjectStats {
1171                project,
1172                sessions: acc.session_ids.len() as u64,
1173                messages: acc.messages,
1174            });
1175        }
1176
1177        let mut adapters = Vec::with_capacity(by_adapter.len());
1178        for (adapter, mut projects) in by_adapter {
1179            projects.sort_by(|a, b| {
1180                b.messages
1181                    .cmp(&a.messages)
1182                    .then_with(|| a.project.cmp(&b.project))
1183            });
1184            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1185            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1186            adapters.push(AdapterStats {
1187                adapter,
1188                sessions,
1189                messages,
1190                projects,
1191            });
1192        }
1193
1194        Ok(CorpusStats {
1195            data_url: self.handle.location().clone(),
1196            totals,
1197            adapters,
1198            include_subagents,
1199        })
1200    }
1201
1202    /// Write a batch of embeddings into `messages`: set `vector` and
1203    /// `embedding_model` on each row by `(session_id, id)`
1204    /// (spec.md#session-embed-from-canonical). The column update goes through the
1205    /// write seam and lands as a new manifest version (`append-only`).
1206    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1207        if rows.is_empty() {
1208            return Ok(());
1209        }
1210        let batch = embedding_update_batch(rows)?;
1211        self.handle
1212            .merge_update(Table::Messages, batch, rows.len())
1213            .await?;
1214        Ok(())
1215    }
1216
1217    /// Stream the backlog of messages needing embedding: rows with `search_text`
1218    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1219    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1220        try_stream! {
1221            let filter = Predicate::And(vec![
1222                Predicate::IsNull("vector"),
1223                Predicate::IsNotNull("search_text"),
1224            ]);
1225            let projection: &[&str] = &["session_id", "id", "search_text"];
1226            let scanner = self
1227                .handle
1228                .scan(
1229                    Table::Messages,
1230                    ScanOpts::with_predicate_and_projection(&filter, projection),
1231                )
1232                .await?;
1233            let mut batches = scanner
1234                .try_into_stream()
1235                .await
1236                .context("failed to open messages stream")?;
1237            while let Some(batch) = batches.next().await {
1238                let batch = batch?;
1239                for row in 0..batch.num_rows() {
1240                    yield PendingMessage {
1241                        session_id: string(&batch, "session_id", row)?
1242                            .context("session_id is null")?,
1243                        id: string(&batch, "id", row)?.context("message id is null")?,
1244                        search_text: string(&batch, "search_text", row)?
1245                            .context("search_text is null")?,
1246                    };
1247                }
1248            }
1249        }
1250    }
1251
1252    /// Stream messages that are either never embedded or stale under the
1253    /// current model. `pond embed --force` feeds this to the same unconditional
1254    /// merge_update as the normal backlog; the filter makes that semantically
1255    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1256    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1257        try_stream! {
1258            let filter = Predicate::And(vec![
1259                Predicate::IsNotNull("search_text"),
1260                Predicate::Or(vec![
1261                    Predicate::IsNull("vector"),
1262                    Predicate::Ne("embedding_model", embed::model_id().into()),
1263                ]),
1264            ]);
1265            let projection: &[&str] = &["session_id", "id", "search_text"];
1266            let scanner = self
1267                .handle
1268                .scan(
1269                    Table::Messages,
1270                    ScanOpts::with_predicate_and_projection(&filter, projection),
1271                )
1272                .await?;
1273            let mut batches = scanner
1274                .try_into_stream()
1275                .await
1276                .context("failed to open pending-or-stale messages stream")?;
1277            while let Some(batch) = batches.next().await {
1278                let batch = batch?;
1279                for row in 0..batch.num_rows() {
1280                    yield PendingMessage {
1281                        session_id: string(&batch, "session_id", row)?
1282                            .context("session_id is null")?,
1283                        id: string(&batch, "id", row)?.context("message id is null")?,
1284                        search_text: string(&batch, "search_text", row)?
1285                            .context("search_text is null")?,
1286                    };
1287                }
1288            }
1289        }
1290    }
1291
1292    /// BM25 full-text retriever over `messages.search_text`.
1293    pub async fn fts_search(
1294        &self,
1295        query: &str,
1296        limit: usize,
1297        filter: &Predicate,
1298    ) -> Result<Vec<(MessageKey, f32)>> {
1299        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1300        scanner.full_text_search(
1301            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1302        )?;
1303        // Lance ships an autoprojection that silently appends `_score` to FTS
1304        // output when the projection omits it. That behavior is going away;
1305        // we opt into the future explicit-projection contract here so the
1306        // scanner stops emitting a per-call deprecation warning, and we list
1307        // `_score` ourselves since the loop below reads it.
1308        scanner.disable_scoring_autoprojection();
1309        scanner.project(&["session_id", "id", "_score"])?;
1310        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1311        let batch = scanner.try_into_batch().await?;
1312        let mut hits = Vec::with_capacity(batch.num_rows());
1313        for row in 0..batch.num_rows() {
1314            let key = MessageKey {
1315                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1316                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1317            };
1318            hits.push((key, float32(&batch, "_score", row)?));
1319        }
1320        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1321        // order, which varies between runs and across calls with different pool
1322        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1323        // different orderings at the same tied score). Without an explicit
1324        // tiebreak the downstream RRF dedup-rank for a tied target session can
1325        // flip session-to-session, making fusion outcomes nondeterministic.
1326        // Sort by `score desc`, then `(session_id, message_id)` asc.
1327        hits.sort_by(|left, right| {
1328            right
1329                .1
1330                .partial_cmp(&left.1)
1331                .unwrap_or(std::cmp::Ordering::Equal)
1332                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1333                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1334        });
1335        Ok(hits)
1336    }
1337
1338    /// Whether any `messages` row carries a vector (spec.md#search) - the
1339    /// signal that flips search from FTS-only to hybrid. The single-active-
1340    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1341    /// vector belongs to the current model.
1342    pub async fn has_embeddings(&self) -> Result<bool> {
1343        let scope = Predicate::IsNotNull("vector");
1344        let mut scanner = self
1345            .handle
1346            .scan(
1347                Table::Messages,
1348                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1349            )
1350            .await?;
1351        scanner.limit(Some(1), None)?;
1352        let batch = scanner.try_into_batch().await?;
1353        Ok(batch.num_rows() > 0)
1354    }
1355
1356    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1357    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1358    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1359    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1360    /// skip them anyway). The single-active-model invariant lets pond drop
1361    /// the per-row model filter: every non-null vector belongs to the current
1362    /// model.
1363    pub async fn vector_search(
1364        &self,
1365        query: &[f32],
1366        limit: usize,
1367        filter: &Predicate,
1368        search: Option<&config::SearchConfig>,
1369    ) -> Result<Vec<(MessageKey, f32)>> {
1370        let scope = embedded_scope(filter);
1371        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1372        let key = Float32Array::from(query.to_vec());
1373        scanner.nearest("vector", &key, limit)?;
1374        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1375            scanner.nprobes(nprobes);
1376        }
1377        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1378            scanner.refine(refine_factor);
1379        }
1380        // Mirror the explicit-projection contract from `fts_search`: opt out
1381        // of `_distance` autoprojection and list it ourselves since the loop
1382        // below reads it.
1383        scanner.disable_scoring_autoprojection();
1384        scanner.project(&["session_id", "id", "_distance"])?;
1385        let batch = scanner.try_into_batch().await?;
1386        let mut hits = Vec::with_capacity(batch.num_rows());
1387        for row in 0..batch.num_rows() {
1388            let key = MessageKey {
1389                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1390                message_id: string(&batch, "id", row)?.context("message id is null")?,
1391            };
1392            hits.push((key, float32(&batch, "_distance", row)?));
1393        }
1394        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1395        // emit hits with effectively identical `_distance` in fragment-dependent
1396        // order, which makes RRF dedup-ranks nondeterministic for tied
1397        // neighbors. Sort by distance asc (smaller = more similar), then by
1398        // `(session_id, message_id)` asc.
1399        hits.sort_by(|left, right| {
1400            left.1
1401                .partial_cmp(&right.1)
1402                .unwrap_or(std::cmp::Ordering::Equal)
1403                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1404                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1405        });
1406        Ok(hits)
1407    }
1408
1409    /// The DataFusion plan string for a filtered vector scan - the
1410    /// `search-prefilter-pushdown` regression guard reads it.
1411    pub async fn explain_vector_plan(
1412        &self,
1413        query: &[f32],
1414        limit: usize,
1415        filter: &Predicate,
1416        search: Option<&config::SearchConfig>,
1417    ) -> Result<String> {
1418        let scope = embedded_scope(filter);
1419        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1420        let key = Float32Array::from(query.to_vec());
1421        scanner.nearest("vector", &key, limit)?;
1422        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1423            scanner.nprobes(nprobes);
1424        }
1425        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1426            scanner.refine(refine_factor);
1427        }
1428        scanner
1429            .explain_plan(true)
1430            .await
1431            .context("explain_plan failed")
1432    }
1433
1434    pub async fn explain_fts_plan(
1435        &self,
1436        query: &str,
1437        limit: usize,
1438        filter: &Predicate,
1439    ) -> Result<String> {
1440        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1441        scanner.full_text_search(
1442            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1443        )?;
1444        scanner.project(&["session_id", "id"])?;
1445        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1446        scanner
1447            .explain_plan(true)
1448            .await
1449            .context("explain_plan failed")
1450    }
1451
1452    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1453    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1454        if keys.is_empty() {
1455            return Ok(Vec::new());
1456        }
1457        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1458        let session_ids = keys
1459            .iter()
1460            .map(|key| key.session_id.clone())
1461            .collect::<Vec<_>>();
1462        let message_ids = keys
1463            .iter()
1464            .map(|key| key.message_id.clone())
1465            .collect::<Vec<_>>();
1466        let predicate = Predicate::And(vec![
1467            in_predicate("session_id", &session_ids),
1468            in_predicate("id", &message_ids),
1469        ]);
1470        let batch = self
1471            .handle
1472            .scan_batch(
1473                Table::Messages,
1474                Some(&predicate),
1475                &[
1476                    "id",
1477                    "session_id",
1478                    "role",
1479                    "project",
1480                    "source_agent",
1481                    "timestamp",
1482                    "search_text",
1483                ],
1484            )
1485            .await?;
1486        let mut metas = Vec::with_capacity(batch.num_rows());
1487        for row in 0..batch.num_rows() {
1488            let message_id = string(&batch, "id", row)?.context("id is null")?;
1489            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1490            if !wanted.contains(&MessageKey {
1491                session_id: session_id.clone(),
1492                message_id: message_id.clone(),
1493            }) {
1494                continue;
1495            }
1496            metas.push(MessageMeta {
1497                message_id,
1498                session_id,
1499                role: string(&batch, "role", row)?.context("role is null")?,
1500                project: string(&batch, "project", row)?.context("project is null")?,
1501                source_agent: string(&batch, "source_agent", row)?
1502                    .context("source_agent is null")?,
1503                timestamp: datetime(&batch, "timestamp", row)?,
1504                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1505            });
1506        }
1507        Ok(metas)
1508    }
1509
1510    /// Total message count per session, for search session summaries.
1511    pub async fn session_message_counts(
1512        &self,
1513        session_ids: &[String],
1514    ) -> Result<BTreeMap<String, usize>> {
1515        if session_ids.is_empty() {
1516            return Ok(BTreeMap::new());
1517        }
1518        let dataset = self.handle.dataset(Table::Messages).await?;
1519        let mut tasks = tokio::task::JoinSet::new();
1520        for session_id in session_ids {
1521            let dataset = dataset.clone();
1522            let session_id = session_id.clone();
1523            tasks.spawn(async move {
1524                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1525                let count = dataset.count_rows(Some(filter)).await?;
1526                anyhow::Ok((session_id, count))
1527            });
1528        }
1529        let mut counts = BTreeMap::new();
1530        while let Some(joined) = tasks.join_next().await {
1531            let (session_id, count) = joined.context("session count task panicked")??;
1532            counts.insert(session_id, count);
1533        }
1534        Ok(counts)
1535    }
1536
1537    /// Rows appended to `messages` since the FTS index was last optimized.
1538    /// A missing index reports the whole table; the query is manifest-only.
1539    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1540        self.handle
1541            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1542            .await
1543    }
1544
1545    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1546    /// was last optimized. Below
1547    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1548    /// must read [`embedding_progress`](Self::embedding_progress) too and
1549    /// distinguish "index not built yet" from "index trails data".
1550    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1551        self.handle
1552            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1553            .await
1554    }
1555
1556    /// Embedding coverage: how many `messages` rows carry a vector and how
1557    /// many are still eligible. Drives the `pond status` embeddings line and
1558    /// the `pond embed` progress bar's known total. `embedded` reads the
1559    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1560    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1561    /// `embedding_model` column.
1562    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1563        let dataset = self.handle.dataset(Table::Messages).await?;
1564        let embedded = dataset
1565            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1566            .await?;
1567        let total = dataset
1568            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1569            .await?;
1570        Ok(EmbeddingProgress {
1571            embedded,
1572            total,
1573            model: embed::model_id(),
1574        })
1575    }
1576
1577    /// Count rows whose `embedding_model` is not the currently configured
1578    /// model AND whose `vector` is still populated - the signal `pond embed`
1579    /// uses to detect a model swap and require `--force`.
1580    pub async fn stale_embedding_count(&self) -> Result<usize> {
1581        let dataset = self.handle.dataset(Table::Messages).await?;
1582        dataset
1583            .count_rows(Some(
1584                Predicate::And(vec![
1585                    Predicate::IsNotNull("vector"),
1586                    Predicate::Ne("embedding_model", embed::model_id().into()),
1587                ])
1588                .to_lance(),
1589            ))
1590            .await
1591            .map_err(Into::into)
1592    }
1593
1594    /// Run the per-table maintenance cycle (compact + indices) across every
1595    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1596    /// and compaction commit independently, so a hot writer that starves
1597    /// compaction on one table does not abort the index work the operator
1598    /// asked for on other tables (or even on the same table).
1599    pub async fn optimize_indices(
1600        &self,
1601        progress: Option<OptimizeProgressFn>,
1602        cleanup: Option<CleanupConfig>,
1603    ) -> Result<OptimizeOutcome> {
1604        let cleanup = cleanup.unwrap_or_default();
1605        let policy = pond_index_intents();
1606        let mut tables = Vec::with_capacity(3);
1607        for (table, intents) in policy.all() {
1608            let outcome = self
1609                .handle
1610                .optimize_table(table, intents, progress.as_ref(), cleanup)
1611                .await;
1612            tables.push(outcome);
1613        }
1614        Ok(OptimizeOutcome { tables })
1615    }
1616
1617    /// Fold trailing fragments into existing indices across every table,
1618    /// without running compaction. Used by `pond embed`'s tail so newly
1619    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1620    /// without paying the compaction retry budget while embed itself may
1621    /// still be writing in a sibling process.
1622    pub async fn build_indices_only(
1623        &self,
1624        progress: Option<OptimizeProgressFn>,
1625    ) -> Result<OptimizeOutcome> {
1626        let policy = pond_index_intents();
1627        let mut tables = Vec::with_capacity(3);
1628        for (table, intents) in policy.all() {
1629            let indices = self
1630                .handle
1631                .optimize_table_indices_only(table, intents, progress.as_ref())
1632                .await;
1633            tables.push(TableOptimizeOutcome {
1634                table,
1635                indices,
1636                compaction: PhaseOutcome::NotAttempted,
1637            });
1638        }
1639        Ok(OptimizeOutcome { tables })
1640    }
1641
1642    #[cfg(test)]
1643    async fn optimize_indices_with_vector_threshold(
1644        &self,
1645        vector_threshold: usize,
1646    ) -> Result<OptimizeOutcome> {
1647        let cleanup = CleanupConfig::default();
1648        let policy = pond_index_intents_with_vector_threshold(vector_threshold);
1649        let mut tables = Vec::with_capacity(3);
1650        for (table, intents) in policy.all() {
1651            let outcome = self
1652                .handle
1653                .optimize_table(table, intents, None, cleanup)
1654                .await;
1655            tables.push(outcome);
1656        }
1657        Ok(OptimizeOutcome { tables })
1658    }
1659
1660    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1661        let policy = pond_index_intents();
1662        let mut matched = false;
1663        for (table, intents) in policy.all() {
1664            for intent in intents {
1665                if intent_name.is_none_or(|name| name == intent.name) {
1666                    matched = true;
1667                    self.handle.rebuild_index(table, intent).await?;
1668                }
1669            }
1670        }
1671        if let Some(name) = intent_name
1672            && !matched
1673        {
1674            anyhow::bail!("unknown index intent {name:?}");
1675        }
1676        Ok(())
1677    }
1678
1679    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1680        let policy = pond_index_intents();
1681        let mut statuses = Vec::new();
1682        for (table, intents) in policy.all() {
1683            statuses.extend(self.handle.index_status(table, intents).await?);
1684        }
1685        Ok(statuses)
1686    }
1687
1688    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1689    /// --force` before re-bootstrapping under a different model. Silent
1690    /// when the index does not exist.
1691    pub async fn drop_vector_index(&self) -> Result<()> {
1692        match self
1693            .handle
1694            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1695            .await
1696        {
1697            Ok(()) => Ok(()),
1698            Err(error) => {
1699                let msg = error.to_string();
1700                if msg.contains("not found") || msg.contains("does not exist") {
1701                    Ok(())
1702                } else {
1703                    Err(error)
1704                }
1705            }
1706        }
1707    }
1708
1709    /// On-disk byte totals per dataset, sized through Lance's object store
1710    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1711    pub async fn table_sizes(&self) -> Result<TableSizes> {
1712        self.handle.table_sizes().await
1713    }
1714
1715    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1716        let batch = self
1717            .handle
1718            .scan_batch(
1719                Table::Sessions,
1720                Some(&Predicate::Eq("id", session_id.into())),
1721                &[],
1722            )
1723            .await?;
1724        if batch.num_rows() == 0 {
1725            Ok(None)
1726        } else {
1727            Ok(Some(session_from_batch(&batch, 0)?))
1728        }
1729    }
1730
1731    /// Fetch the stored vector for one message, for `similar_to`-style
1732    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1733    /// embedded, or when no message has that id. Vectors are stored Float16
1734    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1735    /// widen here. Cheap rare op - one predicate scan, no index.
1736    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1737        let batch = self
1738            .handle
1739            .scan_batch(
1740                Table::Messages,
1741                Some(&Predicate::Eq("id", message_id.into())),
1742                &["vector"],
1743            )
1744            .await?;
1745        if batch.num_rows() == 0 {
1746            return Ok(None);
1747        }
1748        let column = batch
1749            .column(0)
1750            .as_any()
1751            .downcast_ref::<FixedSizeListArray>();
1752        let Some(list) = column else {
1753            return Ok(None);
1754        };
1755        if list.is_null(0) {
1756            return Ok(None);
1757        }
1758        let values = list.value(0);
1759        let halves = values
1760            .as_any()
1761            .downcast_ref::<Float16Array>()
1762            .context("messages.vector inner array is not Float16")?;
1763        let widened = (0..halves.len())
1764            .map(|i| halves.value(i).to_f32())
1765            .collect();
1766        Ok(Some(widened))
1767    }
1768
1769    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1770        let batch = self
1771            .handle
1772            .scan_batch(
1773                Table::Messages,
1774                Some(&Predicate::Eq("session_id", session_id.into())),
1775                &[
1776                    "session_id",
1777                    "id",
1778                    "timestamp",
1779                    "role",
1780                    "content",
1781                    "options",
1782                ],
1783            )
1784            .await?;
1785        let mut messages = Vec::with_capacity(batch.num_rows());
1786        for row in 0..batch.num_rows() {
1787            messages.push(message_from_batch(&batch, row)?);
1788        }
1789        messages.sort_by(|left, right| {
1790            left.timestamp()
1791                .cmp(&right.timestamp())
1792                .then_with(|| left.id().cmp(right.id()))
1793        });
1794
1795        let message_ids = messages
1796            .iter()
1797            .map(|message| message.id().to_owned())
1798            .collect::<Vec<_>>();
1799        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1800
1801        Ok(messages
1802            .into_iter()
1803            .map(|message| {
1804                let key = (message.session_id().to_owned(), message.id().to_owned());
1805                let parts = parts_by_message.remove(&key).unwrap_or_default();
1806                MessageWithParts { message, parts }
1807            })
1808            .collect())
1809    }
1810
1811    /// Every part of these messages, full fidelity (file blobs included). The
1812    /// canonical read primitive - restore/export, verbatim mode, and the
1813    /// message-mode target all need the complete set.
1814    pub async fn parts_for_messages(
1815        &self,
1816        session_id: &str,
1817        message_ids: &[String],
1818    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1819        self.scan_parts(session_id, message_ids, None).await
1820    }
1821
1822    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1823    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1824    /// nothing. For the summary-only reads (conversational/complete session
1825    /// views, search hits) - it never feeds restore/export.
1826    pub async fn summary_parts_for_messages(
1827        &self,
1828        session_id: &str,
1829        message_ids: &[String],
1830    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1831        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1832            .await
1833    }
1834
1835    async fn scan_parts(
1836        &self,
1837        session_id: &str,
1838        message_ids: &[String],
1839        part_types: Option<&[&str]>,
1840    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1841        if message_ids.is_empty() {
1842            return Ok(BTreeMap::new());
1843        }
1844        let mut clauses = vec![
1845            Predicate::Eq("session_id", session_id.into()),
1846            in_predicate("message_id", message_ids),
1847        ];
1848        if let Some(types) = part_types {
1849            clauses.push(Predicate::In(
1850                "type",
1851                types.iter().map(|&t| t.into()).collect(),
1852            ));
1853        }
1854        let predicate = Predicate::And(clauses);
1855        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1856        let mut scanner = self
1857            .handle
1858            .scan(
1859                Table::Parts,
1860                ScanOpts::with_predicate_and_projection(
1861                    &predicate,
1862                    &[
1863                        "session_id",
1864                        "message_id",
1865                        "id",
1866                        "ordinal",
1867                        "type",
1868                        "provenance",
1869                        "variant_data",
1870                        "options",
1871                    ],
1872                ),
1873            )
1874            .await?;
1875        scanner.with_row_address();
1876        let batch = scanner.try_into_batch().await.context("scan failed")?;
1877        let row_addresses = uint64(&batch, "_rowaddr")?;
1878        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1879        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1880        for row in 0..batch.num_rows() {
1881            if string(&batch, "type", row)?.as_deref() == Some("file") {
1882                let variant_data =
1883                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1884                file_rows.push((row, row_addresses.value(row), variant_data));
1885            }
1886        }
1887        if !file_rows.is_empty() {
1888            let addresses = file_rows
1889                .iter()
1890                .map(|(_, address, _)| *address)
1891                .collect::<Vec<_>>();
1892            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1893            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1894                // Legacy blob (lance-encoding:blob): payload is bytes; the
1895                // url variant stored its URL as UTF-8 bytes, recovered via
1896                // `file_data_from_blob`'s `data_kind = "url"` branch.
1897                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1898                file_payloads.insert(row, payload);
1899            }
1900        }
1901        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1902        for row in 0..batch.num_rows() {
1903            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1904            parts_by_message
1905                .entry((part.session_id.clone(), part.message_id.clone()))
1906                .or_default()
1907                .push(part);
1908        }
1909        for parts in parts_by_message.values_mut() {
1910            parts.sort_by_key(|part| part.ordinal);
1911        }
1912        Ok(parts_by_message)
1913    }
1914}
1915
1916#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1917#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1918pub enum IngestEvent {
1919    Session(Session),
1920    Message(Message),
1921    Part(Part),
1922}
1923
1924/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1925/// The wire layer (`pond_ingest`) instead returns per-row results; the
1926/// aggregate is derived from those at the wire boundary.
1927///
1928/// Fields are bucketed by population so the summary never conflates "100
1929/// validator-rejected rows in 1 bad session" with "100 separate failures."
1930/// The shape is set by spec.md#adapter-integrity-event-ordering.
1931#[derive(Debug, Clone, PartialEq, Eq, Default)]
1932pub struct IngestSummary {
1933    /// Rows actually written to Lance, summed across all three tables.
1934    /// Use the per-table fields below for user-facing counts; this stays
1935    /// for `accepted()` and existing wire callers.
1936    pub inserted: usize,
1937    /// Rows that already existed (merge_insert no-op match).
1938    pub matched: usize,
1939    /// Session rows inserted this pass.
1940    pub sessions_inserted: usize,
1941    /// Message rows inserted this pass (total - includes tool calls,
1942    /// tool results, and other non-searchable messages).
1943    pub messages_inserted_total: usize,
1944    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1945    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1946    /// count in `pond sync` / `pond status` reads this field.
1947    pub messages_inserted_searchable: usize,
1948    /// Part rows inserted this pass.
1949    pub parts_inserted: usize,
1950    /// Session rows already-present (merge_insert matched).
1951    pub sessions_matched: usize,
1952    /// Message rows already-present (merge_insert matched), total.
1953    pub messages_matched_total: usize,
1954    /// Subset of `messages_matched_total` with `search_text`.
1955    pub messages_matched_searchable: usize,
1956    /// Part rows already-present.
1957    pub parts_matched: usize,
1958    /// Events the validator dropped under per-event-drop policy (ordering
1959    /// violation, orphan part, mismatched parent, adapter parse failure,
1960    /// duplicate-id collision, ...). Counted by event, not by session: a
1961    /// session with one bad part stays in this bucket as 1, not as "the
1962    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1963    /// own emissions upstream when source replay is expected; the
1964    /// validator's in-batch HashSet is a safety net, not a feature
1965    /// adapters may rely on. If this bucket grows on a clean adapter,
1966    /// inspect `drop_reasons` for the top contributors.
1967    pub dropped_events: usize,
1968    /// Sessions whose Session-level invariants (immutable `source_agent` /
1969    /// `project` against the stored row) failed at flush time and
1970    /// whose substream got rejected wholesale. Always small relative to
1971    /// `inserted`; if not, there's a real problem to investigate.
1972    pub dropped_sessions: usize,
1973    /// Files the adapter couldn't decode at all (no Session header
1974    /// extractable: empty `.jsonl`, missing required field).
1975    pub skipped_files: usize,
1976    /// Files that produced no importable session and were benignly skipped:
1977    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1978    /// metadata file), or an unextractable header. Never an error or a drop;
1979    /// the underlying cause is logged at `-vv` (debug) verbosity.
1980    pub skipped_empty: usize,
1981    /// Sessions short-circuited via the per-session staleness skip
1982    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1983    /// pond last wrote that session's row, so re-decode was bypassed.
1984    pub skipped_fresh: usize,
1985    /// Storage-layer failures whose retries were exhausted (commit
1986    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1987    /// runs.
1988    pub storage_errors: usize,
1989    /// Oversized values truncated to a bounded sentinel at the seam
1990    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1991    pub truncated_values: usize,
1992    /// Histogram of stable reason keys for the combined `dropped_events +
1993    /// dropped_sessions` populations. Keys are `&'static str` (see the
1994    /// `DROP_REASON_*` constants) so consumers can match by identity.
1995    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1996    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1997    pub drop_reasons: BTreeMap<&'static str, usize>,
1998}
1999
2000/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
2001/// the per-row `RowError::reason_key`. `&'static str` so consumers can
2002/// match by identity rather than prose. Adding a new variant: pick a short
2003/// snake_case identifier, route it from the validator/adapter, and update
2004/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
2005pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2006pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2007pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2008pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2009pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2010pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2011pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2012pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2013pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2014pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2015pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2016
2017/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
2018/// returned counts together with the pre-existence sets captured by
2019/// `upsert_session_batch`. Folded into a per-sync summary via
2020/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
2021/// is a no-op write, so the inserted/matched split is informational - we still
2022/// surface it because both `pond sync` and `pond_ingest` clients reconcile
2023/// against "which rows landed this call."
2024#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2025pub struct BatchCounts {
2026    pub sessions_inserted: usize,
2027    pub sessions_matched: usize,
2028    pub messages_inserted_total: usize,
2029    pub messages_inserted_searchable: usize,
2030    pub messages_matched_total: usize,
2031    pub messages_matched_searchable: usize,
2032    pub parts_inserted: usize,
2033    pub parts_matched: usize,
2034}
2035
2036impl IngestSummary {
2037    pub fn accepted(&self) -> usize {
2038        self.inserted + self.matched
2039    }
2040
2041    /// Sole writer of the per-table counters on the CLI batched flush path.
2042    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2043    /// both for the same rows would double-count.
2044    pub fn add_batch(&mut self, counts: &BatchCounts) {
2045        self.sessions_inserted += counts.sessions_inserted;
2046        self.sessions_matched += counts.sessions_matched;
2047        self.messages_inserted_total += counts.messages_inserted_total;
2048        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2049        self.messages_matched_total += counts.messages_matched_total;
2050        self.messages_matched_searchable += counts.messages_matched_searchable;
2051        self.parts_inserted += counts.parts_inserted;
2052        self.parts_matched += counts.parts_matched;
2053        self.inserted +=
2054            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2055        self.matched +=
2056            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2057    }
2058
2059    /// Sum every counter from `other` into `self`. Used by the multi-source
2060    /// `pond sync` loop so adding a new field to this struct doesn't silently
2061    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2062    pub fn merge(&mut self, other: &Self) {
2063        self.inserted += other.inserted;
2064        self.matched += other.matched;
2065        self.sessions_inserted += other.sessions_inserted;
2066        self.messages_inserted_total += other.messages_inserted_total;
2067        self.messages_inserted_searchable += other.messages_inserted_searchable;
2068        self.parts_inserted += other.parts_inserted;
2069        self.sessions_matched += other.sessions_matched;
2070        self.messages_matched_total += other.messages_matched_total;
2071        self.messages_matched_searchable += other.messages_matched_searchable;
2072        self.parts_matched += other.parts_matched;
2073        self.dropped_events += other.dropped_events;
2074        self.dropped_sessions += other.dropped_sessions;
2075        self.skipped_files += other.skipped_files;
2076        self.skipped_empty += other.skipped_empty;
2077        self.skipped_fresh += other.skipped_fresh;
2078        self.storage_errors += other.storage_errors;
2079        self.truncated_values += other.truncated_values;
2080        for (key, value) in &other.drop_reasons {
2081            *self.drop_reasons.entry(key).or_insert(0) += value;
2082        }
2083    }
2084
2085    /// Same dispatch as [`Self::add_outcomes`] but ignores
2086    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2087    /// via [`Self::add_batch`] and uses this method to attribute per-row
2088    /// `Error` outcomes from the same flush.
2089    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2090        for outcome in outcomes {
2091            if !matches!(outcome.status, OutcomeStatus::Error) {
2092                continue;
2093            }
2094            if outcome.kind == "session" {
2095                self.dropped_sessions += 1;
2096            } else {
2097                self.dropped_events += 1;
2098            }
2099            let reason = outcome
2100                .error
2101                .as_ref()
2102                .and_then(|error| error.reason_key)
2103                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2104            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2105        }
2106    }
2107
2108    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2109        for outcome in outcomes {
2110            match outcome.status {
2111                OutcomeStatus::Inserted => {
2112                    self.inserted += 1;
2113                    match outcome.kind {
2114                        "session" => self.sessions_inserted += 1,
2115                        "message" => {
2116                            self.messages_inserted_total += 1;
2117                            if outcome.searchable {
2118                                self.messages_inserted_searchable += 1;
2119                            }
2120                        }
2121                        "part" => self.parts_inserted += 1,
2122                        _ => {}
2123                    }
2124                }
2125                OutcomeStatus::Matched => {
2126                    self.matched += 1;
2127                    match outcome.kind {
2128                        "session" => self.sessions_matched += 1,
2129                        "message" => {
2130                            self.messages_matched_total += 1;
2131                            if outcome.searchable {
2132                                self.messages_matched_searchable += 1;
2133                            }
2134                        }
2135                        "part" => self.parts_matched += 1,
2136                        _ => {}
2137                    }
2138                }
2139                OutcomeStatus::Error => {
2140                    // Session-level rejection: exactly one session-kind Error
2141                    // outcome (see `error_outcomes_for_substream`). Per-event
2142                    // drop: one Error per message/part. The two populations
2143                    // are counted separately so the operator can tell a
2144                    // structural reject from a row-level skip.
2145                    if outcome.kind == "session" {
2146                        self.dropped_sessions += 1;
2147                    } else {
2148                        self.dropped_events += 1;
2149                    }
2150                    let reason = outcome
2151                        .error
2152                        .as_ref()
2153                        .and_then(|e| e.reason_key)
2154                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2155                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2156                }
2157            }
2158        }
2159    }
2160}
2161
2162/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2163/// row per input event from the request's `events` array. The validator
2164/// returns these in array order so the wire layer can pack them directly
2165/// into [`crate::wire::IngestResult`] entries.
2166#[derive(Debug, Clone, PartialEq)]
2167pub struct RowOutcome {
2168    pub index: usize,
2169    pub kind: &'static str,
2170    pub pk: Value,
2171    pub status: OutcomeStatus,
2172    pub error: Option<RowError>,
2173    /// True iff `kind == "message"` AND the underlying row carries
2174    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2175    /// so the CLI can show "searchable" message deltas distinct from raw
2176    /// inserts. Always false for session/part rows.
2177    pub searchable: bool,
2178}
2179
2180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2181pub enum OutcomeStatus {
2182    Inserted,
2183    Matched,
2184    Error,
2185}
2186
2187/// Structured per-row error body. Mirrors the wire shape so the handler
2188/// can pass it straight through.
2189#[derive(Debug, Clone, PartialEq, Eq)]
2190pub struct RowError {
2191    pub message: String,
2192    pub field: Option<&'static str>,
2193    pub reason: Option<&'static str>,
2194    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2195    /// `reason` field above is human-prose; `reason_key` is the machine
2196    /// bucket. `None` means uncategorized; consumers attribute to
2197    /// `DROP_REASON_UNCATEGORIZED`.
2198    pub reason_key: Option<&'static str>,
2199}
2200
2201/// Buffered session events tagged with their input array index, so the
2202/// per-row outcomes can be re-attributed once `merge_insert` returns its
2203/// per-row Inserted/Matched stats.
2204#[derive(Debug)]
2205struct BufferedSession {
2206    index: usize,
2207    session: Session,
2208}
2209
2210#[derive(Debug)]
2211struct BufferedMessage {
2212    index: usize,
2213    message: Message,
2214    parts: Vec<BufferedPart>,
2215    search_text: Option<String>,
2216}
2217
2218#[derive(Debug)]
2219struct BufferedPart {
2220    index: usize,
2221    part: Part,
2222}
2223
2224/// State machine that turns the `events: Vec<IngestEvent>` array into a
2225/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2226/// session substream so `merge_insert` runs once per substream (three
2227/// batches: sessions, messages, parts). A validation error on a single event
2228/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2229/// continues; only Session-level invariants (immutable source_agent / project
2230/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2231///
2232/// Writes are batched at flush time. As complete substreams arrive (a new
2233/// `Session` event closes out the current one), they accumulate in
2234/// `completed` rather than each one calling `merge_insert` immediately.
2235/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2236/// at which point one batched 3-parallel-merge-insert covers all pending
2237/// substreams. This is the load-bearing perf change: per-substream commit
2238/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2239/// and amortizing it across N sessions cuts wall time materially.
2240#[derive(Debug, Default)]
2241pub struct IngestValidator {
2242    session: Option<BufferedSession>,
2243    current_message: Option<BufferedMessage>,
2244    current_parts: Vec<BufferedPart>,
2245    messages: Vec<BufferedMessage>,
2246    /// Message ids already buffered in the current substream. Duplicate ids
2247    /// drop the offending event in-line rather than failing the whole batch
2248    /// downstream.
2249    seen_message_ids: HashSet<String>,
2250    /// `(message_id, part_id)` keys already buffered in the current
2251    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2252    seen_part_keys: HashSet<(String, String)>,
2253    /// Substreams whose end-of-stream boundary has been observed but whose
2254    /// rows haven't been written yet. Flushed in batched mode by
2255    /// [`Self::flush`].
2256    completed: Vec<CompletedSubstream>,
2257}
2258
2259/// One closed substream ready for the batched flush path.
2260#[derive(Debug)]
2261struct CompletedSubstream {
2262    session_index: usize,
2263    session: Session,
2264    messages: Vec<BufferedMessage>,
2265}
2266
2267impl IngestValidator {
2268    /// Drive one input event through the validator. Returns the per-row
2269    /// outcomes the event triggered: empty when the event is just buffered,
2270    /// or N entries when a session substream just flushed (success or
2271    /// failure). `Err` is reserved for catastrophic storage failures that
2272    /// should fail the whole `pond_ingest` request.
2273    pub async fn push(
2274        &mut self,
2275        store: &Store,
2276        index: usize,
2277        event: IngestEvent,
2278    ) -> Result<Vec<RowOutcome>> {
2279        match event {
2280            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2281            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2282            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2283        }
2284    }
2285
2286    /// Final flush at end-of-batch. Closes the in-flight substream and
2287    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2288    /// the wire layer) alongside the honest per-table counts (for
2289    /// `IngestSummary::add_batch`).
2290    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2291        self.close_current_substream();
2292        self.flush(store).await
2293    }
2294
2295    /// Drain every completed substream into batched 3-parallel-merge_insert
2296    /// writes. Caller invokes this periodically (every N completed
2297    /// substreams) to keep memory bounded; in adapter-driven sync that
2298    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2299    /// in-flight substream stays buffered - close it explicitly via
2300    /// [`Self::finish`] or by feeding the next Session event.
2301    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2302        if self.completed.is_empty() {
2303            return Ok((Vec::new(), BatchCounts::default()));
2304        }
2305        let completed = std::mem::take(&mut self.completed);
2306        store.upsert_session_batch(completed).await
2307    }
2308
2309    /// Number of fully-buffered substreams awaiting batched write. Used by
2310    /// the adapter caller to decide when to call [`Self::flush`].
2311    pub fn pending_substreams(&self) -> usize {
2312        self.completed.len()
2313    }
2314
2315    async fn push_session(
2316        &mut self,
2317        _store: &Store,
2318        index: usize,
2319        mut session: Session,
2320    ) -> Result<Vec<RowOutcome>> {
2321        // Close out the current substream (if any) - move it to the pending
2322        // buffer instead of writing immediately. The actual write happens
2323        // when the caller invokes `flush` / `finish`.
2324        self.close_current_substream();
2325
2326        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2327        // if empty after trim. A Session event with empty source_agent is
2328        // dropped on the spot - the substream that would follow has nothing
2329        // to anchor on, so subsequent message/part events will also drop.
2330        let trimmed = session.source_agent.trim();
2331        if trimmed.is_empty() {
2332            return Ok(vec![RowOutcome {
2333                index,
2334                kind: "session",
2335                pk: Value::String(session.id.clone()),
2336                status: OutcomeStatus::Error,
2337                error: Some(RowError {
2338                    message: format!("session {} has empty source_agent after trim", session.id),
2339                    field: Some("source_agent"),
2340                    reason: None,
2341                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2342                }),
2343                searchable: false,
2344            }]);
2345        }
2346        if trimmed.len() != session.source_agent.len() {
2347            session.source_agent = trimmed.to_owned();
2348        }
2349
2350        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2351            return Ok(vec![RowOutcome {
2352                index,
2353                kind: "session",
2354                pk: Value::String(session.id.clone()),
2355                status: OutcomeStatus::Error,
2356                error: Some(RowError {
2357                    message: format!(
2358                        "session {} has parent_message_id without parent_session_id",
2359                        session.id,
2360                    ),
2361                    field: Some("parent_message_id"),
2362                    reason: None,
2363                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2364                }),
2365                searchable: false,
2366            }]);
2367        }
2368
2369        self.seen_message_ids.clear();
2370        self.seen_part_keys.clear();
2371        self.session = Some(BufferedSession { index, session });
2372        Ok(Vec::new())
2373    }
2374
2375    fn close_current_substream(&mut self) {
2376        self.flush_current_message();
2377        let Some(BufferedSession {
2378            index: session_index,
2379            session,
2380        }) = self.session.take()
2381        else {
2382            return;
2383        };
2384        let messages = std::mem::take(&mut self.messages);
2385        self.seen_message_ids.clear();
2386        self.seen_part_keys.clear();
2387        self.completed.push(CompletedSubstream {
2388            session_index,
2389            session,
2390            messages,
2391        });
2392    }
2393
2394    fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2395        let pk = Value::Array(vec![
2396            Value::String(message.session_id().to_owned()),
2397            Value::String(message.id().to_owned()),
2398        ]);
2399        let Some(session) = &self.session else {
2400            return vec![error_outcome(
2401                index,
2402                "message",
2403                pk,
2404                "first event in a session stream must be Session",
2405                None,
2406                DROP_REASON_MESSAGE_BEFORE_SESSION,
2407            )];
2408        };
2409        if message.session_id() != session.session.id {
2410            let msg = format!(
2411                "message {} references session {}, expected {}",
2412                message.id(),
2413                message.session_id(),
2414                session.session.id
2415            );
2416            return vec![error_outcome(
2417                index,
2418                "message",
2419                pk,
2420                &msg,
2421                Some("session_id"),
2422                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2423            )];
2424        }
2425        if !self.seen_message_ids.insert(message.id().to_owned()) {
2426            // Keep same-substream duplicate ids visible in `dropped_events`;
2427            // adapters are expected to dedupe upstream (see claude-code's
2428            // per-file `seen_uuids`), so a hit here is worth investigating.
2429            let msg = format!("duplicate message id {} in session substream", message.id());
2430            return vec![error_outcome(
2431                index,
2432                "message",
2433                pk,
2434                &msg,
2435                None,
2436                DROP_REASON_DUPLICATE_MESSAGE_ID,
2437            )];
2438        }
2439        self.flush_current_message();
2440        self.current_message = Some(BufferedMessage {
2441            index,
2442            message,
2443            parts: Vec::new(),
2444            search_text: None,
2445        });
2446        Vec::new()
2447    }
2448
2449    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2450        let pk = Value::Array(vec![
2451            Value::String(part.session_id.clone()),
2452            Value::String(part.message_id.clone()),
2453            Value::String(part.id.clone()),
2454        ]);
2455        let Some(current) = &self.current_message else {
2456            return vec![error_outcome(
2457                index,
2458                "part",
2459                pk,
2460                "part event appeared before a message",
2461                None,
2462                DROP_REASON_PART_BEFORE_MESSAGE,
2463            )];
2464        };
2465        if part.session_id != current.message.session_id() {
2466            let msg = format!(
2467                "part {} references session {}, expected {}",
2468                part.id,
2469                part.session_id,
2470                current.message.session_id()
2471            );
2472            return vec![error_outcome(
2473                index,
2474                "part",
2475                pk,
2476                &msg,
2477                Some("session_id"),
2478                DROP_REASON_PART_MESSAGE_MISMATCH,
2479            )];
2480        }
2481        if part.message_id != current.message.id() {
2482            let msg = format!(
2483                "part {} references message {}, expected {}",
2484                part.id,
2485                part.message_id,
2486                current.message.id()
2487            );
2488            return vec![error_outcome(
2489                index,
2490                "part",
2491                pk,
2492                &msg,
2493                Some("message_id"),
2494                DROP_REASON_PART_MESSAGE_MISMATCH,
2495            )];
2496        }
2497        let part_key = (part.message_id.clone(), part.id.clone());
2498        if !self.seen_part_keys.insert(part_key) {
2499            let msg = format!(
2500                "duplicate part id {} for message {} in session substream",
2501                part.id, part.message_id
2502            );
2503            return vec![error_outcome(
2504                index,
2505                "part",
2506                pk,
2507                &msg,
2508                None,
2509                DROP_REASON_DUPLICATE_PART_KEY,
2510            )];
2511        }
2512        self.current_parts.push(BufferedPart { index, part });
2513        Vec::new()
2514    }
2515
2516    fn flush_current_message(&mut self) {
2517        let Some(mut buffered) = self.current_message.take() else {
2518            return;
2519        };
2520        let parts = std::mem::take(&mut self.current_parts);
2521        let mut canonical_parts = Vec::with_capacity(parts.len());
2522        for part in &parts {
2523            canonical_parts.push(part.part.clone());
2524        }
2525        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2526        buffered.parts = parts;
2527        self.messages.push(buffered);
2528    }
2529}
2530
2531fn error_outcome(
2532    index: usize,
2533    kind: &'static str,
2534    pk: Value,
2535    message: &str,
2536    field: Option<&'static str>,
2537    reason_key: &'static str,
2538) -> RowOutcome {
2539    RowOutcome {
2540        index,
2541        kind,
2542        pk,
2543        status: OutcomeStatus::Error,
2544        error: Some(RowError {
2545            message: message.to_owned(),
2546            field,
2547            reason: None,
2548            reason_key: Some(reason_key),
2549        }),
2550        searchable: false,
2551    }
2552}
2553
2554/// Session-level rejection (immutable `source_agent` / `project` violation):
2555/// emit exactly one Error outcome on the Session row. The buffered messages
2556/// and parts of this substream are *not* surfaced as per-row errors - their
2557/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2558fn error_outcomes_for_substream(
2559    session_index: usize,
2560    session: &Session,
2561    _messages: &[BufferedMessage],
2562    message: impl Into<String>,
2563    field: Option<&'static str>,
2564    reason_key: &'static str,
2565) -> Vec<RowOutcome> {
2566    let reason = field.map(|_| "immutable");
2567    vec![RowOutcome {
2568        index: session_index,
2569        kind: "session",
2570        pk: Value::String(session.id.clone()),
2571        status: OutcomeStatus::Error,
2572        error: Some(RowError {
2573            message: message.into(),
2574            field,
2575            reason,
2576            reason_key: Some(reason_key),
2577        }),
2578        searchable: false,
2579    }]
2580}
2581
2582/// Batched-path success helper. Each row's Inserted/Matched status is read
2583/// from the pre-existence sets captured by `upsert_session_batch` before its
2584/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2585/// Also accumulates the per-table totals into `counts` so the CLI summary
2586/// gets the same truth without re-walking the outcomes.
2587fn success_outcomes_for_substream(
2588    session_index: usize,
2589    session: &Session,
2590    messages: &[BufferedMessage],
2591    existing_sessions: &std::collections::HashMap<String, Session>,
2592    existing_message_pks: &HashSet<(String, String)>,
2593    existing_part_pks: &HashSet<(String, String, String)>,
2594    counts: &mut BatchCounts,
2595) -> Vec<RowOutcome> {
2596    let session_was_present = existing_sessions.contains_key(&session.id);
2597    let session_status = if session_was_present {
2598        counts.sessions_matched += 1;
2599        UpsertStatus::Matched
2600    } else {
2601        counts.sessions_inserted += 1;
2602        UpsertStatus::Inserted
2603    };
2604
2605    let mut outcomes = Vec::with_capacity(1 + messages.len());
2606    outcomes.push(success_outcome(
2607        session_index,
2608        "session",
2609        Value::String(session.id.clone()),
2610        session_status,
2611        false,
2612    ));
2613    for buffered in messages {
2614        let key = (
2615            buffered.message.session_id().to_owned(),
2616            buffered.message.id().to_owned(),
2617        );
2618        let searchable = buffered.search_text.is_some();
2619        let message_status = if existing_message_pks.contains(&key) {
2620            counts.messages_matched_total += 1;
2621            if searchable {
2622                counts.messages_matched_searchable += 1;
2623            }
2624            UpsertStatus::Matched
2625        } else {
2626            counts.messages_inserted_total += 1;
2627            if searchable {
2628                counts.messages_inserted_searchable += 1;
2629            }
2630            UpsertStatus::Inserted
2631        };
2632        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2633        outcomes.push(success_outcome(
2634            buffered.index,
2635            "message",
2636            pk,
2637            message_status,
2638            searchable,
2639        ));
2640        for part in &buffered.parts {
2641            let part_key = (
2642                part.part.session_id.clone(),
2643                part.part.message_id.clone(),
2644                part.part.id.clone(),
2645            );
2646            let part_status = if existing_part_pks.contains(&part_key) {
2647                counts.parts_matched += 1;
2648                UpsertStatus::Matched
2649            } else {
2650                counts.parts_inserted += 1;
2651                UpsertStatus::Inserted
2652            };
2653            let part_pk = Value::Array(vec![
2654                Value::String(part_key.0),
2655                Value::String(part_key.1),
2656                Value::String(part_key.2),
2657            ]);
2658            outcomes.push(success_outcome(
2659                part.index,
2660                "part",
2661                part_pk,
2662                part_status,
2663                false,
2664            ));
2665        }
2666    }
2667    outcomes
2668}
2669
2670fn success_outcome(
2671    index: usize,
2672    kind: &'static str,
2673    pk: Value,
2674    status: UpsertStatus,
2675    searchable: bool,
2676) -> RowOutcome {
2677    let status = match status {
2678        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2679        UpsertStatus::Matched => OutcomeStatus::Matched,
2680    };
2681    RowOutcome {
2682        index,
2683        kind,
2684        pk,
2685        status,
2686        error: None,
2687        searchable,
2688    }
2689}
2690
2691#[derive(Debug, Clone, PartialEq, Eq)]
2692enum IngestError {
2693    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2694    /// immutable post-first-write because the denormalized copies on
2695    /// `messages` were stamped from the prior Session at first ingest.
2696    /// A re-write that changes either would silently desync.
2697    ImmutableField {
2698        field: &'static str,
2699        session_id: String,
2700        stored: String,
2701        attempted: String,
2702    },
2703}
2704
2705impl std::fmt::Display for IngestError {
2706    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2707        match self {
2708            Self::ImmutableField {
2709                field,
2710                session_id,
2711                stored,
2712                attempted,
2713            } => write!(
2714                formatter,
2715                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2716            ),
2717        }
2718    }
2719}
2720
2721impl std::error::Error for IngestError {}
2722
2723/// Compare an incoming Session row against the stored row on the two
2724/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2725/// counts a NULL-vs-non-NULL change as a mismatch.
2726fn ensure_immutable_match(
2727    existing: &Session,
2728    incoming: &Session,
2729) -> std::result::Result<(), IngestError> {
2730    if existing.source_agent != incoming.source_agent {
2731        return Err(IngestError::ImmutableField {
2732            field: "source_agent",
2733            session_id: incoming.id.clone(),
2734            stored: existing.source_agent.clone(),
2735            attempted: incoming.source_agent.clone(),
2736        });
2737    }
2738    if existing.project != incoming.project {
2739        return Err(IngestError::ImmutableField {
2740            field: "project",
2741            session_id: incoming.id.clone(),
2742            stored: (*existing.project).clone(),
2743            attempted: (*incoming.project).clone(),
2744        });
2745    }
2746    Ok(())
2747}
2748
2749pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2750    use crate::wire::Provenance;
2751    let mut chunks: Vec<String> = Vec::new();
2752    for part in parts {
2753        // spec.md#search: only conversational parts contribute to the indexed
2754        // text; harness-injected scaffolding is excluded from search.
2755        if part.provenance != Provenance::Conversational {
2756            continue;
2757        }
2758        match (message.role(), &part.kind) {
2759            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2760                if let Some(text) = text {
2761                    chunks.push(text.to_string());
2762                }
2763            }
2764            (
2765                Role::User | Role::Assistant,
2766                PartKind::File {
2767                    media_type,
2768                    file_name,
2769                    data,
2770                },
2771            ) => {
2772                if let Some(file_name) = file_name {
2773                    chunks.push(file_name.clone());
2774                }
2775                if let Some(media_type) = media_type {
2776                    chunks.push(media_type.clone());
2777                }
2778                if let FileData::Url(uri) = data {
2779                    chunks.push(uri.clone());
2780                }
2781            }
2782            (
2783                Role::System | Role::Tool,
2784                PartKind::Text { .. }
2785                | PartKind::Reasoning { .. }
2786                | PartKind::File { .. }
2787                | PartKind::ToolCall { .. }
2788                | PartKind::ToolResult { .. }
2789                | PartKind::ToolApprovalRequest { .. }
2790                | PartKind::ToolApprovalResponse { .. },
2791            )
2792            | (
2793                Role::User | Role::Assistant,
2794                PartKind::Reasoning { .. }
2795                | PartKind::ToolCall { .. }
2796                | PartKind::ToolResult { .. }
2797                | PartKind::ToolApprovalRequest { .. }
2798                | PartKind::ToolApprovalResponse { .. },
2799            ) => {}
2800        }
2801    }
2802
2803    let text = chunks
2804        .into_iter()
2805        .filter(|chunk| !chunk.trim().is_empty())
2806        .collect::<Vec<_>>()
2807        .join("\n");
2808    if text.is_empty() { None } else { Some(text) }
2809}
2810
2811/// Non-empty conversational text (spec.md#search).
2812#[derive(Debug, Clone, PartialEq, Eq)]
2813pub struct SearchText(String);
2814
2815impl SearchText {
2816    pub fn as_str(&self) -> &str {
2817        &self.0
2818    }
2819
2820    pub fn into_inner(self) -> String {
2821        self.0
2822    }
2823}
2824
2825impl AsRef<str> for SearchText {
2826    fn as_ref(&self) -> &str {
2827        &self.0
2828    }
2829}
2830
2831#[derive(Debug, Clone, PartialEq)]
2832pub struct MessageWithParts {
2833    pub message: Message,
2834    pub parts: Vec<Part>,
2835}
2836
2837#[derive(Debug, Clone, PartialEq)]
2838pub struct SessionWithMessages {
2839    pub session: Session,
2840    pub messages: Vec<MessageWithParts>,
2841}
2842
2843#[derive(Debug, Clone)]
2844pub struct SessionViewParams<'a> {
2845    pub mode: ResponseMode,
2846    pub after_id: Option<&'a str>,
2847    pub limit: usize,
2848    pub budget_bytes: usize,
2849    pub session_from: SessionFrom,
2850}
2851
2852#[derive(Debug, Clone)]
2853pub struct MessageViewParams<'a> {
2854    pub context_depth: usize,
2855    pub after_id: Option<&'a str>,
2856    pub limit: usize,
2857    pub budget_bytes: usize,
2858}
2859
2860/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2861/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2862/// `validation_failed`): the message/part stream is append-only, so an anchor
2863/// that was ever valid never disappears - an unknown one is always a client
2864/// error, never a reason to silently restart the page.
2865#[derive(Debug, Clone, PartialEq)]
2866pub enum GetLookup<T> {
2867    NotFound,
2868    UnknownAfterId,
2869    Found(T),
2870}
2871
2872/// Canonical retrieval result for `pond_get` session mode: the stored session
2873/// plus the page of messages (each with its `Part`s) and a remaining count.
2874/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2875#[derive(Debug, Clone, PartialEq)]
2876pub struct SessionPage {
2877    pub session: Session,
2878    pub messages: Vec<RetrievedMessage>,
2879    pub messages_remaining: usize,
2880}
2881
2882/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2883/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2884/// their parts so the handler can summarize them.
2885#[derive(Debug, Clone, PartialEq)]
2886pub struct MessagePage {
2887    pub session: Session,
2888    pub target: RetrievedMessage,
2889    pub target_parts: Vec<Part>,
2890    pub target_parts_remaining: usize,
2891    pub siblings: Vec<RetrievedMessage>,
2892}
2893
2894#[derive(Debug, Clone, PartialEq)]
2895pub struct RetrievedMessage {
2896    pub id: String,
2897    pub role: Role,
2898    pub timestamp: DateTime<Utc>,
2899    pub text: Option<String>,
2900    pub content: Option<String>,
2901    pub parts: Vec<Part>,
2902}
2903
2904#[derive(Debug, Clone)]
2905struct ScanRow {
2906    id: String,
2907    role: Role,
2908    timestamp: DateTime<Utc>,
2909    text: Option<String>,
2910    content: Option<String>,
2911}
2912
2913/// One row of the conversational scan. `text` is non-empty by
2914/// `IsNotNull("search_text")` pushdown (spec.md#search).
2915#[derive(Debug, Clone)]
2916pub struct ConversationalRow {
2917    pub session_id: String,
2918    pub message_id: String,
2919    pub role: Role,
2920    pub timestamp: DateTime<Utc>,
2921    pub text: SearchText,
2922}
2923
2924/// Number of leading `items` that fit within `limit` and the byte budget,
2925/// sizing each by `size`. Always emits at least one (a single oversize item
2926/// never blocks its own page); the budget then stops the page at the next item
2927/// boundary.
2928fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2929    let capped = items.len().min(limit.clamp(1, 1000));
2930    let mut acc = 0usize;
2931    let mut emitted = 0usize;
2932    for item in &items[..capped] {
2933        let next = acc.saturating_add(size(item));
2934        if emitted > 0 && next > budget_bytes {
2935            break;
2936        }
2937        acc = next;
2938        emitted += 1;
2939    }
2940    emitted
2941}
2942
2943fn role_from_str(value: &str) -> Result<Role> {
2944    match value {
2945        "system" => Ok(Role::System),
2946        "user" => Ok(Role::User),
2947        "assistant" => Ok(Role::Assistant),
2948        "tool" => Ok(Role::Tool),
2949        other => anyhow::bail!("unknown message role {other}"),
2950    }
2951}
2952
2953/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2954/// and range columns, BITMAP for low-cardinality columns. There is no index
2955/// on `embedding_model`: pond's invariant is one active model at a time
2956/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2957/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2958/// query-time predicate - the only embedding-state filter is `vector IS NOT
2959/// NULL`. `id` lookups are rare and full-scan.
2960const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2961    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2962    (
2963        "session_id",
2964        BuiltinIndexType::BTree,
2965        "messages_session_id_btree",
2966    ),
2967    (
2968        "timestamp",
2969        BuiltinIndexType::BTree,
2970        "messages_timestamp_btree",
2971    ),
2972    (
2973        "source_agent",
2974        BuiltinIndexType::Bitmap,
2975        "messages_source_agent_bitmap",
2976    ),
2977    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2978];
2979
2980/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2981/// `parts_for_messages` (hydration on every `get` and grouped search).
2982const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2983    (
2984        "session_id",
2985        BuiltinIndexType::BTree,
2986        "parts_session_id_btree",
2987    ),
2988    (
2989        "message_id",
2990        BuiltinIndexType::BTree,
2991        "parts_message_id_btree",
2992    ),
2993];
2994
2995/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
2996/// `get` and every grouped search.
2997const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2998    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2999
3000fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3001    Predicate::In(
3002        column,
3003        values.iter().cloned().map(ScalarValue::String).collect(),
3004    )
3005}
3006
3007/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
3008/// never sees a null-vector row. Under the single-active-model invariant,
3009/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
3010/// the configured model" - no per-row `embedding_model` filter needed.
3011fn embedded_scope(filter: &Predicate) -> Predicate {
3012    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3013}
3014
3015fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3016    let inserted = usize::try_from(inserted_rows)
3017        .unwrap_or(usize::MAX)
3018        .min(total);
3019    let mut statuses = Vec::with_capacity(total);
3020    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3021    statuses.extend(std::iter::repeat_n(
3022        UpsertStatus::Matched,
3023        total.saturating_sub(inserted),
3024    ));
3025    statuses
3026}
3027
3028// Bare logical table names: the lance-namespace Directory impl owns the
3029// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3030// a `.lance` path.
3031pub(crate) const SESSIONS: &str = "sessions";
3032pub(crate) const MESSAGES: &str = "messages";
3033pub(crate) const PARTS: &str = "parts";
3034
3035/// FTS index name on `messages.search_text`. Stable so status and index
3036/// creation name the same index.
3037pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3038
3039/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3040/// activation check and index creation name the same index.
3041pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3042
3043/// IVF_PQ tuning constants (spec.md#search):
3044/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3045/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3046/// - max_iters = 15 (kmeans cap)
3047/// - cosine metric (e5 vectors are L2-normalized)
3048const IVF_PQ_NUM_BITS: u8 = 8;
3049const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3050const IVF_PQ_MAX_ITERS: usize = 15;
3051
3052/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3053/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3054/// (`FTS`, `OCC`) searchable.
3055const FTS_NGRAM_MIN: u32 = 3;
3056const FTS_NGRAM_MAX: u32 = 5;
3057
3058/// Pond's production IndexIntents: the per-table intent set
3059/// `Store::open_with_options` registers with the substrate.
3060pub fn pond_index_intents() -> IndexIntents {
3061    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3062}
3063
3064/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3065/// threshold. Used by tests that need to exercise the activation boundary
3066/// without writing 100k vectors.
3067pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3068    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3069    messages.push(IndexIntent {
3070        name: MESSAGES_FTS_INDEX,
3071        column: "search_text",
3072        trigger: IndexTrigger::OnAnyRows,
3073        params: IndexParamsKind::InvertedFtsNgram {
3074            min: FTS_NGRAM_MIN,
3075            max: FTS_NGRAM_MAX,
3076        },
3077    });
3078    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3079        messages.push(IndexIntent {
3080            name,
3081            column,
3082            trigger: IndexTrigger::OnAnyRows,
3083            params: IndexParamsKind::Scalar(kind.clone()),
3084        });
3085    }
3086    messages.push(IndexIntent {
3087        name: MESSAGES_VECTOR_INDEX,
3088        column: "vector",
3089        trigger: IndexTrigger::OnNonNullCount {
3090            column: "vector",
3091            threshold: vector_threshold,
3092        },
3093        params: IndexParamsKind::IvfPqCosine {
3094            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3095            num_bits: IVF_PQ_NUM_BITS,
3096            max_iters: IVF_PQ_MAX_ITERS,
3097        },
3098    });
3099    let parts = PARTS_SCALAR_INDICES
3100        .iter()
3101        .map(|(column, kind, name)| IndexIntent {
3102            name,
3103            column,
3104            trigger: IndexTrigger::OnAnyRows,
3105            params: IndexParamsKind::Scalar(kind.clone()),
3106        })
3107        .collect();
3108    let sessions = SESSIONS_SCALAR_INDICES
3109        .iter()
3110        .map(|(column, kind, name)| IndexIntent {
3111            name,
3112            column,
3113            trigger: IndexTrigger::OnAnyRows,
3114            params: IndexParamsKind::Scalar(kind.clone()),
3115        })
3116        .collect();
3117    IndexIntents {
3118        sessions,
3119        messages,
3120        parts,
3121    }
3122}
3123
3124/// Default width of the `messages.vector` embedding column (spec.md#search):
3125/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3126/// 384). Used when `[embeddings].dim` is absent.
3127pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3128
3129/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3130/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3131/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3132/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3133/// which keeps unit tests config-free.
3134static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3135
3136/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3137/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3138pub fn embedding_dim() -> usize {
3139    EMBEDDING_DIM_RUNTIME
3140        .get()
3141        .copied()
3142        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3143}
3144
3145/// Seed [`embedding_dim`] from config. First call wins.
3146pub fn init_embedding_dim(dim: usize) {
3147    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3148}
3149
3150/// Initial-`CREATE` write params for the namespace-mediated path. The
3151/// substrate seam stamps in `session`, `mode`, and `store_params`.
3152/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3153/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3154/// suppresses the per-commit hook so cleanup stays operator-driven via
3155/// `pond index optimize` (one LIST per command instead of per write).
3156pub(crate) fn write_params_for_create() -> WriteParams {
3157    WriteParams {
3158        data_storage_version: Some(LanceFileVersion::V2_1),
3159        enable_v2_manifest_paths: true,
3160        enable_stable_row_ids: true,
3161        auto_cleanup: Some(AutoCleanupParams {
3162            interval: 20,
3163            older_than: chrono::TimeDelta::days(1),
3164        }),
3165        skip_auto_cleanup: true,
3166        ..WriteParams::default()
3167    }
3168}
3169
3170fn export_schema(table: Table) -> Arc<Schema> {
3171    match table {
3172        Table::Sessions => session_schema(),
3173        Table::Messages => message_schema(),
3174        Table::Parts => part_schema(),
3175    }
3176}
3177
3178fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3179    let expected = export_schema(table);
3180    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3181    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3182    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3183    if actual_names != expected_names {
3184        anyhow::bail!(
3185            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3186            table.as_str(),
3187        );
3188    }
3189    Ok(())
3190}
3191
3192async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3193    let source_uri = source
3194        .to_str()
3195        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3196    let dataset = Dataset::open(source_uri)
3197        .await
3198        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3199    ensure_schema_matches_archive(&dataset, table)?;
3200    Ok(dataset)
3201}
3202
3203pub(crate) fn session_schema() -> Arc<Schema> {
3204    Arc::new(Schema::new(vec![
3205        primary_field("id", DataType::Utf8, false),
3206        Field::new("parent_session_id", DataType::Utf8, true),
3207        Field::new("parent_message_id", DataType::Utf8, true),
3208        Field::new("source_agent", DataType::Utf8, false),
3209        Field::new(
3210            "created_at",
3211            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3212            false,
3213        ),
3214        Field::new("project", DataType::Utf8, false),
3215        json_field("options", false),
3216    ]))
3217}
3218
3219pub(crate) fn message_schema() -> Arc<Schema> {
3220    Arc::new(Schema::new(vec![
3221        primary_field("session_id", DataType::Utf8, false),
3222        primary_field("id", DataType::Utf8, false),
3223        Field::new(
3224            "timestamp",
3225            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3226            false,
3227        ),
3228        Field::new("role", DataType::Utf8, false),
3229        Field::new("source_agent", DataType::Utf8, false),
3230        Field::new("project", DataType::Utf8, false),
3231        Field::new("content", DataType::Utf8, true),
3232        Field::new("search_text", DataType::Utf8, true),
3233        // The message's derived embedding (spec.md#session-embed-from-canonical):
3234        // both null until `pond embed` fills them, set together thereafter.
3235        Field::new("vector", embedding_vector_type(), true),
3236        Field::new("embedding_model", DataType::Utf8, true),
3237        json_field("options", false),
3238    ]))
3239}
3240
3241pub(crate) fn part_schema() -> Arc<Schema> {
3242    Arc::new(Schema::new(vec![
3243        primary_field("session_id", DataType::Utf8, false),
3244        primary_field("message_id", DataType::Utf8, false),
3245        primary_field("id", DataType::Utf8, false),
3246        Field::new("ordinal", DataType::Int32, false),
3247        Field::new("type", DataType::Utf8, false),
3248        // spec.md#model-part-provenance: conversation vs harness-injected; search
3249        // reads this column to exclude injected scaffolding.
3250        Field::new("provenance", DataType::Utf8, false),
3251        json_field("variant_data", false),
3252        legacy_blob_field("data", true),
3253        json_field("options", false),
3254    ]))
3255}
3256
3257pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3258    let arrays = schema
3259        .fields()
3260        .iter()
3261        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3262        .collect();
3263    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3264}
3265
3266pub(crate) fn empty_reader(
3267    schema: Arc<Schema>,
3268) -> Result<
3269    RecordBatchIterator<
3270        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3271    >,
3272> {
3273    let batch = empty_batch(schema.clone())?;
3274    Ok(RecordBatchIterator::new(
3275        vec![Ok(batch)].into_iter(),
3276        schema,
3277    ))
3278}
3279
3280pub(crate) struct MessageBatchRow<'a> {
3281    pub message: &'a Message,
3282    pub source_agent: &'a str,
3283    pub project: &'a str,
3284    pub search_text: Option<&'a str>,
3285}
3286
3287// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3288// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3289// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3290// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3291// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3292fn embedding_vector_type() -> DataType {
3293    DataType::FixedSizeList(
3294        Arc::new(Field::new("item", DataType::Float16, true)),
3295        embedding_dim() as i32,
3296    )
3297}
3298
3299/// The partial-schema source for the embedding column update: the `messages`
3300/// primary key plus the two columns `pond embed` fills. The field definitions
3301/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3302fn embedding_update_schema() -> Arc<Schema> {
3303    Arc::new(Schema::new(vec![
3304        primary_field("session_id", DataType::Utf8, false),
3305        primary_field("id", DataType::Utf8, false),
3306        Field::new("vector", embedding_vector_type(), true),
3307        Field::new("embedding_model", DataType::Utf8, true),
3308    ]))
3309}
3310
3311/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3312/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3313pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3314    let dim = embedding_dim();
3315    let mut flat = Vec::with_capacity(rows.len() * dim);
3316    for row in rows {
3317        if row.vector.len() != dim {
3318            anyhow::bail!(
3319                "embedding for message {} has dim {}, expected {dim}",
3320                row.id,
3321                row.vector.len(),
3322            );
3323        }
3324        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3325    }
3326    let values = Float16Array::from(flat);
3327    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3328    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3329        .context("failed to build embedding vector column")?;
3330
3331    RecordBatch::try_new(
3332        embedding_update_schema(),
3333        vec![
3334            Arc::new(StringArray::from(
3335                rows.iter()
3336                    .map(|row| row.session_id.as_str())
3337                    .collect::<Vec<_>>(),
3338            )),
3339            Arc::new(StringArray::from(
3340                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3341            )),
3342            Arc::new(vectors),
3343            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3344        ],
3345    )
3346    .context("failed to build embedding update batch")
3347}
3348
3349/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3350/// is split before the running total of its text columns reaches this, and a
3351/// single cell at or above it is rejected rather than left to panic inside
3352/// `StringArray::from` (spec.md#adapter-bounded-values).
3353const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3354
3355/// Contiguous row ranges whose summed text-column byte cost each stays within
3356/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3357/// column too, since no single column's total can exceed it. `cells[i]` is row
3358/// `i`'s byte cost summed across every text column.
3359fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3360    let mut chunks = Vec::new();
3361    let mut start = 0usize;
3362    let mut running = 0usize;
3363    for (index, &row) in cells.iter().enumerate() {
3364        if running + row > COLUMN_BYTE_BUDGET && index > start {
3365            chunks.push(start..index);
3366            start = index;
3367            running = 0;
3368        }
3369        running += row;
3370    }
3371    if start < cells.len() {
3372        chunks.push(start..cells.len());
3373    }
3374    chunks
3375}
3376
3377fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3378    if bytes >= COLUMN_BYTE_BUDGET {
3379        anyhow::bail!(
3380            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3381             overflow Arrow's i32 offset buffer"
3382        );
3383    }
3384    Ok(())
3385}
3386
3387async fn merge_insert_chunks(
3388    handle: &Handle,
3389    table: Table,
3390    batches: Vec<RecordBatch>,
3391) -> Result<u64> {
3392    let mut inserted = 0u64;
3393    for batch in batches {
3394        let rows = batch.num_rows();
3395        inserted += handle.merge_insert(table, batch, rows).await?;
3396    }
3397    Ok(inserted)
3398}
3399
3400pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3401    let options = sessions
3402        .iter()
3403        .map(|session| json_bytes(&session.options))
3404        .collect::<Result<Vec<_>>>()?;
3405    let mut cells = Vec::with_capacity(sessions.len());
3406    for (session, encoded) in sessions.iter().zip(&options) {
3407        let columns = [
3408            session.id.len(),
3409            session.parent_session_id.as_deref().map_or(0, str::len),
3410            session.parent_message_id.as_deref().map_or(0, str::len),
3411            session.source_agent.len(),
3412            session.project.as_str().len(),
3413            encoded.len(),
3414        ];
3415        for bytes in columns {
3416            guard_cell("sessions", &session.id, bytes)?;
3417        }
3418        cells.push(columns.iter().sum());
3419    }
3420    chunk_ranges(&cells)
3421        .into_iter()
3422        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3423        .collect()
3424}
3425
3426fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3427    let schema = session_schema();
3428    RecordBatch::try_new(
3429        schema.clone(),
3430        vec![
3431            Arc::new(StringArray::from(
3432                sessions
3433                    .iter()
3434                    .map(|session| session.id.as_str())
3435                    .collect::<Vec<_>>(),
3436            )),
3437            Arc::new(StringArray::from(
3438                sessions
3439                    .iter()
3440                    .map(|session| session.parent_session_id.as_deref())
3441                    .collect::<Vec<_>>(),
3442            )),
3443            Arc::new(StringArray::from(
3444                sessions
3445                    .iter()
3446                    .map(|session| session.parent_message_id.as_deref())
3447                    .collect::<Vec<_>>(),
3448            )),
3449            Arc::new(StringArray::from(
3450                sessions
3451                    .iter()
3452                    .map(|session| session.source_agent.as_str())
3453                    .collect::<Vec<_>>(),
3454            )),
3455            Arc::new(
3456                TimestampMicrosecondArray::from(
3457                    sessions
3458                        .iter()
3459                        .map(|session| micros(session.created_at))
3460                        .collect::<Vec<_>>(),
3461                )
3462                .with_timezone("UTC"),
3463            ),
3464            Arc::new(StringArray::from(
3465                sessions
3466                    .iter()
3467                    .map(|session| session.project.as_str())
3468                    .collect::<Vec<_>>(),
3469            )),
3470            Arc::new(LargeBinaryArray::from_iter_values(
3471                options.iter().map(Vec::as_slice),
3472            )),
3473        ],
3474    )
3475    .context("failed to build session batch")
3476}
3477
3478pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3479    let options = rows
3480        .iter()
3481        .map(|row| json_bytes(row.message.options()))
3482        .collect::<Result<Vec<_>>>()?;
3483    let mut cells = Vec::with_capacity(rows.len());
3484    for (row, encoded) in rows.iter().zip(&options) {
3485        let columns = [
3486            row.message.session_id().len(),
3487            row.message.id().len(),
3488            row.message.role().as_str().len(),
3489            row.source_agent.len(),
3490            row.project.len(),
3491            row.message.system_content().map_or(0, str::len),
3492            row.search_text.map_or(0, str::len),
3493            encoded.len(),
3494        ];
3495        for bytes in columns {
3496            guard_cell("messages", row.message.id(), bytes)?;
3497        }
3498        cells.push(columns.iter().sum());
3499    }
3500    chunk_ranges(&cells)
3501        .into_iter()
3502        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3503        .collect()
3504}
3505
3506fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3507    let schema = message_schema();
3508    RecordBatch::try_new(
3509        schema.clone(),
3510        vec![
3511            Arc::new(StringArray::from(
3512                rows.iter()
3513                    .map(|row| row.message.session_id())
3514                    .collect::<Vec<_>>(),
3515            )),
3516            Arc::new(StringArray::from(
3517                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3518            )),
3519            Arc::new(
3520                TimestampMicrosecondArray::from(
3521                    rows.iter()
3522                        .map(|row| micros(row.message.timestamp()))
3523                        .collect::<Vec<_>>(),
3524                )
3525                .with_timezone("UTC"),
3526            ),
3527            Arc::new(StringArray::from(
3528                rows.iter()
3529                    .map(|row| row.message.role().as_str())
3530                    .collect::<Vec<_>>(),
3531            )),
3532            Arc::new(StringArray::from(
3533                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3534            )),
3535            Arc::new(StringArray::from(
3536                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3537            )),
3538            Arc::new(StringArray::from(
3539                rows.iter()
3540                    .map(|row| row.message.system_content())
3541                    .collect::<Vec<_>>(),
3542            )),
3543            Arc::new(StringArray::from(
3544                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3545            )),
3546            // `vector` / `embedding_model` are written null at ingest; every
3547            // message starts un-embedded and `pond embed` fills them later
3548            // (spec.md#session-embed-from-canonical).
3549            new_null_array(&embedding_vector_type(), rows.len()),
3550            new_null_array(&DataType::Utf8, rows.len()),
3551            Arc::new(LargeBinaryArray::from_iter_values(
3552                options.iter().map(Vec::as_slice),
3553            )),
3554        ],
3555    )
3556    .context("failed to build message batch")
3557}
3558
3559pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3560    let variant_data = parts
3561        .iter()
3562        .map(|part| part_variant_json(&part.kind))
3563        .collect::<Result<Vec<_>>>()?;
3564    let options = parts
3565        .iter()
3566        .map(|part| json_bytes(&part.options))
3567        .collect::<Result<Vec<_>>>()?;
3568    let mut cells = Vec::with_capacity(parts.len());
3569    // The blob column is a BinaryArray, exempt from the text-column bound
3570    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3571    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3572        let columns = [
3573            part.session_id.len(),
3574            part.message_id.len(),
3575            part.id.len(),
3576            part.kind.type_name().len(),
3577            part.provenance.as_str().len(),
3578            variant.len(),
3579            encoded.len(),
3580        ];
3581        for bytes in columns {
3582            guard_cell("parts", &part.id, bytes)?;
3583        }
3584        cells.push(columns.iter().sum());
3585    }
3586    chunk_ranges(&cells)
3587        .into_iter()
3588        .map(|range| {
3589            parts_chunk(
3590                &parts[range.clone()],
3591                &variant_data[range.clone()],
3592                &options[range],
3593            )
3594        })
3595        .collect()
3596}
3597
3598fn parts_chunk(
3599    parts: &[Part],
3600    variant_data: &[Vec<u8>],
3601    options: &[Vec<u8>],
3602) -> Result<RecordBatch> {
3603    let schema = part_schema();
3604    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3605    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3606    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3607    let blob_payloads: Vec<Option<&[u8]>> = parts
3608        .iter()
3609        .map(|part| match &part.kind {
3610            PartKind::File { data, .. } => Some(match data {
3611                FileData::String(value) => value.as_bytes(),
3612                FileData::Bytes(value) => value.as_slice(),
3613                FileData::Url(value) => value.as_bytes(),
3614            }),
3615            PartKind::Text { .. }
3616            | PartKind::Reasoning { .. }
3617            | PartKind::ToolCall { .. }
3618            | PartKind::ToolResult { .. }
3619            | PartKind::ToolApprovalRequest { .. }
3620            | PartKind::ToolApprovalResponse { .. } => None,
3621        })
3622        .collect();
3623    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3624
3625    RecordBatch::try_new(
3626        schema.clone(),
3627        vec![
3628            Arc::new(StringArray::from(
3629                parts
3630                    .iter()
3631                    .map(|part| part.session_id.as_str())
3632                    .collect::<Vec<_>>(),
3633            )),
3634            Arc::new(StringArray::from(
3635                parts
3636                    .iter()
3637                    .map(|part| part.message_id.as_str())
3638                    .collect::<Vec<_>>(),
3639            )),
3640            Arc::new(StringArray::from(
3641                parts
3642                    .iter()
3643                    .map(|part| part.id.as_str())
3644                    .collect::<Vec<_>>(),
3645            )),
3646            Arc::new(Int32Array::from(
3647                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3648            )),
3649            Arc::new(StringArray::from(
3650                parts
3651                    .iter()
3652                    .map(|part| part.kind.type_name())
3653                    .collect::<Vec<_>>(),
3654            )),
3655            Arc::new(StringArray::from(
3656                parts
3657                    .iter()
3658                    .map(|part| part.provenance.as_str())
3659                    .collect::<Vec<_>>(),
3660            )),
3661            Arc::new(LargeBinaryArray::from_iter_values(
3662                variant_data.iter().map(Vec::as_slice),
3663            )),
3664            Arc::new(blob_array),
3665            Arc::new(LargeBinaryArray::from_iter_values(
3666                options.iter().map(Vec::as_slice),
3667            )),
3668        ],
3669    )
3670    .context("failed to build parts batch")
3671}
3672
3673pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3674    Ok(Session {
3675        id: string(batch, "id", row)?.context("session id is null")?,
3676        parent_session_id: string(batch, "parent_session_id", row)?,
3677        parent_message_id: string(batch, "parent_message_id", row)?,
3678        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3679        created_at: datetime(batch, "created_at", row)?,
3680        project: crate::adapter::Extracted::from_stored(
3681            string(batch, "project", row)?.context("project is null")?,
3682        ),
3683        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3684    })
3685}
3686
3687pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3688    let id = string(batch, "id", row)?.context("message id is null")?;
3689    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3690    let timestamp = datetime(batch, "timestamp", row)?;
3691    let options =
3692        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3693
3694    match string(batch, "role", row)?
3695        .context("message role is null")?
3696        .as_str()
3697    {
3698        "system" => Ok(Message::System {
3699            id,
3700            session_id,
3701            timestamp,
3702            // `content` is nullable in the schema; preserve the distinction
3703            // between "no content row stored" (`None`) and "empty string
3704            // stored" (`Some(extracted_empty)`). The value originally
3705            // came from a `Source` extraction at ingest time; rewrap via
3706            // the storage-internal `from_stored` so the type-system seal
3707            // for adapters stays intact.
3708            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3709            options,
3710        }),
3711        "user" => Ok(Message::User {
3712            id,
3713            session_id,
3714            timestamp,
3715            options,
3716        }),
3717        "assistant" => Ok(Message::Assistant {
3718            id,
3719            session_id,
3720            timestamp,
3721            options,
3722        }),
3723        "tool" => Ok(Message::Tool {
3724            id,
3725            session_id,
3726            timestamp,
3727            options,
3728        }),
3729        other => anyhow::bail!("unknown message role {other}"),
3730    }
3731}
3732
3733pub(crate) fn part_from_batch(
3734    batch: &RecordBatch,
3735    row: usize,
3736    file_data: Option<FileData>,
3737) -> Result<Part> {
3738    let type_name = string(batch, "type", row)?.context("part type is null")?;
3739    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3740    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3741    Ok(Part {
3742        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3743        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3744        id: string(batch, "id", row)?.context("part id is null")?,
3745        ordinal: int32(batch, "ordinal", row)?,
3746        provenance: provenance_from_str(&provenance)?,
3747        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3748        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3749    })
3750}
3751
3752fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3753    match value {
3754        "conversational" => Ok(crate::wire::Provenance::Conversational),
3755        "injected" => Ok(crate::wire::Provenance::Injected),
3756        other => anyhow::bail!("unknown part provenance {other}"),
3757    }
3758}
3759
3760fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3761    let kind = file_data_kind(variant_data)?;
3762    match kind.as_str() {
3763        "string" => {
3764            let text = std::str::from_utf8(bytes)
3765                .context("file string payload is not UTF-8")?
3766                .to_owned();
3767            Ok(FileData::String(text))
3768        }
3769        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3770        "url" => Ok(FileData::Url(
3771            std::str::from_utf8(bytes)
3772                .context("file URL payload is not UTF-8")?
3773                .to_owned(),
3774        )),
3775        other => anyhow::bail!("unknown file data_kind {other}"),
3776    }
3777}
3778
3779fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3780    let value = json_parse::<Value>(variant_data)?;
3781    value
3782        .get("data_kind")
3783        .and_then(Value::as_str)
3784        .map(str::to_owned)
3785        .context("file part variant_data missing data_kind")
3786}
3787
3788fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3789    batch
3790        .column_by_name(name)
3791        .with_context(|| format!("missing column {name}"))?
3792        .as_any()
3793        .downcast_ref::<UInt64Array>()
3794        .with_context(|| format!("column {name} is not UInt64"))
3795}
3796
3797pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3798    let array = batch
3799        .column_by_name(name)
3800        .with_context(|| format!("missing column {name}"))?
3801        .as_any()
3802        .downcast_ref::<StringArray>()
3803        .with_context(|| format!("column {name} is not Utf8"))?;
3804    if array.is_null(row) {
3805        Ok(None)
3806    } else {
3807        Ok(Some(array.value(row).to_owned()))
3808    }
3809}
3810
3811fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3812    // Lance can return a `lance.json` column either as raw JSONB bytes
3813    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3814    // LargeUtf8), depending on the read path. Handle both.
3815    let column = batch
3816        .column_by_name(name)
3817        .with_context(|| format!("missing column {name}"))?;
3818    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3819        return if array.is_null(row) {
3820            Ok(None)
3821        } else {
3822            Ok(Some(
3823                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3824            ))
3825        };
3826    }
3827    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3828        return if array.is_null(row) {
3829            Ok(None)
3830        } else {
3831            Ok(Some(array.value(row).as_bytes().to_vec()))
3832        };
3833    }
3834    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3835        return if array.is_null(row) {
3836            Ok(None)
3837        } else {
3838            Ok(Some(array.value(row).as_bytes().to_vec()))
3839        };
3840    }
3841    anyhow::bail!("column {name} is not a JSON-compatible array")
3842}
3843
3844fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3845    let array = batch
3846        .column_by_name(name)
3847        .with_context(|| format!("missing column {name}"))?
3848        .as_any()
3849        .downcast_ref::<Int32Array>()
3850        .with_context(|| format!("column {name} is not Int32"))?;
3851    Ok(array.value(row))
3852}
3853
3854pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3855    let array = batch
3856        .column_by_name(name)
3857        .with_context(|| format!("missing column {name}"))?
3858        .as_any()
3859        .downcast_ref::<Float32Array>()
3860        .with_context(|| format!("column {name} is not Float32"))?;
3861    Ok(array.value(row))
3862}
3863
3864pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3865    let array = batch
3866        .column_by_name(name)
3867        .with_context(|| format!("missing column {name}"))?
3868        .as_any()
3869        .downcast_ref::<TimestampMicrosecondArray>()
3870        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3871    Utc.timestamp_micros(array.value(row))
3872        .single()
3873        .context("timestamp is out of range")
3874}
3875
3876fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3877    Field::new(name, data_type, nullable).with_metadata(
3878        [(
3879            "lance-schema:unenforced-primary-key".to_owned(),
3880            "true".to_owned(),
3881        )]
3882        .into(),
3883    )
3884}
3885
3886// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3887// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3888// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3889// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3890// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3891// allocated only one column_info, so the decoder's second `expect_next()`
3892// fires `"there were more fields in the schema than provided column
3893// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3894// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3895fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3896    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3897        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3898            .into_iter()
3899            .collect(),
3900    )
3901}
3902
3903fn json_field(name: &str, nullable: bool) -> Field {
3904    lance_arrow::json::json_field(name, nullable)
3905}
3906
3907fn micros(timestamp: DateTime<Utc>) -> i64 {
3908    timestamp.timestamp_micros()
3909}
3910
3911fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3912    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3913    // matches the `lance.json` extension contract. Lance's compact path
3914    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3915    // applies `decode_json -> encode_json` on this column; with proper JSONB
3916    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3917    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3918    // `try_into_dfstream`; compact still goes through the adapter).
3919    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3920    lance_arrow::json::encode_json(&text)
3921        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3922}
3923
3924fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3925    serde_json::from_slice(value).context("failed to parse JSON field")
3926}
3927
3928fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3929    if let PartKind::File {
3930        media_type,
3931        file_name,
3932        data,
3933    } = kind
3934    {
3935        let data_kind = match data {
3936            FileData::String(_) => "string",
3937            FileData::Bytes(_) => "bytes",
3938            FileData::Url(_) => "url",
3939        };
3940        return json_bytes(&serde_json::json!({
3941            "media_type": media_type,
3942            "file_name": file_name,
3943            "data_kind": data_kind,
3944        }));
3945    }
3946    let value = serde_json::to_value(kind)?;
3947    let mut object = value
3948        .as_object()
3949        .cloned()
3950        .context("part variant did not serialize to an object")?;
3951    object.remove("type");
3952    json_bytes(&object)
3953}
3954
3955fn part_kind_from_json(
3956    type_name: &str,
3957    variant_data: &[u8],
3958    file_data: Option<FileData>,
3959) -> Result<PartKind> {
3960    let mut value = json_parse::<Value>(variant_data)?;
3961    let object = value
3962        .as_object_mut()
3963        .context("part variant data is not an object")?;
3964    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3965    if let Some(data) = file_data {
3966        object.remove("data_kind");
3967        object.insert("data".to_owned(), serde_json::to_value(data)?);
3968    }
3969    serde_json::from_value(value).context("failed to parse part kind")
3970}
3971
3972#[cfg(test)]
3973mod tests {
3974    #![allow(clippy::expect_used, clippy::unwrap_used)]
3975
3976    use super::*;
3977    use crate::{
3978        adapter::Extracted,
3979        handlers::ingest_events,
3980        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3981    };
3982    use chrono::Utc;
3983    use serde_json::json;
3984    use tempfile::TempDir;
3985
3986    fn synthetic_session(id: &str) -> Session {
3987        Session {
3988            id: id.to_owned(),
3989            parent_session_id: None,
3990            parent_message_id: None,
3991            source_agent: "claude-code".to_owned(),
3992            created_at: Utc::now(),
3993            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3994            options: ProviderOptions::new(),
3995        }
3996    }
3997
3998    #[test]
3999    fn search_text_excludes_injected_parts() {
4000        use crate::wire::Provenance;
4001        let message = Message::User {
4002            id: "m1".to_owned(),
4003            session_id: "s1".to_owned(),
4004            timestamp: Utc::now(),
4005            options: ProviderOptions::new(),
4006        };
4007        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4008            session_id: "s1".to_owned(),
4009            id: id.to_owned(),
4010            message_id: "m1".to_owned(),
4011            ordinal: 0,
4012            provenance,
4013            options: ProviderOptions::new(),
4014            kind: PartKind::Text {
4015                text: Some(Extracted::from_test_value(text.to_owned())),
4016            },
4017        };
4018
4019        // A conversational part contributes; an injected one is excluded
4020        // (spec.md#search).
4021        let conversational = search_text(
4022            &message,
4023            &[text_part(
4024                "p1",
4025                "real human prompt",
4026                Provenance::Conversational,
4027            )],
4028        );
4029        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4030
4031        let injected = search_text(
4032            &message,
4033            &[text_part(
4034                "p2",
4035                "<task-notification>...</task-notification>",
4036                Provenance::Injected,
4037            )],
4038        );
4039        assert!(
4040            injected.is_none(),
4041            "a message whose only part is injected has null search_text"
4042        );
4043    }
4044
4045    #[test]
4046    fn chunk_ranges_splits_on_byte_budget() {
4047        assert!(chunk_ranges(&[]).is_empty());
4048        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4049
4050        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4051        assert_eq!(
4052            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4053            vec![0..1, 1..2, 2..3],
4054        );
4055
4056        // An oversized single row gets its own chunk, never an infinite loop.
4057        assert_eq!(
4058            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4059            vec![0..1, 1..2, 2..3],
4060        );
4061    }
4062
4063    #[tokio::test]
4064    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4065        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4066        // Message is dropped on the spot, with one Error outcome surfaced. The
4067        // rest of the substream continues normally - subsequent valid messages
4068        // and parts get written.
4069        let temp = TempDir::new()?;
4070        let store = Store::open_local(temp.path()).await?;
4071        let session = synthetic_session("ordering");
4072        let orphan_part = Part {
4073            session_id: session.id.clone(),
4074            id: "orphan-part".to_owned(),
4075            message_id: "missing-message".to_owned(),
4076            ordinal: 0,
4077            provenance: crate::wire::Provenance::Conversational,
4078            options: ProviderOptions::new(),
4079            kind: PartKind::Text {
4080                text: Some(Extracted::from_test_value("orphan".to_owned())),
4081            },
4082        };
4083        let valid_message = Message::User {
4084            id: "valid-message".to_owned(),
4085            session_id: session.id.clone(),
4086            timestamp: Utc::now(),
4087            options: ProviderOptions::new(),
4088        };
4089        let valid_part = Part {
4090            session_id: session.id.clone(),
4091            id: "valid-part".to_owned(),
4092            message_id: valid_message.id().to_owned(),
4093            ordinal: 0,
4094            provenance: crate::wire::Provenance::Conversational,
4095            options: ProviderOptions::new(),
4096            kind: PartKind::Text {
4097                text: Some(Extracted::from_test_value("kept".to_owned())),
4098            },
4099        };
4100
4101        let mut validator = IngestValidator::default();
4102        validator
4103            .push(&store, 0, IngestEvent::Session(session.clone()))
4104            .await?;
4105        let part_outcomes = validator
4106            .push(&store, 1, IngestEvent::Part(orphan_part))
4107            .await?;
4108        assert_eq!(part_outcomes.len(), 1);
4109        assert_eq!(part_outcomes[0].kind, "part");
4110        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4111        assert!(
4112            part_outcomes[0]
4113                .error
4114                .as_ref()
4115                .map(|e| e.message.contains("part event appeared before a message"))
4116                .unwrap_or(false),
4117            "error message must explain the ordering violation: {part_outcomes:?}"
4118        );
4119        validator
4120            .push(&store, 2, IngestEvent::Message(valid_message))
4121            .await?;
4122        validator
4123            .push(&store, 3, IngestEvent::Part(valid_part))
4124            .await?;
4125        validator.finish(&store).await?;
4126
4127        let (sessions, messages, parts) = store.row_counts().await?;
4128        assert_eq!(sessions, 1, "session committed despite the orphan part");
4129        assert_eq!(messages, 1, "valid message committed");
4130        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4131
4132        Ok(())
4133    }
4134
4135    #[tokio::test]
4136    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4137        // Per-event drop: a duplicate message id within a substream drops the
4138        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4139        // session still commits.
4140        let temp = TempDir::new()?;
4141        let store = Store::open_local(temp.path()).await?;
4142        let session = synthetic_session("duplicate-message");
4143        let first = Message::User {
4144            id: "message-1".to_owned(),
4145            session_id: session.id.clone(),
4146            timestamp: Utc::now(),
4147            options: ProviderOptions::new(),
4148        };
4149        let second = Message::Assistant {
4150            id: "message-1".to_owned(),
4151            session_id: session.id.clone(),
4152            timestamp: Utc::now(),
4153            options: ProviderOptions::new(),
4154        };
4155
4156        let mut validator = IngestValidator::default();
4157        validator
4158            .push(&store, 0, IngestEvent::Session(session.clone()))
4159            .await?;
4160        validator
4161            .push(&store, 1, IngestEvent::Message(first))
4162            .await?;
4163        let dup_outcomes = validator
4164            .push(&store, 2, IngestEvent::Message(second))
4165            .await?;
4166        assert_eq!(dup_outcomes.len(), 1);
4167        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4168        assert!(
4169            dup_outcomes[0]
4170                .error
4171                .as_ref()
4172                .map(|e| e.message.contains("duplicate message id message-1"))
4173                .unwrap_or(false),
4174            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4175        );
4176
4177        validator.finish(&store).await?;
4178        let (sessions, messages, _) = store.row_counts().await?;
4179        assert_eq!(sessions, 1, "session committed");
4180        assert_eq!(messages, 1, "only the first message committed");
4181
4182        Ok(())
4183    }
4184
4185    /// Regression: compact_files on `parts` with the blob column tripped a
4186    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4187    /// batches give compact fragments to merge; every `FileData` variant
4188    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4189    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4190    /// write batch mixes null + valid rows in the blob column - benign in
4191    /// release, irrelevant to this regression's scope.
4192    #[tokio::test(flavor = "multi_thread")]
4193    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4194        use crate::wire::{FileData, PartKind, Provenance};
4195        let temp = TempDir::new()?;
4196        let store = Store::open_local(temp.path()).await?;
4197
4198        let session = synthetic_session("compact-blob");
4199        store
4200            .upsert_sessions(std::slice::from_ref(&session))
4201            .await?;
4202
4203        let make_part = |idx: usize, kind: PartKind| Part {
4204            session_id: session.id.clone(),
4205            message_id: format!("msg-{idx}"),
4206            id: format!("part-{idx}"),
4207            ordinal: 0,
4208            provenance: Provenance::Conversational,
4209            options: ProviderOptions::new(),
4210            kind,
4211        };
4212
4213        let batch_a = vec![
4214            make_part(
4215                0,
4216                PartKind::File {
4217                    media_type: Some("text/plain".to_owned()),
4218                    file_name: Some("a.txt".to_owned()),
4219                    data: FileData::Bytes(b"alpha".to_vec()),
4220                },
4221            ),
4222            make_part(
4223                1,
4224                PartKind::File {
4225                    media_type: Some("text/plain".to_owned()),
4226                    file_name: Some("b.txt".to_owned()),
4227                    data: FileData::String("beta".to_owned()),
4228                },
4229            ),
4230        ];
4231        store.upsert_parts(&batch_a).await?;
4232
4233        let batch_b = vec![
4234            make_part(
4235                2,
4236                PartKind::File {
4237                    media_type: Some("application/octet-stream".to_owned()),
4238                    file_name: None,
4239                    data: FileData::Url("https://example.com/file".to_owned()),
4240                },
4241            ),
4242            make_part(
4243                3,
4244                PartKind::File {
4245                    media_type: Some("image/png".to_owned()),
4246                    file_name: Some("c.png".to_owned()),
4247                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4248                },
4249            ),
4250        ];
4251        store.upsert_parts(&batch_b).await?;
4252
4253        store.optimize_indices(None, None).await?.into_result()?;
4254
4255        Ok(())
4256    }
4257
4258    #[tokio::test]
4259    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4260        let temp = TempDir::new()?;
4261        let store = Store::open_local(temp.path()).await?;
4262        let session = synthetic_session("blob");
4263        let message = Message::User {
4264            id: "message-1".to_owned(),
4265            session_id: session.id.clone(),
4266            timestamp: Utc::now(),
4267            options: ProviderOptions::new(),
4268        };
4269        let part = Part {
4270            session_id: session.id.clone(),
4271            id: "part-1".to_owned(),
4272            message_id: message.id().to_owned(),
4273            ordinal: 0,
4274            provenance: crate::wire::Provenance::Conversational,
4275            options: ProviderOptions::new(),
4276            kind: PartKind::File {
4277                media_type: Some("text/plain".to_owned()),
4278                file_name: Some("payload.txt".to_owned()),
4279                data: FileData::Bytes(b"pond".to_vec()),
4280            },
4281        };
4282
4283        let mut validator = IngestValidator::default();
4284        validator
4285            .push(&store, 0, IngestEvent::Session(session.clone()))
4286            .await?;
4287        validator
4288            .push(&store, 1, IngestEvent::Message(message.clone()))
4289            .await?;
4290        validator
4291            .push(&store, 2, IngestEvent::Part(part.clone()))
4292            .await?;
4293        validator.finish(&store).await?;
4294
4295        let stored = store
4296            .get_session(&session.id)
4297            .await?
4298            .expect("session should exist");
4299        let stored_part = &stored.messages[0].parts[0];
4300        assert_eq!(stored_part, &part);
4301
4302        Ok(())
4303    }
4304
4305    //
4306    // `Session.source_agent` and `Session.project` are immutable
4307    // post-first-write because `messages` denormalizes them at first
4308    // ingest; a silent overwrite would desync the denormalized
4309    // copies. pond core's `IngestValidator` probes the existing session
4310    // before the merge_insert and emits a per-row `validation_failed`
4311    // outcome with the typed field name when either changes. Other Session
4312    // fields (options, parent_session_id, created_at, parent_message_id)
4313    // re-write idempotently via merge_insert.
4314
4315    fn base_session() -> Session {
4316        Session {
4317            id: "01HXY00000000001".to_owned(),
4318            parent_session_id: None,
4319            parent_message_id: None,
4320            source_agent: "claude-code".to_owned(),
4321            created_at: Utc::now(),
4322            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4323            options: ProviderOptions::new(),
4324        }
4325    }
4326
4327    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4328        outcomes
4329            .iter()
4330            .filter(|outcome| outcome.status == target)
4331            .count()
4332    }
4333
4334    #[tokio::test(flavor = "multi_thread")]
4335    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4336    -> anyhow::Result<()> {
4337        let temp = TempDir::new()?;
4338        let store = Store::open_local(temp.path()).await?;
4339
4340        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4341        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4342
4343        let mut again = base_session();
4344        again.options.insert("title".to_owned(), json!("renamed"));
4345        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4346        assert_eq!(
4347            count_status(&second, OutcomeStatus::Error),
4348            0,
4349            "options is mutable; the re-ingest must not surface an error: {second:?}",
4350        );
4351        assert_eq!(
4352            count_status(&second, OutcomeStatus::Matched),
4353            1,
4354            "unchanged immutable fields must match-insert via merge_insert",
4355        );
4356
4357        Ok(())
4358    }
4359
4360    #[tokio::test(flavor = "multi_thread")]
4361    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4362        let temp = TempDir::new()?;
4363        let store = Store::open_local(temp.path()).await?;
4364
4365        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4366        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4367
4368        let mut tampered = base_session();
4369        tampered.source_agent = "codex-cli".to_owned();
4370        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4371        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4372        let err_row = second
4373            .iter()
4374            .find(|outcome| outcome.status == OutcomeStatus::Error)
4375            .expect("error outcome present");
4376        let err = err_row.error.as_ref().expect("error body present");
4377        assert_eq!(err.field, Some("source_agent"));
4378        assert_eq!(err.reason, Some("immutable"));
4379
4380        // The stored row stayed on the original adapter - no silent rewrite.
4381        let stored = store
4382            .get_session(&base_session().id)
4383            .await?
4384            .expect("session row survives the rejected re-ingest");
4385        assert_eq!(stored.session.source_agent, "claude-code");
4386
4387        Ok(())
4388    }
4389
4390    #[tokio::test(flavor = "multi_thread")]
4391    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4392        let temp = TempDir::new()?;
4393        let store = Store::open_local(temp.path()).await?;
4394
4395        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4396        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4397
4398        let mut tampered = base_session();
4399        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4400        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4401        let err_row = second
4402            .iter()
4403            .find(|outcome| outcome.status == OutcomeStatus::Error)
4404            .expect("project change must surface an error outcome");
4405        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4406
4407        let stored = store
4408            .get_session(&base_session().id)
4409            .await?
4410            .expect("session row survives");
4411        assert_eq!(
4412            stored.session.project.as_str(),
4413            "/home/me/proj",
4414            "stored project must remain the original",
4415        );
4416
4417        Ok(())
4418    }
4419
4420    #[tokio::test(flavor = "multi_thread")]
4421    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4422        // Regression guard: re-ingesting an existing session with NEW
4423        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4424        // on `BatchCounts`, and per-row outcomes must mark the new message
4425        // rows `Inserted` while the session row is `Matched`. The prior
4426        // implementation derived all per-row statuses from the batch-level
4427        // session inserted count, which silently flipped the new messages
4428        // into `Matched` (visible as "up to date" in the CLI bar tail).
4429        use crate::wire::Provenance;
4430        let temp = TempDir::new()?;
4431        let store = Store::open_local(temp.path()).await?;
4432        let session = base_session();
4433
4434        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4435            session_id: session.id.clone(),
4436            id: part_id.to_owned(),
4437            message_id: message_id.to_owned(),
4438            ordinal: 0,
4439            provenance: Provenance::Conversational,
4440            options: ProviderOptions::new(),
4441            kind: PartKind::Text {
4442                text: Some(Extracted::from_test_value(body.to_owned())),
4443            },
4444        };
4445        let user_message = |id: &str| Message::User {
4446            id: id.to_owned(),
4447            session_id: session.id.clone(),
4448            timestamp: Utc::now(),
4449            options: ProviderOptions::new(),
4450        };
4451
4452        // First pass: 2 messages land fresh.
4453        let mut validator = IngestValidator::default();
4454        validator
4455            .push(&store, 0, IngestEvent::Session(session.clone()))
4456            .await?;
4457        validator
4458            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4459            .await?;
4460        validator
4461            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4462            .await?;
4463        validator
4464            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4465            .await?;
4466        validator
4467            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4468            .await?;
4469        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4470        assert_eq!(first_counts.sessions_inserted, 1);
4471        assert_eq!(first_counts.messages_inserted_total, 2);
4472        assert_eq!(first_counts.messages_inserted_searchable, 2);
4473
4474        // Second pass: same session id, 3 NEW messages.
4475        let mut validator = IngestValidator::default();
4476        validator
4477            .push(&store, 0, IngestEvent::Session(session.clone()))
4478            .await?;
4479        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4480            let pid = format!("p{}", idx + 3);
4481            validator
4482                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4483                .await?;
4484            validator
4485                .push(
4486                    &store,
4487                    idx * 2 + 2,
4488                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4489                )
4490                .await?;
4491        }
4492        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4493
4494        assert_eq!(
4495            second_counts.sessions_inserted, 0,
4496            "existing session row must report as Matched, not Inserted",
4497        );
4498        assert_eq!(second_counts.sessions_matched, 1);
4499        assert_eq!(
4500            second_counts.messages_inserted_total, 3,
4501            "the three NEW messages must register as Inserted in BatchCounts",
4502        );
4503        assert_eq!(
4504            second_counts.messages_inserted_searchable, 3,
4505            "all three new messages carry conversational text -> searchable",
4506        );
4507        assert_eq!(second_counts.messages_matched_total, 0);
4508        assert_eq!(second_counts.parts_inserted, 3);
4509        assert_eq!(second_counts.parts_matched, 0);
4510
4511        // Per-row outcomes mirror the BatchCounts shape: the session row is
4512        // Matched, every new message + part row is Inserted.
4513        let session_outcome = second_outcomes
4514            .iter()
4515            .find(|outcome| outcome.kind == "session")
4516            .expect("session-row outcome present");
4517        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4518        for outcome in &second_outcomes {
4519            if outcome.kind == "message" || outcome.kind == "part" {
4520                assert_eq!(
4521                    outcome.status,
4522                    OutcomeStatus::Inserted,
4523                    "new row must be Inserted, got: {outcome:?}",
4524                );
4525            }
4526        }
4527        Ok(())
4528    }
4529
4530    /// Ingest `count` synthetic messages spread across a handful of sessions
4531    /// and projects, each with conversational `search_text`. Returns the store
4532    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4533    async fn store_with_messages(
4534        temp: &TempDir,
4535        count: usize,
4536    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4537        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4538    }
4539
4540    /// Same as [`store_with_messages`] but tests optimize with a custom
4541    /// IVF_PQ activation threshold.
4542    async fn store_with_messages_at_threshold(
4543        temp: &TempDir,
4544        count: usize,
4545        _vector_threshold: usize,
4546    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4547        let store = Store::open_local(temp.path()).await?;
4548        let sessions = 8.min(count.max(1));
4549        let mut events = Vec::new();
4550        for s in 0..sessions {
4551            events.push(IngestEvent::Session(Session {
4552                id: format!("session-{s}"),
4553                parent_session_id: None,
4554                parent_message_id: None,
4555                source_agent: "claude-code".to_owned(),
4556                created_at: Utc::now(),
4557                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4558                options: ProviderOptions::new(),
4559            }));
4560            for i in (s..count).step_by(sessions) {
4561                let message_id = format!("msg-{i}");
4562                events.push(IngestEvent::Message(Message::User {
4563                    id: message_id.clone(),
4564                    session_id: format!("session-{s}"),
4565                    timestamp: Utc::now(),
4566                    options: ProviderOptions::new(),
4567                }));
4568                events.push(IngestEvent::Part(Part {
4569                    session_id: format!("session-{s}"),
4570                    id: format!("{message_id}-part"),
4571                    message_id,
4572                    ordinal: 0,
4573                    provenance: crate::wire::Provenance::Conversational,
4574                    options: ProviderOptions::new(),
4575                    kind: PartKind::Text {
4576                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4577                    },
4578                }));
4579            }
4580        }
4581        ingest_events(&store, events).await?;
4582        let keys = (0..count)
4583            .map(|i| MessageKey {
4584                session_id: format!("session-{}", i % sessions),
4585                message_id: format!("msg-{i}"),
4586            })
4587            .collect();
4588        Ok((store, keys))
4589    }
4590
4591    /// A deterministic pseudo-random vector of the production dimension.
4592    fn synthetic_vector(seed: usize) -> Vec<f32> {
4593        let mut state = (seed as u64)
4594            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4595            .wrapping_add(1);
4596        (0..embedding_dim())
4597            .map(|_| {
4598                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4599                #[allow(clippy::cast_precision_loss)]
4600                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4601                unit - 1.0
4602            })
4603            .collect()
4604    }
4605
4606    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4607    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4608        keys.iter()
4609            .enumerate()
4610            .map(|(seed, key)| EmbeddedMessage {
4611                session_id: key.session_id.clone(),
4612                id: key.message_id.clone(),
4613                vector: synthetic_vector(seed),
4614            })
4615            .collect()
4616    }
4617
4618    fn embedding_update_batch_with_model(
4619        rows: &[EmbeddedMessage],
4620        model: &str,
4621    ) -> Result<RecordBatch> {
4622        let mut batch = embedding_update_batch(rows)?;
4623        let columns = batch
4624            .columns()
4625            .iter()
4626            .take(3)
4627            .cloned()
4628            .chain(std::iter::once(
4629                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4630            ))
4631            .collect::<Vec<_>>();
4632        batch = RecordBatch::try_new(batch.schema(), columns)?;
4633        Ok(batch)
4634    }
4635
4636    #[tokio::test]
4637    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4638        let temp = TempDir::new()?;
4639        // 4 messages cycle session-0..session-3, so `session-3` is a real
4640        // partition. Scalar-index pushdown is volume-independent: the planner
4641        // emits `ScalarIndexQuery` whenever the index exists.
4642        let (store, keys) = store_with_messages(&temp, 4).await?;
4643        store.write_embeddings(&embedded(&keys)).await?;
4644        store.optimize_indices(None, None).await?.into_result()?;
4645
4646        let query = vec![0.01_f32; embedding_dim()];
4647        let plan = store
4648            .explain_vector_plan(
4649                &query,
4650                10,
4651                &Predicate::Eq("session_id", "session-3".into()),
4652                None,
4653            )
4654            .await?;
4655
4656        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4657        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4658        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4659        // and unrelated.)
4660        assert!(
4661            plan.contains("ScalarIndexQuery"),
4662            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4663        );
4664        let predicate_postfiltered = plan
4665            .lines()
4666            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4667        assert!(
4668            !predicate_postfiltered,
4669            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4670        );
4671        Ok(())
4672    }
4673
4674    #[tokio::test]
4675    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4676        let temp = TempDir::new()?;
4677        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4678
4679        // First batch: 255 vectors, one below threshold. Optimize does not
4680        // create the IVF_PQ because the trigger is not met.
4681        store.write_embeddings(&embedded(&keys[..255])).await?;
4682        store
4683            .optimize_indices_with_vector_threshold(256)
4684            .await?
4685            .into_result()?;
4686        assert!(
4687            !store
4688                .handle
4689                .messages_index_names()
4690                .await?
4691                .iter()
4692                .any(|name| name == MESSAGES_VECTOR_INDEX),
4693            "IVF_PQ must not exist below the activation threshold",
4694        );
4695
4696        // Next batch: one more vector. Total reaches 256; optimize creates
4697        // the IVF_PQ.
4698        store.write_embeddings(&embedded(&keys[255..256])).await?;
4699        store
4700            .optimize_indices_with_vector_threshold(256)
4701            .await?
4702            .into_result()?;
4703        assert!(
4704            store
4705                .handle
4706                .messages_index_names()
4707                .await?
4708                .iter()
4709                .any(|name| name == MESSAGES_VECTOR_INDEX),
4710            "optimize must create the IVF_PQ once the threshold is crossed",
4711        );
4712
4713        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4714        // non-null subset and a planted vector is retrievable.
4715        let hits = store
4716            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4717            .await?;
4718        assert!(
4719            hits.iter().any(|(key, _)| key == &keys[0]),
4720            "an embedded row is retrievable via the index",
4721        );
4722        Ok(())
4723    }
4724
4725    #[tokio::test]
4726    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4727    {
4728        let temp = TempDir::new()?;
4729        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4730        let old_rows = embedded(&keys);
4731        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4732        store
4733            .handle
4734            .merge_update(Table::Messages, old_batch, old_rows.len())
4735            .await?;
4736        store
4737            .optimize_indices_with_vector_threshold(256)
4738            .await?
4739            .into_result()?;
4740        assert!(
4741            store
4742                .handle
4743                .messages_index_names()
4744                .await?
4745                .iter()
4746                .any(|name| name == MESSAGES_VECTOR_INDEX),
4747            "IVF_PQ must exist before a model swap",
4748        );
4749        assert_eq!(store.stale_embedding_count().await?, keys.len());
4750
4751        store.drop_vector_index().await?;
4752        let mut pending = Vec::new();
4753        let stream = store.pending_or_stale_messages();
4754        tokio::pin!(stream);
4755        while let Some(row) = stream.next().await {
4756            pending.push(row?);
4757        }
4758        assert_eq!(
4759            pending.len(),
4760            keys.len(),
4761            "force stream should see stale rows"
4762        );
4763        store.write_embeddings(&embedded(&keys)).await?;
4764        assert_eq!(store.stale_embedding_count().await?, 0);
4765        store
4766            .optimize_indices_with_vector_threshold(256)
4767            .await?
4768            .into_result()?;
4769        assert!(
4770            store
4771                .handle
4772                .messages_index_names()
4773                .await?
4774                .iter()
4775                .any(|name| name == MESSAGES_VECTOR_INDEX),
4776            "optimize must rebuild IVF_PQ after force re-embed",
4777        );
4778
4779        let stream = store.pending_or_stale_messages();
4780        tokio::pin!(stream);
4781        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4782        Ok(())
4783    }
4784
4785    #[tokio::test]
4786    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4787        // Regression: `_row_last_updated_at_version` can point at a Lance
4788        // manifest version that `cleanup_old_versions` or the auto_cleanup
4789        // hook has since dropped from `Dataset::versions()`. The old code
4790        // silently dropped any session whose row-version was not in the
4791        // visible list, collapsing the staleness-skip map down to recent
4792        // commits and forcing `pond sync` to re-touch every file. The fix
4793        // falls back to the oldest still-visible commit timestamp - a
4794        // sound upper bound on the row's true ingest time.
4795        let temp = TempDir::new()?;
4796        let (store, _keys) = store_with_messages(&temp, 4).await?;
4797
4798        // Produce several distinct manifest versions on `sessions` so the
4799        // older ones become eligible for cleanup.
4800        for tag in 0..3 {
4801            let extra = synthetic_session(&format!("extra-{tag}"));
4802            store.upsert_sessions(&[extra]).await?;
4803        }
4804
4805        // Prune everything older than ~now, leaving only the latest manifest.
4806        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4807        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4808        // Lance's `delete_unverified` floor still protects in-flight files.
4809        let dataset = store.handle.dataset(Table::Sessions).await?;
4810        dataset
4811            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4812            .await
4813            .context("cleanup_old_versions failed")?;
4814
4815        let map = store.session_last_ingested_at().await?;
4816        let session_count = store.row_counts().await?.0;
4817        assert!(
4818            map.len() >= session_count,
4819            "watermark map ({}) must still cover every session ({}) after \
4820             version cleanup; an empty fallback regresses pond sync to a \
4821             full re-scan",
4822            map.len(),
4823            session_count,
4824        );
4825        Ok(())
4826    }
4827
4828    #[tokio::test]
4829    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4830        let temp = TempDir::new()?;
4831        let (store, keys) = store_with_messages(&temp, 10).await?;
4832
4833        let before = store.embedding_progress().await?;
4834        assert_eq!(before.embedded, 0);
4835        assert_eq!(before.total, 10);
4836        assert_eq!(before.model, crate::embed::model_id());
4837
4838        store.write_embeddings(&embedded(&keys[..4])).await?;
4839        let partial = store.embedding_progress().await?;
4840        assert_eq!(partial.embedded, 4);
4841        assert_eq!(partial.total, 10);
4842
4843        store.write_embeddings(&embedded(&keys[4..])).await?;
4844        let full = store.embedding_progress().await?;
4845        assert_eq!(full.embedded, 10);
4846        assert_eq!(full.total, 10);
4847        Ok(())
4848    }
4849}