Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{
32        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33        SessionFrom,
34    },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40    handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45    pub sessions: usize,
46    pub messages: usize,
47    pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52    pub sessions: u64,
53    pub messages: u64,
54    pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59    pub rows: LanceArchiveCounts,
60    pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65    pub rows: LanceArchiveCounts,
66    pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71    pub sessions: Vec<IndexIntent>,
72    pub messages: Vec<IndexIntent>,
73    pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78        [
79            (Table::Sessions, &self.sessions),
80            (Table::Messages, &self.messages),
81            (Table::Parts, &self.parts),
82        ]
83    }
84}
85
86/// A message awaiting embedding: its primary key plus the `search_text` to
87/// embed. The vector lives on the same `messages` row, so no denormalized
88/// filter columns are needed (spec.md#session-embed-from-canonical).
89#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91    pub session_id: String,
92    pub id: String,
93    pub search_text: String,
94}
95
96/// One embedded message: a primary key and the vector to store. `pond embed`
97/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
98#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100    pub session_id: String,
101    pub id: String,
102    pub vector: Vec<f32>,
103}
104
105/// Message metadata used to hydrate search hits after retriever ranking.
106#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108    pub message_id: String,
109    pub session_id: String,
110    pub role: String,
111    pub project: String,
112    pub source_agent: String,
113    pub timestamp: DateTime<Utc>,
114    pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119    pub session_id: String,
120    pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125    Inserted,
126    Matched,
127}
128
129/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
130/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
131/// results so the CLI can render compaction-skipped (under writer contention)
132/// distinctly from index-build failure (real problem).
133#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135    pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139    /// True if any table's indices phase reported a non-conflict failure.
140    /// `SkippedConflict` is expected under contention and does not count.
141    pub fn any_indices_failed(&self) -> bool {
142        self.tables.iter().any(|t| t.indices.is_failed())
143    }
144
145    /// Treat any `Failed` phase as an error. Tests that don't run under
146    /// contention use this to keep their existing `.await?` style: a real
147    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
148    pub fn into_result(self) -> Result<Self> {
149        for table in &self.tables {
150            if let PhaseOutcome::Failed(error) = &table.indices {
151                anyhow::bail!(
152                    "indices phase failed on {}: {error:#}",
153                    table.table.as_str()
154                );
155            }
156            if let PhaseOutcome::Failed(error) = &table.compaction {
157                anyhow::bail!(
158                    "compaction phase failed on {}: {error:#}",
159                    table.table.as_str()
160                );
161            }
162        }
163        Ok(self)
164    }
165}
166
167/// What `pond status` reports: where the data lives, total rows per table,
168/// and a per-(adapter, project) breakdown built from one `messages` scan.
169#[derive(Debug, Clone)]
170pub struct CorpusStats {
171    pub data_url: Url,
172    pub totals: RowTotals,
173    /// One entry per adapter present in the corpus. When `include_subagents`
174    /// is false (the CLI default), sub-agent rows (`source_agent` containing
175    /// `/`) are filtered out so only the main-agent sessions appear. When
176    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
177    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
178    /// this into registry order at render time so the tree matches the
179    /// discovery picker.
180    pub adapters: Vec<AdapterStats>,
181    /// Whether the rollup includes sub-agent sessions. The renderer prints a
182    /// hint about `--include-subagents` when this is false so users know the
183    /// `totals` row above counts sessions that aren't broken down below.
184    pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189    pub sessions: u64,
190    pub messages: u64,
191    pub parts: u64,
192}
193
194/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
195/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
196/// without `search_text` produce no vector. `embedded` is the subset of those
197/// already carrying a vector under the current [`embed::model_id()`]. The pending
198/// backlog is `total - embedded`.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201    pub embedded: usize,
202    pub total: usize,
203    pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
209    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
210    /// `include_subagents` is on.
211    pub adapter: String,
212    pub sessions: u64,
213    pub messages: u64,
214    /// Projects under this adapter, sorted by message count desc, then by
215    /// project name asc.
216    pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221    pub project: String,
222    pub sessions: u64,
223    pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228    messages: u64,
229    session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234    pub message: &'a Message,
235    pub parts: &'a [Part],
236    pub search_text: Option<&'a str>,
237}
238
239impl Store {
240    /// Open against a local filesystem URL or a remote one for which the
241    /// caller has no extra options to pass (env vars suffice). CLI verbs
242    /// that load `[storage]` from config should call
243    /// [`Store::open_with_options`] instead so the same options flow into
244    /// every dataset open and write.
245    pub async fn open(location: &Url) -> Result<Self> {
246        Ok(Self {
247            handle: Handle::open(location).await?,
248        })
249    }
250
251    /// Open with object-store options (S3 creds, region, endpoint, ...)
252    /// threaded through Lance verbatim. Keys are the standard `object_store`
253    /// config names; pond does not parse them. Empty options + default caps
254    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
255    /// config block via [`crate::substrate::RuntimeCaps`].
256    pub async fn open_with_options(
257        location: &Url,
258        storage_options: std::collections::HashMap<String, String>,
259        caps: crate::substrate::RuntimeCaps,
260    ) -> Result<Self> {
261        Ok(Self {
262            handle: Handle::open_with_options(location, storage_options, caps).await?,
263        })
264    }
265
266    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
267    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
268    /// through [`Store::open_with_options`] so the production policy is
269    /// applied; tests get the backend-aware local-FS defaults.
270    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271        let url = config::url_for_path(path)?;
272        Self::open_with_options(
273            &url,
274            std::collections::HashMap::new(),
275            crate::substrate::RuntimeCaps::default(),
276        )
277        .await
278    }
279
280    /// Export clean, index-free Lance datasets into `dest`.
281    ///
282    /// This rewrites the visible rows of each table instead of copying the
283    /// dataset roots. The resulting manifests therefore contain no references
284    /// to the source store's `_indices`, while `messages.vector` and
285    /// `messages.embedding_model` remain ordinary data columns and are
286    /// preserved.
287    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288        std::fs::create_dir_all(dest)
289            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290        let (sessions, sessions_version) = self
291            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292            .await?;
293        let (messages, messages_version) = self
294            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295            .await?;
296        let (parts, parts_version) = self
297            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298            .await?;
299        Ok(LanceArchiveExport {
300            rows: LanceArchiveCounts {
301                sessions,
302                messages,
303                parts,
304            },
305            source_versions: LanceArchiveVersions {
306                sessions: sessions_version,
307                messages: messages_version,
308                parts: parts_version,
309            },
310        })
311    }
312
313    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314        let sessions_dataset =
315            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316        let messages_dataset =
317            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319        let (sessions, sessions_inserted) = self
320            .import_clean_table(Table::Sessions, sessions_dataset)
321            .await?;
322        let (messages, messages_inserted) = self
323            .import_clean_table(Table::Messages, messages_dataset)
324            .await?;
325        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326        Ok(LanceArchiveImport {
327            rows: LanceArchiveCounts {
328                sessions,
329                messages,
330                parts,
331            },
332            inserted: LanceArchiveCounts {
333                sessions: sessions_inserted,
334                messages: messages_inserted,
335                parts: parts_inserted,
336            },
337        })
338    }
339
340    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341        let dataset = self.handle.dataset(table).await?;
342        let source_version = dataset.version_id();
343        let schema = export_schema(table);
344        let mut stream = dataset
345            .scan()
346            .try_into_stream()
347            .await
348            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349        let dest_uri = dest
350            .to_str()
351            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353        let mut rows = 0usize;
354        let mut wrote = false;
355        while let Some(batch) = stream.next().await {
356            let batch = batch
357                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358            rows += batch.num_rows();
359            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360            let mut params = write_params_for_create();
361            if wrote {
362                params.mode = WriteMode::Append;
363            }
364            Dataset::write(reader, dest_uri, Some(params))
365                .await
366                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367            wrote = true;
368        }
369
370        if !wrote {
371            let batch = RecordBatch::new_empty(schema.clone());
372            let reader = RecordBatchIterator::new([Ok(batch)], schema);
373            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374                .await
375                .with_context(|| {
376                    format!("failed to write empty {} archive table", table.as_str())
377                })?;
378        }
379        Ok((rows, source_version))
380    }
381
382    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383        let mut stream = dataset
384            .scan()
385            .try_into_stream()
386            .await
387            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388        let mut rows = 0usize;
389        let mut inserted = 0usize;
390        while let Some(batch) = stream.next().await {
391            let batch = batch
392                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393            let row_count = batch.num_rows();
394            rows += row_count;
395            inserted += self
396                .handle
397                .merge_insert(table, batch, row_count)
398                .await
399                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400                as usize;
401        }
402        Ok((rows, inserted))
403    }
404
405    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406        if sessions.is_empty() {
407            return Ok(Vec::new());
408        }
409        let batches = sessions_batches(sessions)?;
410        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411        // Per-row outcomes; no fold here (see `pond index optimize`).
412        Ok(statuses_from_inserted(sessions.len(), inserted))
413    }
414
415    /// Batched write path used by the adapter ingest loop and by the wire
416    /// handler's final flush. Receives N completed substreams from the
417    /// validator and:
418    ///
419    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
420    ///      per session, sequentially. Sessions that fail produce one Error
421    ///      outcome and are excluded from the write batch.
422    ///   2. Deduplicates in-batch at the substream level: when two substreams
423    ///      in the same batch share a `session_id` (Claude Code's subagent
424    ///      files reuse their parent's id), the first occurrence wins. The
425    ///      second is either *merged* (same `source_agent` + `project`:
426    ///      messages/parts append, no duplicate rows) or *rejected*
427    ///      (different `project` - the subagent-vs-parent case). Row-level
428    ///      duplicates that slip past here are caught downstream by Lance's
429    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
430    ///      (invariant 17): this layer's job is preserving substream merge
431    ///      semantics, not policing the PK uniqueness Lance handles itself.
432    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
433    ///      parts) across every valid substream.
434    ///   4. Fires the three `merge_insert` calls in parallel via
435    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
436    ///      independent, so these proceed concurrently.
437    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
438    async fn upsert_session_batch(
439        &self,
440        substreams: Vec<CompletedSubstream>,
441    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442        if substreams.is_empty() {
443            return Ok((Vec::new(), BatchCounts::default()));
444        }
445
446        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447        let mut counts = BatchCounts::default();
448
449        // In-batch dedup. First occurrence of each session_id wins; later
450        // occurrences either merge or get rejected. Iteration order preserves
451        // original substream order so outcomes index correctly.
452        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453        let mut by_session_id: std::collections::HashMap<String, usize> =
454            std::collections::HashMap::with_capacity(substreams.len());
455        for substream in substreams {
456            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457                let existing = &merged[existing_idx];
458                if existing.session.source_agent != substream.session.source_agent
459                    || existing.session.project != substream.session.project
460                {
461                    // Subagent-vs-parent class. The first occurrence's
462                    // metadata stays authoritative; this substream is
463                    // rejected on the same immutable-field axis as the
464                    // storage-side check.
465                    let reason = if existing.session.source_agent != substream.session.source_agent
466                    {
467                        IngestError::ImmutableField {
468                            field: "source_agent",
469                            session_id: substream.session.id.clone(),
470                            stored: existing.session.source_agent.clone(),
471                            attempted: substream.session.source_agent.clone(),
472                        }
473                    } else {
474                        IngestError::ImmutableField {
475                            field: "project",
476                            session_id: substream.session.id.clone(),
477                            stored: (*existing.session.project).clone(),
478                            attempted: (*substream.session.project).clone(),
479                        }
480                    };
481                    let field = match &reason {
482                        IngestError::ImmutableField { field, .. } => Some(*field),
483                    };
484                    let reason_key = match field {
485                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487                        _ => DROP_REASON_UNCATEGORIZED,
488                    };
489                    outcomes.extend(error_outcomes_for_substream(
490                        substream.session_index,
491                        &substream.session,
492                        &substream.messages,
493                        reason.to_string(),
494                        field,
495                        reason_key,
496                    ));
497                    continue;
498                }
499                // Same session, same metadata: merge messages. Dedup message
500                // ids defensively (within one batch, the validator's seen
501                // sets are per-substream so cross-substream dups can happen
502                // legally if both files re-emit the same row).
503                let existing = &mut merged[existing_idx];
504                let mut seen: std::collections::HashSet<String> = existing
505                    .messages
506                    .iter()
507                    .map(|m| m.message.id().to_owned())
508                    .collect();
509                for msg in substream.messages {
510                    if seen.insert(msg.message.id().to_owned()) {
511                        existing.messages.push(msg);
512                    }
513                }
514                continue;
515            }
516            by_session_id.insert(substream.session.id.clone(), merged.len());
517            merged.push(substream);
518        }
519
520        // Pre-existence sweep: one scan per table keyed on the batch's
521        // session_ids, capped at the substream count. Replaces the prior
522        // N-sequential `find_session` calls and gives us honest per-row
523        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
524        let session_id_values: Vec<ScalarValue> = merged
525            .iter()
526            .map(|substream| ScalarValue::String(substream.session.id.clone()))
527            .collect();
528        let existing_sessions: std::collections::HashMap<String, Session> =
529            if session_id_values.is_empty() {
530                std::collections::HashMap::new()
531            } else {
532                let batch = self
533                    .handle
534                    .scan_batch(
535                        Table::Sessions,
536                        Some(&Predicate::In("id", session_id_values.clone())),
537                        &[],
538                    )
539                    .await?;
540                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541                for row in 0..batch.num_rows() {
542                    let session = session_from_batch(&batch, row)?;
543                    map.insert(session.id.clone(), session);
544                }
545                map
546            };
547        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548            HashSet::new()
549        } else {
550            let batch = self
551                .handle
552                .scan_batch(
553                    Table::Messages,
554                    Some(&Predicate::In("session_id", session_id_values.clone())),
555                    &["session_id", "id"],
556                )
557                .await?;
558            let mut set = HashSet::with_capacity(batch.num_rows());
559            for row in 0..batch.num_rows() {
560                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561                let mid = string(&batch, "id", row)?.context("message id is null")?;
562                set.insert((sid, mid));
563            }
564            set
565        };
566        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567            HashSet::new()
568        } else {
569            let batch = self
570                .handle
571                .scan_batch(
572                    Table::Parts,
573                    Some(&Predicate::In("session_id", session_id_values)),
574                    &["session_id", "message_id", "id"],
575                )
576                .await?;
577            let mut set = HashSet::with_capacity(batch.num_rows());
578            for row in 0..batch.num_rows() {
579                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581                let pid = string(&batch, "id", row)?.context("part id is null")?;
582                set.insert((sid, mid, pid));
583            }
584            set
585        };
586
587        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588        for substream in merged {
589            if let Some(existing) = existing_sessions.get(&substream.session.id)
590                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591            {
592                let field = match &failure {
593                    IngestError::ImmutableField { field, .. } => Some(*field),
594                };
595                let reason_key = match field {
596                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598                    _ => DROP_REASON_UNCATEGORIZED,
599                };
600                outcomes.extend(error_outcomes_for_substream(
601                    substream.session_index,
602                    &substream.session,
603                    &substream.messages,
604                    failure.to_string(),
605                    field,
606                    reason_key,
607                ));
608                continue;
609            }
610            writeable.push(substream);
611        }
612
613        if writeable.is_empty() {
614            outcomes.sort_by_key(|outcome| outcome.index);
615            return Ok((outcomes, counts));
616        }
617
618        let sessions_owned: Vec<Session> = writeable
619            .iter()
620            .map(|substream| substream.session.clone())
621            .collect();
622        let message_rows: Vec<MessageBatchRow<'_>> = writeable
623            .iter()
624            .flat_map(|substream| {
625                substream.messages.iter().map(|buffered| MessageBatchRow {
626                    message: &buffered.message,
627                    source_agent: &substream.session.source_agent,
628                    project: &substream.session.project,
629                    search_text: buffered.search_text.as_deref(),
630                })
631            })
632            .collect();
633        let part_rows: Vec<Part> = writeable
634            .iter()
635            .flat_map(|substream| {
636                substream.messages.iter().flat_map(|buffered| {
637                    buffered
638                        .parts
639                        .iter()
640                        .map(|buffered_part| buffered_part.part.clone())
641                })
642            })
643            .collect();
644
645        let session_batches = sessions_batches(&sessions_owned)?;
646        let message_batches = messages_batches(&message_rows)?;
647        let part_batches = parts_batches(&part_rows)?;
648
649        // Merge_insert returns a batch-level inserted count which we cross-
650        // check against our pre-existence sets, but for per-row truth we
651        // attribute through the sets themselves (next loop). Under
652        // single-writer the two agree exactly; under a hostile concurrent
653        // writer the sets are authoritative for THIS request's wire shape -
654        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
655        // distinction informational, not behavioral.
656        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660        )?;
661
662        for substream in &writeable {
663            outcomes.extend(success_outcomes_for_substream(
664                substream.session_index,
665                &substream.session,
666                &substream.messages,
667                &existing_sessions,
668                &existing_message_pks,
669                &existing_part_pks,
670                &mut counts,
671            ));
672        }
673
674        outcomes.sort_by_key(|outcome| outcome.index);
675        Ok((outcomes, counts))
676    }
677
678    pub async fn upsert_messages(
679        &self,
680        session: &Session,
681        messages: &[MessageWrite<'_>],
682    ) -> Result<Vec<UpsertStatus>> {
683        if messages.is_empty() {
684            return Ok(Vec::new());
685        }
686
687        let rows = messages
688            .iter()
689            .map(|write| MessageBatchRow {
690                message: write.message,
691                source_agent: &session.source_agent,
692                project: &session.project,
693                search_text: write.search_text,
694            })
695            .collect::<Vec<_>>();
696        let batches = messages_batches(&rows)?;
697        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698        Ok(statuses_from_inserted(messages.len(), inserted))
699    }
700
701    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702        if parts.is_empty() {
703            return Ok(Vec::new());
704        }
705        let batches = parts_batches(parts)?;
706        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707        Ok(statuses_from_inserted(parts.len(), inserted))
708    }
709
710    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711        let Some(session) = self.find_session(session_id).await? else {
712            return Ok(None);
713        };
714        let messages = self.messages_for_session(session_id).await?;
715        Ok(Some(SessionWithMessages { session, messages }))
716    }
717
718    /// Every session id currently in the store, unsorted.
719    pub async fn session_ids(&self) -> Result<Vec<String>> {
720        let batch = self
721            .handle
722            .scan_batch(Table::Sessions, None, &["id"])
723            .await?;
724        let mut ids = Vec::with_capacity(batch.num_rows());
725        for row in 0..batch.num_rows() {
726            if let Some(id) = string(&batch, "id", row)? {
727                ids.push(id);
728            }
729        }
730        Ok(ids)
731    }
732
733    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734        let batch = self
735            .handle
736            .scan_batch(
737                Table::Sessions,
738                Some(&Predicate::Eq(
739                    "parent_session_id",
740                    parent_session_id.into(),
741                )),
742                &[
743                    "id",
744                    "parent_session_id",
745                    "parent_message_id",
746                    "source_agent",
747                    "created_at",
748                    "project",
749                    "options",
750                ],
751            )
752            .await?;
753        let mut sessions = Vec::with_capacity(batch.num_rows());
754        for row in 0..batch.num_rows() {
755            sessions.push(session_from_batch(&batch, row)?);
756        }
757        sessions.sort_by(|left, right| left.id.cmp(&right.id));
758        Ok(sessions)
759    }
760
761    /// `session_id -> wall-clock time of the Lance manifest version that
762    /// last wrote the row` for the per-session staleness skip
763    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
764    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
765    /// and joins it against `Dataset::versions()` for commit timestamps.
766    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767        use lance::deps::arrow_array::UInt64Array;
768
769        let dataset = self.handle.dataset(Table::Sessions).await?;
770        let version_list = dataset.versions().await?;
771        let versions: HashMap<u64, DateTime<Utc>> = version_list
772            .iter()
773            .map(|v| (v.version, v.timestamp))
774            .collect();
775        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
776        // pruned versions from the manifest list, leaving rows whose
777        // `_row_last_updated_at_version` points at a version that no longer
778        // resolves. Those rows are still real and were ingested at some time
779        // <= the oldest still-visible version's commit timestamp - so falling
780        // back to that bound preserves a sound `mtime <= ingested` upper edge
781        // and keeps the staleness skip working after cleanup.
782        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784        let scanner = self
785            .handle
786            .scan(
787                Table::Sessions,
788                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789            )
790            .await?;
791        let mut stream = scanner.try_into_stream().await?;
792        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793        while let Some(batch) = stream.next().await {
794            let batch = batch?;
795            let version_array = batch
796                .column_by_name("_row_last_updated_at_version")
797                .context("missing _row_last_updated_at_version column")?
798                .as_any()
799                .downcast_ref::<UInt64Array>()
800                .context("_row_last_updated_at_version is not UInt64")?;
801            for row in 0..batch.num_rows() {
802                let Some(id) = string(&batch, "id", row)? else {
803                    continue;
804                };
805                if version_array.is_null(row) {
806                    continue;
807                }
808                let version = version_array.value(row);
809                let ts = versions.get(&version).copied().or(oldest_visible_ts);
810                if let Some(ts) = ts {
811                    out.insert(id, ts);
812                }
813            }
814        }
815        Ok(out)
816    }
817
818    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
819    /// Conversational filters to `search_text IS NOT NULL`; Complete and
820    /// Verbatim scan every message. Every mode attaches compact part summaries;
821    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
822    /// lower bound (a message id); the page is bounded by `limit` and a byte
823    /// budget and never cuts mid-message.
824    pub async fn session_view(
825        &self,
826        session_id: &str,
827        params: SessionViewParams<'_>,
828    ) -> Result<GetLookup<SessionPage>> {
829        let Some(session) = self.find_session(session_id).await? else {
830            return Ok(GetLookup::NotFound);
831        };
832
833        let mut rows = match params.mode {
834            ResponseMode::Conversational => self
835                .scan_conversational_messages(session_id)
836                .await?
837                .into_iter()
838                .map(|row| ScanRow {
839                    id: row.message_id,
840                    role: row.role,
841                    timestamp: row.timestamp,
842                    text: Some(row.text.into_inner()),
843                    content: None,
844                })
845                .collect(),
846            ResponseMode::Complete | ResponseMode::Verbatim => {
847                self.scan_all_messages(session_id).await?
848            }
849        };
850        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852        let start_at = match params.after_id {
853            // Append-only stream: a real anchor never vanishes, so an unknown
854            // `after_id` is a stale/mistyped client cursor, not "start over".
855            Some(after) => match rows.iter().position(|row| row.id == after) {
856                Some(idx) => idx + 1,
857                None => return Ok(GetLookup::UnknownAfterId),
858            },
859            None => 0,
860        };
861        let remaining = rows.get(start_at..).unwrap_or(&[]);
862        let (emitted, messages_remaining) = match params.session_from {
863            SessionFrom::Start => {
864                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865                    row.text.as_deref().map_or(0, str::len)
866                });
867                (&remaining[..n], remaining.len() - n)
868            }
869            // Tail: the newest messages that fit `limit` and the byte budget,
870            // dropping oldest first; the newest is always kept and the page
871            // stays chronological so the agent reads the flow forward.
872            SessionFrom::End => {
873                let mut bytes = 0usize;
874                let mut start = remaining.len();
875                for row in remaining.iter().rev() {
876                    if remaining.len() - start >= params.limit {
877                        break;
878                    }
879                    let size = row.text.as_deref().map_or(0, str::len);
880                    if start < remaining.len() && bytes + size > params.budget_bytes {
881                        break;
882                    }
883                    bytes += size;
884                    start -= 1;
885                }
886                (&remaining[start..], start)
887            }
888        };
889        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891        // Conversational/Complete only summarize parts; Verbatim inlines every
892        // part (blobs included).
893        let mut parts_by_message = match params.mode {
894            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895            ResponseMode::Conversational | ResponseMode::Complete => {
896                self.summary_parts_for_messages(session_id, &ids).await?
897            }
898        };
899        let messages = emitted
900            .iter()
901            .map(|row| RetrievedMessage {
902                id: row.id.clone(),
903                role: row.role,
904                timestamp: row.timestamp,
905                text: row.text.clone(),
906                content: row.content.clone(),
907                parts: parts_by_message
908                    .remove(&(session_id.to_owned(), row.id.clone()))
909                    .unwrap_or_default(),
910            })
911            .collect();
912
913        Ok(GetLookup::Found(SessionPage {
914            session,
915            messages,
916            messages_remaining,
917        }))
918    }
919
920    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
921    /// the target with its full parts (paginated by `after_id` over part
922    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
923    /// `None` when no stored message carries `message_id`. Sibling parts are
924    /// carried for summarizing; the target's parts ride `target_parts`.
925    pub async fn message_view(
926        &self,
927        message_id: &str,
928        params: MessageViewParams<'_>,
929    ) -> Result<GetLookup<MessagePage>> {
930        let Some(session_id) = self.session_id_for_message(message_id).await? else {
931            return Ok(GetLookup::NotFound);
932        };
933        let Some(session) = self.find_session(&session_id).await? else {
934            return Ok(GetLookup::NotFound);
935        };
936        let mut rows = self.scan_all_messages(&session_id).await?;
937        // spec.md#protocol: context siblings follow the response mode, and the
938        // default is the conversational view - in carrier-heavy sessions the
939        // system/tool rows would otherwise fill the whole +-depth window and
940        // push the actual conversation out of it. The target stays regardless
941        // of its own role: the caller asked for that message.
942        if matches!(params.mode, ResponseMode::Conversational) {
943            rows.retain(|row| row.text.is_some() || row.id == message_id);
944        }
945        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
946        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
947            return Ok(GetLookup::NotFound);
948        };
949
950        let start = target_pos.saturating_sub(params.context_depth);
951        let end = (target_pos + params.context_depth + 1).min(rows.len());
952        let window = &rows[start..end];
953        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
954        // The target's full parts (blobs included) ride the response; siblings
955        // are only summarized, but they share this one window scan.
956        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
957
958        let all_parts = parts_by_message
959            .remove(&(session_id.clone(), message_id.to_owned()))
960            .unwrap_or_default();
961        let start_part = match params.after_id {
962            // Exclusive over ordinals: parts are ordinal-sorted, so the first
963            // part past the anchor's ordinal is the page start. An anchor absent
964            // from the target's parts is a stale/mistyped client cursor.
965            Some(after) => match all_parts.iter().find(|part| part.id == after) {
966                Some(anchor) => all_parts
967                    .iter()
968                    .position(|part| part.ordinal > anchor.ordinal)
969                    .unwrap_or(all_parts.len()),
970                None => return Ok(GetLookup::UnknownAfterId),
971            },
972            None => 0,
973        };
974        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
975        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
976            serde_json::to_string(part).map_or(0, |json| json.len())
977        });
978        let target_parts = remaining_parts[..part_count].to_vec();
979        let target_parts_remaining = remaining_parts.len() - part_count;
980
981        let target_row = &rows[target_pos];
982        let target = RetrievedMessage {
983            id: target_row.id.clone(),
984            role: target_row.role,
985            timestamp: target_row.timestamp,
986            text: target_row.text.clone(),
987            content: target_row.content.clone(),
988            // Target structure is carried in full by `target_parts`.
989            parts: Vec::new(),
990        };
991        let siblings = window
992            .iter()
993            .enumerate()
994            .filter(|(idx, _)| start + idx != target_pos)
995            .map(|(_, row)| RetrievedMessage {
996                id: row.id.clone(),
997                role: row.role,
998                timestamp: row.timestamp,
999                text: row.text.clone(),
1000                content: row.content.clone(),
1001                parts: parts_by_message
1002                    .get(&(session_id.clone(), row.id.clone()))
1003                    .cloned()
1004                    .unwrap_or_default(),
1005            })
1006            .collect();
1007
1008        Ok(GetLookup::Found(MessagePage {
1009            session,
1010            target,
1011            target_parts,
1012            target_parts_remaining,
1013            siblings,
1014        }))
1015    }
1016
1017    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1018        let batch = self
1019            .handle
1020            .scan_batch(
1021                Table::Messages,
1022                Some(&Predicate::Eq("session_id", session_id.into())),
1023                &["id", "timestamp", "role", "search_text", "content"],
1024            )
1025            .await?;
1026        let mut rows = Vec::with_capacity(batch.num_rows());
1027        for row in 0..batch.num_rows() {
1028            let id = string(&batch, "id", row)?.context("message id is null")?;
1029            let role =
1030                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1031            let timestamp = datetime(&batch, "timestamp", row)?;
1032            rows.push(ScanRow {
1033                id,
1034                role,
1035                timestamp,
1036                text: string(&batch, "search_text", row)?,
1037                content: string(&batch, "content", row)?,
1038            });
1039        }
1040        Ok(rows)
1041    }
1042
1043    /// Conversational scan over one session: rows ordered by
1044    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1045    /// read seam (spec.md#search-prefilter-pushdown).
1046    pub async fn scan_conversational_messages(
1047        &self,
1048        session_id: &str,
1049    ) -> Result<Vec<ConversationalRow>> {
1050        let filter = Predicate::And(vec![
1051            Predicate::Eq("session_id", session_id.into()),
1052            Predicate::IsNotNull("search_text"),
1053        ]);
1054        let batch = self
1055            .handle
1056            .scan_batch(
1057                Table::Messages,
1058                Some(&filter),
1059                &["id", "timestamp", "role", "search_text"],
1060            )
1061            .await?;
1062
1063        let mut rows = Vec::with_capacity(batch.num_rows());
1064        for row in 0..batch.num_rows() {
1065            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1066            let role =
1067                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1068            let timestamp = datetime(&batch, "timestamp", row)?;
1069            let text_str = string(&batch, "search_text", row)?.context(
1070                "search_text null after IsNotNull pushdown - storage invariant violated",
1071            )?;
1072            rows.push(ConversationalRow {
1073                session_id: session_id.to_owned(),
1074                message_id,
1075                role,
1076                timestamp,
1077                text: SearchText(text_str),
1078            });
1079        }
1080        rows.sort_by(|a, b| {
1081            a.timestamp
1082                .cmp(&b.timestamp)
1083                .then_with(|| a.message_id.cmp(&b.message_id))
1084        });
1085        Ok(rows)
1086    }
1087
1088    /// Locate the session id for a stored message. Cheap when only the routing
1089    /// hint is needed - callers that need the messages use `scan_all_messages`.
1090    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1091        let batch = self
1092            .handle
1093            .scan_batch(
1094                Table::Messages,
1095                Some(&Predicate::Eq("id", message_id.into())),
1096                &["session_id"],
1097            )
1098            .await?;
1099        if batch.num_rows() == 0 {
1100            return Ok(None);
1101        }
1102        string(&batch, "session_id", 0)
1103    }
1104
1105    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1106        self.handle.row_counts().await
1107    }
1108
1109    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1110    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1111    /// handle's freshness gate, so each query sees a current snapshot.
1112    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1113        Ok(Arc::new(self.handle.dataset(table).await?))
1114    }
1115
1116    /// Write a `pond_sql_query` export artifact.
1117    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1118        self.handle.export_write(name, bytes).await
1119    }
1120
1121    /// Read a `pond_sql_query` export artifact back.
1122    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1123        self.handle.export_read(name).await
1124    }
1125
1126    /// Local filesystem path of an export artifact on `file://` installs.
1127    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1128        self.handle.export_local_path(name)
1129    }
1130
1131    /// Compute the per-adapter / per-project rollup that drives
1132    /// `pond status`. One scan over `messages` projecting the three
1133    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1134    /// aggregated in-memory. Bounded by the cross product of adapters and
1135    /// projects, which stays small on real corpora.
1136    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1137        let scanner = self
1138            .handle
1139            .scan(
1140                Table::Messages,
1141                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1142            )
1143            .await?;
1144        let mut stream = scanner.try_into_stream().await?;
1145        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1146        while let Some(batch) = stream.next().await {
1147            let batch = batch?;
1148            for row in 0..batch.num_rows() {
1149                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1150                let project = string(&batch, "project", row)?.unwrap_or_default();
1151                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1152                let is_subagent = source_agent.contains('/');
1153                if is_subagent && !include_subagents {
1154                    continue;
1155                }
1156                let entry = groups.entry((source_agent, project)).or_default();
1157                entry.messages += 1;
1158                entry.session_ids.insert(session_id);
1159            }
1160        }
1161
1162        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1163        let totals = RowTotals {
1164            sessions: totals_sessions as u64,
1165            messages: totals_messages as u64,
1166            parts: totals_parts as u64,
1167        };
1168
1169        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1170        for ((adapter, project), acc) in groups {
1171            by_adapter.entry(adapter).or_default().push(ProjectStats {
1172                project,
1173                sessions: acc.session_ids.len() as u64,
1174                messages: acc.messages,
1175            });
1176        }
1177
1178        let mut adapters = Vec::with_capacity(by_adapter.len());
1179        for (adapter, mut projects) in by_adapter {
1180            projects.sort_by(|a, b| {
1181                b.messages
1182                    .cmp(&a.messages)
1183                    .then_with(|| a.project.cmp(&b.project))
1184            });
1185            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1186            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1187            adapters.push(AdapterStats {
1188                adapter,
1189                sessions,
1190                messages,
1191                projects,
1192            });
1193        }
1194
1195        Ok(CorpusStats {
1196            data_url: self.handle.location().clone(),
1197            totals,
1198            adapters,
1199            include_subagents,
1200        })
1201    }
1202
1203    /// Write a batch of embeddings into `messages`: set `vector` and
1204    /// `embedding_model` on each row by `(session_id, id)`
1205    /// (spec.md#session-embed-from-canonical). The column update goes through the
1206    /// write seam and lands as a new manifest version (`append-only`).
1207    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1208        if rows.is_empty() {
1209            return Ok(());
1210        }
1211        let batch = embedding_update_batch(rows)?;
1212        self.handle
1213            .merge_update(Table::Messages, batch, rows.len())
1214            .await?;
1215        Ok(())
1216    }
1217
1218    /// Stream the backlog of messages needing embedding: rows with `search_text`
1219    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1220    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1221        try_stream! {
1222            let filter = Predicate::And(vec![
1223                Predicate::IsNull("vector"),
1224                Predicate::IsNotNull("search_text"),
1225            ]);
1226            let projection: &[&str] = &["session_id", "id", "search_text"];
1227            let scanner = self
1228                .handle
1229                .scan(
1230                    Table::Messages,
1231                    ScanOpts::with_predicate_and_projection(&filter, projection),
1232                )
1233                .await?;
1234            let mut batches = scanner
1235                .try_into_stream()
1236                .await
1237                .context("failed to open messages stream")?;
1238            while let Some(batch) = batches.next().await {
1239                let batch = batch?;
1240                for row in 0..batch.num_rows() {
1241                    yield PendingMessage {
1242                        session_id: string(&batch, "session_id", row)?
1243                            .context("session_id is null")?,
1244                        id: string(&batch, "id", row)?.context("message id is null")?,
1245                        search_text: string(&batch, "search_text", row)?
1246                            .context("search_text is null")?,
1247                    };
1248                }
1249            }
1250        }
1251    }
1252
1253    /// Stream messages that are either never embedded or stale under the
1254    /// current model. `pond embed --force` feeds this to the same unconditional
1255    /// merge_update as the normal backlog; the filter makes that semantically
1256    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1257    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1258        try_stream! {
1259            let filter = Predicate::And(vec![
1260                Predicate::IsNotNull("search_text"),
1261                Predicate::Or(vec![
1262                    Predicate::IsNull("vector"),
1263                    Predicate::Ne("embedding_model", embed::model_id().into()),
1264                ]),
1265            ]);
1266            let projection: &[&str] = &["session_id", "id", "search_text"];
1267            let scanner = self
1268                .handle
1269                .scan(
1270                    Table::Messages,
1271                    ScanOpts::with_predicate_and_projection(&filter, projection),
1272                )
1273                .await?;
1274            let mut batches = scanner
1275                .try_into_stream()
1276                .await
1277                .context("failed to open pending-or-stale messages stream")?;
1278            while let Some(batch) = batches.next().await {
1279                let batch = batch?;
1280                for row in 0..batch.num_rows() {
1281                    yield PendingMessage {
1282                        session_id: string(&batch, "session_id", row)?
1283                            .context("session_id is null")?,
1284                        id: string(&batch, "id", row)?.context("message id is null")?,
1285                        search_text: string(&batch, "search_text", row)?
1286                            .context("search_text is null")?,
1287                    };
1288                }
1289            }
1290        }
1291    }
1292
1293    /// BM25 full-text retriever over `messages.search_text`.
1294    pub async fn fts_search(
1295        &self,
1296        query: &str,
1297        limit: usize,
1298        filter: &Predicate,
1299    ) -> Result<Vec<(MessageKey, f32)>> {
1300        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1301        scanner.full_text_search(
1302            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1303        )?;
1304        // Lance ships an autoprojection that silently appends `_score` to FTS
1305        // output when the projection omits it. That behavior is going away;
1306        // we opt into the future explicit-projection contract here so the
1307        // scanner stops emitting a per-call deprecation warning, and we list
1308        // `_score` ourselves since the loop below reads it.
1309        scanner.disable_scoring_autoprojection();
1310        scanner.project(&["session_id", "id", "_score"])?;
1311        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1312        let batch = scanner.try_into_batch().await?;
1313        let mut hits = Vec::with_capacity(batch.num_rows());
1314        for row in 0..batch.num_rows() {
1315            let key = MessageKey {
1316                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1317                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1318            };
1319            hits.push((key, float32(&batch, "_score", row)?));
1320        }
1321        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1322        // order, which varies between runs and across calls with different pool
1323        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1324        // different orderings at the same tied score). Without an explicit
1325        // tiebreak the downstream RRF dedup-rank for a tied target session can
1326        // flip session-to-session, making fusion outcomes nondeterministic.
1327        // Sort by `score desc`, then `(session_id, message_id)` asc.
1328        hits.sort_by(|left, right| {
1329            right
1330                .1
1331                .partial_cmp(&left.1)
1332                .unwrap_or(std::cmp::Ordering::Equal)
1333                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1334                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1335        });
1336        Ok(hits)
1337    }
1338
1339    /// Count of searchable messages (non-null `search_text`) inside the
1340    /// caller's filter scope - the universe a search actually ran over.
1341    /// Powers the response's absence honesty (spec.md#search): "no relevant
1342    /// hits" only means something relative to how many messages were
1343    /// searchable at all, and 0 tells the caller their filters excluded
1344    /// everything before retrieval even started.
1345    pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1346        let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1347        let dataset = self.handle.dataset(Table::Messages).await?;
1348        dataset
1349            .count_rows(Some(scope.to_lance()))
1350            .await
1351            .map_err(Into::into)
1352    }
1353
1354    /// Whether any `messages` row carries a vector (spec.md#search) - the
1355    /// signal that flips search from FTS-only to hybrid. The single-active-
1356    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1357    /// vector belongs to the current model.
1358    pub async fn has_embeddings(&self) -> Result<bool> {
1359        let scope = Predicate::IsNotNull("vector");
1360        let mut scanner = self
1361            .handle
1362            .scan(
1363                Table::Messages,
1364                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1365            )
1366            .await?;
1367        scanner.limit(Some(1), None)?;
1368        let batch = scanner.try_into_batch().await?;
1369        Ok(batch.num_rows() > 0)
1370    }
1371
1372    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1373    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1374    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1375    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1376    /// skip them anyway). The single-active-model invariant lets pond drop
1377    /// the per-row model filter: every non-null vector belongs to the current
1378    /// model.
1379    pub async fn vector_search(
1380        &self,
1381        query: &[f32],
1382        limit: usize,
1383        filter: &Predicate,
1384        search: Option<&config::SearchConfig>,
1385    ) -> Result<Vec<(MessageKey, f32)>> {
1386        let scope = embedded_scope(filter);
1387        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1388        let key = Float32Array::from(query.to_vec());
1389        scanner.nearest("vector", &key, limit)?;
1390        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1391            scanner.nprobes(nprobes);
1392        }
1393        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1394            scanner.refine(refine_factor);
1395        }
1396        // Mirror the explicit-projection contract from `fts_search`: opt out
1397        // of `_distance` autoprojection and list it ourselves since the loop
1398        // below reads it.
1399        scanner.disable_scoring_autoprojection();
1400        scanner.project(&["session_id", "id", "_distance"])?;
1401        let batch = scanner.try_into_batch().await?;
1402        let mut hits = Vec::with_capacity(batch.num_rows());
1403        for row in 0..batch.num_rows() {
1404            let key = MessageKey {
1405                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1406                message_id: string(&batch, "id", row)?.context("message id is null")?,
1407            };
1408            hits.push((key, float32(&batch, "_distance", row)?));
1409        }
1410        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1411        // emit hits with effectively identical `_distance` in fragment-dependent
1412        // order, which makes RRF dedup-ranks nondeterministic for tied
1413        // neighbors. Sort by distance asc (smaller = more similar), then by
1414        // `(session_id, message_id)` asc.
1415        hits.sort_by(|left, right| {
1416            left.1
1417                .partial_cmp(&right.1)
1418                .unwrap_or(std::cmp::Ordering::Equal)
1419                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1420                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1421        });
1422        Ok(hits)
1423    }
1424
1425    /// The DataFusion plan string for a filtered vector scan - the
1426    /// `search-prefilter-pushdown` regression guard reads it.
1427    pub async fn explain_vector_plan(
1428        &self,
1429        query: &[f32],
1430        limit: usize,
1431        filter: &Predicate,
1432        search: Option<&config::SearchConfig>,
1433    ) -> Result<String> {
1434        let scope = embedded_scope(filter);
1435        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1436        let key = Float32Array::from(query.to_vec());
1437        scanner.nearest("vector", &key, limit)?;
1438        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1439            scanner.nprobes(nprobes);
1440        }
1441        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1442            scanner.refine(refine_factor);
1443        }
1444        scanner
1445            .explain_plan(true)
1446            .await
1447            .context("explain_plan failed")
1448    }
1449
1450    pub async fn explain_fts_plan(
1451        &self,
1452        query: &str,
1453        limit: usize,
1454        filter: &Predicate,
1455    ) -> Result<String> {
1456        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1457        scanner.full_text_search(
1458            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1459        )?;
1460        scanner.project(&["session_id", "id"])?;
1461        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1462        scanner
1463            .explain_plan(true)
1464            .await
1465            .context("explain_plan failed")
1466    }
1467
1468    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1469    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1470        if keys.is_empty() {
1471            return Ok(Vec::new());
1472        }
1473        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1474        let session_ids = keys
1475            .iter()
1476            .map(|key| key.session_id.clone())
1477            .collect::<Vec<_>>();
1478        let message_ids = keys
1479            .iter()
1480            .map(|key| key.message_id.clone())
1481            .collect::<Vec<_>>();
1482        let predicate = Predicate::And(vec![
1483            in_predicate("session_id", &session_ids),
1484            in_predicate("id", &message_ids),
1485        ]);
1486        let batch = self
1487            .handle
1488            .scan_batch(
1489                Table::Messages,
1490                Some(&predicate),
1491                &[
1492                    "id",
1493                    "session_id",
1494                    "role",
1495                    "project",
1496                    "source_agent",
1497                    "timestamp",
1498                    "search_text",
1499                ],
1500            )
1501            .await?;
1502        let mut metas = Vec::with_capacity(batch.num_rows());
1503        for row in 0..batch.num_rows() {
1504            let message_id = string(&batch, "id", row)?.context("id is null")?;
1505            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1506            if !wanted.contains(&MessageKey {
1507                session_id: session_id.clone(),
1508                message_id: message_id.clone(),
1509            }) {
1510                continue;
1511            }
1512            metas.push(MessageMeta {
1513                message_id,
1514                session_id,
1515                role: string(&batch, "role", row)?.context("role is null")?,
1516                project: string(&batch, "project", row)?.context("project is null")?,
1517                source_agent: string(&batch, "source_agent", row)?
1518                    .context("source_agent is null")?,
1519                timestamp: datetime(&batch, "timestamp", row)?,
1520                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1521            });
1522        }
1523        Ok(metas)
1524    }
1525
1526    /// Total message count per session, for search session summaries.
1527    pub async fn session_message_counts(
1528        &self,
1529        session_ids: &[String],
1530    ) -> Result<BTreeMap<String, usize>> {
1531        if session_ids.is_empty() {
1532            return Ok(BTreeMap::new());
1533        }
1534        let dataset = self.handle.dataset(Table::Messages).await?;
1535        let mut tasks = tokio::task::JoinSet::new();
1536        for session_id in session_ids {
1537            let dataset = dataset.clone();
1538            let session_id = session_id.clone();
1539            tasks.spawn(async move {
1540                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1541                let count = dataset.count_rows(Some(filter)).await?;
1542                anyhow::Ok((session_id, count))
1543            });
1544        }
1545        let mut counts = BTreeMap::new();
1546        while let Some(joined) = tasks.join_next().await {
1547            let (session_id, count) = joined.context("session count task panicked")??;
1548            counts.insert(session_id, count);
1549        }
1550        Ok(counts)
1551    }
1552
1553    /// Rows appended to `messages` since the FTS index was last optimized.
1554    /// A missing index reports the whole table; the query is manifest-only.
1555    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1556        self.handle
1557            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1558            .await
1559    }
1560
1561    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1562    /// was last optimized. Below
1563    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1564    /// must read [`embedding_progress`](Self::embedding_progress) too and
1565    /// distinguish "index not built yet" from "index trails data".
1566    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1567        self.handle
1568            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1569            .await
1570    }
1571
1572    /// Embedding coverage: how many `messages` rows carry a vector and how
1573    /// many are still eligible. Drives the `pond status` embeddings line and
1574    /// the `pond embed` progress bar's known total. `embedded` reads the
1575    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1576    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1577    /// `embedding_model` column.
1578    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1579        let dataset = self.handle.dataset(Table::Messages).await?;
1580        let embedded = dataset
1581            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1582            .await?;
1583        let total = dataset
1584            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1585            .await?;
1586        Ok(EmbeddingProgress {
1587            embedded,
1588            total,
1589            model: embed::model_id(),
1590        })
1591    }
1592
1593    /// Count rows whose `embedding_model` is not the currently configured
1594    /// model AND whose `vector` is still populated - the signal `pond embed`
1595    /// uses to detect a model swap and require `--force`.
1596    pub async fn stale_embedding_count(&self) -> Result<usize> {
1597        let dataset = self.handle.dataset(Table::Messages).await?;
1598        dataset
1599            .count_rows(Some(
1600                Predicate::And(vec![
1601                    Predicate::IsNotNull("vector"),
1602                    Predicate::Ne("embedding_model", embed::model_id().into()),
1603                ])
1604                .to_lance(),
1605            ))
1606            .await
1607            .map_err(Into::into)
1608    }
1609
1610    /// Run the per-table maintenance cycle (compact + indices) across every
1611    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1612    /// and compaction commit independently, so a hot writer that starves
1613    /// compaction on one table does not abort the index work the operator
1614    /// asked for on other tables (or even on the same table).
1615    pub async fn optimize_indices(
1616        &self,
1617        progress: Option<OptimizeProgressFn>,
1618        maintenance: &MaintenancePolicy,
1619    ) -> Result<OptimizeOutcome> {
1620        let intents = pond_index_intents();
1621        let mut tables = Vec::with_capacity(3);
1622        for (table, intents) in intents.all() {
1623            let outcome = self
1624                .handle
1625                .optimize_table(table, intents, progress.as_ref(), maintenance)
1626                .await;
1627            tables.push(outcome);
1628        }
1629        Ok(OptimizeOutcome { tables })
1630    }
1631
1632    /// Fold trailing fragments into existing indices across every table,
1633    /// without running compaction. Used by `pond embed`'s tail so newly
1634    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1635    /// without paying the compaction retry budget while embed itself may
1636    /// still be writing in a sibling process.
1637    pub async fn build_indices_only(
1638        &self,
1639        progress: Option<OptimizeProgressFn>,
1640    ) -> Result<OptimizeOutcome> {
1641        let policy = pond_index_intents();
1642        let mut tables = Vec::with_capacity(3);
1643        for (table, intents) in policy.all() {
1644            let indices = self
1645                .handle
1646                .optimize_table_indices_only(table, intents, progress.as_ref())
1647                .await;
1648            tables.push(TableOptimizeOutcome {
1649                table,
1650                indices,
1651                compaction: PhaseOutcome::NotAttempted,
1652            });
1653        }
1654        Ok(OptimizeOutcome { tables })
1655    }
1656
1657    #[cfg(test)]
1658    async fn optimize_indices_with_vector_threshold(
1659        &self,
1660        vector_threshold: usize,
1661    ) -> Result<OptimizeOutcome> {
1662        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1663        let policy = MaintenancePolicy::always_compact();
1664        let mut tables = Vec::with_capacity(3);
1665        for (table, intents) in intents.all() {
1666            let outcome = self
1667                .handle
1668                .optimize_table(table, intents, None, &policy)
1669                .await;
1670            tables.push(outcome);
1671        }
1672        Ok(OptimizeOutcome { tables })
1673    }
1674
1675    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1676        let policy = pond_index_intents();
1677        let mut matched = false;
1678        for (table, intents) in policy.all() {
1679            for intent in intents {
1680                if intent_name.is_none_or(|name| name == intent.name) {
1681                    matched = true;
1682                    self.handle.rebuild_index(table, intent).await?;
1683                }
1684            }
1685        }
1686        if let Some(name) = intent_name
1687            && !matched
1688        {
1689            anyhow::bail!("unknown index intent {name:?}");
1690        }
1691        Ok(())
1692    }
1693
1694    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1695        let policy = pond_index_intents();
1696        let mut statuses = Vec::new();
1697        for (table, intents) in policy.all() {
1698            statuses.extend(self.handle.index_status(table, intents).await?);
1699        }
1700        Ok(statuses)
1701    }
1702
1703    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1704    /// --force` before re-bootstrapping under a different model. Silent
1705    /// when the index does not exist.
1706    pub async fn drop_vector_index(&self) -> Result<()> {
1707        match self
1708            .handle
1709            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1710            .await
1711        {
1712            Ok(()) => Ok(()),
1713            Err(error) => {
1714                let msg = error.to_string();
1715                if msg.contains("not found") || msg.contains("does not exist") {
1716                    Ok(())
1717                } else {
1718                    Err(error)
1719                }
1720            }
1721        }
1722    }
1723
1724    /// On-disk byte totals per dataset, sized through Lance's object store
1725    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1726    pub async fn table_sizes(&self) -> Result<TableSizes> {
1727        self.handle.table_sizes().await
1728    }
1729
1730    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1731        let batch = self
1732            .handle
1733            .scan_batch(
1734                Table::Sessions,
1735                Some(&Predicate::Eq("id", session_id.into())),
1736                &[],
1737            )
1738            .await?;
1739        if batch.num_rows() == 0 {
1740            Ok(None)
1741        } else {
1742            Ok(Some(session_from_batch(&batch, 0)?))
1743        }
1744    }
1745
1746    /// Fetch the stored vector for one message, for `similar_to`-style
1747    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1748    /// embedded, or when no message has that id. Vectors are stored Float16
1749    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1750    /// widen here. Cheap rare op - one predicate scan, no index.
1751    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1752        let batch = self
1753            .handle
1754            .scan_batch(
1755                Table::Messages,
1756                Some(&Predicate::Eq("id", message_id.into())),
1757                &["vector"],
1758            )
1759            .await?;
1760        if batch.num_rows() == 0 {
1761            return Ok(None);
1762        }
1763        let column = batch
1764            .column(0)
1765            .as_any()
1766            .downcast_ref::<FixedSizeListArray>();
1767        let Some(list) = column else {
1768            return Ok(None);
1769        };
1770        if list.is_null(0) {
1771            return Ok(None);
1772        }
1773        let values = list.value(0);
1774        let halves = values
1775            .as_any()
1776            .downcast_ref::<Float16Array>()
1777            .context("messages.vector inner array is not Float16")?;
1778        let widened = (0..halves.len())
1779            .map(|i| halves.value(i).to_f32())
1780            .collect();
1781        Ok(Some(widened))
1782    }
1783
1784    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1785        let batch = self
1786            .handle
1787            .scan_batch(
1788                Table::Messages,
1789                Some(&Predicate::Eq("session_id", session_id.into())),
1790                &[
1791                    "session_id",
1792                    "id",
1793                    "timestamp",
1794                    "role",
1795                    "content",
1796                    "options",
1797                ],
1798            )
1799            .await?;
1800        let mut messages = Vec::with_capacity(batch.num_rows());
1801        for row in 0..batch.num_rows() {
1802            messages.push(message_from_batch(&batch, row)?);
1803        }
1804        messages.sort_by(|left, right| {
1805            left.timestamp()
1806                .cmp(&right.timestamp())
1807                .then_with(|| left.id().cmp(right.id()))
1808        });
1809
1810        let message_ids = messages
1811            .iter()
1812            .map(|message| message.id().to_owned())
1813            .collect::<Vec<_>>();
1814        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1815
1816        Ok(messages
1817            .into_iter()
1818            .map(|message| {
1819                let key = (message.session_id().to_owned(), message.id().to_owned());
1820                let parts = parts_by_message.remove(&key).unwrap_or_default();
1821                MessageWithParts { message, parts }
1822            })
1823            .collect())
1824    }
1825
1826    /// Every part of these messages, full fidelity (file blobs included). The
1827    /// canonical read primitive - restore/export, verbatim mode, and the
1828    /// message-mode target all need the complete set.
1829    pub async fn parts_for_messages(
1830        &self,
1831        session_id: &str,
1832        message_ids: &[String],
1833    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1834        self.scan_parts(session_id, message_ids, None).await
1835    }
1836
1837    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1838    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1839    /// nothing. For the summary-only reads (conversational/complete session
1840    /// views, search hits) - it never feeds restore/export.
1841    pub async fn summary_parts_for_messages(
1842        &self,
1843        session_id: &str,
1844        message_ids: &[String],
1845    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1846        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1847            .await
1848    }
1849
1850    async fn scan_parts(
1851        &self,
1852        session_id: &str,
1853        message_ids: &[String],
1854        part_types: Option<&[&str]>,
1855    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1856        if message_ids.is_empty() {
1857            return Ok(BTreeMap::new());
1858        }
1859        let mut clauses = vec![
1860            Predicate::Eq("session_id", session_id.into()),
1861            in_predicate("message_id", message_ids),
1862        ];
1863        if let Some(types) = part_types {
1864            clauses.push(Predicate::In(
1865                "type",
1866                types.iter().map(|&t| t.into()).collect(),
1867            ));
1868        }
1869        let predicate = Predicate::And(clauses);
1870        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1871        let mut scanner = self
1872            .handle
1873            .scan(
1874                Table::Parts,
1875                ScanOpts::with_predicate_and_projection(
1876                    &predicate,
1877                    &[
1878                        "session_id",
1879                        "message_id",
1880                        "id",
1881                        "ordinal",
1882                        "type",
1883                        "provenance",
1884                        "variant_data",
1885                        "options",
1886                    ],
1887                ),
1888            )
1889            .await?;
1890        scanner.with_row_address();
1891        let batch = scanner.try_into_batch().await.context("scan failed")?;
1892        let row_addresses = uint64(&batch, "_rowaddr")?;
1893        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1894        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1895        for row in 0..batch.num_rows() {
1896            if string(&batch, "type", row)?.as_deref() == Some("file") {
1897                let variant_data =
1898                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1899                file_rows.push((row, row_addresses.value(row), variant_data));
1900            }
1901        }
1902        if !file_rows.is_empty() {
1903            let addresses = file_rows
1904                .iter()
1905                .map(|(_, address, _)| *address)
1906                .collect::<Vec<_>>();
1907            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1908            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1909                // Legacy blob (lance-encoding:blob): payload is bytes; the
1910                // url variant stored its URL as UTF-8 bytes, recovered via
1911                // `file_data_from_blob`'s `data_kind = "url"` branch.
1912                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1913                file_payloads.insert(row, payload);
1914            }
1915        }
1916        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1917        for row in 0..batch.num_rows() {
1918            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1919            parts_by_message
1920                .entry((part.session_id.clone(), part.message_id.clone()))
1921                .or_default()
1922                .push(part);
1923        }
1924        for parts in parts_by_message.values_mut() {
1925            parts.sort_by_key(|part| part.ordinal);
1926        }
1927        Ok(parts_by_message)
1928    }
1929}
1930
1931#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1932#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1933pub enum IngestEvent {
1934    Session(Session),
1935    Message(Message),
1936    Part(Part),
1937}
1938
1939/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1940/// The wire layer (`pond_ingest`) instead returns per-row results; the
1941/// aggregate is derived from those at the wire boundary.
1942///
1943/// Fields are bucketed by population so the summary never conflates "100
1944/// validator-rejected rows in 1 bad session" with "100 separate failures."
1945/// The shape is set by spec.md#adapter-integrity-event-ordering.
1946#[derive(Debug, Clone, PartialEq, Eq, Default)]
1947pub struct IngestSummary {
1948    /// Rows actually written to Lance, summed across all three tables.
1949    /// Use the per-table fields below for user-facing counts; this stays
1950    /// for `accepted()` and existing wire callers.
1951    pub inserted: usize,
1952    /// Rows that already existed (merge_insert no-op match).
1953    pub matched: usize,
1954    /// Session rows inserted this pass.
1955    pub sessions_inserted: usize,
1956    /// Message rows inserted this pass (total - includes tool calls,
1957    /// tool results, and other non-searchable messages).
1958    pub messages_inserted_total: usize,
1959    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1960    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1961    /// count in `pond sync` / `pond status` reads this field.
1962    pub messages_inserted_searchable: usize,
1963    /// Part rows inserted this pass.
1964    pub parts_inserted: usize,
1965    /// Session rows already-present (merge_insert matched).
1966    pub sessions_matched: usize,
1967    /// Message rows already-present (merge_insert matched), total.
1968    pub messages_matched_total: usize,
1969    /// Subset of `messages_matched_total` with `search_text`.
1970    pub messages_matched_searchable: usize,
1971    /// Part rows already-present.
1972    pub parts_matched: usize,
1973    /// Events the validator dropped under per-event-drop policy (ordering
1974    /// violation, orphan part, mismatched parent, adapter parse failure,
1975    /// duplicate-id collision, ...). Counted by event, not by session: a
1976    /// session with one bad part stays in this bucket as 1, not as "the
1977    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1978    /// own emissions upstream when source replay is expected; the
1979    /// validator's in-batch HashSet is a safety net, not a feature
1980    /// adapters may rely on. If this bucket grows on a clean adapter,
1981    /// inspect `drop_reasons` for the top contributors.
1982    pub dropped_events: usize,
1983    /// Sessions whose Session-level invariants (immutable `source_agent` /
1984    /// `project` against the stored row) failed at flush time and
1985    /// whose substream got rejected wholesale. Always small relative to
1986    /// `inserted`; if not, there's a real problem to investigate.
1987    pub dropped_sessions: usize,
1988    /// Files the adapter couldn't decode at all (no Session header
1989    /// extractable: empty `.jsonl`, missing required field).
1990    pub skipped_files: usize,
1991    /// Files that produced no importable session and were benignly skipped:
1992    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1993    /// metadata file), or an unextractable header. Never an error or a drop;
1994    /// the underlying cause is logged at `-vv` (debug) verbosity.
1995    pub skipped_empty: usize,
1996    /// Sessions short-circuited via the per-session staleness skip
1997    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1998    /// pond last wrote that session's row, so re-decode was bypassed.
1999    pub skipped_fresh: usize,
2000    /// Storage-layer failures whose retries were exhausted (commit
2001    /// conflicts, transient IO that didn't recover). Hard zero on healthy
2002    /// runs.
2003    pub storage_errors: usize,
2004    /// Oversized values truncated to a bounded sentinel at the seam
2005    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
2006    pub truncated_values: usize,
2007    /// Histogram of stable reason keys for the combined `dropped_events +
2008    /// dropped_sessions` populations. Keys are `&'static str` (see the
2009    /// `DROP_REASON_*` constants) so consumers can match by identity.
2010    /// Empty on a clean run. Used by `pond sync` to print the top reasons
2011    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
2012    pub drop_reasons: BTreeMap<&'static str, usize>,
2013}
2014
2015/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
2016/// the per-row `RowError::reason_key`. `&'static str` so consumers can
2017/// match by identity rather than prose. Adding a new variant: pick a short
2018/// snake_case identifier, route it from the validator/adapter, and update
2019/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
2020pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2021pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2022pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2023pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2024pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2025pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2026pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2027pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2028pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2029pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2030pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2031
2032/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
2033/// returned counts together with the pre-existence sets captured by
2034/// `upsert_session_batch`. Folded into a per-sync summary via
2035/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
2036/// is a no-op write, so the inserted/matched split is informational - we still
2037/// surface it because both `pond sync` and `pond_ingest` clients reconcile
2038/// against "which rows landed this call."
2039#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2040pub struct BatchCounts {
2041    pub sessions_inserted: usize,
2042    pub sessions_matched: usize,
2043    pub messages_inserted_total: usize,
2044    pub messages_inserted_searchable: usize,
2045    pub messages_matched_total: usize,
2046    pub messages_matched_searchable: usize,
2047    pub parts_inserted: usize,
2048    pub parts_matched: usize,
2049}
2050
2051impl IngestSummary {
2052    pub fn accepted(&self) -> usize {
2053        self.inserted + self.matched
2054    }
2055
2056    /// Sole writer of the per-table counters on the CLI batched flush path.
2057    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2058    /// both for the same rows would double-count.
2059    pub fn add_batch(&mut self, counts: &BatchCounts) {
2060        self.sessions_inserted += counts.sessions_inserted;
2061        self.sessions_matched += counts.sessions_matched;
2062        self.messages_inserted_total += counts.messages_inserted_total;
2063        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2064        self.messages_matched_total += counts.messages_matched_total;
2065        self.messages_matched_searchable += counts.messages_matched_searchable;
2066        self.parts_inserted += counts.parts_inserted;
2067        self.parts_matched += counts.parts_matched;
2068        self.inserted +=
2069            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2070        self.matched +=
2071            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2072    }
2073
2074    /// Sum every counter from `other` into `self`. Used by the multi-source
2075    /// `pond sync` loop so adding a new field to this struct doesn't silently
2076    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2077    pub fn merge(&mut self, other: &Self) {
2078        self.inserted += other.inserted;
2079        self.matched += other.matched;
2080        self.sessions_inserted += other.sessions_inserted;
2081        self.messages_inserted_total += other.messages_inserted_total;
2082        self.messages_inserted_searchable += other.messages_inserted_searchable;
2083        self.parts_inserted += other.parts_inserted;
2084        self.sessions_matched += other.sessions_matched;
2085        self.messages_matched_total += other.messages_matched_total;
2086        self.messages_matched_searchable += other.messages_matched_searchable;
2087        self.parts_matched += other.parts_matched;
2088        self.dropped_events += other.dropped_events;
2089        self.dropped_sessions += other.dropped_sessions;
2090        self.skipped_files += other.skipped_files;
2091        self.skipped_empty += other.skipped_empty;
2092        self.skipped_fresh += other.skipped_fresh;
2093        self.storage_errors += other.storage_errors;
2094        self.truncated_values += other.truncated_values;
2095        for (key, value) in &other.drop_reasons {
2096            *self.drop_reasons.entry(key).or_insert(0) += value;
2097        }
2098    }
2099
2100    /// Same dispatch as [`Self::add_outcomes`] but ignores
2101    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2102    /// via [`Self::add_batch`] and uses this method to attribute per-row
2103    /// `Error` outcomes from the same flush.
2104    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2105        for outcome in outcomes {
2106            if !matches!(outcome.status, OutcomeStatus::Error) {
2107                continue;
2108            }
2109            if outcome.kind == "session" {
2110                self.dropped_sessions += 1;
2111            } else {
2112                self.dropped_events += 1;
2113            }
2114            let reason = outcome
2115                .error
2116                .as_ref()
2117                .and_then(|error| error.reason_key)
2118                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2119            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2120        }
2121    }
2122
2123    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2124        for outcome in outcomes {
2125            match outcome.status {
2126                OutcomeStatus::Inserted => {
2127                    self.inserted += 1;
2128                    match outcome.kind {
2129                        "session" => self.sessions_inserted += 1,
2130                        "message" => {
2131                            self.messages_inserted_total += 1;
2132                            if outcome.searchable {
2133                                self.messages_inserted_searchable += 1;
2134                            }
2135                        }
2136                        "part" => self.parts_inserted += 1,
2137                        _ => {}
2138                    }
2139                }
2140                OutcomeStatus::Matched => {
2141                    self.matched += 1;
2142                    match outcome.kind {
2143                        "session" => self.sessions_matched += 1,
2144                        "message" => {
2145                            self.messages_matched_total += 1;
2146                            if outcome.searchable {
2147                                self.messages_matched_searchable += 1;
2148                            }
2149                        }
2150                        "part" => self.parts_matched += 1,
2151                        _ => {}
2152                    }
2153                }
2154                OutcomeStatus::Error => {
2155                    // Session-level rejection: exactly one session-kind Error
2156                    // outcome (see `error_outcomes_for_substream`). Per-event
2157                    // drop: one Error per message/part. The two populations
2158                    // are counted separately so the operator can tell a
2159                    // structural reject from a row-level skip.
2160                    if outcome.kind == "session" {
2161                        self.dropped_sessions += 1;
2162                    } else {
2163                        self.dropped_events += 1;
2164                    }
2165                    let reason = outcome
2166                        .error
2167                        .as_ref()
2168                        .and_then(|e| e.reason_key)
2169                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2170                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2171                }
2172            }
2173        }
2174    }
2175}
2176
2177/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2178/// row per input event from the request's `events` array. The validator
2179/// returns these in array order so the wire layer can pack them directly
2180/// into [`crate::wire::IngestResult`] entries.
2181#[derive(Debug, Clone, PartialEq)]
2182pub struct RowOutcome {
2183    pub index: usize,
2184    pub kind: &'static str,
2185    pub pk: Value,
2186    pub status: OutcomeStatus,
2187    pub error: Option<RowError>,
2188    /// True iff `kind == "message"` AND the underlying row carries
2189    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2190    /// so the CLI can show "searchable" message deltas distinct from raw
2191    /// inserts. Always false for session/part rows.
2192    pub searchable: bool,
2193}
2194
2195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2196pub enum OutcomeStatus {
2197    Inserted,
2198    Matched,
2199    Error,
2200}
2201
2202/// Structured per-row error body. Mirrors the wire shape so the handler
2203/// can pass it straight through.
2204#[derive(Debug, Clone, PartialEq, Eq)]
2205pub struct RowError {
2206    pub message: String,
2207    pub field: Option<&'static str>,
2208    pub reason: Option<&'static str>,
2209    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2210    /// `reason` field above is human-prose; `reason_key` is the machine
2211    /// bucket. `None` means uncategorized; consumers attribute to
2212    /// `DROP_REASON_UNCATEGORIZED`.
2213    pub reason_key: Option<&'static str>,
2214}
2215
2216/// Buffered session events tagged with their input array index, so the
2217/// per-row outcomes can be re-attributed once `merge_insert` returns its
2218/// per-row Inserted/Matched stats.
2219#[derive(Debug)]
2220struct BufferedSession {
2221    index: usize,
2222    session: Session,
2223}
2224
2225#[derive(Debug)]
2226struct BufferedMessage {
2227    index: usize,
2228    message: Message,
2229    parts: Vec<BufferedPart>,
2230    search_text: Option<String>,
2231}
2232
2233#[derive(Debug)]
2234struct BufferedPart {
2235    index: usize,
2236    part: Part,
2237}
2238
2239/// State machine that turns the `events: Vec<IngestEvent>` array into a
2240/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2241/// session substream so `merge_insert` runs once per substream (three
2242/// batches: sessions, messages, parts). A validation error on a single event
2243/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2244/// continues; only Session-level invariants (immutable source_agent / project
2245/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2246///
2247/// Writes are batched at flush time. As complete substreams arrive (a new
2248/// `Session` event closes out the current one), they accumulate in
2249/// `completed` rather than each one calling `merge_insert` immediately.
2250/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2251/// at which point one batched 3-parallel-merge-insert covers all pending
2252/// substreams. This is the load-bearing perf change: per-substream commit
2253/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2254/// and amortizing it across N sessions cuts wall time materially.
2255#[derive(Debug, Default)]
2256pub struct IngestValidator {
2257    session: Option<BufferedSession>,
2258    current_message: Option<BufferedMessage>,
2259    current_parts: Vec<BufferedPart>,
2260    messages: Vec<BufferedMessage>,
2261    /// Message ids already buffered in the current substream. Duplicate ids
2262    /// drop the offending event in-line rather than failing the whole batch
2263    /// downstream.
2264    seen_message_ids: HashSet<String>,
2265    /// `(message_id, part_id)` keys already buffered in the current
2266    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2267    seen_part_keys: HashSet<(String, String)>,
2268    /// Substreams whose end-of-stream boundary has been observed but whose
2269    /// rows haven't been written yet. Flushed in batched mode by
2270    /// [`Self::flush`].
2271    completed: Vec<CompletedSubstream>,
2272}
2273
2274/// One closed substream ready for the batched flush path.
2275#[derive(Debug)]
2276struct CompletedSubstream {
2277    session_index: usize,
2278    session: Session,
2279    messages: Vec<BufferedMessage>,
2280}
2281
2282/// Ingest host provenance (`options.pond`, spec.md#model-pond-options),
2283/// computed once per process. An audit fact - "the process that inserted this
2284/// row" - not identity. Fallible lookups are omitted, never synthesized as
2285/// placeholders.
2286fn ingest_host_stamp() -> Option<&'static Value> {
2287    static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2288    STAMP
2289        .get_or_init(|| {
2290            let mut host = serde_json::Map::new();
2291            if let Ok(username) = whoami::username() {
2292                host.insert("username".to_owned(), username.into());
2293            }
2294            if let Ok(hostname) = whoami::hostname() {
2295                host.insert("hostname".to_owned(), hostname.into());
2296            }
2297            if let Ok(devicename) = whoami::devicename() {
2298                host.insert("device_name".to_owned(), devicename.into());
2299            }
2300            (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2301        })
2302        .as_ref()
2303}
2304
2305impl IngestValidator {
2306    /// Drive one input event through the validator. Returns the per-row
2307    /// outcomes the event triggered: empty when the event is just buffered,
2308    /// or N entries when a session substream just flushed (success or
2309    /// failure). `Err` is reserved for catastrophic storage failures that
2310    /// should fail the whole `pond_ingest` request.
2311    pub async fn push(
2312        &mut self,
2313        store: &Store,
2314        index: usize,
2315        event: IngestEvent,
2316    ) -> Result<Vec<RowOutcome>> {
2317        match event {
2318            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2319            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2320            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2321        }
2322    }
2323
2324    /// Final flush at end-of-batch. Closes the in-flight substream and
2325    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2326    /// the wire layer) alongside the honest per-table counts (for
2327    /// `IngestSummary::add_batch`).
2328    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2329        self.close_current_substream();
2330        self.flush(store).await
2331    }
2332
2333    /// Drain every completed substream into batched 3-parallel-merge_insert
2334    /// writes. Caller invokes this periodically (every N completed
2335    /// substreams) to keep memory bounded; in adapter-driven sync that
2336    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2337    /// in-flight substream stays buffered - close it explicitly via
2338    /// [`Self::finish`] or by feeding the next Session event.
2339    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2340        if self.completed.is_empty() {
2341            return Ok((Vec::new(), BatchCounts::default()));
2342        }
2343        let completed = std::mem::take(&mut self.completed);
2344        store.upsert_session_batch(completed).await
2345    }
2346
2347    /// Number of fully-buffered substreams awaiting batched write. Used by
2348    /// the adapter caller to decide when to call [`Self::flush`].
2349    pub fn pending_substreams(&self) -> usize {
2350        self.completed.len()
2351    }
2352
2353    async fn push_session(
2354        &mut self,
2355        _store: &Store,
2356        index: usize,
2357        mut session: Session,
2358    ) -> Result<Vec<RowOutcome>> {
2359        // Close out the current substream (if any) - move it to the pending
2360        // buffer instead of writing immediately. The actual write happens
2361        // when the caller invokes `flush` / `finish`.
2362        self.close_current_substream();
2363
2364        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2365        // if empty after trim. A Session event with empty source_agent is
2366        // dropped on the spot - the substream that would follow has nothing
2367        // to anchor on, so subsequent message/part events will also drop.
2368        let trimmed = session.source_agent.trim();
2369        if trimmed.is_empty() {
2370            return Ok(vec![RowOutcome {
2371                index,
2372                kind: "session",
2373                pk: Value::String(session.id.clone()),
2374                status: OutcomeStatus::Error,
2375                error: Some(RowError {
2376                    message: format!("session {} has empty source_agent after trim", session.id),
2377                    field: Some("source_agent"),
2378                    reason: None,
2379                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2380                }),
2381                searchable: false,
2382            }]);
2383        }
2384        if trimmed.len() != session.source_agent.len() {
2385            session.source_agent = trimmed.to_owned();
2386        }
2387
2388        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2389            return Ok(vec![RowOutcome {
2390                index,
2391                kind: "session",
2392                pk: Value::String(session.id.clone()),
2393                status: OutcomeStatus::Error,
2394                error: Some(RowError {
2395                    message: format!(
2396                        "session {} has parent_message_id without parent_session_id",
2397                        session.id,
2398                    ),
2399                    field: Some("parent_message_id"),
2400                    reason: None,
2401                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2402                }),
2403                searchable: false,
2404            }]);
2405        }
2406
2407        self.seen_message_ids.clear();
2408        self.seen_part_keys.clear();
2409        self.session = Some(BufferedSession { index, session });
2410        Ok(Vec::new())
2411    }
2412
2413    fn close_current_substream(&mut self) {
2414        self.flush_current_message();
2415        let Some(BufferedSession {
2416            index: session_index,
2417            session,
2418        }) = self.session.take()
2419        else {
2420            return;
2421        };
2422        let messages = std::mem::take(&mut self.messages);
2423        self.seen_message_ids.clear();
2424        self.seen_part_keys.clear();
2425        self.completed.push(CompletedSubstream {
2426            session_index,
2427            session,
2428            messages,
2429        });
2430    }
2431
2432    fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2433        let pk = Value::Array(vec![
2434            Value::String(message.session_id().to_owned()),
2435            Value::String(message.id().to_owned()),
2436        ]);
2437        let Some(session) = &self.session else {
2438            return vec![error_outcome(
2439                index,
2440                "message",
2441                pk,
2442                "first event in a session stream must be Session",
2443                None,
2444                DROP_REASON_MESSAGE_BEFORE_SESSION,
2445            )];
2446        };
2447        if message.session_id() != session.session.id {
2448            let msg = format!(
2449                "message {} references session {}, expected {}",
2450                message.id(),
2451                message.session_id(),
2452                session.session.id
2453            );
2454            return vec![error_outcome(
2455                index,
2456                "message",
2457                pk,
2458                &msg,
2459                Some("session_id"),
2460                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2461            )];
2462        }
2463        if !self.seen_message_ids.insert(message.id().to_owned()) {
2464            // Keep same-substream duplicate ids visible in `dropped_events`;
2465            // adapters are expected to dedupe upstream (see claude-code's
2466            // per-file `seen_uuids`), so a hit here is worth investigating.
2467            let msg = format!("duplicate message id {} in session substream", message.id());
2468            return vec![error_outcome(
2469                index,
2470                "message",
2471                pk,
2472                &msg,
2473                None,
2474                DROP_REASON_DUPLICATE_MESSAGE_ID,
2475            )];
2476        }
2477        // `options.pond` is core-owned (spec.md#model-pond-options): stripped
2478        // and restamped at ingest so neither adapters nor wire clients can
2479        // spoof provenance. Matched rows are merge_insert no-ops, so re-ingest
2480        // never restamps stored rows.
2481        match ingest_host_stamp() {
2482            Some(stamp) => {
2483                message
2484                    .options_mut()
2485                    .insert("pond".to_owned(), stamp.clone());
2486            }
2487            None => {
2488                message.options_mut().remove("pond");
2489            }
2490        }
2491        self.flush_current_message();
2492        self.current_message = Some(BufferedMessage {
2493            index,
2494            message,
2495            parts: Vec::new(),
2496            search_text: None,
2497        });
2498        Vec::new()
2499    }
2500
2501    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2502        let pk = Value::Array(vec![
2503            Value::String(part.session_id.clone()),
2504            Value::String(part.message_id.clone()),
2505            Value::String(part.id.clone()),
2506        ]);
2507        let Some(current) = &self.current_message else {
2508            return vec![error_outcome(
2509                index,
2510                "part",
2511                pk,
2512                "part event appeared before a message",
2513                None,
2514                DROP_REASON_PART_BEFORE_MESSAGE,
2515            )];
2516        };
2517        if part.session_id != current.message.session_id() {
2518            let msg = format!(
2519                "part {} references session {}, expected {}",
2520                part.id,
2521                part.session_id,
2522                current.message.session_id()
2523            );
2524            return vec![error_outcome(
2525                index,
2526                "part",
2527                pk,
2528                &msg,
2529                Some("session_id"),
2530                DROP_REASON_PART_MESSAGE_MISMATCH,
2531            )];
2532        }
2533        if part.message_id != current.message.id() {
2534            let msg = format!(
2535                "part {} references message {}, expected {}",
2536                part.id,
2537                part.message_id,
2538                current.message.id()
2539            );
2540            return vec![error_outcome(
2541                index,
2542                "part",
2543                pk,
2544                &msg,
2545                Some("message_id"),
2546                DROP_REASON_PART_MESSAGE_MISMATCH,
2547            )];
2548        }
2549        let part_key = (part.message_id.clone(), part.id.clone());
2550        if !self.seen_part_keys.insert(part_key) {
2551            let msg = format!(
2552                "duplicate part id {} for message {} in session substream",
2553                part.id, part.message_id
2554            );
2555            return vec![error_outcome(
2556                index,
2557                "part",
2558                pk,
2559                &msg,
2560                None,
2561                DROP_REASON_DUPLICATE_PART_KEY,
2562            )];
2563        }
2564        self.current_parts.push(BufferedPart { index, part });
2565        Vec::new()
2566    }
2567
2568    fn flush_current_message(&mut self) {
2569        let Some(mut buffered) = self.current_message.take() else {
2570            return;
2571        };
2572        let parts = std::mem::take(&mut self.current_parts);
2573        let mut canonical_parts = Vec::with_capacity(parts.len());
2574        for part in &parts {
2575            canonical_parts.push(part.part.clone());
2576        }
2577        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2578        buffered.parts = parts;
2579        self.messages.push(buffered);
2580    }
2581}
2582
2583fn error_outcome(
2584    index: usize,
2585    kind: &'static str,
2586    pk: Value,
2587    message: &str,
2588    field: Option<&'static str>,
2589    reason_key: &'static str,
2590) -> RowOutcome {
2591    RowOutcome {
2592        index,
2593        kind,
2594        pk,
2595        status: OutcomeStatus::Error,
2596        error: Some(RowError {
2597            message: message.to_owned(),
2598            field,
2599            reason: None,
2600            reason_key: Some(reason_key),
2601        }),
2602        searchable: false,
2603    }
2604}
2605
2606/// Session-level rejection (immutable `source_agent` / `project` violation):
2607/// emit exactly one Error outcome on the Session row. The buffered messages
2608/// and parts of this substream are *not* surfaced as per-row errors - their
2609/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2610fn error_outcomes_for_substream(
2611    session_index: usize,
2612    session: &Session,
2613    _messages: &[BufferedMessage],
2614    message: impl Into<String>,
2615    field: Option<&'static str>,
2616    reason_key: &'static str,
2617) -> Vec<RowOutcome> {
2618    let reason = field.map(|_| "immutable");
2619    vec![RowOutcome {
2620        index: session_index,
2621        kind: "session",
2622        pk: Value::String(session.id.clone()),
2623        status: OutcomeStatus::Error,
2624        error: Some(RowError {
2625            message: message.into(),
2626            field,
2627            reason,
2628            reason_key: Some(reason_key),
2629        }),
2630        searchable: false,
2631    }]
2632}
2633
2634/// Batched-path success helper. Each row's Inserted/Matched status is read
2635/// from the pre-existence sets captured by `upsert_session_batch` before its
2636/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2637/// Also accumulates the per-table totals into `counts` so the CLI summary
2638/// gets the same truth without re-walking the outcomes.
2639fn success_outcomes_for_substream(
2640    session_index: usize,
2641    session: &Session,
2642    messages: &[BufferedMessage],
2643    existing_sessions: &std::collections::HashMap<String, Session>,
2644    existing_message_pks: &HashSet<(String, String)>,
2645    existing_part_pks: &HashSet<(String, String, String)>,
2646    counts: &mut BatchCounts,
2647) -> Vec<RowOutcome> {
2648    let session_was_present = existing_sessions.contains_key(&session.id);
2649    let session_status = if session_was_present {
2650        counts.sessions_matched += 1;
2651        UpsertStatus::Matched
2652    } else {
2653        counts.sessions_inserted += 1;
2654        UpsertStatus::Inserted
2655    };
2656
2657    let mut outcomes = Vec::with_capacity(1 + messages.len());
2658    outcomes.push(success_outcome(
2659        session_index,
2660        "session",
2661        Value::String(session.id.clone()),
2662        session_status,
2663        false,
2664    ));
2665    for buffered in messages {
2666        let key = (
2667            buffered.message.session_id().to_owned(),
2668            buffered.message.id().to_owned(),
2669        );
2670        let searchable = buffered.search_text.is_some();
2671        let message_status = if existing_message_pks.contains(&key) {
2672            counts.messages_matched_total += 1;
2673            if searchable {
2674                counts.messages_matched_searchable += 1;
2675            }
2676            UpsertStatus::Matched
2677        } else {
2678            counts.messages_inserted_total += 1;
2679            if searchable {
2680                counts.messages_inserted_searchable += 1;
2681            }
2682            UpsertStatus::Inserted
2683        };
2684        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2685        outcomes.push(success_outcome(
2686            buffered.index,
2687            "message",
2688            pk,
2689            message_status,
2690            searchable,
2691        ));
2692        for part in &buffered.parts {
2693            let part_key = (
2694                part.part.session_id.clone(),
2695                part.part.message_id.clone(),
2696                part.part.id.clone(),
2697            );
2698            let part_status = if existing_part_pks.contains(&part_key) {
2699                counts.parts_matched += 1;
2700                UpsertStatus::Matched
2701            } else {
2702                counts.parts_inserted += 1;
2703                UpsertStatus::Inserted
2704            };
2705            let part_pk = Value::Array(vec![
2706                Value::String(part_key.0),
2707                Value::String(part_key.1),
2708                Value::String(part_key.2),
2709            ]);
2710            outcomes.push(success_outcome(
2711                part.index,
2712                "part",
2713                part_pk,
2714                part_status,
2715                false,
2716            ));
2717        }
2718    }
2719    outcomes
2720}
2721
2722fn success_outcome(
2723    index: usize,
2724    kind: &'static str,
2725    pk: Value,
2726    status: UpsertStatus,
2727    searchable: bool,
2728) -> RowOutcome {
2729    let status = match status {
2730        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2731        UpsertStatus::Matched => OutcomeStatus::Matched,
2732    };
2733    RowOutcome {
2734        index,
2735        kind,
2736        pk,
2737        status,
2738        error: None,
2739        searchable,
2740    }
2741}
2742
2743#[derive(Debug, Clone, PartialEq, Eq)]
2744enum IngestError {
2745    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2746    /// immutable post-first-write because the denormalized copies on
2747    /// `messages` were stamped from the prior Session at first ingest.
2748    /// A re-write that changes either would silently desync.
2749    ImmutableField {
2750        field: &'static str,
2751        session_id: String,
2752        stored: String,
2753        attempted: String,
2754    },
2755}
2756
2757impl std::fmt::Display for IngestError {
2758    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2759        match self {
2760            Self::ImmutableField {
2761                field,
2762                session_id,
2763                stored,
2764                attempted,
2765            } => write!(
2766                formatter,
2767                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2768            ),
2769        }
2770    }
2771}
2772
2773impl std::error::Error for IngestError {}
2774
2775/// Compare an incoming Session row against the stored row on the two
2776/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2777/// counts a NULL-vs-non-NULL change as a mismatch.
2778fn ensure_immutable_match(
2779    existing: &Session,
2780    incoming: &Session,
2781) -> std::result::Result<(), IngestError> {
2782    if existing.source_agent != incoming.source_agent {
2783        return Err(IngestError::ImmutableField {
2784            field: "source_agent",
2785            session_id: incoming.id.clone(),
2786            stored: existing.source_agent.clone(),
2787            attempted: incoming.source_agent.clone(),
2788        });
2789    }
2790    if existing.project != incoming.project {
2791        return Err(IngestError::ImmutableField {
2792            field: "project",
2793            session_id: incoming.id.clone(),
2794            stored: (*existing.project).clone(),
2795            attempted: (*incoming.project).clone(),
2796        });
2797    }
2798    Ok(())
2799}
2800
2801pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2802    use crate::wire::Provenance;
2803    let mut chunks: Vec<String> = Vec::new();
2804    for part in parts {
2805        // spec.md#search: only conversational parts contribute to the indexed
2806        // text; harness-injected scaffolding is excluded from search.
2807        if part.provenance != Provenance::Conversational {
2808            continue;
2809        }
2810        match (message.role(), &part.kind) {
2811            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2812                if let Some(text) = text {
2813                    chunks.push(text.to_string());
2814                }
2815            }
2816            (
2817                Role::User | Role::Assistant,
2818                PartKind::File {
2819                    media_type,
2820                    file_name,
2821                    data,
2822                },
2823            ) => {
2824                if let Some(file_name) = file_name {
2825                    chunks.push(file_name.clone());
2826                }
2827                if let Some(media_type) = media_type {
2828                    chunks.push(media_type.clone());
2829                }
2830                if let FileData::Url(uri) = data {
2831                    chunks.push(uri.clone());
2832                }
2833            }
2834            (
2835                Role::System | Role::Tool,
2836                PartKind::Text { .. }
2837                | PartKind::Reasoning { .. }
2838                | PartKind::File { .. }
2839                | PartKind::ToolCall { .. }
2840                | PartKind::ToolResult { .. }
2841                | PartKind::ToolApprovalRequest { .. }
2842                | PartKind::ToolApprovalResponse { .. },
2843            )
2844            | (
2845                Role::User | Role::Assistant,
2846                PartKind::Reasoning { .. }
2847                | PartKind::ToolCall { .. }
2848                | PartKind::ToolResult { .. }
2849                | PartKind::ToolApprovalRequest { .. }
2850                | PartKind::ToolApprovalResponse { .. },
2851            ) => {}
2852        }
2853    }
2854
2855    let text = chunks
2856        .into_iter()
2857        .filter(|chunk| !chunk.trim().is_empty())
2858        .collect::<Vec<_>>()
2859        .join("\n");
2860    if text.is_empty() { None } else { Some(text) }
2861}
2862
2863/// Non-empty conversational text (spec.md#search).
2864#[derive(Debug, Clone, PartialEq, Eq)]
2865pub struct SearchText(String);
2866
2867impl SearchText {
2868    pub fn as_str(&self) -> &str {
2869        &self.0
2870    }
2871
2872    pub fn into_inner(self) -> String {
2873        self.0
2874    }
2875}
2876
2877impl AsRef<str> for SearchText {
2878    fn as_ref(&self) -> &str {
2879        &self.0
2880    }
2881}
2882
2883#[derive(Debug, Clone, PartialEq)]
2884pub struct MessageWithParts {
2885    pub message: Message,
2886    pub parts: Vec<Part>,
2887}
2888
2889#[derive(Debug, Clone, PartialEq)]
2890pub struct SessionWithMessages {
2891    pub session: Session,
2892    pub messages: Vec<MessageWithParts>,
2893}
2894
2895#[derive(Debug, Clone)]
2896pub struct SessionViewParams<'a> {
2897    pub mode: ResponseMode,
2898    pub after_id: Option<&'a str>,
2899    pub limit: usize,
2900    pub budget_bytes: usize,
2901    pub session_from: SessionFrom,
2902}
2903
2904#[derive(Debug, Clone)]
2905pub struct MessageViewParams<'a> {
2906    pub context_depth: usize,
2907    /// Which siblings fill the context window: conversational (default)
2908    /// keeps the window on the human/model exchange; complete/verbatim
2909    /// include system/tool carriers.
2910    pub mode: ResponseMode,
2911    pub after_id: Option<&'a str>,
2912    pub limit: usize,
2913    pub budget_bytes: usize,
2914}
2915
2916/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2917/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2918/// `validation_failed`): the message/part stream is append-only, so an anchor
2919/// that was ever valid never disappears - an unknown one is always a client
2920/// error, never a reason to silently restart the page.
2921#[derive(Debug, Clone, PartialEq)]
2922pub enum GetLookup<T> {
2923    NotFound,
2924    UnknownAfterId,
2925    Found(T),
2926}
2927
2928/// Canonical retrieval result for `pond_get` session mode: the stored session
2929/// plus the page of messages (each with its `Part`s) and a remaining count.
2930/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2931#[derive(Debug, Clone, PartialEq)]
2932pub struct SessionPage {
2933    pub session: Session,
2934    pub messages: Vec<RetrievedMessage>,
2935    pub messages_remaining: usize,
2936}
2937
2938/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2939/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2940/// their parts so the handler can summarize them.
2941#[derive(Debug, Clone, PartialEq)]
2942pub struct MessagePage {
2943    pub session: Session,
2944    pub target: RetrievedMessage,
2945    pub target_parts: Vec<Part>,
2946    pub target_parts_remaining: usize,
2947    pub siblings: Vec<RetrievedMessage>,
2948}
2949
2950#[derive(Debug, Clone, PartialEq)]
2951pub struct RetrievedMessage {
2952    pub id: String,
2953    pub role: Role,
2954    pub timestamp: DateTime<Utc>,
2955    pub text: Option<String>,
2956    pub content: Option<String>,
2957    pub parts: Vec<Part>,
2958}
2959
2960#[derive(Debug, Clone)]
2961struct ScanRow {
2962    id: String,
2963    role: Role,
2964    timestamp: DateTime<Utc>,
2965    text: Option<String>,
2966    content: Option<String>,
2967}
2968
2969/// One row of the conversational scan. `text` is non-empty by
2970/// `IsNotNull("search_text")` pushdown (spec.md#search).
2971#[derive(Debug, Clone)]
2972pub struct ConversationalRow {
2973    pub session_id: String,
2974    pub message_id: String,
2975    pub role: Role,
2976    pub timestamp: DateTime<Utc>,
2977    pub text: SearchText,
2978}
2979
2980/// Number of leading `items` that fit within `limit` and the byte budget,
2981/// sizing each by `size`. Always emits at least one (a single oversize item
2982/// never blocks its own page); the budget then stops the page at the next item
2983/// boundary.
2984fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2985    let capped = items.len().min(limit.clamp(1, 1000));
2986    let mut acc = 0usize;
2987    let mut emitted = 0usize;
2988    for item in &items[..capped] {
2989        let next = acc.saturating_add(size(item));
2990        if emitted > 0 && next > budget_bytes {
2991            break;
2992        }
2993        acc = next;
2994        emitted += 1;
2995    }
2996    emitted
2997}
2998
2999fn role_from_str(value: &str) -> Result<Role> {
3000    match value {
3001        "system" => Ok(Role::System),
3002        "user" => Ok(Role::User),
3003        "assistant" => Ok(Role::Assistant),
3004        "tool" => Ok(Role::Tool),
3005        other => anyhow::bail!("unknown message role {other}"),
3006    }
3007}
3008
3009/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
3010/// and range columns, BITMAP for low-cardinality columns. There is no index
3011/// on `embedding_model`: pond's invariant is one active model at a time
3012/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
3013/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
3014/// query-time predicate - the only embedding-state filter is `vector IS NOT
3015/// NULL`. `id` lookups are rare and full-scan.
3016const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3017    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
3018    (
3019        "session_id",
3020        BuiltinIndexType::BTree,
3021        "messages_session_id_btree",
3022    ),
3023    (
3024        "timestamp",
3025        BuiltinIndexType::BTree,
3026        "messages_timestamp_btree",
3027    ),
3028    (
3029        "source_agent",
3030        BuiltinIndexType::Bitmap,
3031        "messages_source_agent_bitmap",
3032    ),
3033    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
3034];
3035
3036/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
3037/// `parts_for_messages` (hydration on every `get` and grouped search).
3038const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3039    (
3040        "session_id",
3041        BuiltinIndexType::BTree,
3042        "parts_session_id_btree",
3043    ),
3044    (
3045        "message_id",
3046        BuiltinIndexType::BTree,
3047        "parts_message_id_btree",
3048    ),
3049];
3050
3051/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
3052/// `get` and every grouped search.
3053const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3054    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3055
3056fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3057    Predicate::In(
3058        column,
3059        values.iter().cloned().map(ScalarValue::String).collect(),
3060    )
3061}
3062
3063/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
3064/// never sees a null-vector row. Under the single-active-model invariant,
3065/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
3066/// the configured model" - no per-row `embedding_model` filter needed.
3067fn embedded_scope(filter: &Predicate) -> Predicate {
3068    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3069}
3070
3071fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3072    let inserted = usize::try_from(inserted_rows)
3073        .unwrap_or(usize::MAX)
3074        .min(total);
3075    let mut statuses = Vec::with_capacity(total);
3076    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3077    statuses.extend(std::iter::repeat_n(
3078        UpsertStatus::Matched,
3079        total.saturating_sub(inserted),
3080    ));
3081    statuses
3082}
3083
3084// Bare logical table names: the lance-namespace Directory impl owns the
3085// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3086// a `.lance` path.
3087pub(crate) const SESSIONS: &str = "sessions";
3088pub(crate) const MESSAGES: &str = "messages";
3089pub(crate) const PARTS: &str = "parts";
3090
3091/// FTS index name on `messages.search_text`. Stable so status and index
3092/// creation name the same index.
3093pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3094
3095/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3096/// activation check and index creation name the same index.
3097pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3098
3099/// IVF_PQ tuning constants (spec.md#search):
3100/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3101/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3102/// - max_iters = 15 (kmeans cap)
3103/// - cosine metric (e5 vectors are L2-normalized)
3104const IVF_PQ_NUM_BITS: u8 = 8;
3105const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3106const IVF_PQ_MAX_ITERS: usize = 15;
3107
3108/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3109/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3110/// (`FTS`, `OCC`) searchable.
3111const FTS_NGRAM_MIN: u32 = 3;
3112const FTS_NGRAM_MAX: u32 = 5;
3113
3114/// Pond's production IndexIntents: the per-table intent set
3115/// `Store::open_with_options` registers with the substrate.
3116pub fn pond_index_intents() -> IndexIntents {
3117    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3118}
3119
3120/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3121/// threshold. Used by tests that need to exercise the activation boundary
3122/// without writing 100k vectors.
3123pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3124    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3125    messages.push(IndexIntent {
3126        name: MESSAGES_FTS_INDEX,
3127        column: "search_text",
3128        trigger: IndexTrigger::OnAnyRows,
3129        params: IndexParamsKind::InvertedFtsNgram {
3130            min: FTS_NGRAM_MIN,
3131            max: FTS_NGRAM_MAX,
3132        },
3133    });
3134    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3135        messages.push(IndexIntent {
3136            name,
3137            column,
3138            trigger: IndexTrigger::OnAnyRows,
3139            params: IndexParamsKind::Scalar(kind.clone()),
3140        });
3141    }
3142    messages.push(IndexIntent {
3143        name: MESSAGES_VECTOR_INDEX,
3144        column: "vector",
3145        trigger: IndexTrigger::OnNonNullCount {
3146            column: "vector",
3147            threshold: vector_threshold,
3148        },
3149        params: IndexParamsKind::IvfPqCosine {
3150            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3151            num_bits: IVF_PQ_NUM_BITS,
3152            max_iters: IVF_PQ_MAX_ITERS,
3153        },
3154    });
3155    let parts = PARTS_SCALAR_INDICES
3156        .iter()
3157        .map(|(column, kind, name)| IndexIntent {
3158            name,
3159            column,
3160            trigger: IndexTrigger::OnAnyRows,
3161            params: IndexParamsKind::Scalar(kind.clone()),
3162        })
3163        .collect();
3164    let sessions = SESSIONS_SCALAR_INDICES
3165        .iter()
3166        .map(|(column, kind, name)| IndexIntent {
3167            name,
3168            column,
3169            trigger: IndexTrigger::OnAnyRows,
3170            params: IndexParamsKind::Scalar(kind.clone()),
3171        })
3172        .collect();
3173    IndexIntents {
3174        sessions,
3175        messages,
3176        parts,
3177    }
3178}
3179
3180/// Default width of the `messages.vector` embedding column (spec.md#search):
3181/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3182/// 384). Used when `[embeddings].dim` is absent.
3183pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3184
3185/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3186/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3187/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3188/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3189/// which keeps unit tests config-free.
3190static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3191
3192/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3193/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3194pub fn embedding_dim() -> usize {
3195    EMBEDDING_DIM_RUNTIME
3196        .get()
3197        .copied()
3198        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3199}
3200
3201/// Seed [`embedding_dim`] from config. First call wins.
3202pub fn init_embedding_dim(dim: usize) {
3203    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3204}
3205
3206/// Initial-`CREATE` write params for the namespace-mediated path. The
3207/// substrate seam stamps in `session`, `mode`, and `store_params`.
3208/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3209/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3210/// suppresses the per-commit hook so cleanup stays operator-driven via
3211/// `pond index optimize` (one LIST per command instead of per write).
3212pub(crate) fn write_params_for_create() -> WriteParams {
3213    WriteParams {
3214        data_storage_version: Some(LanceFileVersion::V2_1),
3215        enable_v2_manifest_paths: true,
3216        enable_stable_row_ids: true,
3217        auto_cleanup: Some(AutoCleanupParams {
3218            interval: 20,
3219            older_than: chrono::TimeDelta::days(1),
3220        }),
3221        skip_auto_cleanup: true,
3222        ..WriteParams::default()
3223    }
3224}
3225
3226fn export_schema(table: Table) -> Arc<Schema> {
3227    match table {
3228        Table::Sessions => session_schema(),
3229        Table::Messages => message_schema(),
3230        Table::Parts => part_schema(),
3231    }
3232}
3233
3234fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3235    let expected = export_schema(table);
3236    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3237    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3238    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3239    if actual_names != expected_names {
3240        anyhow::bail!(
3241            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3242            table.as_str(),
3243        );
3244    }
3245    Ok(())
3246}
3247
3248async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3249    let source_uri = source
3250        .to_str()
3251        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3252    let dataset = Dataset::open(source_uri)
3253        .await
3254        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3255    ensure_schema_matches_archive(&dataset, table)?;
3256    Ok(dataset)
3257}
3258
3259pub(crate) fn session_schema() -> Arc<Schema> {
3260    Arc::new(Schema::new(vec![
3261        primary_field("id", DataType::Utf8, false),
3262        Field::new("parent_session_id", DataType::Utf8, true),
3263        Field::new("parent_message_id", DataType::Utf8, true),
3264        Field::new("source_agent", DataType::Utf8, false),
3265        Field::new(
3266            "created_at",
3267            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3268            false,
3269        ),
3270        Field::new("project", DataType::Utf8, false),
3271        json_field("options", false),
3272    ]))
3273}
3274
3275pub(crate) fn message_schema() -> Arc<Schema> {
3276    Arc::new(Schema::new(vec![
3277        primary_field("session_id", DataType::Utf8, false),
3278        primary_field("id", DataType::Utf8, false),
3279        Field::new(
3280            "timestamp",
3281            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3282            false,
3283        ),
3284        Field::new("role", DataType::Utf8, false),
3285        Field::new("source_agent", DataType::Utf8, false),
3286        Field::new("project", DataType::Utf8, false),
3287        Field::new("content", DataType::Utf8, true),
3288        Field::new("search_text", DataType::Utf8, true),
3289        // The message's derived embedding (spec.md#session-embed-from-canonical):
3290        // both null until `pond embed` fills them, set together thereafter.
3291        Field::new("vector", embedding_vector_type(), true),
3292        Field::new("embedding_model", DataType::Utf8, true),
3293        json_field("options", false),
3294    ]))
3295}
3296
3297pub(crate) fn part_schema() -> Arc<Schema> {
3298    Arc::new(Schema::new(vec![
3299        primary_field("session_id", DataType::Utf8, false),
3300        primary_field("message_id", DataType::Utf8, false),
3301        primary_field("id", DataType::Utf8, false),
3302        Field::new("ordinal", DataType::Int32, false),
3303        Field::new("type", DataType::Utf8, false),
3304        // spec.md#model-part-provenance: conversation vs harness-injected; search
3305        // reads this column to exclude injected scaffolding.
3306        Field::new("provenance", DataType::Utf8, false),
3307        json_field("variant_data", false),
3308        legacy_blob_field("data", true),
3309        json_field("options", false),
3310    ]))
3311}
3312
3313pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3314    let arrays = schema
3315        .fields()
3316        .iter()
3317        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3318        .collect();
3319    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3320}
3321
3322pub(crate) fn empty_reader(
3323    schema: Arc<Schema>,
3324) -> Result<
3325    RecordBatchIterator<
3326        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3327    >,
3328> {
3329    let batch = empty_batch(schema.clone())?;
3330    Ok(RecordBatchIterator::new(
3331        vec![Ok(batch)].into_iter(),
3332        schema,
3333    ))
3334}
3335
3336pub(crate) struct MessageBatchRow<'a> {
3337    pub message: &'a Message,
3338    pub source_agent: &'a str,
3339    pub project: &'a str,
3340    pub search_text: Option<&'a str>,
3341}
3342
3343// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3344// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3345// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3346// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3347// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3348fn embedding_vector_type() -> DataType {
3349    DataType::FixedSizeList(
3350        Arc::new(Field::new("item", DataType::Float16, true)),
3351        embedding_dim() as i32,
3352    )
3353}
3354
3355/// The partial-schema source for the embedding column update: the `messages`
3356/// primary key plus the two columns `pond embed` fills. The field definitions
3357/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3358fn embedding_update_schema() -> Arc<Schema> {
3359    Arc::new(Schema::new(vec![
3360        primary_field("session_id", DataType::Utf8, false),
3361        primary_field("id", DataType::Utf8, false),
3362        Field::new("vector", embedding_vector_type(), true),
3363        Field::new("embedding_model", DataType::Utf8, true),
3364    ]))
3365}
3366
3367/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3368/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3369pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3370    let dim = embedding_dim();
3371    let mut flat = Vec::with_capacity(rows.len() * dim);
3372    for row in rows {
3373        if row.vector.len() != dim {
3374            anyhow::bail!(
3375                "embedding for message {} has dim {}, expected {dim}",
3376                row.id,
3377                row.vector.len(),
3378            );
3379        }
3380        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3381    }
3382    let values = Float16Array::from(flat);
3383    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3384    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3385        .context("failed to build embedding vector column")?;
3386
3387    RecordBatch::try_new(
3388        embedding_update_schema(),
3389        vec![
3390            Arc::new(StringArray::from(
3391                rows.iter()
3392                    .map(|row| row.session_id.as_str())
3393                    .collect::<Vec<_>>(),
3394            )),
3395            Arc::new(StringArray::from(
3396                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3397            )),
3398            Arc::new(vectors),
3399            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3400        ],
3401    )
3402    .context("failed to build embedding update batch")
3403}
3404
3405/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3406/// is split before the running total of its text columns reaches this, and a
3407/// single cell at or above it is rejected rather than left to panic inside
3408/// `StringArray::from` (spec.md#adapter-bounded-values).
3409const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3410
3411/// Contiguous row ranges whose summed text-column byte cost each stays within
3412/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3413/// column too, since no single column's total can exceed it. `cells[i]` is row
3414/// `i`'s byte cost summed across every text column.
3415fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3416    let mut chunks = Vec::new();
3417    let mut start = 0usize;
3418    let mut running = 0usize;
3419    for (index, &row) in cells.iter().enumerate() {
3420        if running + row > COLUMN_BYTE_BUDGET && index > start {
3421            chunks.push(start..index);
3422            start = index;
3423            running = 0;
3424        }
3425        running += row;
3426    }
3427    if start < cells.len() {
3428        chunks.push(start..cells.len());
3429    }
3430    chunks
3431}
3432
3433fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3434    if bytes >= COLUMN_BYTE_BUDGET {
3435        anyhow::bail!(
3436            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3437             overflow Arrow's i32 offset buffer"
3438        );
3439    }
3440    Ok(())
3441}
3442
3443async fn merge_insert_chunks(
3444    handle: &Handle,
3445    table: Table,
3446    batches: Vec<RecordBatch>,
3447) -> Result<u64> {
3448    let mut inserted = 0u64;
3449    for batch in batches {
3450        let rows = batch.num_rows();
3451        inserted += handle.merge_insert(table, batch, rows).await?;
3452    }
3453    Ok(inserted)
3454}
3455
3456pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3457    let options = sessions
3458        .iter()
3459        .map(|session| json_bytes(&session.options))
3460        .collect::<Result<Vec<_>>>()?;
3461    let mut cells = Vec::with_capacity(sessions.len());
3462    for (session, encoded) in sessions.iter().zip(&options) {
3463        let columns = [
3464            session.id.len(),
3465            session.parent_session_id.as_deref().map_or(0, str::len),
3466            session.parent_message_id.as_deref().map_or(0, str::len),
3467            session.source_agent.len(),
3468            session.project.as_str().len(),
3469            encoded.len(),
3470        ];
3471        for bytes in columns {
3472            guard_cell("sessions", &session.id, bytes)?;
3473        }
3474        cells.push(columns.iter().sum());
3475    }
3476    chunk_ranges(&cells)
3477        .into_iter()
3478        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3479        .collect()
3480}
3481
3482fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3483    let schema = session_schema();
3484    RecordBatch::try_new(
3485        schema.clone(),
3486        vec![
3487            Arc::new(StringArray::from(
3488                sessions
3489                    .iter()
3490                    .map(|session| session.id.as_str())
3491                    .collect::<Vec<_>>(),
3492            )),
3493            Arc::new(StringArray::from(
3494                sessions
3495                    .iter()
3496                    .map(|session| session.parent_session_id.as_deref())
3497                    .collect::<Vec<_>>(),
3498            )),
3499            Arc::new(StringArray::from(
3500                sessions
3501                    .iter()
3502                    .map(|session| session.parent_message_id.as_deref())
3503                    .collect::<Vec<_>>(),
3504            )),
3505            Arc::new(StringArray::from(
3506                sessions
3507                    .iter()
3508                    .map(|session| session.source_agent.as_str())
3509                    .collect::<Vec<_>>(),
3510            )),
3511            Arc::new(
3512                TimestampMicrosecondArray::from(
3513                    sessions
3514                        .iter()
3515                        .map(|session| micros(session.created_at))
3516                        .collect::<Vec<_>>(),
3517                )
3518                .with_timezone("UTC"),
3519            ),
3520            Arc::new(StringArray::from(
3521                sessions
3522                    .iter()
3523                    .map(|session| session.project.as_str())
3524                    .collect::<Vec<_>>(),
3525            )),
3526            Arc::new(LargeBinaryArray::from_iter_values(
3527                options.iter().map(Vec::as_slice),
3528            )),
3529        ],
3530    )
3531    .context("failed to build session batch")
3532}
3533
3534pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3535    let options = rows
3536        .iter()
3537        .map(|row| json_bytes(row.message.options()))
3538        .collect::<Result<Vec<_>>>()?;
3539    let mut cells = Vec::with_capacity(rows.len());
3540    for (row, encoded) in rows.iter().zip(&options) {
3541        let columns = [
3542            row.message.session_id().len(),
3543            row.message.id().len(),
3544            row.message.role().as_str().len(),
3545            row.source_agent.len(),
3546            row.project.len(),
3547            row.message.system_content().map_or(0, str::len),
3548            row.search_text.map_or(0, str::len),
3549            encoded.len(),
3550        ];
3551        for bytes in columns {
3552            guard_cell("messages", row.message.id(), bytes)?;
3553        }
3554        cells.push(columns.iter().sum());
3555    }
3556    chunk_ranges(&cells)
3557        .into_iter()
3558        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3559        .collect()
3560}
3561
3562fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3563    let schema = message_schema();
3564    RecordBatch::try_new(
3565        schema.clone(),
3566        vec![
3567            Arc::new(StringArray::from(
3568                rows.iter()
3569                    .map(|row| row.message.session_id())
3570                    .collect::<Vec<_>>(),
3571            )),
3572            Arc::new(StringArray::from(
3573                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3574            )),
3575            Arc::new(
3576                TimestampMicrosecondArray::from(
3577                    rows.iter()
3578                        .map(|row| micros(row.message.timestamp()))
3579                        .collect::<Vec<_>>(),
3580                )
3581                .with_timezone("UTC"),
3582            ),
3583            Arc::new(StringArray::from(
3584                rows.iter()
3585                    .map(|row| row.message.role().as_str())
3586                    .collect::<Vec<_>>(),
3587            )),
3588            Arc::new(StringArray::from(
3589                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3590            )),
3591            Arc::new(StringArray::from(
3592                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3593            )),
3594            Arc::new(StringArray::from(
3595                rows.iter()
3596                    .map(|row| row.message.system_content())
3597                    .collect::<Vec<_>>(),
3598            )),
3599            Arc::new(StringArray::from(
3600                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3601            )),
3602            // `vector` / `embedding_model` are written null at ingest; every
3603            // message starts un-embedded and `pond embed` fills them later
3604            // (spec.md#session-embed-from-canonical).
3605            new_null_array(&embedding_vector_type(), rows.len()),
3606            new_null_array(&DataType::Utf8, rows.len()),
3607            Arc::new(LargeBinaryArray::from_iter_values(
3608                options.iter().map(Vec::as_slice),
3609            )),
3610        ],
3611    )
3612    .context("failed to build message batch")
3613}
3614
3615pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3616    let variant_data = parts
3617        .iter()
3618        .map(|part| part_variant_json(&part.kind))
3619        .collect::<Result<Vec<_>>>()?;
3620    let options = parts
3621        .iter()
3622        .map(|part| json_bytes(&part.options))
3623        .collect::<Result<Vec<_>>>()?;
3624    let mut cells = Vec::with_capacity(parts.len());
3625    // The blob column is a BinaryArray, exempt from the text-column bound
3626    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3627    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3628        let columns = [
3629            part.session_id.len(),
3630            part.message_id.len(),
3631            part.id.len(),
3632            part.kind.type_name().len(),
3633            part.provenance.as_str().len(),
3634            variant.len(),
3635            encoded.len(),
3636        ];
3637        for bytes in columns {
3638            guard_cell("parts", &part.id, bytes)?;
3639        }
3640        cells.push(columns.iter().sum());
3641    }
3642    chunk_ranges(&cells)
3643        .into_iter()
3644        .map(|range| {
3645            parts_chunk(
3646                &parts[range.clone()],
3647                &variant_data[range.clone()],
3648                &options[range],
3649            )
3650        })
3651        .collect()
3652}
3653
3654fn parts_chunk(
3655    parts: &[Part],
3656    variant_data: &[Vec<u8>],
3657    options: &[Vec<u8>],
3658) -> Result<RecordBatch> {
3659    let schema = part_schema();
3660    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3661    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3662    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3663    let blob_payloads: Vec<Option<&[u8]>> = parts
3664        .iter()
3665        .map(|part| match &part.kind {
3666            PartKind::File { data, .. } => Some(match data {
3667                FileData::String(value) => value.as_bytes(),
3668                FileData::Bytes(value) => value.as_slice(),
3669                FileData::Url(value) => value.as_bytes(),
3670            }),
3671            PartKind::Text { .. }
3672            | PartKind::Reasoning { .. }
3673            | PartKind::ToolCall { .. }
3674            | PartKind::ToolResult { .. }
3675            | PartKind::ToolApprovalRequest { .. }
3676            | PartKind::ToolApprovalResponse { .. } => None,
3677        })
3678        .collect();
3679    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3680
3681    RecordBatch::try_new(
3682        schema.clone(),
3683        vec![
3684            Arc::new(StringArray::from(
3685                parts
3686                    .iter()
3687                    .map(|part| part.session_id.as_str())
3688                    .collect::<Vec<_>>(),
3689            )),
3690            Arc::new(StringArray::from(
3691                parts
3692                    .iter()
3693                    .map(|part| part.message_id.as_str())
3694                    .collect::<Vec<_>>(),
3695            )),
3696            Arc::new(StringArray::from(
3697                parts
3698                    .iter()
3699                    .map(|part| part.id.as_str())
3700                    .collect::<Vec<_>>(),
3701            )),
3702            Arc::new(Int32Array::from(
3703                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3704            )),
3705            Arc::new(StringArray::from(
3706                parts
3707                    .iter()
3708                    .map(|part| part.kind.type_name())
3709                    .collect::<Vec<_>>(),
3710            )),
3711            Arc::new(StringArray::from(
3712                parts
3713                    .iter()
3714                    .map(|part| part.provenance.as_str())
3715                    .collect::<Vec<_>>(),
3716            )),
3717            Arc::new(LargeBinaryArray::from_iter_values(
3718                variant_data.iter().map(Vec::as_slice),
3719            )),
3720            Arc::new(blob_array),
3721            Arc::new(LargeBinaryArray::from_iter_values(
3722                options.iter().map(Vec::as_slice),
3723            )),
3724        ],
3725    )
3726    .context("failed to build parts batch")
3727}
3728
3729pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3730    Ok(Session {
3731        id: string(batch, "id", row)?.context("session id is null")?,
3732        parent_session_id: string(batch, "parent_session_id", row)?,
3733        parent_message_id: string(batch, "parent_message_id", row)?,
3734        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3735        created_at: datetime(batch, "created_at", row)?,
3736        project: crate::adapter::Extracted::from_stored(
3737            string(batch, "project", row)?.context("project is null")?,
3738        ),
3739        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3740    })
3741}
3742
3743pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3744    let id = string(batch, "id", row)?.context("message id is null")?;
3745    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3746    let timestamp = datetime(batch, "timestamp", row)?;
3747    let options =
3748        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3749
3750    match string(batch, "role", row)?
3751        .context("message role is null")?
3752        .as_str()
3753    {
3754        "system" => Ok(Message::System {
3755            id,
3756            session_id,
3757            timestamp,
3758            // `content` is nullable in the schema; preserve the distinction
3759            // between "no content row stored" (`None`) and "empty string
3760            // stored" (`Some(extracted_empty)`). The value originally
3761            // came from a `Source` extraction at ingest time; rewrap via
3762            // the storage-internal `from_stored` so the type-system seal
3763            // for adapters stays intact.
3764            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3765            options,
3766        }),
3767        "user" => Ok(Message::User {
3768            id,
3769            session_id,
3770            timestamp,
3771            options,
3772        }),
3773        "assistant" => Ok(Message::Assistant {
3774            id,
3775            session_id,
3776            timestamp,
3777            options,
3778        }),
3779        "tool" => Ok(Message::Tool {
3780            id,
3781            session_id,
3782            timestamp,
3783            options,
3784        }),
3785        other => anyhow::bail!("unknown message role {other}"),
3786    }
3787}
3788
3789pub(crate) fn part_from_batch(
3790    batch: &RecordBatch,
3791    row: usize,
3792    file_data: Option<FileData>,
3793) -> Result<Part> {
3794    let type_name = string(batch, "type", row)?.context("part type is null")?;
3795    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3796    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3797    Ok(Part {
3798        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3799        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3800        id: string(batch, "id", row)?.context("part id is null")?,
3801        ordinal: int32(batch, "ordinal", row)?,
3802        provenance: provenance_from_str(&provenance)?,
3803        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3804        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3805    })
3806}
3807
3808fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3809    match value {
3810        "conversational" => Ok(crate::wire::Provenance::Conversational),
3811        "injected" => Ok(crate::wire::Provenance::Injected),
3812        other => anyhow::bail!("unknown part provenance {other}"),
3813    }
3814}
3815
3816fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3817    let kind = file_data_kind(variant_data)?;
3818    match kind.as_str() {
3819        "string" => {
3820            let text = std::str::from_utf8(bytes)
3821                .context("file string payload is not UTF-8")?
3822                .to_owned();
3823            Ok(FileData::String(text))
3824        }
3825        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3826        "url" => Ok(FileData::Url(
3827            std::str::from_utf8(bytes)
3828                .context("file URL payload is not UTF-8")?
3829                .to_owned(),
3830        )),
3831        other => anyhow::bail!("unknown file data_kind {other}"),
3832    }
3833}
3834
3835fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3836    let value = json_parse::<Value>(variant_data)?;
3837    value
3838        .get("data_kind")
3839        .and_then(Value::as_str)
3840        .map(str::to_owned)
3841        .context("file part variant_data missing data_kind")
3842}
3843
3844fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3845    batch
3846        .column_by_name(name)
3847        .with_context(|| format!("missing column {name}"))?
3848        .as_any()
3849        .downcast_ref::<UInt64Array>()
3850        .with_context(|| format!("column {name} is not UInt64"))
3851}
3852
3853pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3854    let array = batch
3855        .column_by_name(name)
3856        .with_context(|| format!("missing column {name}"))?
3857        .as_any()
3858        .downcast_ref::<StringArray>()
3859        .with_context(|| format!("column {name} is not Utf8"))?;
3860    if array.is_null(row) {
3861        Ok(None)
3862    } else {
3863        Ok(Some(array.value(row).to_owned()))
3864    }
3865}
3866
3867fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3868    // Lance can return a `lance.json` column either as raw JSONB bytes
3869    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3870    // LargeUtf8), depending on the read path. Handle both.
3871    let column = batch
3872        .column_by_name(name)
3873        .with_context(|| format!("missing column {name}"))?;
3874    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3875        return if array.is_null(row) {
3876            Ok(None)
3877        } else {
3878            Ok(Some(
3879                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3880            ))
3881        };
3882    }
3883    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3884        return if array.is_null(row) {
3885            Ok(None)
3886        } else {
3887            Ok(Some(array.value(row).as_bytes().to_vec()))
3888        };
3889    }
3890    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3891        return if array.is_null(row) {
3892            Ok(None)
3893        } else {
3894            Ok(Some(array.value(row).as_bytes().to_vec()))
3895        };
3896    }
3897    anyhow::bail!("column {name} is not a JSON-compatible array")
3898}
3899
3900fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3901    let array = batch
3902        .column_by_name(name)
3903        .with_context(|| format!("missing column {name}"))?
3904        .as_any()
3905        .downcast_ref::<Int32Array>()
3906        .with_context(|| format!("column {name} is not Int32"))?;
3907    Ok(array.value(row))
3908}
3909
3910pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3911    let array = batch
3912        .column_by_name(name)
3913        .with_context(|| format!("missing column {name}"))?
3914        .as_any()
3915        .downcast_ref::<Float32Array>()
3916        .with_context(|| format!("column {name} is not Float32"))?;
3917    Ok(array.value(row))
3918}
3919
3920pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3921    let array = batch
3922        .column_by_name(name)
3923        .with_context(|| format!("missing column {name}"))?
3924        .as_any()
3925        .downcast_ref::<TimestampMicrosecondArray>()
3926        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3927    Utc.timestamp_micros(array.value(row))
3928        .single()
3929        .context("timestamp is out of range")
3930}
3931
3932fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3933    Field::new(name, data_type, nullable).with_metadata(
3934        [(
3935            "lance-schema:unenforced-primary-key".to_owned(),
3936            "true".to_owned(),
3937        )]
3938        .into(),
3939    )
3940}
3941
3942// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3943// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3944// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3945// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3946// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3947// allocated only one column_info, so the decoder's second `expect_next()`
3948// fires `"there were more fields in the schema than provided column
3949// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3950// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3951fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3952    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3953        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3954            .into_iter()
3955            .collect(),
3956    )
3957}
3958
3959fn json_field(name: &str, nullable: bool) -> Field {
3960    lance_arrow::json::json_field(name, nullable)
3961}
3962
3963fn micros(timestamp: DateTime<Utc>) -> i64 {
3964    timestamp.timestamp_micros()
3965}
3966
3967fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3968    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3969    // matches the `lance.json` extension contract. Lance's compact path
3970    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3971    // applies `decode_json -> encode_json` on this column; with proper JSONB
3972    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3973    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3974    // `try_into_dfstream`; compact still goes through the adapter).
3975    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3976    lance_arrow::json::encode_json(&text)
3977        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3978}
3979
3980fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3981    serde_json::from_slice(value).context("failed to parse JSON field")
3982}
3983
3984fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3985    if let PartKind::File {
3986        media_type,
3987        file_name,
3988        data,
3989    } = kind
3990    {
3991        let data_kind = match data {
3992            FileData::String(_) => "string",
3993            FileData::Bytes(_) => "bytes",
3994            FileData::Url(_) => "url",
3995        };
3996        return json_bytes(&serde_json::json!({
3997            "media_type": media_type,
3998            "file_name": file_name,
3999            "data_kind": data_kind,
4000        }));
4001    }
4002    let value = serde_json::to_value(kind)?;
4003    let mut object = value
4004        .as_object()
4005        .cloned()
4006        .context("part variant did not serialize to an object")?;
4007    object.remove("type");
4008    json_bytes(&object)
4009}
4010
4011fn part_kind_from_json(
4012    type_name: &str,
4013    variant_data: &[u8],
4014    file_data: Option<FileData>,
4015) -> Result<PartKind> {
4016    let mut value = json_parse::<Value>(variant_data)?;
4017    let object = value
4018        .as_object_mut()
4019        .context("part variant data is not an object")?;
4020    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
4021    if let Some(data) = file_data {
4022        object.remove("data_kind");
4023        object.insert("data".to_owned(), serde_json::to_value(data)?);
4024    }
4025    serde_json::from_value(value).context("failed to parse part kind")
4026}
4027
4028#[cfg(test)]
4029mod tests {
4030    #![allow(clippy::expect_used, clippy::unwrap_used)]
4031
4032    use super::*;
4033    use crate::{
4034        adapter::Extracted,
4035        handlers::ingest_events,
4036        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
4037    };
4038    use chrono::Utc;
4039    use serde_json::json;
4040    use tempfile::TempDir;
4041
4042    fn synthetic_session(id: &str) -> Session {
4043        Session {
4044            id: id.to_owned(),
4045            parent_session_id: None,
4046            parent_message_id: None,
4047            source_agent: "claude-code".to_owned(),
4048            created_at: Utc::now(),
4049            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4050            options: ProviderOptions::new(),
4051        }
4052    }
4053
4054    #[test]
4055    fn search_text_excludes_injected_parts() {
4056        use crate::wire::Provenance;
4057        let message = Message::User {
4058            id: "m1".to_owned(),
4059            session_id: "s1".to_owned(),
4060            timestamp: Utc::now(),
4061            options: ProviderOptions::new(),
4062        };
4063        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4064            session_id: "s1".to_owned(),
4065            id: id.to_owned(),
4066            message_id: "m1".to_owned(),
4067            ordinal: 0,
4068            provenance,
4069            options: ProviderOptions::new(),
4070            kind: PartKind::Text {
4071                text: Some(Extracted::from_test_value(text.to_owned())),
4072            },
4073        };
4074
4075        // A conversational part contributes; an injected one is excluded
4076        // (spec.md#search).
4077        let conversational = search_text(
4078            &message,
4079            &[text_part(
4080                "p1",
4081                "real human prompt",
4082                Provenance::Conversational,
4083            )],
4084        );
4085        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4086
4087        let injected = search_text(
4088            &message,
4089            &[text_part(
4090                "p2",
4091                "<task-notification>...</task-notification>",
4092                Provenance::Injected,
4093            )],
4094        );
4095        assert!(
4096            injected.is_none(),
4097            "a message whose only part is injected has null search_text"
4098        );
4099    }
4100
4101    #[test]
4102    fn chunk_ranges_splits_on_byte_budget() {
4103        assert!(chunk_ranges(&[]).is_empty());
4104        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4105
4106        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4107        assert_eq!(
4108            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4109            vec![0..1, 1..2, 2..3],
4110        );
4111
4112        // An oversized single row gets its own chunk, never an infinite loop.
4113        assert_eq!(
4114            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4115            vec![0..1, 1..2, 2..3],
4116        );
4117    }
4118
4119    #[tokio::test]
4120    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4121        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4122        // Message is dropped on the spot, with one Error outcome surfaced. The
4123        // rest of the substream continues normally - subsequent valid messages
4124        // and parts get written.
4125        let temp = TempDir::new()?;
4126        let store = Store::open_local(temp.path()).await?;
4127        let session = synthetic_session("ordering");
4128        let orphan_part = Part {
4129            session_id: session.id.clone(),
4130            id: "orphan-part".to_owned(),
4131            message_id: "missing-message".to_owned(),
4132            ordinal: 0,
4133            provenance: crate::wire::Provenance::Conversational,
4134            options: ProviderOptions::new(),
4135            kind: PartKind::Text {
4136                text: Some(Extracted::from_test_value("orphan".to_owned())),
4137            },
4138        };
4139        let valid_message = Message::User {
4140            id: "valid-message".to_owned(),
4141            session_id: session.id.clone(),
4142            timestamp: Utc::now(),
4143            options: ProviderOptions::new(),
4144        };
4145        let valid_part = Part {
4146            session_id: session.id.clone(),
4147            id: "valid-part".to_owned(),
4148            message_id: valid_message.id().to_owned(),
4149            ordinal: 0,
4150            provenance: crate::wire::Provenance::Conversational,
4151            options: ProviderOptions::new(),
4152            kind: PartKind::Text {
4153                text: Some(Extracted::from_test_value("kept".to_owned())),
4154            },
4155        };
4156
4157        let mut validator = IngestValidator::default();
4158        validator
4159            .push(&store, 0, IngestEvent::Session(session.clone()))
4160            .await?;
4161        let part_outcomes = validator
4162            .push(&store, 1, IngestEvent::Part(orphan_part))
4163            .await?;
4164        assert_eq!(part_outcomes.len(), 1);
4165        assert_eq!(part_outcomes[0].kind, "part");
4166        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4167        assert!(
4168            part_outcomes[0]
4169                .error
4170                .as_ref()
4171                .map(|e| e.message.contains("part event appeared before a message"))
4172                .unwrap_or(false),
4173            "error message must explain the ordering violation: {part_outcomes:?}"
4174        );
4175        validator
4176            .push(&store, 2, IngestEvent::Message(valid_message))
4177            .await?;
4178        validator
4179            .push(&store, 3, IngestEvent::Part(valid_part))
4180            .await?;
4181        validator.finish(&store).await?;
4182
4183        let (sessions, messages, parts) = store.row_counts().await?;
4184        assert_eq!(sessions, 1, "session committed despite the orphan part");
4185        assert_eq!(messages, 1, "valid message committed");
4186        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4187
4188        Ok(())
4189    }
4190
4191    #[tokio::test]
4192    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4193        // Per-event drop: a duplicate message id within a substream drops the
4194        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4195        // session still commits.
4196        let temp = TempDir::new()?;
4197        let store = Store::open_local(temp.path()).await?;
4198        let session = synthetic_session("duplicate-message");
4199        let first = Message::User {
4200            id: "message-1".to_owned(),
4201            session_id: session.id.clone(),
4202            timestamp: Utc::now(),
4203            options: ProviderOptions::new(),
4204        };
4205        let second = Message::Assistant {
4206            id: "message-1".to_owned(),
4207            session_id: session.id.clone(),
4208            timestamp: Utc::now(),
4209            options: ProviderOptions::new(),
4210        };
4211
4212        let mut validator = IngestValidator::default();
4213        validator
4214            .push(&store, 0, IngestEvent::Session(session.clone()))
4215            .await?;
4216        validator
4217            .push(&store, 1, IngestEvent::Message(first))
4218            .await?;
4219        let dup_outcomes = validator
4220            .push(&store, 2, IngestEvent::Message(second))
4221            .await?;
4222        assert_eq!(dup_outcomes.len(), 1);
4223        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4224        assert!(
4225            dup_outcomes[0]
4226                .error
4227                .as_ref()
4228                .map(|e| e.message.contains("duplicate message id message-1"))
4229                .unwrap_or(false),
4230            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4231        );
4232
4233        validator.finish(&store).await?;
4234        let (sessions, messages, _) = store.row_counts().await?;
4235        assert_eq!(sessions, 1, "session committed");
4236        assert_eq!(messages, 1, "only the first message committed");
4237
4238        Ok(())
4239    }
4240
4241    #[tokio::test]
4242    async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4243    -> anyhow::Result<()> {
4244        // spec.md#model-pond-options: `options.pond` is core-owned. A stored
4245        // message carries the process's host stamp (when resolvable) and never
4246        // a client-supplied value; session and part options stay untouched.
4247        let temp = TempDir::new()?;
4248        let store = Store::open_local(temp.path()).await?;
4249        let session = synthetic_session("host-provenance");
4250        let mut spoofed = ProviderOptions::new();
4251        spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4252        let message = Message::User {
4253            id: "message-1".to_owned(),
4254            session_id: session.id.clone(),
4255            timestamp: Utc::now(),
4256            options: spoofed,
4257        };
4258        let part = Part {
4259            session_id: session.id.clone(),
4260            id: "part-1".to_owned(),
4261            message_id: "message-1".to_owned(),
4262            ordinal: 0,
4263            provenance: crate::wire::Provenance::Conversational,
4264            options: ProviderOptions::new(),
4265            kind: PartKind::Text {
4266                text: Some(Extracted::from_test_value("hello".to_owned())),
4267            },
4268        };
4269
4270        let mut validator = IngestValidator::default();
4271        validator
4272            .push(&store, 0, IngestEvent::Session(session.clone()))
4273            .await?;
4274        validator
4275            .push(&store, 1, IngestEvent::Message(message))
4276            .await?;
4277        validator.push(&store, 2, IngestEvent::Part(part)).await?;
4278        validator.finish(&store).await?;
4279
4280        let stored = store
4281            .get_session(&session.id)
4282            .await?
4283            .expect("ingested session is readable");
4284        assert!(
4285            !stored.session.options.contains_key("pond"),
4286            "session rows are not stamped (attribution derives from messages)"
4287        );
4288        let stored_message = &stored.messages[0].message;
4289        match ingest_host_stamp() {
4290            Some(stamp) => {
4291                assert_eq!(
4292                    stored_message.options().get("pond"),
4293                    Some(stamp),
4294                    "stored message carries the real stamp, never the spoof"
4295                );
4296                let host = stamp
4297                    .pointer("/ingest/host")
4298                    .and_then(Value::as_object)
4299                    .expect("stamp shape is {ingest: {host: {..}}}");
4300                assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4301                assert!(
4302                    host.values()
4303                        .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4304                    "stamp fields are omitted when unavailable, never empty: {host:?}"
4305                );
4306            }
4307            None => assert!(
4308                stored_message.options().get("pond").is_none(),
4309                "with no resolvable stamp the spoofed key is still stripped"
4310            ),
4311        }
4312        assert!(
4313            !stored.messages[0].parts[0].options.contains_key("pond"),
4314            "part rows are not stamped (covered by their message's stamp)"
4315        );
4316
4317        Ok(())
4318    }
4319
4320    /// Regression: compact_files on `parts` with the blob column tripped a
4321    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4322    /// batches give compact fragments to merge; every `FileData` variant
4323    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4324    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4325    /// write batch mixes null + valid rows in the blob column - benign in
4326    /// release, irrelevant to this regression's scope.
4327    #[tokio::test(flavor = "multi_thread")]
4328    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4329        use crate::wire::{FileData, PartKind, Provenance};
4330        let temp = TempDir::new()?;
4331        let store = Store::open_local(temp.path()).await?;
4332
4333        let session = synthetic_session("compact-blob");
4334        store
4335            .upsert_sessions(std::slice::from_ref(&session))
4336            .await?;
4337
4338        let make_part = |idx: usize, kind: PartKind| Part {
4339            session_id: session.id.clone(),
4340            message_id: format!("msg-{idx}"),
4341            id: format!("part-{idx}"),
4342            ordinal: 0,
4343            provenance: Provenance::Conversational,
4344            options: ProviderOptions::new(),
4345            kind,
4346        };
4347
4348        let batch_a = vec![
4349            make_part(
4350                0,
4351                PartKind::File {
4352                    media_type: Some("text/plain".to_owned()),
4353                    file_name: Some("a.txt".to_owned()),
4354                    data: FileData::Bytes(b"alpha".to_vec()),
4355                },
4356            ),
4357            make_part(
4358                1,
4359                PartKind::File {
4360                    media_type: Some("text/plain".to_owned()),
4361                    file_name: Some("b.txt".to_owned()),
4362                    data: FileData::String("beta".to_owned()),
4363                },
4364            ),
4365        ];
4366        store.upsert_parts(&batch_a).await?;
4367
4368        let batch_b = vec![
4369            make_part(
4370                2,
4371                PartKind::File {
4372                    media_type: Some("application/octet-stream".to_owned()),
4373                    file_name: None,
4374                    data: FileData::Url("https://example.com/file".to_owned()),
4375                },
4376            ),
4377            make_part(
4378                3,
4379                PartKind::File {
4380                    media_type: Some("image/png".to_owned()),
4381                    file_name: Some("c.png".to_owned()),
4382                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4383                },
4384            ),
4385        ];
4386        store.upsert_parts(&batch_b).await?;
4387
4388        store
4389            .optimize_indices(None, &MaintenancePolicy::always_compact())
4390            .await?
4391            .into_result()?;
4392
4393        Ok(())
4394    }
4395
4396    #[tokio::test]
4397    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4398        let temp = TempDir::new()?;
4399        let store = Store::open_local(temp.path()).await?;
4400        let session = synthetic_session("blob");
4401        let message = Message::User {
4402            id: "message-1".to_owned(),
4403            session_id: session.id.clone(),
4404            timestamp: Utc::now(),
4405            options: ProviderOptions::new(),
4406        };
4407        let part = Part {
4408            session_id: session.id.clone(),
4409            id: "part-1".to_owned(),
4410            message_id: message.id().to_owned(),
4411            ordinal: 0,
4412            provenance: crate::wire::Provenance::Conversational,
4413            options: ProviderOptions::new(),
4414            kind: PartKind::File {
4415                media_type: Some("text/plain".to_owned()),
4416                file_name: Some("payload.txt".to_owned()),
4417                data: FileData::Bytes(b"pond".to_vec()),
4418            },
4419        };
4420
4421        let mut validator = IngestValidator::default();
4422        validator
4423            .push(&store, 0, IngestEvent::Session(session.clone()))
4424            .await?;
4425        validator
4426            .push(&store, 1, IngestEvent::Message(message.clone()))
4427            .await?;
4428        validator
4429            .push(&store, 2, IngestEvent::Part(part.clone()))
4430            .await?;
4431        validator.finish(&store).await?;
4432
4433        let stored = store
4434            .get_session(&session.id)
4435            .await?
4436            .expect("session should exist");
4437        let stored_part = &stored.messages[0].parts[0];
4438        assert_eq!(stored_part, &part);
4439
4440        Ok(())
4441    }
4442
4443    //
4444    // `Session.source_agent` and `Session.project` are immutable
4445    // post-first-write because `messages` denormalizes them at first
4446    // ingest; a silent overwrite would desync the denormalized
4447    // copies. pond core's `IngestValidator` probes the existing session
4448    // before the merge_insert and emits a per-row `validation_failed`
4449    // outcome with the typed field name when either changes. Other Session
4450    // fields (options, parent_session_id, created_at, parent_message_id)
4451    // re-write idempotently via merge_insert.
4452
4453    fn base_session() -> Session {
4454        Session {
4455            id: "01HXY00000000001".to_owned(),
4456            parent_session_id: None,
4457            parent_message_id: None,
4458            source_agent: "claude-code".to_owned(),
4459            created_at: Utc::now(),
4460            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4461            options: ProviderOptions::new(),
4462        }
4463    }
4464
4465    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4466        outcomes
4467            .iter()
4468            .filter(|outcome| outcome.status == target)
4469            .count()
4470    }
4471
4472    #[tokio::test(flavor = "multi_thread")]
4473    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4474    -> anyhow::Result<()> {
4475        let temp = TempDir::new()?;
4476        let store = Store::open_local(temp.path()).await?;
4477
4478        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4479        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4480
4481        let mut again = base_session();
4482        again.options.insert("title".to_owned(), json!("renamed"));
4483        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4484        assert_eq!(
4485            count_status(&second, OutcomeStatus::Error),
4486            0,
4487            "options is mutable; the re-ingest must not surface an error: {second:?}",
4488        );
4489        assert_eq!(
4490            count_status(&second, OutcomeStatus::Matched),
4491            1,
4492            "unchanged immutable fields must match-insert via merge_insert",
4493        );
4494
4495        Ok(())
4496    }
4497
4498    #[tokio::test(flavor = "multi_thread")]
4499    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4500        let temp = TempDir::new()?;
4501        let store = Store::open_local(temp.path()).await?;
4502
4503        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4504        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4505
4506        let mut tampered = base_session();
4507        tampered.source_agent = "codex-cli".to_owned();
4508        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4509        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4510        let err_row = second
4511            .iter()
4512            .find(|outcome| outcome.status == OutcomeStatus::Error)
4513            .expect("error outcome present");
4514        let err = err_row.error.as_ref().expect("error body present");
4515        assert_eq!(err.field, Some("source_agent"));
4516        assert_eq!(err.reason, Some("immutable"));
4517
4518        // The stored row stayed on the original adapter - no silent rewrite.
4519        let stored = store
4520            .get_session(&base_session().id)
4521            .await?
4522            .expect("session row survives the rejected re-ingest");
4523        assert_eq!(stored.session.source_agent, "claude-code");
4524
4525        Ok(())
4526    }
4527
4528    #[tokio::test(flavor = "multi_thread")]
4529    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4530        let temp = TempDir::new()?;
4531        let store = Store::open_local(temp.path()).await?;
4532
4533        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4534        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4535
4536        let mut tampered = base_session();
4537        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4538        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4539        let err_row = second
4540            .iter()
4541            .find(|outcome| outcome.status == OutcomeStatus::Error)
4542            .expect("project change must surface an error outcome");
4543        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4544
4545        let stored = store
4546            .get_session(&base_session().id)
4547            .await?
4548            .expect("session row survives");
4549        assert_eq!(
4550            stored.session.project.as_str(),
4551            "/home/me/proj",
4552            "stored project must remain the original",
4553        );
4554
4555        Ok(())
4556    }
4557
4558    #[tokio::test(flavor = "multi_thread")]
4559    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4560        // Regression guard: re-ingesting an existing session with NEW
4561        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4562        // on `BatchCounts`, and per-row outcomes must mark the new message
4563        // rows `Inserted` while the session row is `Matched`. The prior
4564        // implementation derived all per-row statuses from the batch-level
4565        // session inserted count, which silently flipped the new messages
4566        // into `Matched` (visible as "up to date" in the CLI bar tail).
4567        use crate::wire::Provenance;
4568        let temp = TempDir::new()?;
4569        let store = Store::open_local(temp.path()).await?;
4570        let session = base_session();
4571
4572        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4573            session_id: session.id.clone(),
4574            id: part_id.to_owned(),
4575            message_id: message_id.to_owned(),
4576            ordinal: 0,
4577            provenance: Provenance::Conversational,
4578            options: ProviderOptions::new(),
4579            kind: PartKind::Text {
4580                text: Some(Extracted::from_test_value(body.to_owned())),
4581            },
4582        };
4583        let user_message = |id: &str| Message::User {
4584            id: id.to_owned(),
4585            session_id: session.id.clone(),
4586            timestamp: Utc::now(),
4587            options: ProviderOptions::new(),
4588        };
4589
4590        // First pass: 2 messages land fresh.
4591        let mut validator = IngestValidator::default();
4592        validator
4593            .push(&store, 0, IngestEvent::Session(session.clone()))
4594            .await?;
4595        validator
4596            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4597            .await?;
4598        validator
4599            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4600            .await?;
4601        validator
4602            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4603            .await?;
4604        validator
4605            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4606            .await?;
4607        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4608        assert_eq!(first_counts.sessions_inserted, 1);
4609        assert_eq!(first_counts.messages_inserted_total, 2);
4610        assert_eq!(first_counts.messages_inserted_searchable, 2);
4611
4612        // Second pass: same session id, 3 NEW messages.
4613        let mut validator = IngestValidator::default();
4614        validator
4615            .push(&store, 0, IngestEvent::Session(session.clone()))
4616            .await?;
4617        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4618            let pid = format!("p{}", idx + 3);
4619            validator
4620                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4621                .await?;
4622            validator
4623                .push(
4624                    &store,
4625                    idx * 2 + 2,
4626                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4627                )
4628                .await?;
4629        }
4630        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4631
4632        assert_eq!(
4633            second_counts.sessions_inserted, 0,
4634            "existing session row must report as Matched, not Inserted",
4635        );
4636        assert_eq!(second_counts.sessions_matched, 1);
4637        assert_eq!(
4638            second_counts.messages_inserted_total, 3,
4639            "the three NEW messages must register as Inserted in BatchCounts",
4640        );
4641        assert_eq!(
4642            second_counts.messages_inserted_searchable, 3,
4643            "all three new messages carry conversational text -> searchable",
4644        );
4645        assert_eq!(second_counts.messages_matched_total, 0);
4646        assert_eq!(second_counts.parts_inserted, 3);
4647        assert_eq!(second_counts.parts_matched, 0);
4648
4649        // Per-row outcomes mirror the BatchCounts shape: the session row is
4650        // Matched, every new message + part row is Inserted.
4651        let session_outcome = second_outcomes
4652            .iter()
4653            .find(|outcome| outcome.kind == "session")
4654            .expect("session-row outcome present");
4655        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4656        for outcome in &second_outcomes {
4657            if outcome.kind == "message" || outcome.kind == "part" {
4658                assert_eq!(
4659                    outcome.status,
4660                    OutcomeStatus::Inserted,
4661                    "new row must be Inserted, got: {outcome:?}",
4662                );
4663            }
4664        }
4665        Ok(())
4666    }
4667
4668    /// Ingest `count` synthetic messages spread across a handful of sessions
4669    /// and projects, each with conversational `search_text`. Returns the store
4670    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4671    async fn store_with_messages(
4672        temp: &TempDir,
4673        count: usize,
4674    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4675        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4676    }
4677
4678    /// Same as [`store_with_messages`] but tests optimize with a custom
4679    /// IVF_PQ activation threshold.
4680    async fn store_with_messages_at_threshold(
4681        temp: &TempDir,
4682        count: usize,
4683        _vector_threshold: usize,
4684    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4685        let store = Store::open_local(temp.path()).await?;
4686        let sessions = 8.min(count.max(1));
4687        let mut events = Vec::new();
4688        for s in 0..sessions {
4689            events.push(IngestEvent::Session(Session {
4690                id: format!("session-{s}"),
4691                parent_session_id: None,
4692                parent_message_id: None,
4693                source_agent: "claude-code".to_owned(),
4694                created_at: Utc::now(),
4695                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4696                options: ProviderOptions::new(),
4697            }));
4698            for i in (s..count).step_by(sessions) {
4699                let message_id = format!("msg-{i}");
4700                events.push(IngestEvent::Message(Message::User {
4701                    id: message_id.clone(),
4702                    session_id: format!("session-{s}"),
4703                    timestamp: Utc::now(),
4704                    options: ProviderOptions::new(),
4705                }));
4706                events.push(IngestEvent::Part(Part {
4707                    session_id: format!("session-{s}"),
4708                    id: format!("{message_id}-part"),
4709                    message_id,
4710                    ordinal: 0,
4711                    provenance: crate::wire::Provenance::Conversational,
4712                    options: ProviderOptions::new(),
4713                    kind: PartKind::Text {
4714                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4715                    },
4716                }));
4717            }
4718        }
4719        ingest_events(&store, events).await?;
4720        let keys = (0..count)
4721            .map(|i| MessageKey {
4722                session_id: format!("session-{}", i % sessions),
4723                message_id: format!("msg-{i}"),
4724            })
4725            .collect();
4726        Ok((store, keys))
4727    }
4728
4729    /// A deterministic pseudo-random vector of the production dimension.
4730    fn synthetic_vector(seed: usize) -> Vec<f32> {
4731        let mut state = (seed as u64)
4732            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4733            .wrapping_add(1);
4734        (0..embedding_dim())
4735            .map(|_| {
4736                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4737                #[allow(clippy::cast_precision_loss)]
4738                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4739                unit - 1.0
4740            })
4741            .collect()
4742    }
4743
4744    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4745    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4746        keys.iter()
4747            .enumerate()
4748            .map(|(seed, key)| EmbeddedMessage {
4749                session_id: key.session_id.clone(),
4750                id: key.message_id.clone(),
4751                vector: synthetic_vector(seed),
4752            })
4753            .collect()
4754    }
4755
4756    fn embedding_update_batch_with_model(
4757        rows: &[EmbeddedMessage],
4758        model: &str,
4759    ) -> Result<RecordBatch> {
4760        let mut batch = embedding_update_batch(rows)?;
4761        let columns = batch
4762            .columns()
4763            .iter()
4764            .take(3)
4765            .cloned()
4766            .chain(std::iter::once(
4767                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4768            ))
4769            .collect::<Vec<_>>();
4770        batch = RecordBatch::try_new(batch.schema(), columns)?;
4771        Ok(batch)
4772    }
4773
4774    #[tokio::test]
4775    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4776        let temp = TempDir::new()?;
4777        // 4 messages cycle session-0..session-3, so `session-3` is a real
4778        // partition. Scalar-index pushdown is volume-independent: the planner
4779        // emits `ScalarIndexQuery` whenever the index exists.
4780        let (store, keys) = store_with_messages(&temp, 4).await?;
4781        store.write_embeddings(&embedded(&keys)).await?;
4782        store
4783            .optimize_indices(None, &MaintenancePolicy::always_compact())
4784            .await?
4785            .into_result()?;
4786
4787        let query = vec![0.01_f32; embedding_dim()];
4788        let plan = store
4789            .explain_vector_plan(
4790                &query,
4791                10,
4792                &Predicate::Eq("session_id", "session-3".into()),
4793                None,
4794            )
4795            .await?;
4796
4797        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4798        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4799        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4800        // and unrelated.)
4801        assert!(
4802            plan.contains("ScalarIndexQuery"),
4803            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4804        );
4805        let predicate_postfiltered = plan
4806            .lines()
4807            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4808        assert!(
4809            !predicate_postfiltered,
4810            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4811        );
4812        Ok(())
4813    }
4814
4815    #[tokio::test]
4816    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4817        let temp = TempDir::new()?;
4818        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4819
4820        // First batch: 255 vectors, one below threshold. Optimize does not
4821        // create the IVF_PQ because the trigger is not met.
4822        store.write_embeddings(&embedded(&keys[..255])).await?;
4823        store
4824            .optimize_indices_with_vector_threshold(256)
4825            .await?
4826            .into_result()?;
4827        assert!(
4828            !store
4829                .handle
4830                .messages_index_names()
4831                .await?
4832                .iter()
4833                .any(|name| name == MESSAGES_VECTOR_INDEX),
4834            "IVF_PQ must not exist below the activation threshold",
4835        );
4836
4837        // Next batch: one more vector. Total reaches 256; optimize creates
4838        // the IVF_PQ.
4839        store.write_embeddings(&embedded(&keys[255..256])).await?;
4840        store
4841            .optimize_indices_with_vector_threshold(256)
4842            .await?
4843            .into_result()?;
4844        assert!(
4845            store
4846                .handle
4847                .messages_index_names()
4848                .await?
4849                .iter()
4850                .any(|name| name == MESSAGES_VECTOR_INDEX),
4851            "optimize must create the IVF_PQ once the threshold is crossed",
4852        );
4853
4854        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4855        // non-null subset and a planted vector is retrievable.
4856        let hits = store
4857            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4858            .await?;
4859        assert!(
4860            hits.iter().any(|(key, _)| key == &keys[0]),
4861            "an embedded row is retrievable via the index",
4862        );
4863        Ok(())
4864    }
4865
4866    #[tokio::test]
4867    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4868    {
4869        let temp = TempDir::new()?;
4870        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4871        let old_rows = embedded(&keys);
4872        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4873        store
4874            .handle
4875            .merge_update(Table::Messages, old_batch, old_rows.len())
4876            .await?;
4877        store
4878            .optimize_indices_with_vector_threshold(256)
4879            .await?
4880            .into_result()?;
4881        assert!(
4882            store
4883                .handle
4884                .messages_index_names()
4885                .await?
4886                .iter()
4887                .any(|name| name == MESSAGES_VECTOR_INDEX),
4888            "IVF_PQ must exist before a model swap",
4889        );
4890        assert_eq!(store.stale_embedding_count().await?, keys.len());
4891
4892        store.drop_vector_index().await?;
4893        let mut pending = Vec::new();
4894        let stream = store.pending_or_stale_messages();
4895        tokio::pin!(stream);
4896        while let Some(row) = stream.next().await {
4897            pending.push(row?);
4898        }
4899        assert_eq!(
4900            pending.len(),
4901            keys.len(),
4902            "force stream should see stale rows"
4903        );
4904        store.write_embeddings(&embedded(&keys)).await?;
4905        assert_eq!(store.stale_embedding_count().await?, 0);
4906        store
4907            .optimize_indices_with_vector_threshold(256)
4908            .await?
4909            .into_result()?;
4910        assert!(
4911            store
4912                .handle
4913                .messages_index_names()
4914                .await?
4915                .iter()
4916                .any(|name| name == MESSAGES_VECTOR_INDEX),
4917            "optimize must rebuild IVF_PQ after force re-embed",
4918        );
4919
4920        let stream = store.pending_or_stale_messages();
4921        tokio::pin!(stream);
4922        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4923        Ok(())
4924    }
4925
4926    #[tokio::test]
4927    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4928        // Regression: `_row_last_updated_at_version` can point at a Lance
4929        // manifest version that `cleanup_old_versions` or the auto_cleanup
4930        // hook has since dropped from `Dataset::versions()`. The old code
4931        // silently dropped any session whose row-version was not in the
4932        // visible list, collapsing the staleness-skip map down to recent
4933        // commits and forcing `pond sync` to re-touch every file. The fix
4934        // falls back to the oldest still-visible commit timestamp - a
4935        // sound upper bound on the row's true ingest time.
4936        let temp = TempDir::new()?;
4937        let (store, _keys) = store_with_messages(&temp, 4).await?;
4938
4939        // Produce several distinct manifest versions on `sessions` so the
4940        // older ones become eligible for cleanup.
4941        for tag in 0..3 {
4942            let extra = synthetic_session(&format!("extra-{tag}"));
4943            store.upsert_sessions(&[extra]).await?;
4944        }
4945
4946        // Prune everything older than ~now, leaving only the latest manifest.
4947        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4948        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4949        // Lance's `delete_unverified` floor still protects in-flight files.
4950        let dataset = store.handle.dataset(Table::Sessions).await?;
4951        dataset
4952            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4953            .await
4954            .context("cleanup_old_versions failed")?;
4955
4956        let map = store.session_last_ingested_at().await?;
4957        let session_count = store.row_counts().await?.0;
4958        assert!(
4959            map.len() >= session_count,
4960            "watermark map ({}) must still cover every session ({}) after \
4961             version cleanup; an empty fallback regresses pond sync to a \
4962             full re-scan",
4963            map.len(),
4964            session_count,
4965        );
4966        Ok(())
4967    }
4968
4969    #[tokio::test]
4970    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4971        let temp = TempDir::new()?;
4972        let (store, keys) = store_with_messages(&temp, 10).await?;
4973
4974        let before = store.embedding_progress().await?;
4975        assert_eq!(before.embedded, 0);
4976        assert_eq!(before.total, 10);
4977        assert_eq!(before.model, crate::embed::model_id());
4978
4979        store.write_embeddings(&embedded(&keys[..4])).await?;
4980        let partial = store.embedding_progress().await?;
4981        assert_eq!(partial.embedded, 4);
4982        assert_eq!(partial.total, 10);
4983
4984        store.write_embeddings(&embedded(&keys[4..])).await?;
4985        let full = store.embedding_progress().await?;
4986        assert_eq!(full.embedded, 10);
4987        assert_eq!(full.total, 10);
4988        Ok(())
4989    }
4990}