Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, OptimizeProgressFn,
28        PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table, TableOptimizeOutcome, TableSizes,
29        VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session},
32};
33use url::Url;
34
35#[derive(Debug)]
36pub struct Store {
37    handle: Handle,
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
41pub struct LanceArchiveCounts {
42    pub sessions: usize,
43    pub messages: usize,
44    pub parts: usize,
45}
46
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
48pub struct LanceArchiveVersions {
49    pub sessions: u64,
50    pub messages: u64,
51    pub parts: u64,
52}
53
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
55pub struct LanceArchiveExport {
56    pub rows: LanceArchiveCounts,
57    pub source_versions: LanceArchiveVersions,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveImport {
62    pub rows: LanceArchiveCounts,
63    pub inserted: LanceArchiveCounts,
64}
65
66#[derive(Debug, Clone, Default)]
67pub struct IndexIntents {
68    pub sessions: Vec<IndexIntent>,
69    pub messages: Vec<IndexIntent>,
70    pub parts: Vec<IndexIntent>,
71}
72
73impl IndexIntents {
74    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
75        [
76            (Table::Sessions, &self.sessions),
77            (Table::Messages, &self.messages),
78            (Table::Parts, &self.parts),
79        ]
80    }
81}
82
83/// A message awaiting embedding: its primary key plus the `search_text` to
84/// embed. The vector lives on the same `messages` row, so no denormalized
85/// filter columns are needed (spec.md#session-embed-from-canonical).
86#[derive(Debug, Clone, PartialEq)]
87pub struct PendingMessage {
88    pub session_id: String,
89    pub id: String,
90    pub search_text: String,
91}
92
93/// One embedded message: a primary key and the vector to store. `pond embed`
94/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
95#[derive(Debug, Clone, PartialEq)]
96pub struct EmbeddedMessage {
97    pub session_id: String,
98    pub id: String,
99    pub vector: Vec<f32>,
100}
101
102/// Message metadata used to hydrate search hits after retriever ranking.
103#[derive(Debug, Clone, PartialEq)]
104pub struct MessageMeta {
105    pub message_id: String,
106    pub session_id: String,
107    pub role: String,
108    pub project: String,
109    pub source_agent: String,
110    pub timestamp: DateTime<Utc>,
111    pub search_text: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
115pub struct MessageKey {
116    pub session_id: String,
117    pub message_id: String,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum UpsertStatus {
122    Inserted,
123    Matched,
124}
125
126/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
127/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
128/// results so the CLI can render compaction-skipped (under writer contention)
129/// distinctly from index-build failure (real problem).
130#[derive(Debug, Default)]
131pub struct OptimizeOutcome {
132    pub tables: Vec<TableOptimizeOutcome>,
133}
134
135impl OptimizeOutcome {
136    /// True if any table's indices phase reported a non-conflict failure.
137    /// `SkippedConflict` is expected under contention and does not count.
138    pub fn any_indices_failed(&self) -> bool {
139        self.tables.iter().any(|t| t.indices.is_failed())
140    }
141
142    /// Treat any `Failed` phase as an error. Tests that don't run under
143    /// contention use this to keep their existing `.await?` style: a real
144    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
145    pub fn into_result(self) -> Result<Self> {
146        for table in &self.tables {
147            if let PhaseOutcome::Failed(error) = &table.indices {
148                anyhow::bail!(
149                    "indices phase failed on {}: {error:#}",
150                    table.table.as_str()
151                );
152            }
153            if let PhaseOutcome::Failed(error) = &table.compaction {
154                anyhow::bail!(
155                    "compaction phase failed on {}: {error:#}",
156                    table.table.as_str()
157                );
158            }
159        }
160        Ok(self)
161    }
162}
163
164/// Default manifest-retention window for `pond index optimize`'s explicit
165/// cleanup pass. Matches LanceDB's recommended OSS-operator practice
166/// (lancedb docs: performance.mdx, tables/update.mdx). Note: with
167/// `delete_unverified=false` (default), Lance protects files newer than
168/// 7 days regardless of this value (`UNVERIFIED_THRESHOLD_DAYS` in
169/// lance/dataset/cleanup.rs).
170pub fn default_cleanup_older_than() -> chrono::Duration {
171    chrono::Duration::days(1)
172}
173
174/// Cleanup parameters for the compaction phase of `pond index optimize`.
175/// `delete_unverified=true` overrides Lance's 7-day in-progress safety
176/// guard - required to reclaim files younger than 7 days, unsafe under
177/// concurrent writers.
178#[derive(Debug, Clone, Copy)]
179pub struct CleanupConfig {
180    pub older_than: chrono::Duration,
181    pub delete_unverified: bool,
182}
183
184impl Default for CleanupConfig {
185    fn default() -> Self {
186        Self {
187            older_than: default_cleanup_older_than(),
188            delete_unverified: false,
189        }
190    }
191}
192
193/// What `pond status` reports: where the data lives, total rows per table,
194/// and a per-(adapter, project) breakdown built from one `messages` scan.
195#[derive(Debug, Clone)]
196pub struct CorpusStats {
197    pub data_url: Url,
198    pub totals: RowTotals,
199    /// One entry per adapter present in the corpus. When `include_subagents`
200    /// is false (the CLI default), sub-agent rows (`source_agent` containing
201    /// `/`) are filtered out so only the main-agent sessions appear. When
202    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
203    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
204    /// this into registry order at render time so the tree matches the
205    /// discovery picker.
206    pub adapters: Vec<AdapterStats>,
207    /// Whether the rollup includes sub-agent sessions. The renderer prints a
208    /// hint about `--include-subagents` when this is false so users know the
209    /// `totals` row above counts sessions that aren't broken down below.
210    pub include_subagents: bool,
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct RowTotals {
215    pub sessions: u64,
216    pub messages: u64,
217    pub parts: u64,
218}
219
220/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
221/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
222/// without `search_text` produce no vector. `embedded` is the subset of those
223/// already carrying a vector under the current [`embed::model_id()`]. The pending
224/// backlog is `total - embedded`.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub struct EmbeddingProgress {
227    pub embedded: usize,
228    pub total: usize,
229    pub model: &'static str,
230}
231
232#[derive(Debug, Clone)]
233pub struct AdapterStats {
234    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
235    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
236    /// `include_subagents` is on.
237    pub adapter: String,
238    pub sessions: u64,
239    pub messages: u64,
240    /// Projects under this adapter, sorted by message count desc, then by
241    /// project name asc.
242    pub projects: Vec<ProjectStats>,
243}
244
245#[derive(Debug, Clone)]
246pub struct ProjectStats {
247    pub project: String,
248    pub sessions: u64,
249    pub messages: u64,
250}
251
252#[derive(Default)]
253struct GroupAccumulator {
254    messages: u64,
255    session_ids: HashSet<String>,
256}
257
258#[derive(Debug, Clone, Copy)]
259pub struct MessageWrite<'a> {
260    pub message: &'a Message,
261    pub parts: &'a [Part],
262    pub search_text: Option<&'a str>,
263}
264
265impl Store {
266    /// Open against a local filesystem URL or a remote one for which the
267    /// caller has no extra options to pass (env vars suffice). CLI verbs
268    /// that load `[storage]` from config should call
269    /// [`Store::open_with_options`] instead so the same options flow into
270    /// every dataset open and write.
271    pub async fn open(location: &Url) -> Result<Self> {
272        Ok(Self {
273            handle: Handle::open(location).await?,
274        })
275    }
276
277    /// Open with object-store options (S3 creds, region, endpoint, ...)
278    /// threaded through Lance verbatim. Keys are the standard `object_store`
279    /// config names; pond does not parse them. Empty options + default caps
280    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
281    /// config block via [`crate::substrate::RuntimeCaps`].
282    pub async fn open_with_options(
283        location: &Url,
284        storage_options: std::collections::HashMap<String, String>,
285        caps: crate::substrate::RuntimeCaps,
286    ) -> Result<Self> {
287        Ok(Self {
288            handle: Handle::open_with_options(location, storage_options, caps).await?,
289        })
290    }
291
292    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
293    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
294    /// through [`Store::open_with_options`] so the production policy is
295    /// applied; tests get the backend-aware local-FS defaults.
296    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
297        let url = config::url_for_path(path)?;
298        Self::open_with_options(
299            &url,
300            std::collections::HashMap::new(),
301            crate::substrate::RuntimeCaps::default(),
302        )
303        .await
304    }
305
306    /// Export clean, index-free Lance datasets into `dest`.
307    ///
308    /// This rewrites the visible rows of each table instead of copying the
309    /// dataset roots. The resulting manifests therefore contain no references
310    /// to the source store's `_indices`, while `messages.vector` and
311    /// `messages.embedding_model` remain ordinary data columns and are
312    /// preserved.
313    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
314        std::fs::create_dir_all(dest)
315            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
316        let (sessions, sessions_version) = self
317            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
318            .await?;
319        let (messages, messages_version) = self
320            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
321            .await?;
322        let (parts, parts_version) = self
323            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
324            .await?;
325        Ok(LanceArchiveExport {
326            rows: LanceArchiveCounts {
327                sessions,
328                messages,
329                parts,
330            },
331            source_versions: LanceArchiveVersions {
332                sessions: sessions_version,
333                messages: messages_version,
334                parts: parts_version,
335            },
336        })
337    }
338
339    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
340        let sessions_dataset =
341            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
342        let messages_dataset =
343            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
344        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
345        let (sessions, sessions_inserted) = self
346            .import_clean_table(Table::Sessions, sessions_dataset)
347            .await?;
348        let (messages, messages_inserted) = self
349            .import_clean_table(Table::Messages, messages_dataset)
350            .await?;
351        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
352        Ok(LanceArchiveImport {
353            rows: LanceArchiveCounts {
354                sessions,
355                messages,
356                parts,
357            },
358            inserted: LanceArchiveCounts {
359                sessions: sessions_inserted,
360                messages: messages_inserted,
361                parts: parts_inserted,
362            },
363        })
364    }
365
366    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
367        let dataset = self.handle.dataset(table).await?;
368        let source_version = dataset.version_id();
369        let schema = export_schema(table);
370        let mut stream = dataset
371            .scan()
372            .try_into_stream()
373            .await
374            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
375        let dest_uri = dest
376            .to_str()
377            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
378
379        let mut rows = 0usize;
380        let mut wrote = false;
381        while let Some(batch) = stream.next().await {
382            let batch = batch
383                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
384            rows += batch.num_rows();
385            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
386            let mut params = write_params_for_create();
387            if wrote {
388                params.mode = WriteMode::Append;
389            }
390            Dataset::write(reader, dest_uri, Some(params))
391                .await
392                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
393            wrote = true;
394        }
395
396        if !wrote {
397            let batch = RecordBatch::new_empty(schema.clone());
398            let reader = RecordBatchIterator::new([Ok(batch)], schema);
399            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
400                .await
401                .with_context(|| {
402                    format!("failed to write empty {} archive table", table.as_str())
403                })?;
404        }
405        Ok((rows, source_version))
406    }
407
408    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
409        let mut stream = dataset
410            .scan()
411            .try_into_stream()
412            .await
413            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
414        let mut rows = 0usize;
415        let mut inserted = 0usize;
416        while let Some(batch) = stream.next().await {
417            let batch = batch
418                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
419            let row_count = batch.num_rows();
420            rows += row_count;
421            inserted += self
422                .handle
423                .merge_insert(table, batch, row_count)
424                .await
425                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
426                as usize;
427        }
428        Ok((rows, inserted))
429    }
430
431    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
432        if sessions.is_empty() {
433            return Ok(Vec::new());
434        }
435        let batches = sessions_batches(sessions)?;
436        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
437        // Per-row outcomes; no fold here (see `pond index optimize`).
438        Ok(statuses_from_inserted(sessions.len(), inserted))
439    }
440
441    /// Batched write path used by the adapter ingest loop and by the wire
442    /// handler's final flush. Receives N completed substreams from the
443    /// validator and:
444    ///
445    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
446    ///      per session, sequentially. Sessions that fail produce one Error
447    ///      outcome and are excluded from the write batch.
448    ///   2. Deduplicates in-batch at the substream level: when two substreams
449    ///      in the same batch share a `session_id` (Claude Code's subagent
450    ///      files reuse their parent's id), the first occurrence wins. The
451    ///      second is either *merged* (same `source_agent` + `project`:
452    ///      messages/parts append, no duplicate rows) or *rejected*
453    ///      (different `project` - the subagent-vs-parent case). Row-level
454    ///      duplicates that slip past here are caught downstream by Lance's
455    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
456    ///      (invariant 17): this layer's job is preserving substream merge
457    ///      semantics, not policing the PK uniqueness Lance handles itself.
458    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
459    ///      parts) across every valid substream.
460    ///   4. Fires the three `merge_insert` calls in parallel via
461    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
462    ///      independent, so these proceed concurrently.
463    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
464    async fn upsert_session_batch(
465        &self,
466        substreams: Vec<CompletedSubstream>,
467    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
468        if substreams.is_empty() {
469            return Ok((Vec::new(), BatchCounts::default()));
470        }
471
472        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
473        let mut counts = BatchCounts::default();
474
475        // In-batch dedup. First occurrence of each session_id wins; later
476        // occurrences either merge or get rejected. Iteration order preserves
477        // original substream order so outcomes index correctly.
478        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
479        let mut by_session_id: std::collections::HashMap<String, usize> =
480            std::collections::HashMap::with_capacity(substreams.len());
481        for substream in substreams {
482            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
483                let existing = &merged[existing_idx];
484                if existing.session.source_agent != substream.session.source_agent
485                    || existing.session.project != substream.session.project
486                {
487                    // Subagent-vs-parent class. The first occurrence's
488                    // metadata stays authoritative; this substream is
489                    // rejected on the same immutable-field axis as the
490                    // storage-side check.
491                    let reason = if existing.session.source_agent != substream.session.source_agent
492                    {
493                        IngestError::ImmutableField {
494                            field: "source_agent",
495                            session_id: substream.session.id.clone(),
496                            stored: existing.session.source_agent.clone(),
497                            attempted: substream.session.source_agent.clone(),
498                        }
499                    } else {
500                        IngestError::ImmutableField {
501                            field: "project",
502                            session_id: substream.session.id.clone(),
503                            stored: (*existing.session.project).clone(),
504                            attempted: (*substream.session.project).clone(),
505                        }
506                    };
507                    let field = match &reason {
508                        IngestError::ImmutableField { field, .. } => Some(*field),
509                    };
510                    let reason_key = match field {
511                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
512                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
513                        _ => DROP_REASON_UNCATEGORIZED,
514                    };
515                    outcomes.extend(error_outcomes_for_substream(
516                        substream.session_index,
517                        &substream.session,
518                        &substream.messages,
519                        reason.to_string(),
520                        field,
521                        reason_key,
522                    ));
523                    continue;
524                }
525                // Same session, same metadata: merge messages. Dedup message
526                // ids defensively (within one batch, the validator's seen
527                // sets are per-substream so cross-substream dups can happen
528                // legally if both files re-emit the same row).
529                let existing = &mut merged[existing_idx];
530                let mut seen: std::collections::HashSet<String> = existing
531                    .messages
532                    .iter()
533                    .map(|m| m.message.id().to_owned())
534                    .collect();
535                for msg in substream.messages {
536                    if seen.insert(msg.message.id().to_owned()) {
537                        existing.messages.push(msg);
538                    }
539                }
540                continue;
541            }
542            by_session_id.insert(substream.session.id.clone(), merged.len());
543            merged.push(substream);
544        }
545
546        // Pre-existence sweep: one scan per table keyed on the batch's
547        // session_ids, capped at the substream count. Replaces the prior
548        // N-sequential `find_session` calls and gives us honest per-row
549        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
550        let session_id_values: Vec<ScalarValue> = merged
551            .iter()
552            .map(|substream| ScalarValue::String(substream.session.id.clone()))
553            .collect();
554        let existing_sessions: std::collections::HashMap<String, Session> =
555            if session_id_values.is_empty() {
556                std::collections::HashMap::new()
557            } else {
558                let batch = self
559                    .handle
560                    .scan_batch(
561                        Table::Sessions,
562                        Some(&Predicate::In("id", session_id_values.clone())),
563                        &[],
564                    )
565                    .await?;
566                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
567                for row in 0..batch.num_rows() {
568                    let session = session_from_batch(&batch, row)?;
569                    map.insert(session.id.clone(), session);
570                }
571                map
572            };
573        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
574            HashSet::new()
575        } else {
576            let batch = self
577                .handle
578                .scan_batch(
579                    Table::Messages,
580                    Some(&Predicate::In("session_id", session_id_values.clone())),
581                    &["session_id", "id"],
582                )
583                .await?;
584            let mut set = HashSet::with_capacity(batch.num_rows());
585            for row in 0..batch.num_rows() {
586                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
587                let mid = string(&batch, "id", row)?.context("message id is null")?;
588                set.insert((sid, mid));
589            }
590            set
591        };
592        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
593            HashSet::new()
594        } else {
595            let batch = self
596                .handle
597                .scan_batch(
598                    Table::Parts,
599                    Some(&Predicate::In("session_id", session_id_values)),
600                    &["session_id", "message_id", "id"],
601                )
602                .await?;
603            let mut set = HashSet::with_capacity(batch.num_rows());
604            for row in 0..batch.num_rows() {
605                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
606                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
607                let pid = string(&batch, "id", row)?.context("part id is null")?;
608                set.insert((sid, mid, pid));
609            }
610            set
611        };
612
613        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
614        for substream in merged {
615            if let Some(existing) = existing_sessions.get(&substream.session.id)
616                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
617            {
618                let field = match &failure {
619                    IngestError::ImmutableField { field, .. } => Some(*field),
620                };
621                let reason_key = match field {
622                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
623                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
624                    _ => DROP_REASON_UNCATEGORIZED,
625                };
626                outcomes.extend(error_outcomes_for_substream(
627                    substream.session_index,
628                    &substream.session,
629                    &substream.messages,
630                    failure.to_string(),
631                    field,
632                    reason_key,
633                ));
634                continue;
635            }
636            writeable.push(substream);
637        }
638
639        if writeable.is_empty() {
640            outcomes.sort_by_key(|outcome| outcome.index);
641            return Ok((outcomes, counts));
642        }
643
644        let sessions_owned: Vec<Session> = writeable
645            .iter()
646            .map(|substream| substream.session.clone())
647            .collect();
648        let message_rows: Vec<MessageBatchRow<'_>> = writeable
649            .iter()
650            .flat_map(|substream| {
651                substream.messages.iter().map(|buffered| MessageBatchRow {
652                    message: &buffered.message,
653                    source_agent: &substream.session.source_agent,
654                    project: &substream.session.project,
655                    search_text: buffered.search_text.as_deref(),
656                })
657            })
658            .collect();
659        let part_rows: Vec<Part> = writeable
660            .iter()
661            .flat_map(|substream| {
662                substream.messages.iter().flat_map(|buffered| {
663                    buffered
664                        .parts
665                        .iter()
666                        .map(|buffered_part| buffered_part.part.clone())
667                })
668            })
669            .collect();
670
671        let session_batches = sessions_batches(&sessions_owned)?;
672        let message_batches = messages_batches(&message_rows)?;
673        let part_batches = parts_batches(&part_rows)?;
674
675        // Merge_insert returns a batch-level inserted count which we cross-
676        // check against our pre-existence sets, but for per-row truth we
677        // attribute through the sets themselves (next loop). Under
678        // single-writer the two agree exactly; under a hostile concurrent
679        // writer the sets are authoritative for THIS request's wire shape -
680        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
681        // distinction informational, not behavioral.
682        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
683            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
684            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
685            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
686        )?;
687
688        for substream in &writeable {
689            outcomes.extend(success_outcomes_for_substream(
690                substream.session_index,
691                &substream.session,
692                &substream.messages,
693                &existing_sessions,
694                &existing_message_pks,
695                &existing_part_pks,
696                &mut counts,
697            ));
698        }
699
700        outcomes.sort_by_key(|outcome| outcome.index);
701        Ok((outcomes, counts))
702    }
703
704    pub async fn upsert_messages(
705        &self,
706        session: &Session,
707        messages: &[MessageWrite<'_>],
708    ) -> Result<Vec<UpsertStatus>> {
709        if messages.is_empty() {
710            return Ok(Vec::new());
711        }
712
713        let rows = messages
714            .iter()
715            .map(|write| MessageBatchRow {
716                message: write.message,
717                source_agent: &session.source_agent,
718                project: &session.project,
719                search_text: write.search_text,
720            })
721            .collect::<Vec<_>>();
722        let batches = messages_batches(&rows)?;
723        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
724        Ok(statuses_from_inserted(messages.len(), inserted))
725    }
726
727    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
728        if parts.is_empty() {
729            return Ok(Vec::new());
730        }
731        let batches = parts_batches(parts)?;
732        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
733        Ok(statuses_from_inserted(parts.len(), inserted))
734    }
735
736    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
737        let Some(session) = self.find_session(session_id).await? else {
738            return Ok(None);
739        };
740        let messages = self.messages_for_session(session_id).await?;
741        Ok(Some(SessionWithMessages { session, messages }))
742    }
743
744    /// Every session id currently in the store, unsorted.
745    pub async fn session_ids(&self) -> Result<Vec<String>> {
746        let batch = self
747            .handle
748            .scan_batch(Table::Sessions, None, &["id"])
749            .await?;
750        let mut ids = Vec::with_capacity(batch.num_rows());
751        for row in 0..batch.num_rows() {
752            if let Some(id) = string(&batch, "id", row)? {
753                ids.push(id);
754            }
755        }
756        Ok(ids)
757    }
758
759    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
760        let batch = self
761            .handle
762            .scan_batch(
763                Table::Sessions,
764                Some(&Predicate::Eq(
765                    "parent_session_id",
766                    parent_session_id.into(),
767                )),
768                &[
769                    "id",
770                    "parent_session_id",
771                    "parent_message_id",
772                    "source_agent",
773                    "created_at",
774                    "project",
775                    "options",
776                ],
777            )
778            .await?;
779        let mut sessions = Vec::with_capacity(batch.num_rows());
780        for row in 0..batch.num_rows() {
781            sessions.push(session_from_batch(&batch, row)?);
782        }
783        sessions.sort_by(|left, right| left.id.cmp(&right.id));
784        Ok(sessions)
785    }
786
787    /// `session_id -> wall-clock time of the Lance manifest version that
788    /// last wrote the row` for the per-session staleness skip
789    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
790    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
791    /// and joins it against `Dataset::versions()` for commit timestamps.
792    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
793        use lance::deps::arrow_array::UInt64Array;
794
795        let dataset = self.handle.dataset(Table::Sessions).await?;
796        let version_list = dataset.versions().await?;
797        let versions: HashMap<u64, DateTime<Utc>> = version_list
798            .iter()
799            .map(|v| (v.version, v.timestamp))
800            .collect();
801        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
802        // pruned versions from the manifest list, leaving rows whose
803        // `_row_last_updated_at_version` points at a version that no longer
804        // resolves. Those rows are still real and were ingested at some time
805        // <= the oldest still-visible version's commit timestamp - so falling
806        // back to that bound preserves a sound `mtime <= ingested` upper edge
807        // and keeps the staleness skip working after cleanup.
808        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
809
810        let scanner = self
811            .handle
812            .scan(
813                Table::Sessions,
814                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
815            )
816            .await?;
817        let mut stream = scanner.try_into_stream().await?;
818        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
819        while let Some(batch) = stream.next().await {
820            let batch = batch?;
821            let version_array = batch
822                .column_by_name("_row_last_updated_at_version")
823                .context("missing _row_last_updated_at_version column")?
824                .as_any()
825                .downcast_ref::<UInt64Array>()
826                .context("_row_last_updated_at_version is not UInt64")?;
827            for row in 0..batch.num_rows() {
828                let Some(id) = string(&batch, "id", row)? else {
829                    continue;
830                };
831                if version_array.is_null(row) {
832                    continue;
833                }
834                let version = version_array.value(row);
835                let ts = versions.get(&version).copied().or(oldest_visible_ts);
836                if let Some(ts) = ts {
837                    out.insert(id, ts);
838                }
839            }
840        }
841        Ok(out)
842    }
843
844    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
845    /// Conversational filters to `search_text IS NOT NULL`; Complete and
846    /// Verbatim scan every message. Every mode attaches compact part summaries;
847    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
848    /// lower bound (a message id); the page is bounded by `limit` and a byte
849    /// budget and never cuts mid-message.
850    pub async fn session_view(
851        &self,
852        session_id: &str,
853        params: SessionViewParams<'_>,
854    ) -> Result<GetLookup<SessionPage>> {
855        let Some(session) = self.find_session(session_id).await? else {
856            return Ok(GetLookup::NotFound);
857        };
858
859        let mut rows = match params.mode {
860            ResponseMode::Conversational => self
861                .scan_conversational_messages(session_id)
862                .await?
863                .into_iter()
864                .map(|row| ScanRow {
865                    id: row.message_id,
866                    role: row.role,
867                    timestamp: row.timestamp,
868                    text: Some(row.text.into_inner()),
869                    content: None,
870                })
871                .collect(),
872            ResponseMode::Complete | ResponseMode::Verbatim => {
873                self.scan_all_messages(session_id).await?
874            }
875        };
876        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
877
878        let start_at = match params.after_id {
879            // Append-only stream: a real anchor never vanishes, so an unknown
880            // `after_id` is a stale/mistyped client cursor, not "start over".
881            Some(after) => match rows.iter().position(|row| row.id == after) {
882                Some(idx) => idx + 1,
883                None => return Ok(GetLookup::UnknownAfterId),
884            },
885            None => 0,
886        };
887        let remaining = rows.get(start_at..).unwrap_or(&[]);
888        let emitted_count = page_by(remaining, params.limit, params.budget_bytes, |row| {
889            row.text.as_deref().map_or(0, str::len)
890        });
891        let emitted = &remaining[..emitted_count];
892        let messages_remaining = remaining.len() - emitted_count;
893        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
894
895        // Conversational/Complete only summarize parts; Verbatim inlines every
896        // part (blobs included).
897        let mut parts_by_message = match params.mode {
898            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
899            ResponseMode::Conversational | ResponseMode::Complete => {
900                self.summary_parts_for_messages(session_id, &ids).await?
901            }
902        };
903        let messages = emitted
904            .iter()
905            .map(|row| RetrievedMessage {
906                id: row.id.clone(),
907                role: row.role,
908                timestamp: row.timestamp,
909                text: row.text.clone(),
910                content: row.content.clone(),
911                parts: parts_by_message
912                    .remove(&(session_id.to_owned(), row.id.clone()))
913                    .unwrap_or_default(),
914            })
915            .collect();
916
917        Ok(GetLookup::Found(SessionPage {
918            session,
919            messages,
920            messages_remaining,
921        }))
922    }
923
924    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
925    /// the target with its full parts (paginated by `after_id` over part
926    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
927    /// `None` when no stored message carries `message_id`. Sibling parts are
928    /// carried for summarizing; the target's parts ride `target_parts`.
929    pub async fn message_view(
930        &self,
931        message_id: &str,
932        params: MessageViewParams<'_>,
933    ) -> Result<GetLookup<MessagePage>> {
934        let Some(session_id) = self.session_id_for_message(message_id).await? else {
935            return Ok(GetLookup::NotFound);
936        };
937        let Some(session) = self.find_session(&session_id).await? else {
938            return Ok(GetLookup::NotFound);
939        };
940        let mut rows = self.scan_all_messages(&session_id).await?;
941        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
942        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
943            return Ok(GetLookup::NotFound);
944        };
945
946        let start = target_pos.saturating_sub(params.context_depth);
947        let end = (target_pos + params.context_depth + 1).min(rows.len());
948        let window = &rows[start..end];
949        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
950        // The target's full parts (blobs included) ride the response; siblings
951        // are only summarized, but they share this one window scan.
952        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
953
954        let all_parts = parts_by_message
955            .remove(&(session_id.clone(), message_id.to_owned()))
956            .unwrap_or_default();
957        let start_part = match params.after_id {
958            // Exclusive over ordinals: parts are ordinal-sorted, so the first
959            // part past the anchor's ordinal is the page start. An anchor absent
960            // from the target's parts is a stale/mistyped client cursor.
961            Some(after) => match all_parts.iter().find(|part| part.id == after) {
962                Some(anchor) => all_parts
963                    .iter()
964                    .position(|part| part.ordinal > anchor.ordinal)
965                    .unwrap_or(all_parts.len()),
966                None => return Ok(GetLookup::UnknownAfterId),
967            },
968            None => 0,
969        };
970        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
971        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
972            serde_json::to_string(part).map_or(0, |json| json.len())
973        });
974        let target_parts = remaining_parts[..part_count].to_vec();
975        let target_parts_remaining = remaining_parts.len() - part_count;
976
977        let target_row = &rows[target_pos];
978        let target = RetrievedMessage {
979            id: target_row.id.clone(),
980            role: target_row.role,
981            timestamp: target_row.timestamp,
982            text: target_row.text.clone(),
983            content: target_row.content.clone(),
984            // Target structure is carried in full by `target_parts`.
985            parts: Vec::new(),
986        };
987        let siblings = window
988            .iter()
989            .enumerate()
990            .filter(|(idx, _)| start + idx != target_pos)
991            .map(|(_, row)| RetrievedMessage {
992                id: row.id.clone(),
993                role: row.role,
994                timestamp: row.timestamp,
995                text: row.text.clone(),
996                content: row.content.clone(),
997                parts: parts_by_message
998                    .get(&(session_id.clone(), row.id.clone()))
999                    .cloned()
1000                    .unwrap_or_default(),
1001            })
1002            .collect();
1003
1004        Ok(GetLookup::Found(MessagePage {
1005            session,
1006            target,
1007            target_parts,
1008            target_parts_remaining,
1009            siblings,
1010        }))
1011    }
1012
1013    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1014        let batch = self
1015            .handle
1016            .scan_batch(
1017                Table::Messages,
1018                Some(&Predicate::Eq("session_id", session_id.into())),
1019                &["id", "timestamp", "role", "search_text", "content"],
1020            )
1021            .await?;
1022        let mut rows = Vec::with_capacity(batch.num_rows());
1023        for row in 0..batch.num_rows() {
1024            let id = string(&batch, "id", row)?.context("message id is null")?;
1025            let role =
1026                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1027            let timestamp = datetime(&batch, "timestamp", row)?;
1028            rows.push(ScanRow {
1029                id,
1030                role,
1031                timestamp,
1032                text: string(&batch, "search_text", row)?,
1033                content: string(&batch, "content", row)?,
1034            });
1035        }
1036        Ok(rows)
1037    }
1038
1039    /// Conversational scan over one session: rows ordered by
1040    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1041    /// read seam (spec.md#search-prefilter-pushdown).
1042    pub async fn scan_conversational_messages(
1043        &self,
1044        session_id: &str,
1045    ) -> Result<Vec<ConversationalRow>> {
1046        let filter = Predicate::And(vec![
1047            Predicate::Eq("session_id", session_id.into()),
1048            Predicate::IsNotNull("search_text"),
1049        ]);
1050        let batch = self
1051            .handle
1052            .scan_batch(
1053                Table::Messages,
1054                Some(&filter),
1055                &["id", "timestamp", "role", "search_text"],
1056            )
1057            .await?;
1058
1059        let mut rows = Vec::with_capacity(batch.num_rows());
1060        for row in 0..batch.num_rows() {
1061            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1062            let role =
1063                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1064            let timestamp = datetime(&batch, "timestamp", row)?;
1065            let text_str = string(&batch, "search_text", row)?.context(
1066                "search_text null after IsNotNull pushdown - storage invariant violated",
1067            )?;
1068            rows.push(ConversationalRow {
1069                session_id: session_id.to_owned(),
1070                message_id,
1071                role,
1072                timestamp,
1073                text: SearchText(text_str),
1074            });
1075        }
1076        rows.sort_by(|a, b| {
1077            a.timestamp
1078                .cmp(&b.timestamp)
1079                .then_with(|| a.message_id.cmp(&b.message_id))
1080        });
1081        Ok(rows)
1082    }
1083
1084    /// Locate the session id for a stored message. Cheap when only the routing
1085    /// hint is needed - callers that need the messages use `scan_all_messages`.
1086    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1087        let batch = self
1088            .handle
1089            .scan_batch(
1090                Table::Messages,
1091                Some(&Predicate::Eq("id", message_id.into())),
1092                &["session_id"],
1093            )
1094            .await?;
1095        if batch.num_rows() == 0 {
1096            return Ok(None);
1097        }
1098        string(&batch, "session_id", 0)
1099    }
1100
1101    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1102        self.handle.row_counts().await
1103    }
1104
1105    /// Compute the per-adapter / per-project rollup that drives
1106    /// `pond status`. One scan over `messages` projecting the three
1107    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1108    /// aggregated in-memory. Bounded by the cross product of adapters and
1109    /// projects, which stays small on real corpora.
1110    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1111        let scanner = self
1112            .handle
1113            .scan(
1114                Table::Messages,
1115                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1116            )
1117            .await?;
1118        let mut stream = scanner.try_into_stream().await?;
1119        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1120        while let Some(batch) = stream.next().await {
1121            let batch = batch?;
1122            for row in 0..batch.num_rows() {
1123                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1124                let project = string(&batch, "project", row)?.unwrap_or_default();
1125                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1126                let is_subagent = source_agent.contains('/');
1127                if is_subagent && !include_subagents {
1128                    continue;
1129                }
1130                let entry = groups.entry((source_agent, project)).or_default();
1131                entry.messages += 1;
1132                entry.session_ids.insert(session_id);
1133            }
1134        }
1135
1136        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1137        let totals = RowTotals {
1138            sessions: totals_sessions as u64,
1139            messages: totals_messages as u64,
1140            parts: totals_parts as u64,
1141        };
1142
1143        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1144        for ((adapter, project), acc) in groups {
1145            by_adapter.entry(adapter).or_default().push(ProjectStats {
1146                project,
1147                sessions: acc.session_ids.len() as u64,
1148                messages: acc.messages,
1149            });
1150        }
1151
1152        let mut adapters = Vec::with_capacity(by_adapter.len());
1153        for (adapter, mut projects) in by_adapter {
1154            projects.sort_by(|a, b| {
1155                b.messages
1156                    .cmp(&a.messages)
1157                    .then_with(|| a.project.cmp(&b.project))
1158            });
1159            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1160            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1161            adapters.push(AdapterStats {
1162                adapter,
1163                sessions,
1164                messages,
1165                projects,
1166            });
1167        }
1168
1169        Ok(CorpusStats {
1170            data_url: self.handle.location().clone(),
1171            totals,
1172            adapters,
1173            include_subagents,
1174        })
1175    }
1176
1177    /// Write a batch of embeddings into `messages`: set `vector` and
1178    /// `embedding_model` on each row by `(session_id, id)`
1179    /// (spec.md#session-embed-from-canonical). The column update goes through the
1180    /// write seam and lands as a new manifest version (`append-only`).
1181    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1182        if rows.is_empty() {
1183            return Ok(());
1184        }
1185        let batch = embedding_update_batch(rows)?;
1186        self.handle
1187            .merge_update(Table::Messages, batch, rows.len())
1188            .await?;
1189        Ok(())
1190    }
1191
1192    /// Stream the backlog of messages needing embedding: rows with `search_text`
1193    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1194    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1195        try_stream! {
1196            let filter = Predicate::And(vec![
1197                Predicate::IsNull("vector"),
1198                Predicate::IsNotNull("search_text"),
1199            ]);
1200            let projection: &[&str] = &["session_id", "id", "search_text"];
1201            let scanner = self
1202                .handle
1203                .scan(
1204                    Table::Messages,
1205                    ScanOpts::with_predicate_and_projection(&filter, projection),
1206                )
1207                .await?;
1208            let mut batches = scanner
1209                .try_into_stream()
1210                .await
1211                .context("failed to open messages stream")?;
1212            while let Some(batch) = batches.next().await {
1213                let batch = batch?;
1214                for row in 0..batch.num_rows() {
1215                    yield PendingMessage {
1216                        session_id: string(&batch, "session_id", row)?
1217                            .context("session_id is null")?,
1218                        id: string(&batch, "id", row)?.context("message id is null")?,
1219                        search_text: string(&batch, "search_text", row)?
1220                            .context("search_text is null")?,
1221                    };
1222                }
1223            }
1224        }
1225    }
1226
1227    /// Stream messages that are either never embedded or stale under the
1228    /// current model. `pond embed --force` feeds this to the same unconditional
1229    /// merge_update as the normal backlog; the filter makes that semantically
1230    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1231    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1232        try_stream! {
1233            let filter = Predicate::And(vec![
1234                Predicate::IsNotNull("search_text"),
1235                Predicate::Or(vec![
1236                    Predicate::IsNull("vector"),
1237                    Predicate::Ne("embedding_model", embed::model_id().into()),
1238                ]),
1239            ]);
1240            let projection: &[&str] = &["session_id", "id", "search_text"];
1241            let scanner = self
1242                .handle
1243                .scan(
1244                    Table::Messages,
1245                    ScanOpts::with_predicate_and_projection(&filter, projection),
1246                )
1247                .await?;
1248            let mut batches = scanner
1249                .try_into_stream()
1250                .await
1251                .context("failed to open pending-or-stale messages stream")?;
1252            while let Some(batch) = batches.next().await {
1253                let batch = batch?;
1254                for row in 0..batch.num_rows() {
1255                    yield PendingMessage {
1256                        session_id: string(&batch, "session_id", row)?
1257                            .context("session_id is null")?,
1258                        id: string(&batch, "id", row)?.context("message id is null")?,
1259                        search_text: string(&batch, "search_text", row)?
1260                            .context("search_text is null")?,
1261                    };
1262                }
1263            }
1264        }
1265    }
1266
1267    /// BM25 full-text retriever over `messages.search_text`.
1268    pub async fn fts_search(
1269        &self,
1270        query: &str,
1271        limit: usize,
1272        filter: &Predicate,
1273    ) -> Result<Vec<(MessageKey, f32)>> {
1274        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1275        scanner.full_text_search(
1276            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1277        )?;
1278        // Lance ships an autoprojection that silently appends `_score` to FTS
1279        // output when the projection omits it. That behavior is going away;
1280        // we opt into the future explicit-projection contract here so the
1281        // scanner stops emitting a per-call deprecation warning, and we list
1282        // `_score` ourselves since the loop below reads it.
1283        scanner.disable_scoring_autoprojection();
1284        scanner.project(&["session_id", "id", "_score"])?;
1285        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1286        let batch = scanner.try_into_batch().await?;
1287        let mut hits = Vec::with_capacity(batch.num_rows());
1288        for row in 0..batch.num_rows() {
1289            let key = MessageKey {
1290                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1291                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1292            };
1293            hits.push((key, float32(&batch, "_score", row)?));
1294        }
1295        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1296        // order, which varies between runs and across calls with different pool
1297        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1298        // different orderings at the same tied score). Without an explicit
1299        // tiebreak the downstream RRF dedup-rank for a tied target session can
1300        // flip session-to-session, making fusion outcomes nondeterministic.
1301        // Sort by `score desc`, then `(session_id, message_id)` asc.
1302        hits.sort_by(|left, right| {
1303            right
1304                .1
1305                .partial_cmp(&left.1)
1306                .unwrap_or(std::cmp::Ordering::Equal)
1307                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1308                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1309        });
1310        Ok(hits)
1311    }
1312
1313    /// Whether any `messages` row carries a vector (spec.md#search) - the
1314    /// signal that flips search from FTS-only to hybrid. The single-active-
1315    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1316    /// vector belongs to the current model.
1317    pub async fn has_embeddings(&self) -> Result<bool> {
1318        let scope = Predicate::IsNotNull("vector");
1319        let mut scanner = self
1320            .handle
1321            .scan(
1322                Table::Messages,
1323                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1324            )
1325            .await?;
1326        scanner.limit(Some(1), None)?;
1327        let batch = scanner.try_into_batch().await?;
1328        Ok(batch.num_rows() > 0)
1329    }
1330
1331    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1332    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1333    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1334    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1335    /// skip them anyway). The single-active-model invariant lets pond drop
1336    /// the per-row model filter: every non-null vector belongs to the current
1337    /// model.
1338    pub async fn vector_search(
1339        &self,
1340        query: &[f32],
1341        limit: usize,
1342        filter: &Predicate,
1343        search: Option<&config::SearchConfig>,
1344    ) -> Result<Vec<(MessageKey, f32)>> {
1345        let scope = embedded_scope(filter);
1346        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1347        let key = Float32Array::from(query.to_vec());
1348        scanner.nearest("vector", &key, limit)?;
1349        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1350            scanner.nprobes(nprobes);
1351        }
1352        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1353            scanner.refine(refine_factor);
1354        }
1355        // Mirror the explicit-projection contract from `fts_search`: opt out
1356        // of `_distance` autoprojection and list it ourselves since the loop
1357        // below reads it.
1358        scanner.disable_scoring_autoprojection();
1359        scanner.project(&["session_id", "id", "_distance"])?;
1360        let batch = scanner.try_into_batch().await?;
1361        let mut hits = Vec::with_capacity(batch.num_rows());
1362        for row in 0..batch.num_rows() {
1363            let key = MessageKey {
1364                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1365                message_id: string(&batch, "id", row)?.context("message id is null")?,
1366            };
1367            hits.push((key, float32(&batch, "_distance", row)?));
1368        }
1369        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1370        // emit hits with effectively identical `_distance` in fragment-dependent
1371        // order, which makes RRF dedup-ranks nondeterministic for tied
1372        // neighbors. Sort by distance asc (smaller = more similar), then by
1373        // `(session_id, message_id)` asc.
1374        hits.sort_by(|left, right| {
1375            left.1
1376                .partial_cmp(&right.1)
1377                .unwrap_or(std::cmp::Ordering::Equal)
1378                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1379                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1380        });
1381        Ok(hits)
1382    }
1383
1384    /// The DataFusion plan string for a filtered vector scan - the
1385    /// `search-prefilter-pushdown` regression guard reads it.
1386    pub async fn explain_vector_plan(
1387        &self,
1388        query: &[f32],
1389        limit: usize,
1390        filter: &Predicate,
1391        search: Option<&config::SearchConfig>,
1392    ) -> Result<String> {
1393        let scope = embedded_scope(filter);
1394        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1395        let key = Float32Array::from(query.to_vec());
1396        scanner.nearest("vector", &key, limit)?;
1397        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1398            scanner.nprobes(nprobes);
1399        }
1400        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1401            scanner.refine(refine_factor);
1402        }
1403        scanner
1404            .explain_plan(true)
1405            .await
1406            .context("explain_plan failed")
1407    }
1408
1409    pub async fn explain_fts_plan(
1410        &self,
1411        query: &str,
1412        limit: usize,
1413        filter: &Predicate,
1414    ) -> Result<String> {
1415        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1416        scanner.full_text_search(
1417            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1418        )?;
1419        scanner.project(&["session_id", "id"])?;
1420        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1421        scanner
1422            .explain_plan(true)
1423            .await
1424            .context("explain_plan failed")
1425    }
1426
1427    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1428    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1429        if keys.is_empty() {
1430            return Ok(Vec::new());
1431        }
1432        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1433        let session_ids = keys
1434            .iter()
1435            .map(|key| key.session_id.clone())
1436            .collect::<Vec<_>>();
1437        let message_ids = keys
1438            .iter()
1439            .map(|key| key.message_id.clone())
1440            .collect::<Vec<_>>();
1441        let predicate = Predicate::And(vec![
1442            in_predicate("session_id", &session_ids),
1443            in_predicate("id", &message_ids),
1444        ]);
1445        let batch = self
1446            .handle
1447            .scan_batch(
1448                Table::Messages,
1449                Some(&predicate),
1450                &[
1451                    "id",
1452                    "session_id",
1453                    "role",
1454                    "project",
1455                    "source_agent",
1456                    "timestamp",
1457                    "search_text",
1458                ],
1459            )
1460            .await?;
1461        let mut metas = Vec::with_capacity(batch.num_rows());
1462        for row in 0..batch.num_rows() {
1463            let message_id = string(&batch, "id", row)?.context("id is null")?;
1464            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1465            if !wanted.contains(&MessageKey {
1466                session_id: session_id.clone(),
1467                message_id: message_id.clone(),
1468            }) {
1469                continue;
1470            }
1471            metas.push(MessageMeta {
1472                message_id,
1473                session_id,
1474                role: string(&batch, "role", row)?.context("role is null")?,
1475                project: string(&batch, "project", row)?.context("project is null")?,
1476                source_agent: string(&batch, "source_agent", row)?
1477                    .context("source_agent is null")?,
1478                timestamp: datetime(&batch, "timestamp", row)?,
1479                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1480            });
1481        }
1482        Ok(metas)
1483    }
1484
1485    /// Total message count per session, for search session summaries.
1486    pub async fn session_message_counts(
1487        &self,
1488        session_ids: &[String],
1489    ) -> Result<BTreeMap<String, usize>> {
1490        if session_ids.is_empty() {
1491            return Ok(BTreeMap::new());
1492        }
1493        let dataset = self.handle.dataset(Table::Messages).await?;
1494        let mut tasks = tokio::task::JoinSet::new();
1495        for session_id in session_ids {
1496            let dataset = dataset.clone();
1497            let session_id = session_id.clone();
1498            tasks.spawn(async move {
1499                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1500                let count = dataset.count_rows(Some(filter)).await?;
1501                anyhow::Ok((session_id, count))
1502            });
1503        }
1504        let mut counts = BTreeMap::new();
1505        while let Some(joined) = tasks.join_next().await {
1506            let (session_id, count) = joined.context("session count task panicked")??;
1507            counts.insert(session_id, count);
1508        }
1509        Ok(counts)
1510    }
1511
1512    /// Rows appended to `messages` since the FTS index was last optimized.
1513    /// A missing index reports the whole table; the query is manifest-only.
1514    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1515        self.handle
1516            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1517            .await
1518    }
1519
1520    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1521    /// was last optimized. Below
1522    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1523    /// must read [`embedding_progress`](Self::embedding_progress) too and
1524    /// distinguish "index not built yet" from "index trails data".
1525    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1526        self.handle
1527            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1528            .await
1529    }
1530
1531    /// Embedding coverage: how many `messages` rows carry a vector and how
1532    /// many are still eligible. Drives the `pond status` embeddings line and
1533    /// the `pond embed` progress bar's known total. `embedded` reads the
1534    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1535    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1536    /// `embedding_model` column.
1537    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1538        let dataset = self.handle.dataset(Table::Messages).await?;
1539        let embedded = dataset
1540            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1541            .await?;
1542        let total = dataset
1543            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1544            .await?;
1545        Ok(EmbeddingProgress {
1546            embedded,
1547            total,
1548            model: embed::model_id(),
1549        })
1550    }
1551
1552    /// Count rows whose `embedding_model` is not the currently configured
1553    /// model AND whose `vector` is still populated - the signal `pond embed`
1554    /// uses to detect a model swap and require `--force`.
1555    pub async fn stale_embedding_count(&self) -> Result<usize> {
1556        let dataset = self.handle.dataset(Table::Messages).await?;
1557        dataset
1558            .count_rows(Some(
1559                Predicate::And(vec![
1560                    Predicate::IsNotNull("vector"),
1561                    Predicate::Ne("embedding_model", embed::model_id().into()),
1562                ])
1563                .to_lance(),
1564            ))
1565            .await
1566            .map_err(Into::into)
1567    }
1568
1569    /// Run the per-table maintenance cycle (compact + indices) across every
1570    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1571    /// and compaction commit independently, so a hot writer that starves
1572    /// compaction on one table does not abort the index work the operator
1573    /// asked for on other tables (or even on the same table).
1574    pub async fn optimize_indices(
1575        &self,
1576        progress: Option<OptimizeProgressFn>,
1577        cleanup: Option<CleanupConfig>,
1578    ) -> Result<OptimizeOutcome> {
1579        let cleanup = cleanup.unwrap_or_default();
1580        let policy = pond_index_intents();
1581        let mut tables = Vec::with_capacity(3);
1582        for (table, intents) in policy.all() {
1583            let outcome = self
1584                .handle
1585                .optimize_table(table, intents, progress.as_ref(), cleanup)
1586                .await;
1587            tables.push(outcome);
1588        }
1589        Ok(OptimizeOutcome { tables })
1590    }
1591
1592    /// Fold trailing fragments into existing indices across every table,
1593    /// without running compaction. Used by `pond embed`'s tail so newly
1594    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1595    /// without paying the compaction retry budget while embed itself may
1596    /// still be writing in a sibling process.
1597    pub async fn build_indices_only(
1598        &self,
1599        progress: Option<OptimizeProgressFn>,
1600    ) -> Result<OptimizeOutcome> {
1601        let policy = pond_index_intents();
1602        let mut tables = Vec::with_capacity(3);
1603        for (table, intents) in policy.all() {
1604            let indices = self
1605                .handle
1606                .optimize_table_indices_only(table, intents, progress.as_ref())
1607                .await;
1608            tables.push(TableOptimizeOutcome {
1609                table,
1610                indices,
1611                compaction: PhaseOutcome::NotAttempted,
1612            });
1613        }
1614        Ok(OptimizeOutcome { tables })
1615    }
1616
1617    #[cfg(test)]
1618    async fn optimize_indices_with_vector_threshold(
1619        &self,
1620        vector_threshold: usize,
1621    ) -> Result<OptimizeOutcome> {
1622        let cleanup = CleanupConfig::default();
1623        let policy = pond_index_intents_with_vector_threshold(vector_threshold);
1624        let mut tables = Vec::with_capacity(3);
1625        for (table, intents) in policy.all() {
1626            let outcome = self
1627                .handle
1628                .optimize_table(table, intents, None, cleanup)
1629                .await;
1630            tables.push(outcome);
1631        }
1632        Ok(OptimizeOutcome { tables })
1633    }
1634
1635    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1636        let policy = pond_index_intents();
1637        let mut matched = false;
1638        for (table, intents) in policy.all() {
1639            for intent in intents {
1640                if intent_name.is_none_or(|name| name == intent.name) {
1641                    matched = true;
1642                    self.handle.rebuild_index(table, intent).await?;
1643                }
1644            }
1645        }
1646        if let Some(name) = intent_name
1647            && !matched
1648        {
1649            anyhow::bail!("unknown index intent {name:?}");
1650        }
1651        Ok(())
1652    }
1653
1654    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1655        let policy = pond_index_intents();
1656        let mut statuses = Vec::new();
1657        for (table, intents) in policy.all() {
1658            statuses.extend(self.handle.index_status(table, intents).await?);
1659        }
1660        Ok(statuses)
1661    }
1662
1663    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1664    /// --force` before re-bootstrapping under a different model. Silent
1665    /// when the index does not exist.
1666    pub async fn drop_vector_index(&self) -> Result<()> {
1667        match self
1668            .handle
1669            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1670            .await
1671        {
1672            Ok(()) => Ok(()),
1673            Err(error) => {
1674                let msg = error.to_string();
1675                if msg.contains("not found") || msg.contains("does not exist") {
1676                    Ok(())
1677                } else {
1678                    Err(error)
1679                }
1680            }
1681        }
1682    }
1683
1684    /// On-disk byte totals per dataset, sized through Lance's object store
1685    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1686    pub async fn table_sizes(&self) -> Result<TableSizes> {
1687        self.handle.table_sizes().await
1688    }
1689
1690    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1691        let batch = self
1692            .handle
1693            .scan_batch(
1694                Table::Sessions,
1695                Some(&Predicate::Eq("id", session_id.into())),
1696                &[],
1697            )
1698            .await?;
1699        if batch.num_rows() == 0 {
1700            Ok(None)
1701        } else {
1702            Ok(Some(session_from_batch(&batch, 0)?))
1703        }
1704    }
1705
1706    /// Fetch the stored vector for one message, for `similar_to`-style
1707    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1708    /// embedded, or when no message has that id. Vectors are stored Float16
1709    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1710    /// widen here. Cheap rare op - one predicate scan, no index.
1711    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1712        let batch = self
1713            .handle
1714            .scan_batch(
1715                Table::Messages,
1716                Some(&Predicate::Eq("id", message_id.into())),
1717                &["vector"],
1718            )
1719            .await?;
1720        if batch.num_rows() == 0 {
1721            return Ok(None);
1722        }
1723        let column = batch
1724            .column(0)
1725            .as_any()
1726            .downcast_ref::<FixedSizeListArray>();
1727        let Some(list) = column else {
1728            return Ok(None);
1729        };
1730        if list.is_null(0) {
1731            return Ok(None);
1732        }
1733        let values = list.value(0);
1734        let halves = values
1735            .as_any()
1736            .downcast_ref::<Float16Array>()
1737            .context("messages.vector inner array is not Float16")?;
1738        let widened = (0..halves.len())
1739            .map(|i| halves.value(i).to_f32())
1740            .collect();
1741        Ok(Some(widened))
1742    }
1743
1744    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1745        let batch = self
1746            .handle
1747            .scan_batch(
1748                Table::Messages,
1749                Some(&Predicate::Eq("session_id", session_id.into())),
1750                &[
1751                    "session_id",
1752                    "id",
1753                    "timestamp",
1754                    "role",
1755                    "content",
1756                    "options",
1757                ],
1758            )
1759            .await?;
1760        let mut messages = Vec::with_capacity(batch.num_rows());
1761        for row in 0..batch.num_rows() {
1762            messages.push(message_from_batch(&batch, row)?);
1763        }
1764        messages.sort_by(|left, right| {
1765            left.timestamp()
1766                .cmp(&right.timestamp())
1767                .then_with(|| left.id().cmp(right.id()))
1768        });
1769
1770        let message_ids = messages
1771            .iter()
1772            .map(|message| message.id().to_owned())
1773            .collect::<Vec<_>>();
1774        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1775
1776        Ok(messages
1777            .into_iter()
1778            .map(|message| {
1779                let key = (message.session_id().to_owned(), message.id().to_owned());
1780                let parts = parts_by_message.remove(&key).unwrap_or_default();
1781                MessageWithParts { message, parts }
1782            })
1783            .collect())
1784    }
1785
1786    /// Every part of these messages, full fidelity (file blobs included). The
1787    /// canonical read primitive - restore/export, verbatim mode, and the
1788    /// message-mode target all need the complete set.
1789    pub async fn parts_for_messages(
1790        &self,
1791        session_id: &str,
1792        message_ids: &[String],
1793    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1794        self.scan_parts(session_id, message_ids, None).await
1795    }
1796
1797    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1798    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1799    /// nothing. For the summary-only reads (conversational/complete session
1800    /// views, search hits) - it never feeds restore/export.
1801    pub async fn summary_parts_for_messages(
1802        &self,
1803        session_id: &str,
1804        message_ids: &[String],
1805    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1806        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1807            .await
1808    }
1809
1810    async fn scan_parts(
1811        &self,
1812        session_id: &str,
1813        message_ids: &[String],
1814        part_types: Option<&[&str]>,
1815    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1816        if message_ids.is_empty() {
1817            return Ok(BTreeMap::new());
1818        }
1819        let mut clauses = vec![
1820            Predicate::Eq("session_id", session_id.into()),
1821            in_predicate("message_id", message_ids),
1822        ];
1823        if let Some(types) = part_types {
1824            clauses.push(Predicate::In(
1825                "type",
1826                types.iter().map(|&t| t.into()).collect(),
1827            ));
1828        }
1829        let predicate = Predicate::And(clauses);
1830        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1831        let mut scanner = self
1832            .handle
1833            .scan(
1834                Table::Parts,
1835                ScanOpts::with_predicate_and_projection(
1836                    &predicate,
1837                    &[
1838                        "session_id",
1839                        "message_id",
1840                        "id",
1841                        "ordinal",
1842                        "type",
1843                        "provenance",
1844                        "variant_data",
1845                        "options",
1846                    ],
1847                ),
1848            )
1849            .await?;
1850        scanner.with_row_address();
1851        let batch = scanner.try_into_batch().await.context("scan failed")?;
1852        let row_addresses = uint64(&batch, "_rowaddr")?;
1853        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1854        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1855        for row in 0..batch.num_rows() {
1856            if string(&batch, "type", row)?.as_deref() == Some("file") {
1857                let variant_data =
1858                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1859                file_rows.push((row, row_addresses.value(row), variant_data));
1860            }
1861        }
1862        if !file_rows.is_empty() {
1863            let addresses = file_rows
1864                .iter()
1865                .map(|(_, address, _)| *address)
1866                .collect::<Vec<_>>();
1867            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1868            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1869                // Legacy blob (lance-encoding:blob): payload is bytes; the
1870                // url variant stored its URL as UTF-8 bytes, recovered via
1871                // `file_data_from_blob`'s `data_kind = "url"` branch.
1872                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1873                file_payloads.insert(row, payload);
1874            }
1875        }
1876        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1877        for row in 0..batch.num_rows() {
1878            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1879            parts_by_message
1880                .entry((part.session_id.clone(), part.message_id.clone()))
1881                .or_default()
1882                .push(part);
1883        }
1884        for parts in parts_by_message.values_mut() {
1885            parts.sort_by_key(|part| part.ordinal);
1886        }
1887        Ok(parts_by_message)
1888    }
1889}
1890
1891#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1892#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1893pub enum IngestEvent {
1894    Session(Session),
1895    Message(Message),
1896    Part(Part),
1897}
1898
1899/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1900/// The wire layer (`pond_ingest`) instead returns per-row results; the
1901/// aggregate is derived from those at the wire boundary.
1902///
1903/// Fields are bucketed by population so the summary never conflates "100
1904/// validator-rejected rows in 1 bad session" with "100 separate failures."
1905/// The shape is set by spec.md#adapter-integrity-event-ordering.
1906#[derive(Debug, Clone, PartialEq, Eq, Default)]
1907pub struct IngestSummary {
1908    /// Rows actually written to Lance, summed across all three tables.
1909    /// Use the per-table fields below for user-facing counts; this stays
1910    /// for `accepted()` and existing wire callers.
1911    pub inserted: usize,
1912    /// Rows that already existed (merge_insert no-op match).
1913    pub matched: usize,
1914    /// Session rows inserted this pass.
1915    pub sessions_inserted: usize,
1916    /// Message rows inserted this pass (total - includes tool calls,
1917    /// tool results, and other non-searchable messages).
1918    pub messages_inserted_total: usize,
1919    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1920    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1921    /// count in `pond sync` / `pond status` reads this field.
1922    pub messages_inserted_searchable: usize,
1923    /// Part rows inserted this pass.
1924    pub parts_inserted: usize,
1925    /// Session rows already-present (merge_insert matched).
1926    pub sessions_matched: usize,
1927    /// Message rows already-present (merge_insert matched), total.
1928    pub messages_matched_total: usize,
1929    /// Subset of `messages_matched_total` with `search_text`.
1930    pub messages_matched_searchable: usize,
1931    /// Part rows already-present.
1932    pub parts_matched: usize,
1933    /// Events the validator dropped under per-event-drop policy (ordering
1934    /// violation, orphan part, mismatched parent, adapter parse failure,
1935    /// duplicate-id collision, ...). Counted by event, not by session: a
1936    /// session with one bad part stays in this bucket as 1, not as "the
1937    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1938    /// own emissions upstream when source replay is expected; the
1939    /// validator's in-batch HashSet is a safety net, not a feature
1940    /// adapters may rely on. If this bucket grows on a clean adapter,
1941    /// inspect `drop_reasons` for the top contributors.
1942    pub dropped_events: usize,
1943    /// Sessions whose Session-level invariants (immutable `source_agent` /
1944    /// `project` against the stored row) failed at flush time and
1945    /// whose substream got rejected wholesale. Always small relative to
1946    /// `inserted`; if not, there's a real problem to investigate.
1947    pub dropped_sessions: usize,
1948    /// Files the adapter couldn't decode at all (no Session header
1949    /// extractable: empty `.jsonl`, missing required field).
1950    pub skipped_files: usize,
1951    /// Files that produced no importable session and were benignly skipped:
1952    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1953    /// metadata file), or an unextractable header. Never an error or a drop;
1954    /// the underlying cause is logged at `-vv` (debug) verbosity.
1955    pub skipped_empty: usize,
1956    /// Sessions short-circuited via the per-session staleness skip
1957    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1958    /// pond last wrote that session's row, so re-decode was bypassed.
1959    pub skipped_fresh: usize,
1960    /// Storage-layer failures whose retries were exhausted (commit
1961    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1962    /// runs.
1963    pub storage_errors: usize,
1964    /// Oversized values truncated to a bounded sentinel at the seam
1965    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1966    pub truncated_values: usize,
1967    /// Histogram of stable reason keys for the combined `dropped_events +
1968    /// dropped_sessions` populations. Keys are `&'static str` (see the
1969    /// `DROP_REASON_*` constants) so consumers can match by identity.
1970    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1971    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1972    pub drop_reasons: BTreeMap<&'static str, usize>,
1973}
1974
1975/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
1976/// the per-row `RowError::reason_key`. `&'static str` so consumers can
1977/// match by identity rather than prose. Adding a new variant: pick a short
1978/// snake_case identifier, route it from the validator/adapter, and update
1979/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
1980pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1981pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1982pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1983pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1984pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1985pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1986pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1987pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1988pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1989pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1990pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1991
1992/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
1993/// returned counts together with the pre-existence sets captured by
1994/// `upsert_session_batch`. Folded into a per-sync summary via
1995/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
1996/// is a no-op write, so the inserted/matched split is informational - we still
1997/// surface it because both `pond sync` and `pond_ingest` clients reconcile
1998/// against "which rows landed this call."
1999#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2000pub struct BatchCounts {
2001    pub sessions_inserted: usize,
2002    pub sessions_matched: usize,
2003    pub messages_inserted_total: usize,
2004    pub messages_inserted_searchable: usize,
2005    pub messages_matched_total: usize,
2006    pub messages_matched_searchable: usize,
2007    pub parts_inserted: usize,
2008    pub parts_matched: usize,
2009}
2010
2011impl IngestSummary {
2012    pub fn accepted(&self) -> usize {
2013        self.inserted + self.matched
2014    }
2015
2016    /// Sole writer of the per-table counters on the CLI batched flush path.
2017    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2018    /// both for the same rows would double-count.
2019    pub fn add_batch(&mut self, counts: &BatchCounts) {
2020        self.sessions_inserted += counts.sessions_inserted;
2021        self.sessions_matched += counts.sessions_matched;
2022        self.messages_inserted_total += counts.messages_inserted_total;
2023        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2024        self.messages_matched_total += counts.messages_matched_total;
2025        self.messages_matched_searchable += counts.messages_matched_searchable;
2026        self.parts_inserted += counts.parts_inserted;
2027        self.parts_matched += counts.parts_matched;
2028        self.inserted +=
2029            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2030        self.matched +=
2031            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2032    }
2033
2034    /// Sum every counter from `other` into `self`. Used by the multi-source
2035    /// `pond sync` loop so adding a new field to this struct doesn't silently
2036    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2037    pub fn merge(&mut self, other: &Self) {
2038        self.inserted += other.inserted;
2039        self.matched += other.matched;
2040        self.sessions_inserted += other.sessions_inserted;
2041        self.messages_inserted_total += other.messages_inserted_total;
2042        self.messages_inserted_searchable += other.messages_inserted_searchable;
2043        self.parts_inserted += other.parts_inserted;
2044        self.sessions_matched += other.sessions_matched;
2045        self.messages_matched_total += other.messages_matched_total;
2046        self.messages_matched_searchable += other.messages_matched_searchable;
2047        self.parts_matched += other.parts_matched;
2048        self.dropped_events += other.dropped_events;
2049        self.dropped_sessions += other.dropped_sessions;
2050        self.skipped_files += other.skipped_files;
2051        self.skipped_empty += other.skipped_empty;
2052        self.skipped_fresh += other.skipped_fresh;
2053        self.storage_errors += other.storage_errors;
2054        self.truncated_values += other.truncated_values;
2055        for (key, value) in &other.drop_reasons {
2056            *self.drop_reasons.entry(key).or_insert(0) += value;
2057        }
2058    }
2059
2060    /// Same dispatch as [`Self::add_outcomes`] but ignores
2061    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2062    /// via [`Self::add_batch`] and uses this method to attribute per-row
2063    /// `Error` outcomes from the same flush.
2064    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2065        for outcome in outcomes {
2066            if !matches!(outcome.status, OutcomeStatus::Error) {
2067                continue;
2068            }
2069            if outcome.kind == "session" {
2070                self.dropped_sessions += 1;
2071            } else {
2072                self.dropped_events += 1;
2073            }
2074            let reason = outcome
2075                .error
2076                .as_ref()
2077                .and_then(|error| error.reason_key)
2078                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2079            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2080        }
2081    }
2082
2083    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2084        for outcome in outcomes {
2085            match outcome.status {
2086                OutcomeStatus::Inserted => {
2087                    self.inserted += 1;
2088                    match outcome.kind {
2089                        "session" => self.sessions_inserted += 1,
2090                        "message" => {
2091                            self.messages_inserted_total += 1;
2092                            if outcome.searchable {
2093                                self.messages_inserted_searchable += 1;
2094                            }
2095                        }
2096                        "part" => self.parts_inserted += 1,
2097                        _ => {}
2098                    }
2099                }
2100                OutcomeStatus::Matched => {
2101                    self.matched += 1;
2102                    match outcome.kind {
2103                        "session" => self.sessions_matched += 1,
2104                        "message" => {
2105                            self.messages_matched_total += 1;
2106                            if outcome.searchable {
2107                                self.messages_matched_searchable += 1;
2108                            }
2109                        }
2110                        "part" => self.parts_matched += 1,
2111                        _ => {}
2112                    }
2113                }
2114                OutcomeStatus::Error => {
2115                    // Session-level rejection: exactly one session-kind Error
2116                    // outcome (see `error_outcomes_for_substream`). Per-event
2117                    // drop: one Error per message/part. The two populations
2118                    // are counted separately so the operator can tell a
2119                    // structural reject from a row-level skip.
2120                    if outcome.kind == "session" {
2121                        self.dropped_sessions += 1;
2122                    } else {
2123                        self.dropped_events += 1;
2124                    }
2125                    let reason = outcome
2126                        .error
2127                        .as_ref()
2128                        .and_then(|e| e.reason_key)
2129                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2130                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2131                }
2132            }
2133        }
2134    }
2135}
2136
2137/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2138/// row per input event from the request's `events` array. The validator
2139/// returns these in array order so the wire layer can pack them directly
2140/// into [`crate::wire::IngestResult`] entries.
2141#[derive(Debug, Clone, PartialEq)]
2142pub struct RowOutcome {
2143    pub index: usize,
2144    pub kind: &'static str,
2145    pub pk: Value,
2146    pub status: OutcomeStatus,
2147    pub error: Option<RowError>,
2148    /// True iff `kind == "message"` AND the underlying row carries
2149    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2150    /// so the CLI can show "searchable" message deltas distinct from raw
2151    /// inserts. Always false for session/part rows.
2152    pub searchable: bool,
2153}
2154
2155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2156pub enum OutcomeStatus {
2157    Inserted,
2158    Matched,
2159    Error,
2160}
2161
2162/// Structured per-row error body. Mirrors the wire shape so the handler
2163/// can pass it straight through.
2164#[derive(Debug, Clone, PartialEq, Eq)]
2165pub struct RowError {
2166    pub message: String,
2167    pub field: Option<&'static str>,
2168    pub reason: Option<&'static str>,
2169    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2170    /// `reason` field above is human-prose; `reason_key` is the machine
2171    /// bucket. `None` means uncategorized; consumers attribute to
2172    /// `DROP_REASON_UNCATEGORIZED`.
2173    pub reason_key: Option<&'static str>,
2174}
2175
2176/// Buffered session events tagged with their input array index, so the
2177/// per-row outcomes can be re-attributed once `merge_insert` returns its
2178/// per-row Inserted/Matched stats.
2179#[derive(Debug)]
2180struct BufferedSession {
2181    index: usize,
2182    session: Session,
2183}
2184
2185#[derive(Debug)]
2186struct BufferedMessage {
2187    index: usize,
2188    message: Message,
2189    parts: Vec<BufferedPart>,
2190    search_text: Option<String>,
2191}
2192
2193#[derive(Debug)]
2194struct BufferedPart {
2195    index: usize,
2196    part: Part,
2197}
2198
2199/// State machine that turns the `events: Vec<IngestEvent>` array into a
2200/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2201/// session substream so `merge_insert` runs once per substream (three
2202/// batches: sessions, messages, parts). A validation error on a single event
2203/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2204/// continues; only Session-level invariants (immutable source_agent / project
2205/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2206///
2207/// Writes are batched at flush time. As complete substreams arrive (a new
2208/// `Session` event closes out the current one), they accumulate in
2209/// `completed` rather than each one calling `merge_insert` immediately.
2210/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2211/// at which point one batched 3-parallel-merge-insert covers all pending
2212/// substreams. This is the load-bearing perf change: per-substream commit
2213/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2214/// and amortizing it across N sessions cuts wall time materially.
2215#[derive(Debug, Default)]
2216pub struct IngestValidator {
2217    session: Option<BufferedSession>,
2218    current_message: Option<BufferedMessage>,
2219    current_parts: Vec<BufferedPart>,
2220    messages: Vec<BufferedMessage>,
2221    /// Message ids already buffered in the current substream. Duplicate ids
2222    /// drop the offending event in-line rather than failing the whole batch
2223    /// downstream.
2224    seen_message_ids: HashSet<String>,
2225    /// `(message_id, part_id)` keys already buffered in the current
2226    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2227    seen_part_keys: HashSet<(String, String)>,
2228    /// Substreams whose end-of-stream boundary has been observed but whose
2229    /// rows haven't been written yet. Flushed in batched mode by
2230    /// [`Self::flush`].
2231    completed: Vec<CompletedSubstream>,
2232}
2233
2234/// One closed substream ready for the batched flush path.
2235#[derive(Debug)]
2236struct CompletedSubstream {
2237    session_index: usize,
2238    session: Session,
2239    messages: Vec<BufferedMessage>,
2240}
2241
2242impl IngestValidator {
2243    /// Drive one input event through the validator. Returns the per-row
2244    /// outcomes the event triggered: empty when the event is just buffered,
2245    /// or N entries when a session substream just flushed (success or
2246    /// failure). `Err` is reserved for catastrophic storage failures that
2247    /// should fail the whole `pond_ingest` request.
2248    pub async fn push(
2249        &mut self,
2250        store: &Store,
2251        index: usize,
2252        event: IngestEvent,
2253    ) -> Result<Vec<RowOutcome>> {
2254        match event {
2255            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2256            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2257            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2258        }
2259    }
2260
2261    /// Final flush at end-of-batch. Closes the in-flight substream and
2262    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2263    /// the wire layer) alongside the honest per-table counts (for
2264    /// `IngestSummary::add_batch`).
2265    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2266        self.close_current_substream();
2267        self.flush(store).await
2268    }
2269
2270    /// Drain every completed substream into batched 3-parallel-merge_insert
2271    /// writes. Caller invokes this periodically (every N completed
2272    /// substreams) to keep memory bounded; in adapter-driven sync that
2273    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2274    /// in-flight substream stays buffered - close it explicitly via
2275    /// [`Self::finish`] or by feeding the next Session event.
2276    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2277        if self.completed.is_empty() {
2278            return Ok((Vec::new(), BatchCounts::default()));
2279        }
2280        let completed = std::mem::take(&mut self.completed);
2281        store.upsert_session_batch(completed).await
2282    }
2283
2284    /// Number of fully-buffered substreams awaiting batched write. Used by
2285    /// the adapter caller to decide when to call [`Self::flush`].
2286    pub fn pending_substreams(&self) -> usize {
2287        self.completed.len()
2288    }
2289
2290    async fn push_session(
2291        &mut self,
2292        _store: &Store,
2293        index: usize,
2294        mut session: Session,
2295    ) -> Result<Vec<RowOutcome>> {
2296        // Close out the current substream (if any) - move it to the pending
2297        // buffer instead of writing immediately. The actual write happens
2298        // when the caller invokes `flush` / `finish`.
2299        self.close_current_substream();
2300
2301        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2302        // if empty after trim. A Session event with empty source_agent is
2303        // dropped on the spot - the substream that would follow has nothing
2304        // to anchor on, so subsequent message/part events will also drop.
2305        let trimmed = session.source_agent.trim();
2306        if trimmed.is_empty() {
2307            return Ok(vec![RowOutcome {
2308                index,
2309                kind: "session",
2310                pk: Value::String(session.id.clone()),
2311                status: OutcomeStatus::Error,
2312                error: Some(RowError {
2313                    message: format!("session {} has empty source_agent after trim", session.id),
2314                    field: Some("source_agent"),
2315                    reason: None,
2316                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2317                }),
2318                searchable: false,
2319            }]);
2320        }
2321        if trimmed.len() != session.source_agent.len() {
2322            session.source_agent = trimmed.to_owned();
2323        }
2324
2325        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2326            return Ok(vec![RowOutcome {
2327                index,
2328                kind: "session",
2329                pk: Value::String(session.id.clone()),
2330                status: OutcomeStatus::Error,
2331                error: Some(RowError {
2332                    message: format!(
2333                        "session {} has parent_message_id without parent_session_id",
2334                        session.id,
2335                    ),
2336                    field: Some("parent_message_id"),
2337                    reason: None,
2338                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2339                }),
2340                searchable: false,
2341            }]);
2342        }
2343
2344        self.seen_message_ids.clear();
2345        self.seen_part_keys.clear();
2346        self.session = Some(BufferedSession { index, session });
2347        Ok(Vec::new())
2348    }
2349
2350    fn close_current_substream(&mut self) {
2351        self.flush_current_message();
2352        let Some(BufferedSession {
2353            index: session_index,
2354            session,
2355        }) = self.session.take()
2356        else {
2357            return;
2358        };
2359        let messages = std::mem::take(&mut self.messages);
2360        self.seen_message_ids.clear();
2361        self.seen_part_keys.clear();
2362        self.completed.push(CompletedSubstream {
2363            session_index,
2364            session,
2365            messages,
2366        });
2367    }
2368
2369    fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2370        let pk = Value::Array(vec![
2371            Value::String(message.session_id().to_owned()),
2372            Value::String(message.id().to_owned()),
2373        ]);
2374        let Some(session) = &self.session else {
2375            return vec![error_outcome(
2376                index,
2377                "message",
2378                pk,
2379                "first event in a session stream must be Session",
2380                None,
2381                DROP_REASON_MESSAGE_BEFORE_SESSION,
2382            )];
2383        };
2384        if message.session_id() != session.session.id {
2385            let msg = format!(
2386                "message {} references session {}, expected {}",
2387                message.id(),
2388                message.session_id(),
2389                session.session.id
2390            );
2391            return vec![error_outcome(
2392                index,
2393                "message",
2394                pk,
2395                &msg,
2396                Some("session_id"),
2397                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2398            )];
2399        }
2400        if !self.seen_message_ids.insert(message.id().to_owned()) {
2401            // Keep same-substream duplicate ids visible in `dropped_events`;
2402            // adapters are expected to dedupe upstream (see claude-code's
2403            // per-file `seen_uuids`), so a hit here is worth investigating.
2404            let msg = format!("duplicate message id {} in session substream", message.id());
2405            return vec![error_outcome(
2406                index,
2407                "message",
2408                pk,
2409                &msg,
2410                None,
2411                DROP_REASON_DUPLICATE_MESSAGE_ID,
2412            )];
2413        }
2414        self.flush_current_message();
2415        self.current_message = Some(BufferedMessage {
2416            index,
2417            message,
2418            parts: Vec::new(),
2419            search_text: None,
2420        });
2421        Vec::new()
2422    }
2423
2424    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2425        let pk = Value::Array(vec![
2426            Value::String(part.session_id.clone()),
2427            Value::String(part.message_id.clone()),
2428            Value::String(part.id.clone()),
2429        ]);
2430        let Some(current) = &self.current_message else {
2431            return vec![error_outcome(
2432                index,
2433                "part",
2434                pk,
2435                "part event appeared before a message",
2436                None,
2437                DROP_REASON_PART_BEFORE_MESSAGE,
2438            )];
2439        };
2440        if part.session_id != current.message.session_id() {
2441            let msg = format!(
2442                "part {} references session {}, expected {}",
2443                part.id,
2444                part.session_id,
2445                current.message.session_id()
2446            );
2447            return vec![error_outcome(
2448                index,
2449                "part",
2450                pk,
2451                &msg,
2452                Some("session_id"),
2453                DROP_REASON_PART_MESSAGE_MISMATCH,
2454            )];
2455        }
2456        if part.message_id != current.message.id() {
2457            let msg = format!(
2458                "part {} references message {}, expected {}",
2459                part.id,
2460                part.message_id,
2461                current.message.id()
2462            );
2463            return vec![error_outcome(
2464                index,
2465                "part",
2466                pk,
2467                &msg,
2468                Some("message_id"),
2469                DROP_REASON_PART_MESSAGE_MISMATCH,
2470            )];
2471        }
2472        let part_key = (part.message_id.clone(), part.id.clone());
2473        if !self.seen_part_keys.insert(part_key) {
2474            let msg = format!(
2475                "duplicate part id {} for message {} in session substream",
2476                part.id, part.message_id
2477            );
2478            return vec![error_outcome(
2479                index,
2480                "part",
2481                pk,
2482                &msg,
2483                None,
2484                DROP_REASON_DUPLICATE_PART_KEY,
2485            )];
2486        }
2487        self.current_parts.push(BufferedPart { index, part });
2488        Vec::new()
2489    }
2490
2491    fn flush_current_message(&mut self) {
2492        let Some(mut buffered) = self.current_message.take() else {
2493            return;
2494        };
2495        let parts = std::mem::take(&mut self.current_parts);
2496        let mut canonical_parts = Vec::with_capacity(parts.len());
2497        for part in &parts {
2498            canonical_parts.push(part.part.clone());
2499        }
2500        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2501        buffered.parts = parts;
2502        self.messages.push(buffered);
2503    }
2504}
2505
2506fn error_outcome(
2507    index: usize,
2508    kind: &'static str,
2509    pk: Value,
2510    message: &str,
2511    field: Option<&'static str>,
2512    reason_key: &'static str,
2513) -> RowOutcome {
2514    RowOutcome {
2515        index,
2516        kind,
2517        pk,
2518        status: OutcomeStatus::Error,
2519        error: Some(RowError {
2520            message: message.to_owned(),
2521            field,
2522            reason: None,
2523            reason_key: Some(reason_key),
2524        }),
2525        searchable: false,
2526    }
2527}
2528
2529/// Session-level rejection (immutable `source_agent` / `project` violation):
2530/// emit exactly one Error outcome on the Session row. The buffered messages
2531/// and parts of this substream are *not* surfaced as per-row errors - their
2532/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2533fn error_outcomes_for_substream(
2534    session_index: usize,
2535    session: &Session,
2536    _messages: &[BufferedMessage],
2537    message: impl Into<String>,
2538    field: Option<&'static str>,
2539    reason_key: &'static str,
2540) -> Vec<RowOutcome> {
2541    let reason = field.map(|_| "immutable");
2542    vec![RowOutcome {
2543        index: session_index,
2544        kind: "session",
2545        pk: Value::String(session.id.clone()),
2546        status: OutcomeStatus::Error,
2547        error: Some(RowError {
2548            message: message.into(),
2549            field,
2550            reason,
2551            reason_key: Some(reason_key),
2552        }),
2553        searchable: false,
2554    }]
2555}
2556
2557/// Batched-path success helper. Each row's Inserted/Matched status is read
2558/// from the pre-existence sets captured by `upsert_session_batch` before its
2559/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2560/// Also accumulates the per-table totals into `counts` so the CLI summary
2561/// gets the same truth without re-walking the outcomes.
2562fn success_outcomes_for_substream(
2563    session_index: usize,
2564    session: &Session,
2565    messages: &[BufferedMessage],
2566    existing_sessions: &std::collections::HashMap<String, Session>,
2567    existing_message_pks: &HashSet<(String, String)>,
2568    existing_part_pks: &HashSet<(String, String, String)>,
2569    counts: &mut BatchCounts,
2570) -> Vec<RowOutcome> {
2571    let session_was_present = existing_sessions.contains_key(&session.id);
2572    let session_status = if session_was_present {
2573        counts.sessions_matched += 1;
2574        UpsertStatus::Matched
2575    } else {
2576        counts.sessions_inserted += 1;
2577        UpsertStatus::Inserted
2578    };
2579
2580    let mut outcomes = Vec::with_capacity(1 + messages.len());
2581    outcomes.push(success_outcome(
2582        session_index,
2583        "session",
2584        Value::String(session.id.clone()),
2585        session_status,
2586        false,
2587    ));
2588    for buffered in messages {
2589        let key = (
2590            buffered.message.session_id().to_owned(),
2591            buffered.message.id().to_owned(),
2592        );
2593        let searchable = buffered.search_text.is_some();
2594        let message_status = if existing_message_pks.contains(&key) {
2595            counts.messages_matched_total += 1;
2596            if searchable {
2597                counts.messages_matched_searchable += 1;
2598            }
2599            UpsertStatus::Matched
2600        } else {
2601            counts.messages_inserted_total += 1;
2602            if searchable {
2603                counts.messages_inserted_searchable += 1;
2604            }
2605            UpsertStatus::Inserted
2606        };
2607        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2608        outcomes.push(success_outcome(
2609            buffered.index,
2610            "message",
2611            pk,
2612            message_status,
2613            searchable,
2614        ));
2615        for part in &buffered.parts {
2616            let part_key = (
2617                part.part.session_id.clone(),
2618                part.part.message_id.clone(),
2619                part.part.id.clone(),
2620            );
2621            let part_status = if existing_part_pks.contains(&part_key) {
2622                counts.parts_matched += 1;
2623                UpsertStatus::Matched
2624            } else {
2625                counts.parts_inserted += 1;
2626                UpsertStatus::Inserted
2627            };
2628            let part_pk = Value::Array(vec![
2629                Value::String(part_key.0),
2630                Value::String(part_key.1),
2631                Value::String(part_key.2),
2632            ]);
2633            outcomes.push(success_outcome(
2634                part.index,
2635                "part",
2636                part_pk,
2637                part_status,
2638                false,
2639            ));
2640        }
2641    }
2642    outcomes
2643}
2644
2645fn success_outcome(
2646    index: usize,
2647    kind: &'static str,
2648    pk: Value,
2649    status: UpsertStatus,
2650    searchable: bool,
2651) -> RowOutcome {
2652    let status = match status {
2653        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2654        UpsertStatus::Matched => OutcomeStatus::Matched,
2655    };
2656    RowOutcome {
2657        index,
2658        kind,
2659        pk,
2660        status,
2661        error: None,
2662        searchable,
2663    }
2664}
2665
2666#[derive(Debug, Clone, PartialEq, Eq)]
2667enum IngestError {
2668    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2669    /// immutable post-first-write because the denormalized copies on
2670    /// `messages` were stamped from the prior Session at first ingest.
2671    /// A re-write that changes either would silently desync.
2672    ImmutableField {
2673        field: &'static str,
2674        session_id: String,
2675        stored: String,
2676        attempted: String,
2677    },
2678}
2679
2680impl std::fmt::Display for IngestError {
2681    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2682        match self {
2683            Self::ImmutableField {
2684                field,
2685                session_id,
2686                stored,
2687                attempted,
2688            } => write!(
2689                formatter,
2690                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2691            ),
2692        }
2693    }
2694}
2695
2696impl std::error::Error for IngestError {}
2697
2698/// Compare an incoming Session row against the stored row on the two
2699/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2700/// counts a NULL-vs-non-NULL change as a mismatch.
2701fn ensure_immutable_match(
2702    existing: &Session,
2703    incoming: &Session,
2704) -> std::result::Result<(), IngestError> {
2705    if existing.source_agent != incoming.source_agent {
2706        return Err(IngestError::ImmutableField {
2707            field: "source_agent",
2708            session_id: incoming.id.clone(),
2709            stored: existing.source_agent.clone(),
2710            attempted: incoming.source_agent.clone(),
2711        });
2712    }
2713    if existing.project != incoming.project {
2714        return Err(IngestError::ImmutableField {
2715            field: "project",
2716            session_id: incoming.id.clone(),
2717            stored: (*existing.project).clone(),
2718            attempted: (*incoming.project).clone(),
2719        });
2720    }
2721    Ok(())
2722}
2723
2724pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2725    use crate::wire::Provenance;
2726    let mut chunks: Vec<String> = Vec::new();
2727    for part in parts {
2728        // spec.md#search: only conversational parts contribute to the indexed
2729        // text; harness-injected scaffolding is excluded from search.
2730        if part.provenance != Provenance::Conversational {
2731            continue;
2732        }
2733        match (message.role(), &part.kind) {
2734            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2735                if let Some(text) = text {
2736                    chunks.push(text.to_string());
2737                }
2738            }
2739            (
2740                Role::User | Role::Assistant,
2741                PartKind::File {
2742                    media_type,
2743                    file_name,
2744                    data,
2745                },
2746            ) => {
2747                if let Some(file_name) = file_name {
2748                    chunks.push(file_name.clone());
2749                }
2750                if let Some(media_type) = media_type {
2751                    chunks.push(media_type.clone());
2752                }
2753                if let FileData::Url(uri) = data {
2754                    chunks.push(uri.clone());
2755                }
2756            }
2757            (
2758                Role::System | Role::Tool,
2759                PartKind::Text { .. }
2760                | PartKind::Reasoning { .. }
2761                | PartKind::File { .. }
2762                | PartKind::ToolCall { .. }
2763                | PartKind::ToolResult { .. }
2764                | PartKind::ToolApprovalRequest { .. }
2765                | PartKind::ToolApprovalResponse { .. },
2766            )
2767            | (
2768                Role::User | Role::Assistant,
2769                PartKind::Reasoning { .. }
2770                | PartKind::ToolCall { .. }
2771                | PartKind::ToolResult { .. }
2772                | PartKind::ToolApprovalRequest { .. }
2773                | PartKind::ToolApprovalResponse { .. },
2774            ) => {}
2775        }
2776    }
2777
2778    let text = chunks
2779        .into_iter()
2780        .filter(|chunk| !chunk.trim().is_empty())
2781        .collect::<Vec<_>>()
2782        .join("\n");
2783    if text.is_empty() { None } else { Some(text) }
2784}
2785
2786/// Non-empty conversational text (spec.md#search).
2787#[derive(Debug, Clone, PartialEq, Eq)]
2788pub struct SearchText(String);
2789
2790impl SearchText {
2791    pub fn as_str(&self) -> &str {
2792        &self.0
2793    }
2794
2795    pub fn into_inner(self) -> String {
2796        self.0
2797    }
2798}
2799
2800impl AsRef<str> for SearchText {
2801    fn as_ref(&self) -> &str {
2802        &self.0
2803    }
2804}
2805
2806#[derive(Debug, Clone, PartialEq)]
2807pub struct MessageWithParts {
2808    pub message: Message,
2809    pub parts: Vec<Part>,
2810}
2811
2812#[derive(Debug, Clone, PartialEq)]
2813pub struct SessionWithMessages {
2814    pub session: Session,
2815    pub messages: Vec<MessageWithParts>,
2816}
2817
2818#[derive(Debug, Clone)]
2819pub struct SessionViewParams<'a> {
2820    pub mode: ResponseMode,
2821    pub after_id: Option<&'a str>,
2822    pub limit: usize,
2823    pub budget_bytes: usize,
2824}
2825
2826#[derive(Debug, Clone)]
2827pub struct MessageViewParams<'a> {
2828    pub context_depth: usize,
2829    pub after_id: Option<&'a str>,
2830    pub limit: usize,
2831    pub budget_bytes: usize,
2832}
2833
2834/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2835/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2836/// `validation_failed`): the message/part stream is append-only, so an anchor
2837/// that was ever valid never disappears - an unknown one is always a client
2838/// error, never a reason to silently restart the page.
2839#[derive(Debug, Clone, PartialEq)]
2840pub enum GetLookup<T> {
2841    NotFound,
2842    UnknownAfterId,
2843    Found(T),
2844}
2845
2846/// Canonical retrieval result for `pond_get` session mode: the stored session
2847/// plus the page of messages (each with its `Part`s) and a remaining count.
2848/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2849#[derive(Debug, Clone, PartialEq)]
2850pub struct SessionPage {
2851    pub session: Session,
2852    pub messages: Vec<RetrievedMessage>,
2853    pub messages_remaining: usize,
2854}
2855
2856/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2857/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2858/// their parts so the handler can summarize them.
2859#[derive(Debug, Clone, PartialEq)]
2860pub struct MessagePage {
2861    pub session: Session,
2862    pub target: RetrievedMessage,
2863    pub target_parts: Vec<Part>,
2864    pub target_parts_remaining: usize,
2865    pub siblings: Vec<RetrievedMessage>,
2866}
2867
2868#[derive(Debug, Clone, PartialEq)]
2869pub struct RetrievedMessage {
2870    pub id: String,
2871    pub role: Role,
2872    pub timestamp: DateTime<Utc>,
2873    pub text: Option<String>,
2874    pub content: Option<String>,
2875    pub parts: Vec<Part>,
2876}
2877
2878#[derive(Debug, Clone)]
2879struct ScanRow {
2880    id: String,
2881    role: Role,
2882    timestamp: DateTime<Utc>,
2883    text: Option<String>,
2884    content: Option<String>,
2885}
2886
2887/// One row of the conversational scan. `text` is non-empty by
2888/// `IsNotNull("search_text")` pushdown (spec.md#search).
2889#[derive(Debug, Clone)]
2890pub struct ConversationalRow {
2891    pub session_id: String,
2892    pub message_id: String,
2893    pub role: Role,
2894    pub timestamp: DateTime<Utc>,
2895    pub text: SearchText,
2896}
2897
2898/// Number of leading `items` that fit within `limit` and the byte budget,
2899/// sizing each by `size`. Always emits at least one (a single oversize item
2900/// never blocks its own page); the budget then stops the page at the next item
2901/// boundary.
2902fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2903    let capped = items.len().min(limit.clamp(1, 1000));
2904    let mut acc = 0usize;
2905    let mut emitted = 0usize;
2906    for item in &items[..capped] {
2907        let next = acc.saturating_add(size(item));
2908        if emitted > 0 && next > budget_bytes {
2909            break;
2910        }
2911        acc = next;
2912        emitted += 1;
2913    }
2914    emitted
2915}
2916
2917fn role_from_str(value: &str) -> Result<Role> {
2918    match value {
2919        "system" => Ok(Role::System),
2920        "user" => Ok(Role::User),
2921        "assistant" => Ok(Role::Assistant),
2922        "tool" => Ok(Role::Tool),
2923        other => anyhow::bail!("unknown message role {other}"),
2924    }
2925}
2926
2927/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2928/// and range columns, BITMAP for low-cardinality columns. There is no index
2929/// on `embedding_model`: pond's invariant is one active model at a time
2930/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2931/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2932/// query-time predicate - the only embedding-state filter is `vector IS NOT
2933/// NULL`. `id` lookups are rare and full-scan.
2934const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2935    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2936    (
2937        "session_id",
2938        BuiltinIndexType::BTree,
2939        "messages_session_id_btree",
2940    ),
2941    (
2942        "timestamp",
2943        BuiltinIndexType::BTree,
2944        "messages_timestamp_btree",
2945    ),
2946    (
2947        "source_agent",
2948        BuiltinIndexType::Bitmap,
2949        "messages_source_agent_bitmap",
2950    ),
2951    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2952];
2953
2954/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2955/// `parts_for_messages` (hydration on every `get` and grouped search).
2956const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2957    (
2958        "session_id",
2959        BuiltinIndexType::BTree,
2960        "parts_session_id_btree",
2961    ),
2962    (
2963        "message_id",
2964        BuiltinIndexType::BTree,
2965        "parts_message_id_btree",
2966    ),
2967];
2968
2969/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
2970/// `get` and every grouped search.
2971const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2972    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2973
2974fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2975    Predicate::In(
2976        column,
2977        values.iter().cloned().map(ScalarValue::String).collect(),
2978    )
2979}
2980
2981/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
2982/// never sees a null-vector row. Under the single-active-model invariant,
2983/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
2984/// the configured model" - no per-row `embedding_model` filter needed.
2985fn embedded_scope(filter: &Predicate) -> Predicate {
2986    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
2987}
2988
2989fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
2990    let inserted = usize::try_from(inserted_rows)
2991        .unwrap_or(usize::MAX)
2992        .min(total);
2993    let mut statuses = Vec::with_capacity(total);
2994    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
2995    statuses.extend(std::iter::repeat_n(
2996        UpsertStatus::Matched,
2997        total.saturating_sub(inserted),
2998    ));
2999    statuses
3000}
3001
3002// Bare logical table names: the lance-namespace Directory impl owns the
3003// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3004// a `.lance` path.
3005pub(crate) const SESSIONS: &str = "sessions";
3006pub(crate) const MESSAGES: &str = "messages";
3007pub(crate) const PARTS: &str = "parts";
3008
3009/// FTS index name on `messages.search_text`. Stable so status and index
3010/// creation name the same index.
3011pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3012
3013/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3014/// activation check and index creation name the same index.
3015pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3016
3017/// IVF_PQ tuning constants (spec.md#search):
3018/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3019/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3020/// - max_iters = 15 (kmeans cap)
3021/// - cosine metric (e5 vectors are L2-normalized)
3022const IVF_PQ_NUM_BITS: u8 = 8;
3023const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3024const IVF_PQ_MAX_ITERS: usize = 15;
3025
3026/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3027/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3028/// (`FTS`, `OCC`) searchable.
3029const FTS_NGRAM_MIN: u32 = 3;
3030const FTS_NGRAM_MAX: u32 = 5;
3031
3032/// Pond's production IndexIntents: the per-table intent set
3033/// `Store::open_with_options` registers with the substrate.
3034pub fn pond_index_intents() -> IndexIntents {
3035    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3036}
3037
3038/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3039/// threshold. Used by tests that need to exercise the activation boundary
3040/// without writing 100k vectors.
3041pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3042    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3043    messages.push(IndexIntent {
3044        name: MESSAGES_FTS_INDEX,
3045        column: "search_text",
3046        trigger: IndexTrigger::OnAnyRows,
3047        params: IndexParamsKind::InvertedFtsNgram {
3048            min: FTS_NGRAM_MIN,
3049            max: FTS_NGRAM_MAX,
3050        },
3051    });
3052    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3053        messages.push(IndexIntent {
3054            name,
3055            column,
3056            trigger: IndexTrigger::OnAnyRows,
3057            params: IndexParamsKind::Scalar(kind.clone()),
3058        });
3059    }
3060    messages.push(IndexIntent {
3061        name: MESSAGES_VECTOR_INDEX,
3062        column: "vector",
3063        trigger: IndexTrigger::OnNonNullCount {
3064            column: "vector",
3065            threshold: vector_threshold,
3066        },
3067        params: IndexParamsKind::IvfPqCosine {
3068            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3069            num_bits: IVF_PQ_NUM_BITS,
3070            max_iters: IVF_PQ_MAX_ITERS,
3071        },
3072    });
3073    let parts = PARTS_SCALAR_INDICES
3074        .iter()
3075        .map(|(column, kind, name)| IndexIntent {
3076            name,
3077            column,
3078            trigger: IndexTrigger::OnAnyRows,
3079            params: IndexParamsKind::Scalar(kind.clone()),
3080        })
3081        .collect();
3082    let sessions = SESSIONS_SCALAR_INDICES
3083        .iter()
3084        .map(|(column, kind, name)| IndexIntent {
3085            name,
3086            column,
3087            trigger: IndexTrigger::OnAnyRows,
3088            params: IndexParamsKind::Scalar(kind.clone()),
3089        })
3090        .collect();
3091    IndexIntents {
3092        sessions,
3093        messages,
3094        parts,
3095    }
3096}
3097
3098/// Default width of the `messages.vector` embedding column (spec.md#search):
3099/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3100/// 384). Used when `[embeddings].dim` is absent.
3101pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3102
3103/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3104/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3105/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3106/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3107/// which keeps unit tests config-free.
3108static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3109
3110/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3111/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3112pub fn embedding_dim() -> usize {
3113    EMBEDDING_DIM_RUNTIME
3114        .get()
3115        .copied()
3116        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3117}
3118
3119/// Seed [`embedding_dim`] from config. First call wins.
3120pub fn init_embedding_dim(dim: usize) {
3121    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3122}
3123
3124/// Initial-`CREATE` write params for the namespace-mediated path. The
3125/// substrate seam stamps in `session`, `mode`, and `store_params`.
3126/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3127/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3128/// suppresses the per-commit hook so cleanup stays operator-driven via
3129/// `pond index optimize` (one LIST per command instead of per write).
3130pub(crate) fn write_params_for_create() -> WriteParams {
3131    WriteParams {
3132        data_storage_version: Some(LanceFileVersion::V2_1),
3133        enable_v2_manifest_paths: true,
3134        enable_stable_row_ids: true,
3135        auto_cleanup: Some(AutoCleanupParams {
3136            interval: 20,
3137            older_than: chrono::TimeDelta::days(1),
3138        }),
3139        skip_auto_cleanup: true,
3140        ..WriteParams::default()
3141    }
3142}
3143
3144fn export_schema(table: Table) -> Arc<Schema> {
3145    match table {
3146        Table::Sessions => session_schema(),
3147        Table::Messages => message_schema(),
3148        Table::Parts => part_schema(),
3149    }
3150}
3151
3152fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3153    let expected = export_schema(table);
3154    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3155    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3156    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3157    if actual_names != expected_names {
3158        anyhow::bail!(
3159            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3160            table.as_str(),
3161        );
3162    }
3163    Ok(())
3164}
3165
3166async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3167    let source_uri = source
3168        .to_str()
3169        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3170    let dataset = Dataset::open(source_uri)
3171        .await
3172        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3173    ensure_schema_matches_archive(&dataset, table)?;
3174    Ok(dataset)
3175}
3176
3177pub(crate) fn session_schema() -> Arc<Schema> {
3178    Arc::new(Schema::new(vec![
3179        primary_field("id", DataType::Utf8, false),
3180        Field::new("parent_session_id", DataType::Utf8, true),
3181        Field::new("parent_message_id", DataType::Utf8, true),
3182        Field::new("source_agent", DataType::Utf8, false),
3183        Field::new(
3184            "created_at",
3185            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3186            false,
3187        ),
3188        Field::new("project", DataType::Utf8, false),
3189        json_field("options", false),
3190    ]))
3191}
3192
3193pub(crate) fn message_schema() -> Arc<Schema> {
3194    Arc::new(Schema::new(vec![
3195        primary_field("session_id", DataType::Utf8, false),
3196        primary_field("id", DataType::Utf8, false),
3197        Field::new(
3198            "timestamp",
3199            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3200            false,
3201        ),
3202        Field::new("role", DataType::Utf8, false),
3203        Field::new("source_agent", DataType::Utf8, false),
3204        Field::new("project", DataType::Utf8, false),
3205        Field::new("content", DataType::Utf8, true),
3206        Field::new("search_text", DataType::Utf8, true),
3207        // The message's derived embedding (spec.md#session-embed-from-canonical):
3208        // both null until `pond embed` fills them, set together thereafter.
3209        Field::new("vector", embedding_vector_type(), true),
3210        Field::new("embedding_model", DataType::Utf8, true),
3211        json_field("options", false),
3212    ]))
3213}
3214
3215pub(crate) fn part_schema() -> Arc<Schema> {
3216    Arc::new(Schema::new(vec![
3217        primary_field("session_id", DataType::Utf8, false),
3218        primary_field("message_id", DataType::Utf8, false),
3219        primary_field("id", DataType::Utf8, false),
3220        Field::new("ordinal", DataType::Int32, false),
3221        Field::new("type", DataType::Utf8, false),
3222        // spec.md#model-part-provenance: conversation vs harness-injected; search
3223        // reads this column to exclude injected scaffolding.
3224        Field::new("provenance", DataType::Utf8, false),
3225        json_field("variant_data", false),
3226        legacy_blob_field("data", true),
3227        json_field("options", false),
3228    ]))
3229}
3230
3231pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3232    let arrays = schema
3233        .fields()
3234        .iter()
3235        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3236        .collect();
3237    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3238}
3239
3240pub(crate) fn empty_reader(
3241    schema: Arc<Schema>,
3242) -> Result<
3243    RecordBatchIterator<
3244        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3245    >,
3246> {
3247    let batch = empty_batch(schema.clone())?;
3248    Ok(RecordBatchIterator::new(
3249        vec![Ok(batch)].into_iter(),
3250        schema,
3251    ))
3252}
3253
3254pub(crate) struct MessageBatchRow<'a> {
3255    pub message: &'a Message,
3256    pub source_agent: &'a str,
3257    pub project: &'a str,
3258    pub search_text: Option<&'a str>,
3259}
3260
3261// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3262// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3263// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3264// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3265// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3266fn embedding_vector_type() -> DataType {
3267    DataType::FixedSizeList(
3268        Arc::new(Field::new("item", DataType::Float16, true)),
3269        embedding_dim() as i32,
3270    )
3271}
3272
3273/// The partial-schema source for the embedding column update: the `messages`
3274/// primary key plus the two columns `pond embed` fills. The field definitions
3275/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3276fn embedding_update_schema() -> Arc<Schema> {
3277    Arc::new(Schema::new(vec![
3278        primary_field("session_id", DataType::Utf8, false),
3279        primary_field("id", DataType::Utf8, false),
3280        Field::new("vector", embedding_vector_type(), true),
3281        Field::new("embedding_model", DataType::Utf8, true),
3282    ]))
3283}
3284
3285/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3286/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3287pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3288    let dim = embedding_dim();
3289    let mut flat = Vec::with_capacity(rows.len() * dim);
3290    for row in rows {
3291        if row.vector.len() != dim {
3292            anyhow::bail!(
3293                "embedding for message {} has dim {}, expected {dim}",
3294                row.id,
3295                row.vector.len(),
3296            );
3297        }
3298        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3299    }
3300    let values = Float16Array::from(flat);
3301    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3302    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3303        .context("failed to build embedding vector column")?;
3304
3305    RecordBatch::try_new(
3306        embedding_update_schema(),
3307        vec![
3308            Arc::new(StringArray::from(
3309                rows.iter()
3310                    .map(|row| row.session_id.as_str())
3311                    .collect::<Vec<_>>(),
3312            )),
3313            Arc::new(StringArray::from(
3314                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3315            )),
3316            Arc::new(vectors),
3317            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3318        ],
3319    )
3320    .context("failed to build embedding update batch")
3321}
3322
3323/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3324/// is split before the running total of its text columns reaches this, and a
3325/// single cell at or above it is rejected rather than left to panic inside
3326/// `StringArray::from` (spec.md#adapter-bounded-values).
3327const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3328
3329/// Contiguous row ranges whose summed text-column byte cost each stays within
3330/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3331/// column too, since no single column's total can exceed it. `cells[i]` is row
3332/// `i`'s byte cost summed across every text column.
3333fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3334    let mut chunks = Vec::new();
3335    let mut start = 0usize;
3336    let mut running = 0usize;
3337    for (index, &row) in cells.iter().enumerate() {
3338        if running + row > COLUMN_BYTE_BUDGET && index > start {
3339            chunks.push(start..index);
3340            start = index;
3341            running = 0;
3342        }
3343        running += row;
3344    }
3345    if start < cells.len() {
3346        chunks.push(start..cells.len());
3347    }
3348    chunks
3349}
3350
3351fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3352    if bytes >= COLUMN_BYTE_BUDGET {
3353        anyhow::bail!(
3354            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3355             overflow Arrow's i32 offset buffer"
3356        );
3357    }
3358    Ok(())
3359}
3360
3361async fn merge_insert_chunks(
3362    handle: &Handle,
3363    table: Table,
3364    batches: Vec<RecordBatch>,
3365) -> Result<u64> {
3366    let mut inserted = 0u64;
3367    for batch in batches {
3368        let rows = batch.num_rows();
3369        inserted += handle.merge_insert(table, batch, rows).await?;
3370    }
3371    Ok(inserted)
3372}
3373
3374pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3375    let options = sessions
3376        .iter()
3377        .map(|session| json_bytes(&session.options))
3378        .collect::<Result<Vec<_>>>()?;
3379    let mut cells = Vec::with_capacity(sessions.len());
3380    for (session, encoded) in sessions.iter().zip(&options) {
3381        let columns = [
3382            session.id.len(),
3383            session.parent_session_id.as_deref().map_or(0, str::len),
3384            session.parent_message_id.as_deref().map_or(0, str::len),
3385            session.source_agent.len(),
3386            session.project.as_str().len(),
3387            encoded.len(),
3388        ];
3389        for bytes in columns {
3390            guard_cell("sessions", &session.id, bytes)?;
3391        }
3392        cells.push(columns.iter().sum());
3393    }
3394    chunk_ranges(&cells)
3395        .into_iter()
3396        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3397        .collect()
3398}
3399
3400fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3401    let schema = session_schema();
3402    RecordBatch::try_new(
3403        schema.clone(),
3404        vec![
3405            Arc::new(StringArray::from(
3406                sessions
3407                    .iter()
3408                    .map(|session| session.id.as_str())
3409                    .collect::<Vec<_>>(),
3410            )),
3411            Arc::new(StringArray::from(
3412                sessions
3413                    .iter()
3414                    .map(|session| session.parent_session_id.as_deref())
3415                    .collect::<Vec<_>>(),
3416            )),
3417            Arc::new(StringArray::from(
3418                sessions
3419                    .iter()
3420                    .map(|session| session.parent_message_id.as_deref())
3421                    .collect::<Vec<_>>(),
3422            )),
3423            Arc::new(StringArray::from(
3424                sessions
3425                    .iter()
3426                    .map(|session| session.source_agent.as_str())
3427                    .collect::<Vec<_>>(),
3428            )),
3429            Arc::new(
3430                TimestampMicrosecondArray::from(
3431                    sessions
3432                        .iter()
3433                        .map(|session| micros(session.created_at))
3434                        .collect::<Vec<_>>(),
3435                )
3436                .with_timezone("UTC"),
3437            ),
3438            Arc::new(StringArray::from(
3439                sessions
3440                    .iter()
3441                    .map(|session| session.project.as_str())
3442                    .collect::<Vec<_>>(),
3443            )),
3444            Arc::new(LargeBinaryArray::from_iter_values(
3445                options.iter().map(Vec::as_slice),
3446            )),
3447        ],
3448    )
3449    .context("failed to build session batch")
3450}
3451
3452pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3453    let options = rows
3454        .iter()
3455        .map(|row| json_bytes(row.message.options()))
3456        .collect::<Result<Vec<_>>>()?;
3457    let mut cells = Vec::with_capacity(rows.len());
3458    for (row, encoded) in rows.iter().zip(&options) {
3459        let columns = [
3460            row.message.session_id().len(),
3461            row.message.id().len(),
3462            row.message.role().as_str().len(),
3463            row.source_agent.len(),
3464            row.project.len(),
3465            row.message.system_content().map_or(0, str::len),
3466            row.search_text.map_or(0, str::len),
3467            encoded.len(),
3468        ];
3469        for bytes in columns {
3470            guard_cell("messages", row.message.id(), bytes)?;
3471        }
3472        cells.push(columns.iter().sum());
3473    }
3474    chunk_ranges(&cells)
3475        .into_iter()
3476        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3477        .collect()
3478}
3479
3480fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3481    let schema = message_schema();
3482    RecordBatch::try_new(
3483        schema.clone(),
3484        vec![
3485            Arc::new(StringArray::from(
3486                rows.iter()
3487                    .map(|row| row.message.session_id())
3488                    .collect::<Vec<_>>(),
3489            )),
3490            Arc::new(StringArray::from(
3491                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3492            )),
3493            Arc::new(
3494                TimestampMicrosecondArray::from(
3495                    rows.iter()
3496                        .map(|row| micros(row.message.timestamp()))
3497                        .collect::<Vec<_>>(),
3498                )
3499                .with_timezone("UTC"),
3500            ),
3501            Arc::new(StringArray::from(
3502                rows.iter()
3503                    .map(|row| row.message.role().as_str())
3504                    .collect::<Vec<_>>(),
3505            )),
3506            Arc::new(StringArray::from(
3507                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3508            )),
3509            Arc::new(StringArray::from(
3510                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3511            )),
3512            Arc::new(StringArray::from(
3513                rows.iter()
3514                    .map(|row| row.message.system_content())
3515                    .collect::<Vec<_>>(),
3516            )),
3517            Arc::new(StringArray::from(
3518                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3519            )),
3520            // `vector` / `embedding_model` are written null at ingest; every
3521            // message starts un-embedded and `pond embed` fills them later
3522            // (spec.md#session-embed-from-canonical).
3523            new_null_array(&embedding_vector_type(), rows.len()),
3524            new_null_array(&DataType::Utf8, rows.len()),
3525            Arc::new(LargeBinaryArray::from_iter_values(
3526                options.iter().map(Vec::as_slice),
3527            )),
3528        ],
3529    )
3530    .context("failed to build message batch")
3531}
3532
3533pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3534    let variant_data = parts
3535        .iter()
3536        .map(|part| part_variant_json(&part.kind))
3537        .collect::<Result<Vec<_>>>()?;
3538    let options = parts
3539        .iter()
3540        .map(|part| json_bytes(&part.options))
3541        .collect::<Result<Vec<_>>>()?;
3542    let mut cells = Vec::with_capacity(parts.len());
3543    // The blob column is a BinaryArray, exempt from the text-column bound
3544    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3545    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3546        let columns = [
3547            part.session_id.len(),
3548            part.message_id.len(),
3549            part.id.len(),
3550            part.kind.type_name().len(),
3551            part.provenance.as_str().len(),
3552            variant.len(),
3553            encoded.len(),
3554        ];
3555        for bytes in columns {
3556            guard_cell("parts", &part.id, bytes)?;
3557        }
3558        cells.push(columns.iter().sum());
3559    }
3560    chunk_ranges(&cells)
3561        .into_iter()
3562        .map(|range| {
3563            parts_chunk(
3564                &parts[range.clone()],
3565                &variant_data[range.clone()],
3566                &options[range],
3567            )
3568        })
3569        .collect()
3570}
3571
3572fn parts_chunk(
3573    parts: &[Part],
3574    variant_data: &[Vec<u8>],
3575    options: &[Vec<u8>],
3576) -> Result<RecordBatch> {
3577    let schema = part_schema();
3578    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3579    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3580    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3581    let blob_payloads: Vec<Option<&[u8]>> = parts
3582        .iter()
3583        .map(|part| match &part.kind {
3584            PartKind::File { data, .. } => Some(match data {
3585                FileData::String(value) => value.as_bytes(),
3586                FileData::Bytes(value) => value.as_slice(),
3587                FileData::Url(value) => value.as_bytes(),
3588            }),
3589            PartKind::Text { .. }
3590            | PartKind::Reasoning { .. }
3591            | PartKind::ToolCall { .. }
3592            | PartKind::ToolResult { .. }
3593            | PartKind::ToolApprovalRequest { .. }
3594            | PartKind::ToolApprovalResponse { .. } => None,
3595        })
3596        .collect();
3597    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3598
3599    RecordBatch::try_new(
3600        schema.clone(),
3601        vec![
3602            Arc::new(StringArray::from(
3603                parts
3604                    .iter()
3605                    .map(|part| part.session_id.as_str())
3606                    .collect::<Vec<_>>(),
3607            )),
3608            Arc::new(StringArray::from(
3609                parts
3610                    .iter()
3611                    .map(|part| part.message_id.as_str())
3612                    .collect::<Vec<_>>(),
3613            )),
3614            Arc::new(StringArray::from(
3615                parts
3616                    .iter()
3617                    .map(|part| part.id.as_str())
3618                    .collect::<Vec<_>>(),
3619            )),
3620            Arc::new(Int32Array::from(
3621                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3622            )),
3623            Arc::new(StringArray::from(
3624                parts
3625                    .iter()
3626                    .map(|part| part.kind.type_name())
3627                    .collect::<Vec<_>>(),
3628            )),
3629            Arc::new(StringArray::from(
3630                parts
3631                    .iter()
3632                    .map(|part| part.provenance.as_str())
3633                    .collect::<Vec<_>>(),
3634            )),
3635            Arc::new(LargeBinaryArray::from_iter_values(
3636                variant_data.iter().map(Vec::as_slice),
3637            )),
3638            Arc::new(blob_array),
3639            Arc::new(LargeBinaryArray::from_iter_values(
3640                options.iter().map(Vec::as_slice),
3641            )),
3642        ],
3643    )
3644    .context("failed to build parts batch")
3645}
3646
3647pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3648    Ok(Session {
3649        id: string(batch, "id", row)?.context("session id is null")?,
3650        parent_session_id: string(batch, "parent_session_id", row)?,
3651        parent_message_id: string(batch, "parent_message_id", row)?,
3652        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3653        created_at: datetime(batch, "created_at", row)?,
3654        project: crate::adapter::Extracted::from_stored(
3655            string(batch, "project", row)?.context("project is null")?,
3656        ),
3657        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3658    })
3659}
3660
3661pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3662    let id = string(batch, "id", row)?.context("message id is null")?;
3663    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3664    let timestamp = datetime(batch, "timestamp", row)?;
3665    let options =
3666        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3667
3668    match string(batch, "role", row)?
3669        .context("message role is null")?
3670        .as_str()
3671    {
3672        "system" => Ok(Message::System {
3673            id,
3674            session_id,
3675            timestamp,
3676            // `content` is nullable in the schema; preserve the distinction
3677            // between "no content row stored" (`None`) and "empty string
3678            // stored" (`Some(extracted_empty)`). The value originally
3679            // came from a `Source` extraction at ingest time; rewrap via
3680            // the storage-internal `from_stored` so the type-system seal
3681            // for adapters stays intact.
3682            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3683            options,
3684        }),
3685        "user" => Ok(Message::User {
3686            id,
3687            session_id,
3688            timestamp,
3689            options,
3690        }),
3691        "assistant" => Ok(Message::Assistant {
3692            id,
3693            session_id,
3694            timestamp,
3695            options,
3696        }),
3697        "tool" => Ok(Message::Tool {
3698            id,
3699            session_id,
3700            timestamp,
3701            options,
3702        }),
3703        other => anyhow::bail!("unknown message role {other}"),
3704    }
3705}
3706
3707pub(crate) fn part_from_batch(
3708    batch: &RecordBatch,
3709    row: usize,
3710    file_data: Option<FileData>,
3711) -> Result<Part> {
3712    let type_name = string(batch, "type", row)?.context("part type is null")?;
3713    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3714    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3715    Ok(Part {
3716        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3717        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3718        id: string(batch, "id", row)?.context("part id is null")?,
3719        ordinal: int32(batch, "ordinal", row)?,
3720        provenance: provenance_from_str(&provenance)?,
3721        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3722        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3723    })
3724}
3725
3726fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3727    match value {
3728        "conversational" => Ok(crate::wire::Provenance::Conversational),
3729        "injected" => Ok(crate::wire::Provenance::Injected),
3730        other => anyhow::bail!("unknown part provenance {other}"),
3731    }
3732}
3733
3734fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3735    let kind = file_data_kind(variant_data)?;
3736    match kind.as_str() {
3737        "string" => {
3738            let text = std::str::from_utf8(bytes)
3739                .context("file string payload is not UTF-8")?
3740                .to_owned();
3741            Ok(FileData::String(text))
3742        }
3743        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3744        "url" => Ok(FileData::Url(
3745            std::str::from_utf8(bytes)
3746                .context("file URL payload is not UTF-8")?
3747                .to_owned(),
3748        )),
3749        other => anyhow::bail!("unknown file data_kind {other}"),
3750    }
3751}
3752
3753fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3754    let value = json_parse::<Value>(variant_data)?;
3755    value
3756        .get("data_kind")
3757        .and_then(Value::as_str)
3758        .map(str::to_owned)
3759        .context("file part variant_data missing data_kind")
3760}
3761
3762fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3763    batch
3764        .column_by_name(name)
3765        .with_context(|| format!("missing column {name}"))?
3766        .as_any()
3767        .downcast_ref::<UInt64Array>()
3768        .with_context(|| format!("column {name} is not UInt64"))
3769}
3770
3771pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3772    let array = batch
3773        .column_by_name(name)
3774        .with_context(|| format!("missing column {name}"))?
3775        .as_any()
3776        .downcast_ref::<StringArray>()
3777        .with_context(|| format!("column {name} is not Utf8"))?;
3778    if array.is_null(row) {
3779        Ok(None)
3780    } else {
3781        Ok(Some(array.value(row).to_owned()))
3782    }
3783}
3784
3785fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3786    // Lance can return a `lance.json` column either as raw JSONB bytes
3787    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3788    // LargeUtf8), depending on the read path. Handle both.
3789    let column = batch
3790        .column_by_name(name)
3791        .with_context(|| format!("missing column {name}"))?;
3792    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3793        return if array.is_null(row) {
3794            Ok(None)
3795        } else {
3796            Ok(Some(
3797                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3798            ))
3799        };
3800    }
3801    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3802        return if array.is_null(row) {
3803            Ok(None)
3804        } else {
3805            Ok(Some(array.value(row).as_bytes().to_vec()))
3806        };
3807    }
3808    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3809        return if array.is_null(row) {
3810            Ok(None)
3811        } else {
3812            Ok(Some(array.value(row).as_bytes().to_vec()))
3813        };
3814    }
3815    anyhow::bail!("column {name} is not a JSON-compatible array")
3816}
3817
3818fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3819    let array = batch
3820        .column_by_name(name)
3821        .with_context(|| format!("missing column {name}"))?
3822        .as_any()
3823        .downcast_ref::<Int32Array>()
3824        .with_context(|| format!("column {name} is not Int32"))?;
3825    Ok(array.value(row))
3826}
3827
3828pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3829    let array = batch
3830        .column_by_name(name)
3831        .with_context(|| format!("missing column {name}"))?
3832        .as_any()
3833        .downcast_ref::<Float32Array>()
3834        .with_context(|| format!("column {name} is not Float32"))?;
3835    Ok(array.value(row))
3836}
3837
3838pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3839    let array = batch
3840        .column_by_name(name)
3841        .with_context(|| format!("missing column {name}"))?
3842        .as_any()
3843        .downcast_ref::<TimestampMicrosecondArray>()
3844        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3845    Utc.timestamp_micros(array.value(row))
3846        .single()
3847        .context("timestamp is out of range")
3848}
3849
3850fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3851    Field::new(name, data_type, nullable).with_metadata(
3852        [(
3853            "lance-schema:unenforced-primary-key".to_owned(),
3854            "true".to_owned(),
3855        )]
3856        .into(),
3857    )
3858}
3859
3860// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3861// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3862// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3863// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3864// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3865// allocated only one column_info, so the decoder's second `expect_next()`
3866// fires `"there were more fields in the schema than provided column
3867// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3868// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3869fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3870    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3871        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3872            .into_iter()
3873            .collect(),
3874    )
3875}
3876
3877fn json_field(name: &str, nullable: bool) -> Field {
3878    lance_arrow::json::json_field(name, nullable)
3879}
3880
3881fn micros(timestamp: DateTime<Utc>) -> i64 {
3882    timestamp.timestamp_micros()
3883}
3884
3885fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3886    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3887    // matches the `lance.json` extension contract. Lance's compact path
3888    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3889    // applies `decode_json -> encode_json` on this column; with proper JSONB
3890    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3891    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3892    // `try_into_dfstream`; compact still goes through the adapter).
3893    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3894    lance_arrow::json::encode_json(&text)
3895        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3896}
3897
3898fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3899    serde_json::from_slice(value).context("failed to parse JSON field")
3900}
3901
3902fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3903    if let PartKind::File {
3904        media_type,
3905        file_name,
3906        data,
3907    } = kind
3908    {
3909        let data_kind = match data {
3910            FileData::String(_) => "string",
3911            FileData::Bytes(_) => "bytes",
3912            FileData::Url(_) => "url",
3913        };
3914        return json_bytes(&serde_json::json!({
3915            "media_type": media_type,
3916            "file_name": file_name,
3917            "data_kind": data_kind,
3918        }));
3919    }
3920    let value = serde_json::to_value(kind)?;
3921    let mut object = value
3922        .as_object()
3923        .cloned()
3924        .context("part variant did not serialize to an object")?;
3925    object.remove("type");
3926    json_bytes(&object)
3927}
3928
3929fn part_kind_from_json(
3930    type_name: &str,
3931    variant_data: &[u8],
3932    file_data: Option<FileData>,
3933) -> Result<PartKind> {
3934    let mut value = json_parse::<Value>(variant_data)?;
3935    let object = value
3936        .as_object_mut()
3937        .context("part variant data is not an object")?;
3938    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3939    if let Some(data) = file_data {
3940        object.remove("data_kind");
3941        object.insert("data".to_owned(), serde_json::to_value(data)?);
3942    }
3943    serde_json::from_value(value).context("failed to parse part kind")
3944}
3945
3946#[cfg(test)]
3947mod tests {
3948    #![allow(clippy::expect_used, clippy::unwrap_used)]
3949
3950    use super::*;
3951    use crate::{
3952        adapter::Extracted,
3953        handlers::ingest_events,
3954        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3955    };
3956    use chrono::Utc;
3957    use serde_json::json;
3958    use tempfile::TempDir;
3959
3960    fn synthetic_session(id: &str) -> Session {
3961        Session {
3962            id: id.to_owned(),
3963            parent_session_id: None,
3964            parent_message_id: None,
3965            source_agent: "claude-code".to_owned(),
3966            created_at: Utc::now(),
3967            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3968            options: ProviderOptions::new(),
3969        }
3970    }
3971
3972    #[test]
3973    fn search_text_excludes_injected_parts() {
3974        use crate::wire::Provenance;
3975        let message = Message::User {
3976            id: "m1".to_owned(),
3977            session_id: "s1".to_owned(),
3978            timestamp: Utc::now(),
3979            options: ProviderOptions::new(),
3980        };
3981        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
3982            session_id: "s1".to_owned(),
3983            id: id.to_owned(),
3984            message_id: "m1".to_owned(),
3985            ordinal: 0,
3986            provenance,
3987            options: ProviderOptions::new(),
3988            kind: PartKind::Text {
3989                text: Some(Extracted::from_test_value(text.to_owned())),
3990            },
3991        };
3992
3993        // A conversational part contributes; an injected one is excluded
3994        // (spec.md#search).
3995        let conversational = search_text(
3996            &message,
3997            &[text_part(
3998                "p1",
3999                "real human prompt",
4000                Provenance::Conversational,
4001            )],
4002        );
4003        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4004
4005        let injected = search_text(
4006            &message,
4007            &[text_part(
4008                "p2",
4009                "<task-notification>...</task-notification>",
4010                Provenance::Injected,
4011            )],
4012        );
4013        assert!(
4014            injected.is_none(),
4015            "a message whose only part is injected has null search_text"
4016        );
4017    }
4018
4019    #[test]
4020    fn chunk_ranges_splits_on_byte_budget() {
4021        assert!(chunk_ranges(&[]).is_empty());
4022        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4023
4024        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4025        assert_eq!(
4026            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4027            vec![0..1, 1..2, 2..3],
4028        );
4029
4030        // An oversized single row gets its own chunk, never an infinite loop.
4031        assert_eq!(
4032            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4033            vec![0..1, 1..2, 2..3],
4034        );
4035    }
4036
4037    #[tokio::test]
4038    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4039        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4040        // Message is dropped on the spot, with one Error outcome surfaced. The
4041        // rest of the substream continues normally - subsequent valid messages
4042        // and parts get written.
4043        let temp = TempDir::new()?;
4044        let store = Store::open_local(temp.path()).await?;
4045        let session = synthetic_session("ordering");
4046        let orphan_part = Part {
4047            session_id: session.id.clone(),
4048            id: "orphan-part".to_owned(),
4049            message_id: "missing-message".to_owned(),
4050            ordinal: 0,
4051            provenance: crate::wire::Provenance::Conversational,
4052            options: ProviderOptions::new(),
4053            kind: PartKind::Text {
4054                text: Some(Extracted::from_test_value("orphan".to_owned())),
4055            },
4056        };
4057        let valid_message = Message::User {
4058            id: "valid-message".to_owned(),
4059            session_id: session.id.clone(),
4060            timestamp: Utc::now(),
4061            options: ProviderOptions::new(),
4062        };
4063        let valid_part = Part {
4064            session_id: session.id.clone(),
4065            id: "valid-part".to_owned(),
4066            message_id: valid_message.id().to_owned(),
4067            ordinal: 0,
4068            provenance: crate::wire::Provenance::Conversational,
4069            options: ProviderOptions::new(),
4070            kind: PartKind::Text {
4071                text: Some(Extracted::from_test_value("kept".to_owned())),
4072            },
4073        };
4074
4075        let mut validator = IngestValidator::default();
4076        validator
4077            .push(&store, 0, IngestEvent::Session(session.clone()))
4078            .await?;
4079        let part_outcomes = validator
4080            .push(&store, 1, IngestEvent::Part(orphan_part))
4081            .await?;
4082        assert_eq!(part_outcomes.len(), 1);
4083        assert_eq!(part_outcomes[0].kind, "part");
4084        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4085        assert!(
4086            part_outcomes[0]
4087                .error
4088                .as_ref()
4089                .map(|e| e.message.contains("part event appeared before a message"))
4090                .unwrap_or(false),
4091            "error message must explain the ordering violation: {part_outcomes:?}"
4092        );
4093        validator
4094            .push(&store, 2, IngestEvent::Message(valid_message))
4095            .await?;
4096        validator
4097            .push(&store, 3, IngestEvent::Part(valid_part))
4098            .await?;
4099        validator.finish(&store).await?;
4100
4101        let (sessions, messages, parts) = store.row_counts().await?;
4102        assert_eq!(sessions, 1, "session committed despite the orphan part");
4103        assert_eq!(messages, 1, "valid message committed");
4104        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4105
4106        Ok(())
4107    }
4108
4109    #[tokio::test]
4110    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4111        // Per-event drop: a duplicate message id within a substream drops the
4112        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4113        // session still commits.
4114        let temp = TempDir::new()?;
4115        let store = Store::open_local(temp.path()).await?;
4116        let session = synthetic_session("duplicate-message");
4117        let first = Message::User {
4118            id: "message-1".to_owned(),
4119            session_id: session.id.clone(),
4120            timestamp: Utc::now(),
4121            options: ProviderOptions::new(),
4122        };
4123        let second = Message::Assistant {
4124            id: "message-1".to_owned(),
4125            session_id: session.id.clone(),
4126            timestamp: Utc::now(),
4127            options: ProviderOptions::new(),
4128        };
4129
4130        let mut validator = IngestValidator::default();
4131        validator
4132            .push(&store, 0, IngestEvent::Session(session.clone()))
4133            .await?;
4134        validator
4135            .push(&store, 1, IngestEvent::Message(first))
4136            .await?;
4137        let dup_outcomes = validator
4138            .push(&store, 2, IngestEvent::Message(second))
4139            .await?;
4140        assert_eq!(dup_outcomes.len(), 1);
4141        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4142        assert!(
4143            dup_outcomes[0]
4144                .error
4145                .as_ref()
4146                .map(|e| e.message.contains("duplicate message id message-1"))
4147                .unwrap_or(false),
4148            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4149        );
4150
4151        validator.finish(&store).await?;
4152        let (sessions, messages, _) = store.row_counts().await?;
4153        assert_eq!(sessions, 1, "session committed");
4154        assert_eq!(messages, 1, "only the first message committed");
4155
4156        Ok(())
4157    }
4158
4159    /// Regression: compact_files on `parts` with the blob column tripped a
4160    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4161    /// batches give compact fragments to merge; every `FileData` variant
4162    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4163    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4164    /// write batch mixes null + valid rows in the blob column - benign in
4165    /// release, irrelevant to this regression's scope.
4166    #[tokio::test(flavor = "multi_thread")]
4167    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4168        use crate::wire::{FileData, PartKind, Provenance};
4169        let temp = TempDir::new()?;
4170        let store = Store::open_local(temp.path()).await?;
4171
4172        let session = synthetic_session("compact-blob");
4173        store
4174            .upsert_sessions(std::slice::from_ref(&session))
4175            .await?;
4176
4177        let make_part = |idx: usize, kind: PartKind| Part {
4178            session_id: session.id.clone(),
4179            message_id: format!("msg-{idx}"),
4180            id: format!("part-{idx}"),
4181            ordinal: 0,
4182            provenance: Provenance::Conversational,
4183            options: ProviderOptions::new(),
4184            kind,
4185        };
4186
4187        let batch_a = vec![
4188            make_part(
4189                0,
4190                PartKind::File {
4191                    media_type: Some("text/plain".to_owned()),
4192                    file_name: Some("a.txt".to_owned()),
4193                    data: FileData::Bytes(b"alpha".to_vec()),
4194                },
4195            ),
4196            make_part(
4197                1,
4198                PartKind::File {
4199                    media_type: Some("text/plain".to_owned()),
4200                    file_name: Some("b.txt".to_owned()),
4201                    data: FileData::String("beta".to_owned()),
4202                },
4203            ),
4204        ];
4205        store.upsert_parts(&batch_a).await?;
4206
4207        let batch_b = vec![
4208            make_part(
4209                2,
4210                PartKind::File {
4211                    media_type: Some("application/octet-stream".to_owned()),
4212                    file_name: None,
4213                    data: FileData::Url("https://example.com/file".to_owned()),
4214                },
4215            ),
4216            make_part(
4217                3,
4218                PartKind::File {
4219                    media_type: Some("image/png".to_owned()),
4220                    file_name: Some("c.png".to_owned()),
4221                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4222                },
4223            ),
4224        ];
4225        store.upsert_parts(&batch_b).await?;
4226
4227        store.optimize_indices(None, None).await?.into_result()?;
4228
4229        Ok(())
4230    }
4231
4232    #[tokio::test]
4233    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4234        let temp = TempDir::new()?;
4235        let store = Store::open_local(temp.path()).await?;
4236        let session = synthetic_session("blob");
4237        let message = Message::User {
4238            id: "message-1".to_owned(),
4239            session_id: session.id.clone(),
4240            timestamp: Utc::now(),
4241            options: ProviderOptions::new(),
4242        };
4243        let part = Part {
4244            session_id: session.id.clone(),
4245            id: "part-1".to_owned(),
4246            message_id: message.id().to_owned(),
4247            ordinal: 0,
4248            provenance: crate::wire::Provenance::Conversational,
4249            options: ProviderOptions::new(),
4250            kind: PartKind::File {
4251                media_type: Some("text/plain".to_owned()),
4252                file_name: Some("payload.txt".to_owned()),
4253                data: FileData::Bytes(b"pond".to_vec()),
4254            },
4255        };
4256
4257        let mut validator = IngestValidator::default();
4258        validator
4259            .push(&store, 0, IngestEvent::Session(session.clone()))
4260            .await?;
4261        validator
4262            .push(&store, 1, IngestEvent::Message(message.clone()))
4263            .await?;
4264        validator
4265            .push(&store, 2, IngestEvent::Part(part.clone()))
4266            .await?;
4267        validator.finish(&store).await?;
4268
4269        let stored = store
4270            .get_session(&session.id)
4271            .await?
4272            .expect("session should exist");
4273        let stored_part = &stored.messages[0].parts[0];
4274        assert_eq!(stored_part, &part);
4275
4276        Ok(())
4277    }
4278
4279    //
4280    // `Session.source_agent` and `Session.project` are immutable
4281    // post-first-write because `messages` denormalizes them at first
4282    // ingest; a silent overwrite would desync the denormalized
4283    // copies. pond core's `IngestValidator` probes the existing session
4284    // before the merge_insert and emits a per-row `validation_failed`
4285    // outcome with the typed field name when either changes. Other Session
4286    // fields (options, parent_session_id, created_at, parent_message_id)
4287    // re-write idempotently via merge_insert.
4288
4289    fn base_session() -> Session {
4290        Session {
4291            id: "01HXY00000000001".to_owned(),
4292            parent_session_id: None,
4293            parent_message_id: None,
4294            source_agent: "claude-code".to_owned(),
4295            created_at: Utc::now(),
4296            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4297            options: ProviderOptions::new(),
4298        }
4299    }
4300
4301    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4302        outcomes
4303            .iter()
4304            .filter(|outcome| outcome.status == target)
4305            .count()
4306    }
4307
4308    #[tokio::test(flavor = "multi_thread")]
4309    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4310    -> anyhow::Result<()> {
4311        let temp = TempDir::new()?;
4312        let store = Store::open_local(temp.path()).await?;
4313
4314        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4315        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4316
4317        let mut again = base_session();
4318        again.options.insert("title".to_owned(), json!("renamed"));
4319        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4320        assert_eq!(
4321            count_status(&second, OutcomeStatus::Error),
4322            0,
4323            "options is mutable; the re-ingest must not surface an error: {second:?}",
4324        );
4325        assert_eq!(
4326            count_status(&second, OutcomeStatus::Matched),
4327            1,
4328            "unchanged immutable fields must match-insert via merge_insert",
4329        );
4330
4331        Ok(())
4332    }
4333
4334    #[tokio::test(flavor = "multi_thread")]
4335    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4336        let temp = TempDir::new()?;
4337        let store = Store::open_local(temp.path()).await?;
4338
4339        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4340        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4341
4342        let mut tampered = base_session();
4343        tampered.source_agent = "codex-cli".to_owned();
4344        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4345        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4346        let err_row = second
4347            .iter()
4348            .find(|outcome| outcome.status == OutcomeStatus::Error)
4349            .expect("error outcome present");
4350        let err = err_row.error.as_ref().expect("error body present");
4351        assert_eq!(err.field, Some("source_agent"));
4352        assert_eq!(err.reason, Some("immutable"));
4353
4354        // The stored row stayed on the original adapter - no silent rewrite.
4355        let stored = store
4356            .get_session(&base_session().id)
4357            .await?
4358            .expect("session row survives the rejected re-ingest");
4359        assert_eq!(stored.session.source_agent, "claude-code");
4360
4361        Ok(())
4362    }
4363
4364    #[tokio::test(flavor = "multi_thread")]
4365    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4366        let temp = TempDir::new()?;
4367        let store = Store::open_local(temp.path()).await?;
4368
4369        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4370        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4371
4372        let mut tampered = base_session();
4373        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4374        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4375        let err_row = second
4376            .iter()
4377            .find(|outcome| outcome.status == OutcomeStatus::Error)
4378            .expect("project change must surface an error outcome");
4379        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4380
4381        let stored = store
4382            .get_session(&base_session().id)
4383            .await?
4384            .expect("session row survives");
4385        assert_eq!(
4386            stored.session.project.as_str(),
4387            "/home/me/proj",
4388            "stored project must remain the original",
4389        );
4390
4391        Ok(())
4392    }
4393
4394    #[tokio::test(flavor = "multi_thread")]
4395    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4396        // Regression guard: re-ingesting an existing session with NEW
4397        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4398        // on `BatchCounts`, and per-row outcomes must mark the new message
4399        // rows `Inserted` while the session row is `Matched`. The prior
4400        // implementation derived all per-row statuses from the batch-level
4401        // session inserted count, which silently flipped the new messages
4402        // into `Matched` (visible as "up to date" in the CLI bar tail).
4403        use crate::wire::Provenance;
4404        let temp = TempDir::new()?;
4405        let store = Store::open_local(temp.path()).await?;
4406        let session = base_session();
4407
4408        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4409            session_id: session.id.clone(),
4410            id: part_id.to_owned(),
4411            message_id: message_id.to_owned(),
4412            ordinal: 0,
4413            provenance: Provenance::Conversational,
4414            options: ProviderOptions::new(),
4415            kind: PartKind::Text {
4416                text: Some(Extracted::from_test_value(body.to_owned())),
4417            },
4418        };
4419        let user_message = |id: &str| Message::User {
4420            id: id.to_owned(),
4421            session_id: session.id.clone(),
4422            timestamp: Utc::now(),
4423            options: ProviderOptions::new(),
4424        };
4425
4426        // First pass: 2 messages land fresh.
4427        let mut validator = IngestValidator::default();
4428        validator
4429            .push(&store, 0, IngestEvent::Session(session.clone()))
4430            .await?;
4431        validator
4432            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4433            .await?;
4434        validator
4435            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4436            .await?;
4437        validator
4438            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4439            .await?;
4440        validator
4441            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4442            .await?;
4443        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4444        assert_eq!(first_counts.sessions_inserted, 1);
4445        assert_eq!(first_counts.messages_inserted_total, 2);
4446        assert_eq!(first_counts.messages_inserted_searchable, 2);
4447
4448        // Second pass: same session id, 3 NEW messages.
4449        let mut validator = IngestValidator::default();
4450        validator
4451            .push(&store, 0, IngestEvent::Session(session.clone()))
4452            .await?;
4453        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4454            let pid = format!("p{}", idx + 3);
4455            validator
4456                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4457                .await?;
4458            validator
4459                .push(
4460                    &store,
4461                    idx * 2 + 2,
4462                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4463                )
4464                .await?;
4465        }
4466        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4467
4468        assert_eq!(
4469            second_counts.sessions_inserted, 0,
4470            "existing session row must report as Matched, not Inserted",
4471        );
4472        assert_eq!(second_counts.sessions_matched, 1);
4473        assert_eq!(
4474            second_counts.messages_inserted_total, 3,
4475            "the three NEW messages must register as Inserted in BatchCounts",
4476        );
4477        assert_eq!(
4478            second_counts.messages_inserted_searchable, 3,
4479            "all three new messages carry conversational text -> searchable",
4480        );
4481        assert_eq!(second_counts.messages_matched_total, 0);
4482        assert_eq!(second_counts.parts_inserted, 3);
4483        assert_eq!(second_counts.parts_matched, 0);
4484
4485        // Per-row outcomes mirror the BatchCounts shape: the session row is
4486        // Matched, every new message + part row is Inserted.
4487        let session_outcome = second_outcomes
4488            .iter()
4489            .find(|outcome| outcome.kind == "session")
4490            .expect("session-row outcome present");
4491        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4492        for outcome in &second_outcomes {
4493            if outcome.kind == "message" || outcome.kind == "part" {
4494                assert_eq!(
4495                    outcome.status,
4496                    OutcomeStatus::Inserted,
4497                    "new row must be Inserted, got: {outcome:?}",
4498                );
4499            }
4500        }
4501        Ok(())
4502    }
4503
4504    /// Ingest `count` synthetic messages spread across a handful of sessions
4505    /// and projects, each with conversational `search_text`. Returns the store
4506    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4507    async fn store_with_messages(
4508        temp: &TempDir,
4509        count: usize,
4510    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4511        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4512    }
4513
4514    /// Same as [`store_with_messages`] but tests optimize with a custom
4515    /// IVF_PQ activation threshold.
4516    async fn store_with_messages_at_threshold(
4517        temp: &TempDir,
4518        count: usize,
4519        _vector_threshold: usize,
4520    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4521        let store = Store::open_local(temp.path()).await?;
4522        let sessions = 8.min(count.max(1));
4523        let mut events = Vec::new();
4524        for s in 0..sessions {
4525            events.push(IngestEvent::Session(Session {
4526                id: format!("session-{s}"),
4527                parent_session_id: None,
4528                parent_message_id: None,
4529                source_agent: "claude-code".to_owned(),
4530                created_at: Utc::now(),
4531                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4532                options: ProviderOptions::new(),
4533            }));
4534            for i in (s..count).step_by(sessions) {
4535                let message_id = format!("msg-{i}");
4536                events.push(IngestEvent::Message(Message::User {
4537                    id: message_id.clone(),
4538                    session_id: format!("session-{s}"),
4539                    timestamp: Utc::now(),
4540                    options: ProviderOptions::new(),
4541                }));
4542                events.push(IngestEvent::Part(Part {
4543                    session_id: format!("session-{s}"),
4544                    id: format!("{message_id}-part"),
4545                    message_id,
4546                    ordinal: 0,
4547                    provenance: crate::wire::Provenance::Conversational,
4548                    options: ProviderOptions::new(),
4549                    kind: PartKind::Text {
4550                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4551                    },
4552                }));
4553            }
4554        }
4555        ingest_events(&store, events).await?;
4556        let keys = (0..count)
4557            .map(|i| MessageKey {
4558                session_id: format!("session-{}", i % sessions),
4559                message_id: format!("msg-{i}"),
4560            })
4561            .collect();
4562        Ok((store, keys))
4563    }
4564
4565    /// A deterministic pseudo-random vector of the production dimension.
4566    fn synthetic_vector(seed: usize) -> Vec<f32> {
4567        let mut state = (seed as u64)
4568            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4569            .wrapping_add(1);
4570        (0..embedding_dim())
4571            .map(|_| {
4572                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4573                #[allow(clippy::cast_precision_loss)]
4574                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4575                unit - 1.0
4576            })
4577            .collect()
4578    }
4579
4580    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4581    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4582        keys.iter()
4583            .enumerate()
4584            .map(|(seed, key)| EmbeddedMessage {
4585                session_id: key.session_id.clone(),
4586                id: key.message_id.clone(),
4587                vector: synthetic_vector(seed),
4588            })
4589            .collect()
4590    }
4591
4592    fn embedding_update_batch_with_model(
4593        rows: &[EmbeddedMessage],
4594        model: &str,
4595    ) -> Result<RecordBatch> {
4596        let mut batch = embedding_update_batch(rows)?;
4597        let columns = batch
4598            .columns()
4599            .iter()
4600            .take(3)
4601            .cloned()
4602            .chain(std::iter::once(
4603                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4604            ))
4605            .collect::<Vec<_>>();
4606        batch = RecordBatch::try_new(batch.schema(), columns)?;
4607        Ok(batch)
4608    }
4609
4610    #[tokio::test]
4611    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4612        let temp = TempDir::new()?;
4613        // 4 messages cycle session-0..session-3, so `session-3` is a real
4614        // partition. Scalar-index pushdown is volume-independent: the planner
4615        // emits `ScalarIndexQuery` whenever the index exists.
4616        let (store, keys) = store_with_messages(&temp, 4).await?;
4617        store.write_embeddings(&embedded(&keys)).await?;
4618        store.optimize_indices(None, None).await?.into_result()?;
4619
4620        let query = vec![0.01_f32; embedding_dim()];
4621        let plan = store
4622            .explain_vector_plan(
4623                &query,
4624                10,
4625                &Predicate::Eq("session_id", "session-3".into()),
4626                None,
4627            )
4628            .await?;
4629
4630        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4631        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4632        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4633        // and unrelated.)
4634        assert!(
4635            plan.contains("ScalarIndexQuery"),
4636            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4637        );
4638        let predicate_postfiltered = plan
4639            .lines()
4640            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4641        assert!(
4642            !predicate_postfiltered,
4643            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4644        );
4645        Ok(())
4646    }
4647
4648    #[tokio::test]
4649    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4650        let temp = TempDir::new()?;
4651        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4652
4653        // First batch: 255 vectors, one below threshold. Optimize does not
4654        // create the IVF_PQ because the trigger is not met.
4655        store.write_embeddings(&embedded(&keys[..255])).await?;
4656        store
4657            .optimize_indices_with_vector_threshold(256)
4658            .await?
4659            .into_result()?;
4660        assert!(
4661            !store
4662                .handle
4663                .messages_index_names()
4664                .await?
4665                .iter()
4666                .any(|name| name == MESSAGES_VECTOR_INDEX),
4667            "IVF_PQ must not exist below the activation threshold",
4668        );
4669
4670        // Next batch: one more vector. Total reaches 256; optimize creates
4671        // the IVF_PQ.
4672        store.write_embeddings(&embedded(&keys[255..256])).await?;
4673        store
4674            .optimize_indices_with_vector_threshold(256)
4675            .await?
4676            .into_result()?;
4677        assert!(
4678            store
4679                .handle
4680                .messages_index_names()
4681                .await?
4682                .iter()
4683                .any(|name| name == MESSAGES_VECTOR_INDEX),
4684            "optimize must create the IVF_PQ once the threshold is crossed",
4685        );
4686
4687        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4688        // non-null subset and a planted vector is retrievable.
4689        let hits = store
4690            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4691            .await?;
4692        assert!(
4693            hits.iter().any(|(key, _)| key == &keys[0]),
4694            "an embedded row is retrievable via the index",
4695        );
4696        Ok(())
4697    }
4698
4699    #[tokio::test]
4700    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4701    {
4702        let temp = TempDir::new()?;
4703        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4704        let old_rows = embedded(&keys);
4705        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4706        store
4707            .handle
4708            .merge_update(Table::Messages, old_batch, old_rows.len())
4709            .await?;
4710        store
4711            .optimize_indices_with_vector_threshold(256)
4712            .await?
4713            .into_result()?;
4714        assert!(
4715            store
4716                .handle
4717                .messages_index_names()
4718                .await?
4719                .iter()
4720                .any(|name| name == MESSAGES_VECTOR_INDEX),
4721            "IVF_PQ must exist before a model swap",
4722        );
4723        assert_eq!(store.stale_embedding_count().await?, keys.len());
4724
4725        store.drop_vector_index().await?;
4726        let mut pending = Vec::new();
4727        let stream = store.pending_or_stale_messages();
4728        tokio::pin!(stream);
4729        while let Some(row) = stream.next().await {
4730            pending.push(row?);
4731        }
4732        assert_eq!(
4733            pending.len(),
4734            keys.len(),
4735            "force stream should see stale rows"
4736        );
4737        store.write_embeddings(&embedded(&keys)).await?;
4738        assert_eq!(store.stale_embedding_count().await?, 0);
4739        store
4740            .optimize_indices_with_vector_threshold(256)
4741            .await?
4742            .into_result()?;
4743        assert!(
4744            store
4745                .handle
4746                .messages_index_names()
4747                .await?
4748                .iter()
4749                .any(|name| name == MESSAGES_VECTOR_INDEX),
4750            "optimize must rebuild IVF_PQ after force re-embed",
4751        );
4752
4753        let stream = store.pending_or_stale_messages();
4754        tokio::pin!(stream);
4755        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4756        Ok(())
4757    }
4758
4759    #[tokio::test]
4760    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4761        // Regression: `_row_last_updated_at_version` can point at a Lance
4762        // manifest version that `cleanup_old_versions` or the auto_cleanup
4763        // hook has since dropped from `Dataset::versions()`. The old code
4764        // silently dropped any session whose row-version was not in the
4765        // visible list, collapsing the staleness-skip map down to recent
4766        // commits and forcing `pond sync` to re-touch every file. The fix
4767        // falls back to the oldest still-visible commit timestamp - a
4768        // sound upper bound on the row's true ingest time.
4769        let temp = TempDir::new()?;
4770        let (store, _keys) = store_with_messages(&temp, 4).await?;
4771
4772        // Produce several distinct manifest versions on `sessions` so the
4773        // older ones become eligible for cleanup.
4774        for tag in 0..3 {
4775            let extra = synthetic_session(&format!("extra-{tag}"));
4776            store.upsert_sessions(&[extra]).await?;
4777        }
4778
4779        // Prune everything older than ~now, leaving only the latest manifest.
4780        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4781        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4782        // Lance's `delete_unverified` floor still protects in-flight files.
4783        let dataset = store.handle.dataset(Table::Sessions).await?;
4784        dataset
4785            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4786            .await
4787            .context("cleanup_old_versions failed")?;
4788
4789        let map = store.session_last_ingested_at().await?;
4790        let session_count = store.row_counts().await?.0;
4791        assert!(
4792            map.len() >= session_count,
4793            "watermark map ({}) must still cover every session ({}) after \
4794             version cleanup; an empty fallback regresses pond sync to a \
4795             full re-scan",
4796            map.len(),
4797            session_count,
4798        );
4799        Ok(())
4800    }
4801
4802    #[tokio::test]
4803    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4804        let temp = TempDir::new()?;
4805        let (store, keys) = store_with_messages(&temp, 10).await?;
4806
4807        let before = store.embedding_progress().await?;
4808        assert_eq!(before.embedded, 0);
4809        assert_eq!(before.total, 10);
4810        assert_eq!(before.model, crate::embed::model_id());
4811
4812        store.write_embeddings(&embedded(&keys[..4])).await?;
4813        let partial = store.embedding_progress().await?;
4814        assert_eq!(partial.embedded, 4);
4815        assert_eq!(partial.total, 10);
4816
4817        store.write_embeddings(&embedded(&keys[4..])).await?;
4818        let full = store.embedding_progress().await?;
4819        assert_eq!(full.embedded, 10);
4820        assert_eq!(full.total, 10);
4821        Ok(())
4822    }
4823}