Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{
32        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33        SessionFrom,
34    },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40    handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45    pub sessions: usize,
46    pub messages: usize,
47    pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52    pub sessions: u64,
53    pub messages: u64,
54    pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59    pub rows: LanceArchiveCounts,
60    pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65    pub rows: LanceArchiveCounts,
66    pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71    pub sessions: Vec<IndexIntent>,
72    pub messages: Vec<IndexIntent>,
73    pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78        [
79            (Table::Sessions, &self.sessions),
80            (Table::Messages, &self.messages),
81            (Table::Parts, &self.parts),
82        ]
83    }
84}
85
86/// A message awaiting embedding: its primary key plus the `search_text` to
87/// embed. The vector lives on the same `messages` row, so no denormalized
88/// filter columns are needed (spec.md#session-embed-from-canonical).
89#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91    pub session_id: String,
92    pub id: String,
93    pub search_text: String,
94}
95
96/// One embedded message: a primary key and the vector to store. `pond embed`
97/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
98#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100    pub session_id: String,
101    pub id: String,
102    pub vector: Vec<f32>,
103}
104
105/// Message metadata used to hydrate search hits after retriever ranking.
106#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108    pub message_id: String,
109    pub session_id: String,
110    pub role: String,
111    pub project: String,
112    pub source_agent: String,
113    pub timestamp: DateTime<Utc>,
114    pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119    pub session_id: String,
120    pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125    Inserted,
126    Matched,
127}
128
129/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
130/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
131/// results so the CLI can render compaction-skipped (under writer contention)
132/// distinctly from index-build failure (real problem).
133#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135    pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139    /// True if any table's indices phase reported a non-conflict failure.
140    /// `SkippedConflict` is expected under contention and does not count.
141    pub fn any_indices_failed(&self) -> bool {
142        self.tables.iter().any(|t| t.indices.is_failed())
143    }
144
145    /// Treat any `Failed` phase as an error. Tests that don't run under
146    /// contention use this to keep their existing `.await?` style: a real
147    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
148    pub fn into_result(self) -> Result<Self> {
149        for table in &self.tables {
150            if let PhaseOutcome::Failed(error) = &table.indices {
151                anyhow::bail!(
152                    "indices phase failed on {}: {error:#}",
153                    table.table.as_str()
154                );
155            }
156            if let PhaseOutcome::Failed(error) = &table.compaction {
157                anyhow::bail!(
158                    "compaction phase failed on {}: {error:#}",
159                    table.table.as_str()
160                );
161            }
162        }
163        Ok(self)
164    }
165}
166
167/// What `pond status` reports: where the data lives, total rows per table,
168/// and a per-(adapter, project) breakdown built from one `messages` scan.
169#[derive(Debug, Clone)]
170pub struct CorpusStats {
171    pub data_url: Url,
172    pub totals: RowTotals,
173    /// One entry per adapter present in the corpus. When `include_subagents`
174    /// is false (the CLI default), sub-agent rows (`source_agent` containing
175    /// `/`) are filtered out so only the main-agent sessions appear. When
176    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
177    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
178    /// this into registry order at render time so the tree matches the
179    /// discovery picker.
180    pub adapters: Vec<AdapterStats>,
181    /// Whether the rollup includes sub-agent sessions. The renderer prints a
182    /// hint about `--include-subagents` when this is false so users know the
183    /// `totals` row above counts sessions that aren't broken down below.
184    pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189    pub sessions: u64,
190    pub messages: u64,
191    pub parts: u64,
192}
193
194/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
195/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
196/// without `search_text` produce no vector. `embedded` is the subset of those
197/// already carrying a vector under the current [`embed::model_id()`]. The pending
198/// backlog is `total - embedded`.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201    pub embedded: usize,
202    pub total: usize,
203    pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
209    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
210    /// `include_subagents` is on.
211    pub adapter: String,
212    pub sessions: u64,
213    pub messages: u64,
214    /// Projects under this adapter, sorted by message count desc, then by
215    /// project name asc.
216    pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221    pub project: String,
222    pub sessions: u64,
223    pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228    messages: u64,
229    session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234    pub message: &'a Message,
235    pub parts: &'a [Part],
236    pub search_text: Option<&'a str>,
237}
238
239impl Store {
240    /// Open against a local filesystem URL or a remote one for which the
241    /// caller has no extra options to pass (env vars suffice). CLI verbs
242    /// that load `[storage]` from config should call
243    /// [`Store::open_with_options`] instead so the same options flow into
244    /// every dataset open and write.
245    pub async fn open(location: &Url) -> Result<Self> {
246        Ok(Self {
247            handle: Handle::open(location).await?,
248        })
249    }
250
251    /// Open with object-store options (S3 creds, region, endpoint, ...)
252    /// threaded through Lance verbatim. Keys are the standard `object_store`
253    /// config names; pond does not parse them. Empty options + default caps
254    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
255    /// config block via [`crate::substrate::RuntimeCaps`].
256    pub async fn open_with_options(
257        location: &Url,
258        storage_options: std::collections::HashMap<String, String>,
259        caps: crate::substrate::RuntimeCaps,
260    ) -> Result<Self> {
261        Ok(Self {
262            handle: Handle::open_with_options(location, storage_options, caps).await?,
263        })
264    }
265
266    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
267    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
268    /// through [`Store::open_with_options`] so the production policy is
269    /// applied; tests get the backend-aware local-FS defaults.
270    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271        let url = config::url_for_path(path)?;
272        Self::open_with_options(
273            &url,
274            std::collections::HashMap::new(),
275            crate::substrate::RuntimeCaps::default(),
276        )
277        .await
278    }
279
280    /// Export clean, index-free Lance datasets into `dest`.
281    ///
282    /// This rewrites the visible rows of each table instead of copying the
283    /// dataset roots. The resulting manifests therefore contain no references
284    /// to the source store's `_indices`, while `messages.vector` and
285    /// `messages.embedding_model` remain ordinary data columns and are
286    /// preserved.
287    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288        std::fs::create_dir_all(dest)
289            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290        let (sessions, sessions_version) = self
291            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292            .await?;
293        let (messages, messages_version) = self
294            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295            .await?;
296        let (parts, parts_version) = self
297            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298            .await?;
299        Ok(LanceArchiveExport {
300            rows: LanceArchiveCounts {
301                sessions,
302                messages,
303                parts,
304            },
305            source_versions: LanceArchiveVersions {
306                sessions: sessions_version,
307                messages: messages_version,
308                parts: parts_version,
309            },
310        })
311    }
312
313    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314        let sessions_dataset =
315            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316        let messages_dataset =
317            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319        let (sessions, sessions_inserted) = self
320            .import_clean_table(Table::Sessions, sessions_dataset)
321            .await?;
322        let (messages, messages_inserted) = self
323            .import_clean_table(Table::Messages, messages_dataset)
324            .await?;
325        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326        Ok(LanceArchiveImport {
327            rows: LanceArchiveCounts {
328                sessions,
329                messages,
330                parts,
331            },
332            inserted: LanceArchiveCounts {
333                sessions: sessions_inserted,
334                messages: messages_inserted,
335                parts: parts_inserted,
336            },
337        })
338    }
339
340    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341        let dataset = self.handle.dataset(table).await?;
342        let source_version = dataset.version_id();
343        let schema = export_schema(table);
344        let mut stream = dataset
345            .scan()
346            .try_into_stream()
347            .await
348            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349        let dest_uri = dest
350            .to_str()
351            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353        let mut rows = 0usize;
354        let mut wrote = false;
355        while let Some(batch) = stream.next().await {
356            let batch = batch
357                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358            rows += batch.num_rows();
359            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360            let mut params = write_params_for_create();
361            if wrote {
362                params.mode = WriteMode::Append;
363            }
364            Dataset::write(reader, dest_uri, Some(params))
365                .await
366                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367            wrote = true;
368        }
369
370        if !wrote {
371            let batch = RecordBatch::new_empty(schema.clone());
372            let reader = RecordBatchIterator::new([Ok(batch)], schema);
373            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374                .await
375                .with_context(|| {
376                    format!("failed to write empty {} archive table", table.as_str())
377                })?;
378        }
379        Ok((rows, source_version))
380    }
381
382    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383        let mut stream = dataset
384            .scan()
385            .try_into_stream()
386            .await
387            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388        let mut rows = 0usize;
389        let mut inserted = 0usize;
390        while let Some(batch) = stream.next().await {
391            let batch = batch
392                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393            let row_count = batch.num_rows();
394            rows += row_count;
395            inserted += self
396                .handle
397                .merge_insert(table, batch, row_count)
398                .await
399                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400                as usize;
401        }
402        Ok((rows, inserted))
403    }
404
405    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406        if sessions.is_empty() {
407            return Ok(Vec::new());
408        }
409        let batches = sessions_batches(sessions)?;
410        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411        // Per-row outcomes; no fold here (see `pond index optimize`).
412        Ok(statuses_from_inserted(sessions.len(), inserted))
413    }
414
415    /// Batched write path used by the adapter ingest loop and by the wire
416    /// handler's final flush. Receives N completed substreams from the
417    /// validator and:
418    ///
419    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
420    ///      per session, sequentially. Sessions that fail produce one Error
421    ///      outcome and are excluded from the write batch.
422    ///   2. Deduplicates in-batch at the substream level: when two substreams
423    ///      in the same batch share a `session_id` (Claude Code's subagent
424    ///      files reuse their parent's id), the first occurrence wins. The
425    ///      second is either *merged* (same `source_agent` + `project`:
426    ///      messages/parts append, no duplicate rows) or *rejected*
427    ///      (different `project` - the subagent-vs-parent case). Row-level
428    ///      duplicates that slip past here are caught downstream by Lance's
429    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
430    ///      (invariant 17): this layer's job is preserving substream merge
431    ///      semantics, not policing the PK uniqueness Lance handles itself.
432    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
433    ///      parts) across every valid substream.
434    ///   4. Fires the three `merge_insert` calls in parallel via
435    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
436    ///      independent, so these proceed concurrently.
437    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
438    async fn upsert_session_batch(
439        &self,
440        substreams: Vec<CompletedSubstream>,
441    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442        if substreams.is_empty() {
443            return Ok((Vec::new(), BatchCounts::default()));
444        }
445
446        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447        let mut counts = BatchCounts::default();
448
449        // In-batch dedup. First occurrence of each session_id wins; later
450        // occurrences either merge or get rejected. Iteration order preserves
451        // original substream order so outcomes index correctly.
452        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453        let mut by_session_id: std::collections::HashMap<String, usize> =
454            std::collections::HashMap::with_capacity(substreams.len());
455        for substream in substreams {
456            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457                let existing = &merged[existing_idx];
458                if existing.session.source_agent != substream.session.source_agent
459                    || existing.session.project != substream.session.project
460                {
461                    // Subagent-vs-parent class. The first occurrence's
462                    // metadata stays authoritative; this substream is
463                    // rejected on the same immutable-field axis as the
464                    // storage-side check.
465                    let reason = if existing.session.source_agent != substream.session.source_agent
466                    {
467                        IngestError::ImmutableField {
468                            field: "source_agent",
469                            session_id: substream.session.id.clone(),
470                            stored: existing.session.source_agent.clone(),
471                            attempted: substream.session.source_agent.clone(),
472                        }
473                    } else {
474                        IngestError::ImmutableField {
475                            field: "project",
476                            session_id: substream.session.id.clone(),
477                            stored: (*existing.session.project).clone(),
478                            attempted: (*substream.session.project).clone(),
479                        }
480                    };
481                    let field = match &reason {
482                        IngestError::ImmutableField { field, .. } => Some(*field),
483                    };
484                    let reason_key = match field {
485                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487                        _ => DROP_REASON_UNCATEGORIZED,
488                    };
489                    outcomes.extend(error_outcomes_for_substream(
490                        substream.session_index,
491                        &substream.session,
492                        &substream.messages,
493                        reason.to_string(),
494                        field,
495                        reason_key,
496                    ));
497                    continue;
498                }
499                // Same session, same metadata: merge messages. Dedup message
500                // ids defensively (within one batch, the validator's seen
501                // sets are per-substream so cross-substream dups can happen
502                // legally if both files re-emit the same row).
503                let existing = &mut merged[existing_idx];
504                let mut seen: std::collections::HashSet<String> = existing
505                    .messages
506                    .iter()
507                    .map(|m| m.message.id().to_owned())
508                    .collect();
509                for msg in substream.messages {
510                    if seen.insert(msg.message.id().to_owned()) {
511                        existing.messages.push(msg);
512                    }
513                }
514                continue;
515            }
516            by_session_id.insert(substream.session.id.clone(), merged.len());
517            merged.push(substream);
518        }
519
520        // Pre-existence sweep: one scan per table keyed on the batch's
521        // session_ids, capped at the substream count. Replaces the prior
522        // N-sequential `find_session` calls and gives us honest per-row
523        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
524        let session_id_values: Vec<ScalarValue> = merged
525            .iter()
526            .map(|substream| ScalarValue::String(substream.session.id.clone()))
527            .collect();
528        let existing_sessions: std::collections::HashMap<String, Session> =
529            if session_id_values.is_empty() {
530                std::collections::HashMap::new()
531            } else {
532                let batch = self
533                    .handle
534                    .scan_batch(
535                        Table::Sessions,
536                        Some(&Predicate::In("id", session_id_values.clone())),
537                        &[],
538                    )
539                    .await?;
540                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541                for row in 0..batch.num_rows() {
542                    let session = session_from_batch(&batch, row)?;
543                    map.insert(session.id.clone(), session);
544                }
545                map
546            };
547        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548            HashSet::new()
549        } else {
550            let batch = self
551                .handle
552                .scan_batch(
553                    Table::Messages,
554                    Some(&Predicate::In("session_id", session_id_values.clone())),
555                    &["session_id", "id"],
556                )
557                .await?;
558            let mut set = HashSet::with_capacity(batch.num_rows());
559            for row in 0..batch.num_rows() {
560                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561                let mid = string(&batch, "id", row)?.context("message id is null")?;
562                set.insert((sid, mid));
563            }
564            set
565        };
566        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567            HashSet::new()
568        } else {
569            let batch = self
570                .handle
571                .scan_batch(
572                    Table::Parts,
573                    Some(&Predicate::In("session_id", session_id_values)),
574                    &["session_id", "message_id", "id"],
575                )
576                .await?;
577            let mut set = HashSet::with_capacity(batch.num_rows());
578            for row in 0..batch.num_rows() {
579                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581                let pid = string(&batch, "id", row)?.context("part id is null")?;
582                set.insert((sid, mid, pid));
583            }
584            set
585        };
586
587        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588        for substream in merged {
589            if let Some(existing) = existing_sessions.get(&substream.session.id)
590                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591            {
592                let field = match &failure {
593                    IngestError::ImmutableField { field, .. } => Some(*field),
594                };
595                let reason_key = match field {
596                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598                    _ => DROP_REASON_UNCATEGORIZED,
599                };
600                outcomes.extend(error_outcomes_for_substream(
601                    substream.session_index,
602                    &substream.session,
603                    &substream.messages,
604                    failure.to_string(),
605                    field,
606                    reason_key,
607                ));
608                continue;
609            }
610            writeable.push(substream);
611        }
612
613        if writeable.is_empty() {
614            outcomes.sort_by_key(|outcome| outcome.index);
615            return Ok((outcomes, counts));
616        }
617
618        let sessions_owned: Vec<Session> = writeable
619            .iter()
620            .map(|substream| substream.session.clone())
621            .collect();
622        let message_rows: Vec<MessageBatchRow<'_>> = writeable
623            .iter()
624            .flat_map(|substream| {
625                substream.messages.iter().map(|buffered| MessageBatchRow {
626                    message: &buffered.message,
627                    source_agent: &substream.session.source_agent,
628                    project: &substream.session.project,
629                    search_text: buffered.search_text.as_deref(),
630                })
631            })
632            .collect();
633        let part_rows: Vec<Part> = writeable
634            .iter()
635            .flat_map(|substream| {
636                substream.messages.iter().flat_map(|buffered| {
637                    buffered
638                        .parts
639                        .iter()
640                        .map(|buffered_part| buffered_part.part.clone())
641                })
642            })
643            .collect();
644
645        let session_batches = sessions_batches(&sessions_owned)?;
646        let message_batches = messages_batches(&message_rows)?;
647        let part_batches = parts_batches(&part_rows)?;
648
649        // Merge_insert returns a batch-level inserted count which we cross-
650        // check against our pre-existence sets, but for per-row truth we
651        // attribute through the sets themselves (next loop). Under
652        // single-writer the two agree exactly; under a hostile concurrent
653        // writer the sets are authoritative for THIS request's wire shape -
654        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
655        // distinction informational, not behavioral.
656        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660        )?;
661
662        for substream in &writeable {
663            outcomes.extend(success_outcomes_for_substream(
664                substream.session_index,
665                &substream.session,
666                &substream.messages,
667                &existing_sessions,
668                &existing_message_pks,
669                &existing_part_pks,
670                &mut counts,
671            ));
672        }
673
674        outcomes.sort_by_key(|outcome| outcome.index);
675        Ok((outcomes, counts))
676    }
677
678    pub async fn upsert_messages(
679        &self,
680        session: &Session,
681        messages: &[MessageWrite<'_>],
682    ) -> Result<Vec<UpsertStatus>> {
683        if messages.is_empty() {
684            return Ok(Vec::new());
685        }
686
687        let rows = messages
688            .iter()
689            .map(|write| MessageBatchRow {
690                message: write.message,
691                source_agent: &session.source_agent,
692                project: &session.project,
693                search_text: write.search_text,
694            })
695            .collect::<Vec<_>>();
696        let batches = messages_batches(&rows)?;
697        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698        Ok(statuses_from_inserted(messages.len(), inserted))
699    }
700
701    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702        if parts.is_empty() {
703            return Ok(Vec::new());
704        }
705        let batches = parts_batches(parts)?;
706        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707        Ok(statuses_from_inserted(parts.len(), inserted))
708    }
709
710    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711        let Some(session) = self.find_session(session_id).await? else {
712            return Ok(None);
713        };
714        let messages = self.messages_for_session(session_id).await?;
715        Ok(Some(SessionWithMessages { session, messages }))
716    }
717
718    /// Every session id currently in the store, unsorted.
719    pub async fn session_ids(&self) -> Result<Vec<String>> {
720        let batch = self
721            .handle
722            .scan_batch(Table::Sessions, None, &["id"])
723            .await?;
724        let mut ids = Vec::with_capacity(batch.num_rows());
725        for row in 0..batch.num_rows() {
726            if let Some(id) = string(&batch, "id", row)? {
727                ids.push(id);
728            }
729        }
730        Ok(ids)
731    }
732
733    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734        let batch = self
735            .handle
736            .scan_batch(
737                Table::Sessions,
738                Some(&Predicate::Eq(
739                    "parent_session_id",
740                    parent_session_id.into(),
741                )),
742                &[
743                    "id",
744                    "parent_session_id",
745                    "parent_message_id",
746                    "source_agent",
747                    "created_at",
748                    "project",
749                    "options",
750                ],
751            )
752            .await?;
753        let mut sessions = Vec::with_capacity(batch.num_rows());
754        for row in 0..batch.num_rows() {
755            sessions.push(session_from_batch(&batch, row)?);
756        }
757        sessions.sort_by(|left, right| left.id.cmp(&right.id));
758        Ok(sessions)
759    }
760
761    /// `session_id -> wall-clock time of the Lance manifest version that
762    /// last wrote the row` for the per-session staleness skip
763    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
764    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
765    /// and joins it against `Dataset::versions()` for commit timestamps.
766    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767        use lance::deps::arrow_array::UInt64Array;
768
769        let dataset = self.handle.dataset(Table::Sessions).await?;
770        let version_list = dataset.versions().await?;
771        let versions: HashMap<u64, DateTime<Utc>> = version_list
772            .iter()
773            .map(|v| (v.version, v.timestamp))
774            .collect();
775        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
776        // pruned versions from the manifest list, leaving rows whose
777        // `_row_last_updated_at_version` points at a version that no longer
778        // resolves. Those rows are still real and were ingested at some time
779        // <= the oldest still-visible version's commit timestamp - so falling
780        // back to that bound preserves a sound `mtime <= ingested` upper edge
781        // and keeps the staleness skip working after cleanup.
782        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784        let scanner = self
785            .handle
786            .scan(
787                Table::Sessions,
788                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789            )
790            .await?;
791        let mut stream = scanner.try_into_stream().await?;
792        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793        while let Some(batch) = stream.next().await {
794            let batch = batch?;
795            let version_array = batch
796                .column_by_name("_row_last_updated_at_version")
797                .context("missing _row_last_updated_at_version column")?
798                .as_any()
799                .downcast_ref::<UInt64Array>()
800                .context("_row_last_updated_at_version is not UInt64")?;
801            for row in 0..batch.num_rows() {
802                let Some(id) = string(&batch, "id", row)? else {
803                    continue;
804                };
805                if version_array.is_null(row) {
806                    continue;
807                }
808                let version = version_array.value(row);
809                let ts = versions.get(&version).copied().or(oldest_visible_ts);
810                if let Some(ts) = ts {
811                    out.insert(id, ts);
812                }
813            }
814        }
815        Ok(out)
816    }
817
818    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
819    /// Conversational filters to `search_text IS NOT NULL`; Complete and
820    /// Verbatim scan every message. Every mode attaches compact part summaries;
821    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
822    /// lower bound (a message id); the page is bounded by `limit` and a byte
823    /// budget and never cuts mid-message.
824    pub async fn session_view(
825        &self,
826        session_id: &str,
827        params: SessionViewParams<'_>,
828    ) -> Result<GetLookup<SessionPage>> {
829        let Some(session) = self.find_session(session_id).await? else {
830            return Ok(GetLookup::NotFound);
831        };
832
833        let mut rows = match params.mode {
834            ResponseMode::Conversational => self
835                .scan_conversational_messages(session_id)
836                .await?
837                .into_iter()
838                .map(|row| ScanRow {
839                    id: row.message_id,
840                    role: row.role,
841                    timestamp: row.timestamp,
842                    text: Some(row.text.into_inner()),
843                    content: None,
844                })
845                .collect(),
846            ResponseMode::Complete | ResponseMode::Verbatim => {
847                self.scan_all_messages(session_id).await?
848            }
849        };
850        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852        let start_at = match params.after_id {
853            // Append-only stream: a real anchor never vanishes, so an unknown
854            // `after_id` is a stale/mistyped client cursor, not "start over".
855            Some(after) => match rows.iter().position(|row| row.id == after) {
856                Some(idx) => idx + 1,
857                None => return Ok(GetLookup::UnknownAfterId),
858            },
859            None => 0,
860        };
861        let remaining = rows.get(start_at..).unwrap_or(&[]);
862        let (emitted, messages_remaining) = match params.session_from {
863            SessionFrom::Start => {
864                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865                    row.text.as_deref().map_or(0, str::len)
866                });
867                (&remaining[..n], remaining.len() - n)
868            }
869            // Tail: the newest messages that fit `limit` and the byte budget,
870            // dropping oldest first; the newest is always kept and the page
871            // stays chronological so the agent reads the flow forward.
872            SessionFrom::End => {
873                let mut bytes = 0usize;
874                let mut start = remaining.len();
875                for row in remaining.iter().rev() {
876                    if remaining.len() - start >= params.limit {
877                        break;
878                    }
879                    let size = row.text.as_deref().map_or(0, str::len);
880                    if start < remaining.len() && bytes + size > params.budget_bytes {
881                        break;
882                    }
883                    bytes += size;
884                    start -= 1;
885                }
886                (&remaining[start..], start)
887            }
888        };
889        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891        // Conversational/Complete only summarize parts; Verbatim inlines every
892        // part (blobs included).
893        let mut parts_by_message = match params.mode {
894            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895            ResponseMode::Conversational | ResponseMode::Complete => {
896                self.summary_parts_for_messages(session_id, &ids).await?
897            }
898        };
899        let messages = emitted
900            .iter()
901            .map(|row| RetrievedMessage {
902                id: row.id.clone(),
903                role: row.role,
904                timestamp: row.timestamp,
905                text: row.text.clone(),
906                content: row.content.clone(),
907                parts: parts_by_message
908                    .remove(&(session_id.to_owned(), row.id.clone()))
909                    .unwrap_or_default(),
910            })
911            .collect();
912
913        Ok(GetLookup::Found(SessionPage {
914            session,
915            messages,
916            messages_remaining,
917        }))
918    }
919
920    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
921    /// the target with its full parts (paginated by `after_id` over part
922    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
923    /// `None` when no stored message carries `message_id`. Sibling parts are
924    /// carried for summarizing; the target's parts ride `target_parts`.
925    pub async fn message_view(
926        &self,
927        message_id: &str,
928        params: MessageViewParams<'_>,
929    ) -> Result<GetLookup<MessagePage>> {
930        let Some(session_id) = self.session_id_for_message(message_id).await? else {
931            return Ok(GetLookup::NotFound);
932        };
933        let Some(session) = self.find_session(&session_id).await? else {
934            return Ok(GetLookup::NotFound);
935        };
936        let mut rows = self.scan_all_messages(&session_id).await?;
937        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
938        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
939            return Ok(GetLookup::NotFound);
940        };
941
942        let start = target_pos.saturating_sub(params.context_depth);
943        let end = (target_pos + params.context_depth + 1).min(rows.len());
944        let window = &rows[start..end];
945        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
946        // The target's full parts (blobs included) ride the response; siblings
947        // are only summarized, but they share this one window scan.
948        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
949
950        let all_parts = parts_by_message
951            .remove(&(session_id.clone(), message_id.to_owned()))
952            .unwrap_or_default();
953        let start_part = match params.after_id {
954            // Exclusive over ordinals: parts are ordinal-sorted, so the first
955            // part past the anchor's ordinal is the page start. An anchor absent
956            // from the target's parts is a stale/mistyped client cursor.
957            Some(after) => match all_parts.iter().find(|part| part.id == after) {
958                Some(anchor) => all_parts
959                    .iter()
960                    .position(|part| part.ordinal > anchor.ordinal)
961                    .unwrap_or(all_parts.len()),
962                None => return Ok(GetLookup::UnknownAfterId),
963            },
964            None => 0,
965        };
966        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
967        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
968            serde_json::to_string(part).map_or(0, |json| json.len())
969        });
970        let target_parts = remaining_parts[..part_count].to_vec();
971        let target_parts_remaining = remaining_parts.len() - part_count;
972
973        let target_row = &rows[target_pos];
974        let target = RetrievedMessage {
975            id: target_row.id.clone(),
976            role: target_row.role,
977            timestamp: target_row.timestamp,
978            text: target_row.text.clone(),
979            content: target_row.content.clone(),
980            // Target structure is carried in full by `target_parts`.
981            parts: Vec::new(),
982        };
983        let siblings = window
984            .iter()
985            .enumerate()
986            .filter(|(idx, _)| start + idx != target_pos)
987            .map(|(_, row)| RetrievedMessage {
988                id: row.id.clone(),
989                role: row.role,
990                timestamp: row.timestamp,
991                text: row.text.clone(),
992                content: row.content.clone(),
993                parts: parts_by_message
994                    .get(&(session_id.clone(), row.id.clone()))
995                    .cloned()
996                    .unwrap_or_default(),
997            })
998            .collect();
999
1000        Ok(GetLookup::Found(MessagePage {
1001            session,
1002            target,
1003            target_parts,
1004            target_parts_remaining,
1005            siblings,
1006        }))
1007    }
1008
1009    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1010        let batch = self
1011            .handle
1012            .scan_batch(
1013                Table::Messages,
1014                Some(&Predicate::Eq("session_id", session_id.into())),
1015                &["id", "timestamp", "role", "search_text", "content"],
1016            )
1017            .await?;
1018        let mut rows = Vec::with_capacity(batch.num_rows());
1019        for row in 0..batch.num_rows() {
1020            let id = string(&batch, "id", row)?.context("message id is null")?;
1021            let role =
1022                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1023            let timestamp = datetime(&batch, "timestamp", row)?;
1024            rows.push(ScanRow {
1025                id,
1026                role,
1027                timestamp,
1028                text: string(&batch, "search_text", row)?,
1029                content: string(&batch, "content", row)?,
1030            });
1031        }
1032        Ok(rows)
1033    }
1034
1035    /// Conversational scan over one session: rows ordered by
1036    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1037    /// read seam (spec.md#search-prefilter-pushdown).
1038    pub async fn scan_conversational_messages(
1039        &self,
1040        session_id: &str,
1041    ) -> Result<Vec<ConversationalRow>> {
1042        let filter = Predicate::And(vec![
1043            Predicate::Eq("session_id", session_id.into()),
1044            Predicate::IsNotNull("search_text"),
1045        ]);
1046        let batch = self
1047            .handle
1048            .scan_batch(
1049                Table::Messages,
1050                Some(&filter),
1051                &["id", "timestamp", "role", "search_text"],
1052            )
1053            .await?;
1054
1055        let mut rows = Vec::with_capacity(batch.num_rows());
1056        for row in 0..batch.num_rows() {
1057            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1058            let role =
1059                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1060            let timestamp = datetime(&batch, "timestamp", row)?;
1061            let text_str = string(&batch, "search_text", row)?.context(
1062                "search_text null after IsNotNull pushdown - storage invariant violated",
1063            )?;
1064            rows.push(ConversationalRow {
1065                session_id: session_id.to_owned(),
1066                message_id,
1067                role,
1068                timestamp,
1069                text: SearchText(text_str),
1070            });
1071        }
1072        rows.sort_by(|a, b| {
1073            a.timestamp
1074                .cmp(&b.timestamp)
1075                .then_with(|| a.message_id.cmp(&b.message_id))
1076        });
1077        Ok(rows)
1078    }
1079
1080    /// Locate the session id for a stored message. Cheap when only the routing
1081    /// hint is needed - callers that need the messages use `scan_all_messages`.
1082    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1083        let batch = self
1084            .handle
1085            .scan_batch(
1086                Table::Messages,
1087                Some(&Predicate::Eq("id", message_id.into())),
1088                &["session_id"],
1089            )
1090            .await?;
1091        if batch.num_rows() == 0 {
1092            return Ok(None);
1093        }
1094        string(&batch, "session_id", 0)
1095    }
1096
1097    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1098        self.handle.row_counts().await
1099    }
1100
1101    /// Compute the per-adapter / per-project rollup that drives
1102    /// `pond status`. One scan over `messages` projecting the three
1103    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1104    /// aggregated in-memory. Bounded by the cross product of adapters and
1105    /// projects, which stays small on real corpora.
1106    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1107        let scanner = self
1108            .handle
1109            .scan(
1110                Table::Messages,
1111                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1112            )
1113            .await?;
1114        let mut stream = scanner.try_into_stream().await?;
1115        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1116        while let Some(batch) = stream.next().await {
1117            let batch = batch?;
1118            for row in 0..batch.num_rows() {
1119                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1120                let project = string(&batch, "project", row)?.unwrap_or_default();
1121                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1122                let is_subagent = source_agent.contains('/');
1123                if is_subagent && !include_subagents {
1124                    continue;
1125                }
1126                let entry = groups.entry((source_agent, project)).or_default();
1127                entry.messages += 1;
1128                entry.session_ids.insert(session_id);
1129            }
1130        }
1131
1132        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1133        let totals = RowTotals {
1134            sessions: totals_sessions as u64,
1135            messages: totals_messages as u64,
1136            parts: totals_parts as u64,
1137        };
1138
1139        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1140        for ((adapter, project), acc) in groups {
1141            by_adapter.entry(adapter).or_default().push(ProjectStats {
1142                project,
1143                sessions: acc.session_ids.len() as u64,
1144                messages: acc.messages,
1145            });
1146        }
1147
1148        let mut adapters = Vec::with_capacity(by_adapter.len());
1149        for (adapter, mut projects) in by_adapter {
1150            projects.sort_by(|a, b| {
1151                b.messages
1152                    .cmp(&a.messages)
1153                    .then_with(|| a.project.cmp(&b.project))
1154            });
1155            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1156            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1157            adapters.push(AdapterStats {
1158                adapter,
1159                sessions,
1160                messages,
1161                projects,
1162            });
1163        }
1164
1165        Ok(CorpusStats {
1166            data_url: self.handle.location().clone(),
1167            totals,
1168            adapters,
1169            include_subagents,
1170        })
1171    }
1172
1173    /// Write a batch of embeddings into `messages`: set `vector` and
1174    /// `embedding_model` on each row by `(session_id, id)`
1175    /// (spec.md#session-embed-from-canonical). The column update goes through the
1176    /// write seam and lands as a new manifest version (`append-only`).
1177    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1178        if rows.is_empty() {
1179            return Ok(());
1180        }
1181        let batch = embedding_update_batch(rows)?;
1182        self.handle
1183            .merge_update(Table::Messages, batch, rows.len())
1184            .await?;
1185        Ok(())
1186    }
1187
1188    /// Stream the backlog of messages needing embedding: rows with `search_text`
1189    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1190    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1191        try_stream! {
1192            let filter = Predicate::And(vec![
1193                Predicate::IsNull("vector"),
1194                Predicate::IsNotNull("search_text"),
1195            ]);
1196            let projection: &[&str] = &["session_id", "id", "search_text"];
1197            let scanner = self
1198                .handle
1199                .scan(
1200                    Table::Messages,
1201                    ScanOpts::with_predicate_and_projection(&filter, projection),
1202                )
1203                .await?;
1204            let mut batches = scanner
1205                .try_into_stream()
1206                .await
1207                .context("failed to open messages stream")?;
1208            while let Some(batch) = batches.next().await {
1209                let batch = batch?;
1210                for row in 0..batch.num_rows() {
1211                    yield PendingMessage {
1212                        session_id: string(&batch, "session_id", row)?
1213                            .context("session_id is null")?,
1214                        id: string(&batch, "id", row)?.context("message id is null")?,
1215                        search_text: string(&batch, "search_text", row)?
1216                            .context("search_text is null")?,
1217                    };
1218                }
1219            }
1220        }
1221    }
1222
1223    /// Stream messages that are either never embedded or stale under the
1224    /// current model. `pond embed --force` feeds this to the same unconditional
1225    /// merge_update as the normal backlog; the filter makes that semantically
1226    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1227    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1228        try_stream! {
1229            let filter = Predicate::And(vec![
1230                Predicate::IsNotNull("search_text"),
1231                Predicate::Or(vec![
1232                    Predicate::IsNull("vector"),
1233                    Predicate::Ne("embedding_model", embed::model_id().into()),
1234                ]),
1235            ]);
1236            let projection: &[&str] = &["session_id", "id", "search_text"];
1237            let scanner = self
1238                .handle
1239                .scan(
1240                    Table::Messages,
1241                    ScanOpts::with_predicate_and_projection(&filter, projection),
1242                )
1243                .await?;
1244            let mut batches = scanner
1245                .try_into_stream()
1246                .await
1247                .context("failed to open pending-or-stale messages stream")?;
1248            while let Some(batch) = batches.next().await {
1249                let batch = batch?;
1250                for row in 0..batch.num_rows() {
1251                    yield PendingMessage {
1252                        session_id: string(&batch, "session_id", row)?
1253                            .context("session_id is null")?,
1254                        id: string(&batch, "id", row)?.context("message id is null")?,
1255                        search_text: string(&batch, "search_text", row)?
1256                            .context("search_text is null")?,
1257                    };
1258                }
1259            }
1260        }
1261    }
1262
1263    /// BM25 full-text retriever over `messages.search_text`.
1264    pub async fn fts_search(
1265        &self,
1266        query: &str,
1267        limit: usize,
1268        filter: &Predicate,
1269    ) -> Result<Vec<(MessageKey, f32)>> {
1270        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1271        scanner.full_text_search(
1272            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1273        )?;
1274        // Lance ships an autoprojection that silently appends `_score` to FTS
1275        // output when the projection omits it. That behavior is going away;
1276        // we opt into the future explicit-projection contract here so the
1277        // scanner stops emitting a per-call deprecation warning, and we list
1278        // `_score` ourselves since the loop below reads it.
1279        scanner.disable_scoring_autoprojection();
1280        scanner.project(&["session_id", "id", "_score"])?;
1281        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1282        let batch = scanner.try_into_batch().await?;
1283        let mut hits = Vec::with_capacity(batch.num_rows());
1284        for row in 0..batch.num_rows() {
1285            let key = MessageKey {
1286                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1287                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1288            };
1289            hits.push((key, float32(&batch, "_score", row)?));
1290        }
1291        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1292        // order, which varies between runs and across calls with different pool
1293        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1294        // different orderings at the same tied score). Without an explicit
1295        // tiebreak the downstream RRF dedup-rank for a tied target session can
1296        // flip session-to-session, making fusion outcomes nondeterministic.
1297        // Sort by `score desc`, then `(session_id, message_id)` asc.
1298        hits.sort_by(|left, right| {
1299            right
1300                .1
1301                .partial_cmp(&left.1)
1302                .unwrap_or(std::cmp::Ordering::Equal)
1303                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1304                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1305        });
1306        Ok(hits)
1307    }
1308
1309    /// Whether any `messages` row carries a vector (spec.md#search) - the
1310    /// signal that flips search from FTS-only to hybrid. The single-active-
1311    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1312    /// vector belongs to the current model.
1313    pub async fn has_embeddings(&self) -> Result<bool> {
1314        let scope = Predicate::IsNotNull("vector");
1315        let mut scanner = self
1316            .handle
1317            .scan(
1318                Table::Messages,
1319                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1320            )
1321            .await?;
1322        scanner.limit(Some(1), None)?;
1323        let batch = scanner.try_into_batch().await?;
1324        Ok(batch.num_rows() > 0)
1325    }
1326
1327    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1328    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1329    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1330    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1331    /// skip them anyway). The single-active-model invariant lets pond drop
1332    /// the per-row model filter: every non-null vector belongs to the current
1333    /// model.
1334    pub async fn vector_search(
1335        &self,
1336        query: &[f32],
1337        limit: usize,
1338        filter: &Predicate,
1339        search: Option<&config::SearchConfig>,
1340    ) -> Result<Vec<(MessageKey, f32)>> {
1341        let scope = embedded_scope(filter);
1342        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1343        let key = Float32Array::from(query.to_vec());
1344        scanner.nearest("vector", &key, limit)?;
1345        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1346            scanner.nprobes(nprobes);
1347        }
1348        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1349            scanner.refine(refine_factor);
1350        }
1351        // Mirror the explicit-projection contract from `fts_search`: opt out
1352        // of `_distance` autoprojection and list it ourselves since the loop
1353        // below reads it.
1354        scanner.disable_scoring_autoprojection();
1355        scanner.project(&["session_id", "id", "_distance"])?;
1356        let batch = scanner.try_into_batch().await?;
1357        let mut hits = Vec::with_capacity(batch.num_rows());
1358        for row in 0..batch.num_rows() {
1359            let key = MessageKey {
1360                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1361                message_id: string(&batch, "id", row)?.context("message id is null")?,
1362            };
1363            hits.push((key, float32(&batch, "_distance", row)?));
1364        }
1365        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1366        // emit hits with effectively identical `_distance` in fragment-dependent
1367        // order, which makes RRF dedup-ranks nondeterministic for tied
1368        // neighbors. Sort by distance asc (smaller = more similar), then by
1369        // `(session_id, message_id)` asc.
1370        hits.sort_by(|left, right| {
1371            left.1
1372                .partial_cmp(&right.1)
1373                .unwrap_or(std::cmp::Ordering::Equal)
1374                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1375                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1376        });
1377        Ok(hits)
1378    }
1379
1380    /// The DataFusion plan string for a filtered vector scan - the
1381    /// `search-prefilter-pushdown` regression guard reads it.
1382    pub async fn explain_vector_plan(
1383        &self,
1384        query: &[f32],
1385        limit: usize,
1386        filter: &Predicate,
1387        search: Option<&config::SearchConfig>,
1388    ) -> Result<String> {
1389        let scope = embedded_scope(filter);
1390        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1391        let key = Float32Array::from(query.to_vec());
1392        scanner.nearest("vector", &key, limit)?;
1393        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1394            scanner.nprobes(nprobes);
1395        }
1396        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1397            scanner.refine(refine_factor);
1398        }
1399        scanner
1400            .explain_plan(true)
1401            .await
1402            .context("explain_plan failed")
1403    }
1404
1405    pub async fn explain_fts_plan(
1406        &self,
1407        query: &str,
1408        limit: usize,
1409        filter: &Predicate,
1410    ) -> Result<String> {
1411        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1412        scanner.full_text_search(
1413            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1414        )?;
1415        scanner.project(&["session_id", "id"])?;
1416        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1417        scanner
1418            .explain_plan(true)
1419            .await
1420            .context("explain_plan failed")
1421    }
1422
1423    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1424    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1425        if keys.is_empty() {
1426            return Ok(Vec::new());
1427        }
1428        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1429        let session_ids = keys
1430            .iter()
1431            .map(|key| key.session_id.clone())
1432            .collect::<Vec<_>>();
1433        let message_ids = keys
1434            .iter()
1435            .map(|key| key.message_id.clone())
1436            .collect::<Vec<_>>();
1437        let predicate = Predicate::And(vec![
1438            in_predicate("session_id", &session_ids),
1439            in_predicate("id", &message_ids),
1440        ]);
1441        let batch = self
1442            .handle
1443            .scan_batch(
1444                Table::Messages,
1445                Some(&predicate),
1446                &[
1447                    "id",
1448                    "session_id",
1449                    "role",
1450                    "project",
1451                    "source_agent",
1452                    "timestamp",
1453                    "search_text",
1454                ],
1455            )
1456            .await?;
1457        let mut metas = Vec::with_capacity(batch.num_rows());
1458        for row in 0..batch.num_rows() {
1459            let message_id = string(&batch, "id", row)?.context("id is null")?;
1460            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1461            if !wanted.contains(&MessageKey {
1462                session_id: session_id.clone(),
1463                message_id: message_id.clone(),
1464            }) {
1465                continue;
1466            }
1467            metas.push(MessageMeta {
1468                message_id,
1469                session_id,
1470                role: string(&batch, "role", row)?.context("role is null")?,
1471                project: string(&batch, "project", row)?.context("project is null")?,
1472                source_agent: string(&batch, "source_agent", row)?
1473                    .context("source_agent is null")?,
1474                timestamp: datetime(&batch, "timestamp", row)?,
1475                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1476            });
1477        }
1478        Ok(metas)
1479    }
1480
1481    /// Total message count per session, for search session summaries.
1482    pub async fn session_message_counts(
1483        &self,
1484        session_ids: &[String],
1485    ) -> Result<BTreeMap<String, usize>> {
1486        if session_ids.is_empty() {
1487            return Ok(BTreeMap::new());
1488        }
1489        let dataset = self.handle.dataset(Table::Messages).await?;
1490        let mut tasks = tokio::task::JoinSet::new();
1491        for session_id in session_ids {
1492            let dataset = dataset.clone();
1493            let session_id = session_id.clone();
1494            tasks.spawn(async move {
1495                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1496                let count = dataset.count_rows(Some(filter)).await?;
1497                anyhow::Ok((session_id, count))
1498            });
1499        }
1500        let mut counts = BTreeMap::new();
1501        while let Some(joined) = tasks.join_next().await {
1502            let (session_id, count) = joined.context("session count task panicked")??;
1503            counts.insert(session_id, count);
1504        }
1505        Ok(counts)
1506    }
1507
1508    /// Rows appended to `messages` since the FTS index was last optimized.
1509    /// A missing index reports the whole table; the query is manifest-only.
1510    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1511        self.handle
1512            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1513            .await
1514    }
1515
1516    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1517    /// was last optimized. Below
1518    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1519    /// must read [`embedding_progress`](Self::embedding_progress) too and
1520    /// distinguish "index not built yet" from "index trails data".
1521    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1522        self.handle
1523            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1524            .await
1525    }
1526
1527    /// Embedding coverage: how many `messages` rows carry a vector and how
1528    /// many are still eligible. Drives the `pond status` embeddings line and
1529    /// the `pond embed` progress bar's known total. `embedded` reads the
1530    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1531    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1532    /// `embedding_model` column.
1533    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1534        let dataset = self.handle.dataset(Table::Messages).await?;
1535        let embedded = dataset
1536            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1537            .await?;
1538        let total = dataset
1539            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1540            .await?;
1541        Ok(EmbeddingProgress {
1542            embedded,
1543            total,
1544            model: embed::model_id(),
1545        })
1546    }
1547
1548    /// Count rows whose `embedding_model` is not the currently configured
1549    /// model AND whose `vector` is still populated - the signal `pond embed`
1550    /// uses to detect a model swap and require `--force`.
1551    pub async fn stale_embedding_count(&self) -> Result<usize> {
1552        let dataset = self.handle.dataset(Table::Messages).await?;
1553        dataset
1554            .count_rows(Some(
1555                Predicate::And(vec![
1556                    Predicate::IsNotNull("vector"),
1557                    Predicate::Ne("embedding_model", embed::model_id().into()),
1558                ])
1559                .to_lance(),
1560            ))
1561            .await
1562            .map_err(Into::into)
1563    }
1564
1565    /// Run the per-table maintenance cycle (compact + indices) across every
1566    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1567    /// and compaction commit independently, so a hot writer that starves
1568    /// compaction on one table does not abort the index work the operator
1569    /// asked for on other tables (or even on the same table).
1570    pub async fn optimize_indices(
1571        &self,
1572        progress: Option<OptimizeProgressFn>,
1573        maintenance: &MaintenancePolicy,
1574    ) -> Result<OptimizeOutcome> {
1575        let intents = pond_index_intents();
1576        let mut tables = Vec::with_capacity(3);
1577        for (table, intents) in intents.all() {
1578            let outcome = self
1579                .handle
1580                .optimize_table(table, intents, progress.as_ref(), maintenance)
1581                .await;
1582            tables.push(outcome);
1583        }
1584        Ok(OptimizeOutcome { tables })
1585    }
1586
1587    /// Fold trailing fragments into existing indices across every table,
1588    /// without running compaction. Used by `pond embed`'s tail so newly
1589    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1590    /// without paying the compaction retry budget while embed itself may
1591    /// still be writing in a sibling process.
1592    pub async fn build_indices_only(
1593        &self,
1594        progress: Option<OptimizeProgressFn>,
1595    ) -> Result<OptimizeOutcome> {
1596        let policy = pond_index_intents();
1597        let mut tables = Vec::with_capacity(3);
1598        for (table, intents) in policy.all() {
1599            let indices = self
1600                .handle
1601                .optimize_table_indices_only(table, intents, progress.as_ref())
1602                .await;
1603            tables.push(TableOptimizeOutcome {
1604                table,
1605                indices,
1606                compaction: PhaseOutcome::NotAttempted,
1607            });
1608        }
1609        Ok(OptimizeOutcome { tables })
1610    }
1611
1612    #[cfg(test)]
1613    async fn optimize_indices_with_vector_threshold(
1614        &self,
1615        vector_threshold: usize,
1616    ) -> Result<OptimizeOutcome> {
1617        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1618        let policy = MaintenancePolicy::always_compact();
1619        let mut tables = Vec::with_capacity(3);
1620        for (table, intents) in intents.all() {
1621            let outcome = self
1622                .handle
1623                .optimize_table(table, intents, None, &policy)
1624                .await;
1625            tables.push(outcome);
1626        }
1627        Ok(OptimizeOutcome { tables })
1628    }
1629
1630    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1631        let policy = pond_index_intents();
1632        let mut matched = false;
1633        for (table, intents) in policy.all() {
1634            for intent in intents {
1635                if intent_name.is_none_or(|name| name == intent.name) {
1636                    matched = true;
1637                    self.handle.rebuild_index(table, intent).await?;
1638                }
1639            }
1640        }
1641        if let Some(name) = intent_name
1642            && !matched
1643        {
1644            anyhow::bail!("unknown index intent {name:?}");
1645        }
1646        Ok(())
1647    }
1648
1649    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1650        let policy = pond_index_intents();
1651        let mut statuses = Vec::new();
1652        for (table, intents) in policy.all() {
1653            statuses.extend(self.handle.index_status(table, intents).await?);
1654        }
1655        Ok(statuses)
1656    }
1657
1658    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1659    /// --force` before re-bootstrapping under a different model. Silent
1660    /// when the index does not exist.
1661    pub async fn drop_vector_index(&self) -> Result<()> {
1662        match self
1663            .handle
1664            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1665            .await
1666        {
1667            Ok(()) => Ok(()),
1668            Err(error) => {
1669                let msg = error.to_string();
1670                if msg.contains("not found") || msg.contains("does not exist") {
1671                    Ok(())
1672                } else {
1673                    Err(error)
1674                }
1675            }
1676        }
1677    }
1678
1679    /// On-disk byte totals per dataset, sized through Lance's object store
1680    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1681    pub async fn table_sizes(&self) -> Result<TableSizes> {
1682        self.handle.table_sizes().await
1683    }
1684
1685    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1686        let batch = self
1687            .handle
1688            .scan_batch(
1689                Table::Sessions,
1690                Some(&Predicate::Eq("id", session_id.into())),
1691                &[],
1692            )
1693            .await?;
1694        if batch.num_rows() == 0 {
1695            Ok(None)
1696        } else {
1697            Ok(Some(session_from_batch(&batch, 0)?))
1698        }
1699    }
1700
1701    /// Fetch the stored vector for one message, for `similar_to`-style
1702    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1703    /// embedded, or when no message has that id. Vectors are stored Float16
1704    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1705    /// widen here. Cheap rare op - one predicate scan, no index.
1706    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1707        let batch = self
1708            .handle
1709            .scan_batch(
1710                Table::Messages,
1711                Some(&Predicate::Eq("id", message_id.into())),
1712                &["vector"],
1713            )
1714            .await?;
1715        if batch.num_rows() == 0 {
1716            return Ok(None);
1717        }
1718        let column = batch
1719            .column(0)
1720            .as_any()
1721            .downcast_ref::<FixedSizeListArray>();
1722        let Some(list) = column else {
1723            return Ok(None);
1724        };
1725        if list.is_null(0) {
1726            return Ok(None);
1727        }
1728        let values = list.value(0);
1729        let halves = values
1730            .as_any()
1731            .downcast_ref::<Float16Array>()
1732            .context("messages.vector inner array is not Float16")?;
1733        let widened = (0..halves.len())
1734            .map(|i| halves.value(i).to_f32())
1735            .collect();
1736        Ok(Some(widened))
1737    }
1738
1739    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1740        let batch = self
1741            .handle
1742            .scan_batch(
1743                Table::Messages,
1744                Some(&Predicate::Eq("session_id", session_id.into())),
1745                &[
1746                    "session_id",
1747                    "id",
1748                    "timestamp",
1749                    "role",
1750                    "content",
1751                    "options",
1752                ],
1753            )
1754            .await?;
1755        let mut messages = Vec::with_capacity(batch.num_rows());
1756        for row in 0..batch.num_rows() {
1757            messages.push(message_from_batch(&batch, row)?);
1758        }
1759        messages.sort_by(|left, right| {
1760            left.timestamp()
1761                .cmp(&right.timestamp())
1762                .then_with(|| left.id().cmp(right.id()))
1763        });
1764
1765        let message_ids = messages
1766            .iter()
1767            .map(|message| message.id().to_owned())
1768            .collect::<Vec<_>>();
1769        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1770
1771        Ok(messages
1772            .into_iter()
1773            .map(|message| {
1774                let key = (message.session_id().to_owned(), message.id().to_owned());
1775                let parts = parts_by_message.remove(&key).unwrap_or_default();
1776                MessageWithParts { message, parts }
1777            })
1778            .collect())
1779    }
1780
1781    /// Every part of these messages, full fidelity (file blobs included). The
1782    /// canonical read primitive - restore/export, verbatim mode, and the
1783    /// message-mode target all need the complete set.
1784    pub async fn parts_for_messages(
1785        &self,
1786        session_id: &str,
1787        message_ids: &[String],
1788    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1789        self.scan_parts(session_id, message_ids, None).await
1790    }
1791
1792    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1793    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1794    /// nothing. For the summary-only reads (conversational/complete session
1795    /// views, search hits) - it never feeds restore/export.
1796    pub async fn summary_parts_for_messages(
1797        &self,
1798        session_id: &str,
1799        message_ids: &[String],
1800    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1801        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1802            .await
1803    }
1804
1805    async fn scan_parts(
1806        &self,
1807        session_id: &str,
1808        message_ids: &[String],
1809        part_types: Option<&[&str]>,
1810    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1811        if message_ids.is_empty() {
1812            return Ok(BTreeMap::new());
1813        }
1814        let mut clauses = vec![
1815            Predicate::Eq("session_id", session_id.into()),
1816            in_predicate("message_id", message_ids),
1817        ];
1818        if let Some(types) = part_types {
1819            clauses.push(Predicate::In(
1820                "type",
1821                types.iter().map(|&t| t.into()).collect(),
1822            ));
1823        }
1824        let predicate = Predicate::And(clauses);
1825        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1826        let mut scanner = self
1827            .handle
1828            .scan(
1829                Table::Parts,
1830                ScanOpts::with_predicate_and_projection(
1831                    &predicate,
1832                    &[
1833                        "session_id",
1834                        "message_id",
1835                        "id",
1836                        "ordinal",
1837                        "type",
1838                        "provenance",
1839                        "variant_data",
1840                        "options",
1841                    ],
1842                ),
1843            )
1844            .await?;
1845        scanner.with_row_address();
1846        let batch = scanner.try_into_batch().await.context("scan failed")?;
1847        let row_addresses = uint64(&batch, "_rowaddr")?;
1848        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1849        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1850        for row in 0..batch.num_rows() {
1851            if string(&batch, "type", row)?.as_deref() == Some("file") {
1852                let variant_data =
1853                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1854                file_rows.push((row, row_addresses.value(row), variant_data));
1855            }
1856        }
1857        if !file_rows.is_empty() {
1858            let addresses = file_rows
1859                .iter()
1860                .map(|(_, address, _)| *address)
1861                .collect::<Vec<_>>();
1862            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1863            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1864                // Legacy blob (lance-encoding:blob): payload is bytes; the
1865                // url variant stored its URL as UTF-8 bytes, recovered via
1866                // `file_data_from_blob`'s `data_kind = "url"` branch.
1867                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1868                file_payloads.insert(row, payload);
1869            }
1870        }
1871        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1872        for row in 0..batch.num_rows() {
1873            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1874            parts_by_message
1875                .entry((part.session_id.clone(), part.message_id.clone()))
1876                .or_default()
1877                .push(part);
1878        }
1879        for parts in parts_by_message.values_mut() {
1880            parts.sort_by_key(|part| part.ordinal);
1881        }
1882        Ok(parts_by_message)
1883    }
1884}
1885
1886#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1887#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1888pub enum IngestEvent {
1889    Session(Session),
1890    Message(Message),
1891    Part(Part),
1892}
1893
1894/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1895/// The wire layer (`pond_ingest`) instead returns per-row results; the
1896/// aggregate is derived from those at the wire boundary.
1897///
1898/// Fields are bucketed by population so the summary never conflates "100
1899/// validator-rejected rows in 1 bad session" with "100 separate failures."
1900/// The shape is set by spec.md#adapter-integrity-event-ordering.
1901#[derive(Debug, Clone, PartialEq, Eq, Default)]
1902pub struct IngestSummary {
1903    /// Rows actually written to Lance, summed across all three tables.
1904    /// Use the per-table fields below for user-facing counts; this stays
1905    /// for `accepted()` and existing wire callers.
1906    pub inserted: usize,
1907    /// Rows that already existed (merge_insert no-op match).
1908    pub matched: usize,
1909    /// Session rows inserted this pass.
1910    pub sessions_inserted: usize,
1911    /// Message rows inserted this pass (total - includes tool calls,
1912    /// tool results, and other non-searchable messages).
1913    pub messages_inserted_total: usize,
1914    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1915    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1916    /// count in `pond sync` / `pond status` reads this field.
1917    pub messages_inserted_searchable: usize,
1918    /// Part rows inserted this pass.
1919    pub parts_inserted: usize,
1920    /// Session rows already-present (merge_insert matched).
1921    pub sessions_matched: usize,
1922    /// Message rows already-present (merge_insert matched), total.
1923    pub messages_matched_total: usize,
1924    /// Subset of `messages_matched_total` with `search_text`.
1925    pub messages_matched_searchable: usize,
1926    /// Part rows already-present.
1927    pub parts_matched: usize,
1928    /// Events the validator dropped under per-event-drop policy (ordering
1929    /// violation, orphan part, mismatched parent, adapter parse failure,
1930    /// duplicate-id collision, ...). Counted by event, not by session: a
1931    /// session with one bad part stays in this bucket as 1, not as "the
1932    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1933    /// own emissions upstream when source replay is expected; the
1934    /// validator's in-batch HashSet is a safety net, not a feature
1935    /// adapters may rely on. If this bucket grows on a clean adapter,
1936    /// inspect `drop_reasons` for the top contributors.
1937    pub dropped_events: usize,
1938    /// Sessions whose Session-level invariants (immutable `source_agent` /
1939    /// `project` against the stored row) failed at flush time and
1940    /// whose substream got rejected wholesale. Always small relative to
1941    /// `inserted`; if not, there's a real problem to investigate.
1942    pub dropped_sessions: usize,
1943    /// Files the adapter couldn't decode at all (no Session header
1944    /// extractable: empty `.jsonl`, missing required field).
1945    pub skipped_files: usize,
1946    /// Files that produced no importable session and were benignly skipped:
1947    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1948    /// metadata file), or an unextractable header. Never an error or a drop;
1949    /// the underlying cause is logged at `-vv` (debug) verbosity.
1950    pub skipped_empty: usize,
1951    /// Sessions short-circuited via the per-session staleness skip
1952    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1953    /// pond last wrote that session's row, so re-decode was bypassed.
1954    pub skipped_fresh: usize,
1955    /// Storage-layer failures whose retries were exhausted (commit
1956    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1957    /// runs.
1958    pub storage_errors: usize,
1959    /// Oversized values truncated to a bounded sentinel at the seam
1960    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1961    pub truncated_values: usize,
1962    /// Histogram of stable reason keys for the combined `dropped_events +
1963    /// dropped_sessions` populations. Keys are `&'static str` (see the
1964    /// `DROP_REASON_*` constants) so consumers can match by identity.
1965    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1966    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1967    pub drop_reasons: BTreeMap<&'static str, usize>,
1968}
1969
1970/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
1971/// the per-row `RowError::reason_key`. `&'static str` so consumers can
1972/// match by identity rather than prose. Adding a new variant: pick a short
1973/// snake_case identifier, route it from the validator/adapter, and update
1974/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
1975pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1976pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1977pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1978pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1979pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1980pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1981pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1982pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1983pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1984pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1985pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1986
1987/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
1988/// returned counts together with the pre-existence sets captured by
1989/// `upsert_session_batch`. Folded into a per-sync summary via
1990/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
1991/// is a no-op write, so the inserted/matched split is informational - we still
1992/// surface it because both `pond sync` and `pond_ingest` clients reconcile
1993/// against "which rows landed this call."
1994#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1995pub struct BatchCounts {
1996    pub sessions_inserted: usize,
1997    pub sessions_matched: usize,
1998    pub messages_inserted_total: usize,
1999    pub messages_inserted_searchable: usize,
2000    pub messages_matched_total: usize,
2001    pub messages_matched_searchable: usize,
2002    pub parts_inserted: usize,
2003    pub parts_matched: usize,
2004}
2005
2006impl IngestSummary {
2007    pub fn accepted(&self) -> usize {
2008        self.inserted + self.matched
2009    }
2010
2011    /// Sole writer of the per-table counters on the CLI batched flush path.
2012    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2013    /// both for the same rows would double-count.
2014    pub fn add_batch(&mut self, counts: &BatchCounts) {
2015        self.sessions_inserted += counts.sessions_inserted;
2016        self.sessions_matched += counts.sessions_matched;
2017        self.messages_inserted_total += counts.messages_inserted_total;
2018        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2019        self.messages_matched_total += counts.messages_matched_total;
2020        self.messages_matched_searchable += counts.messages_matched_searchable;
2021        self.parts_inserted += counts.parts_inserted;
2022        self.parts_matched += counts.parts_matched;
2023        self.inserted +=
2024            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2025        self.matched +=
2026            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2027    }
2028
2029    /// Sum every counter from `other` into `self`. Used by the multi-source
2030    /// `pond sync` loop so adding a new field to this struct doesn't silently
2031    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2032    pub fn merge(&mut self, other: &Self) {
2033        self.inserted += other.inserted;
2034        self.matched += other.matched;
2035        self.sessions_inserted += other.sessions_inserted;
2036        self.messages_inserted_total += other.messages_inserted_total;
2037        self.messages_inserted_searchable += other.messages_inserted_searchable;
2038        self.parts_inserted += other.parts_inserted;
2039        self.sessions_matched += other.sessions_matched;
2040        self.messages_matched_total += other.messages_matched_total;
2041        self.messages_matched_searchable += other.messages_matched_searchable;
2042        self.parts_matched += other.parts_matched;
2043        self.dropped_events += other.dropped_events;
2044        self.dropped_sessions += other.dropped_sessions;
2045        self.skipped_files += other.skipped_files;
2046        self.skipped_empty += other.skipped_empty;
2047        self.skipped_fresh += other.skipped_fresh;
2048        self.storage_errors += other.storage_errors;
2049        self.truncated_values += other.truncated_values;
2050        for (key, value) in &other.drop_reasons {
2051            *self.drop_reasons.entry(key).or_insert(0) += value;
2052        }
2053    }
2054
2055    /// Same dispatch as [`Self::add_outcomes`] but ignores
2056    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2057    /// via [`Self::add_batch`] and uses this method to attribute per-row
2058    /// `Error` outcomes from the same flush.
2059    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2060        for outcome in outcomes {
2061            if !matches!(outcome.status, OutcomeStatus::Error) {
2062                continue;
2063            }
2064            if outcome.kind == "session" {
2065                self.dropped_sessions += 1;
2066            } else {
2067                self.dropped_events += 1;
2068            }
2069            let reason = outcome
2070                .error
2071                .as_ref()
2072                .and_then(|error| error.reason_key)
2073                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2074            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2075        }
2076    }
2077
2078    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2079        for outcome in outcomes {
2080            match outcome.status {
2081                OutcomeStatus::Inserted => {
2082                    self.inserted += 1;
2083                    match outcome.kind {
2084                        "session" => self.sessions_inserted += 1,
2085                        "message" => {
2086                            self.messages_inserted_total += 1;
2087                            if outcome.searchable {
2088                                self.messages_inserted_searchable += 1;
2089                            }
2090                        }
2091                        "part" => self.parts_inserted += 1,
2092                        _ => {}
2093                    }
2094                }
2095                OutcomeStatus::Matched => {
2096                    self.matched += 1;
2097                    match outcome.kind {
2098                        "session" => self.sessions_matched += 1,
2099                        "message" => {
2100                            self.messages_matched_total += 1;
2101                            if outcome.searchable {
2102                                self.messages_matched_searchable += 1;
2103                            }
2104                        }
2105                        "part" => self.parts_matched += 1,
2106                        _ => {}
2107                    }
2108                }
2109                OutcomeStatus::Error => {
2110                    // Session-level rejection: exactly one session-kind Error
2111                    // outcome (see `error_outcomes_for_substream`). Per-event
2112                    // drop: one Error per message/part. The two populations
2113                    // are counted separately so the operator can tell a
2114                    // structural reject from a row-level skip.
2115                    if outcome.kind == "session" {
2116                        self.dropped_sessions += 1;
2117                    } else {
2118                        self.dropped_events += 1;
2119                    }
2120                    let reason = outcome
2121                        .error
2122                        .as_ref()
2123                        .and_then(|e| e.reason_key)
2124                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2125                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2126                }
2127            }
2128        }
2129    }
2130}
2131
2132/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2133/// row per input event from the request's `events` array. The validator
2134/// returns these in array order so the wire layer can pack them directly
2135/// into [`crate::wire::IngestResult`] entries.
2136#[derive(Debug, Clone, PartialEq)]
2137pub struct RowOutcome {
2138    pub index: usize,
2139    pub kind: &'static str,
2140    pub pk: Value,
2141    pub status: OutcomeStatus,
2142    pub error: Option<RowError>,
2143    /// True iff `kind == "message"` AND the underlying row carries
2144    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2145    /// so the CLI can show "searchable" message deltas distinct from raw
2146    /// inserts. Always false for session/part rows.
2147    pub searchable: bool,
2148}
2149
2150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2151pub enum OutcomeStatus {
2152    Inserted,
2153    Matched,
2154    Error,
2155}
2156
2157/// Structured per-row error body. Mirrors the wire shape so the handler
2158/// can pass it straight through.
2159#[derive(Debug, Clone, PartialEq, Eq)]
2160pub struct RowError {
2161    pub message: String,
2162    pub field: Option<&'static str>,
2163    pub reason: Option<&'static str>,
2164    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2165    /// `reason` field above is human-prose; `reason_key` is the machine
2166    /// bucket. `None` means uncategorized; consumers attribute to
2167    /// `DROP_REASON_UNCATEGORIZED`.
2168    pub reason_key: Option<&'static str>,
2169}
2170
2171/// Buffered session events tagged with their input array index, so the
2172/// per-row outcomes can be re-attributed once `merge_insert` returns its
2173/// per-row Inserted/Matched stats.
2174#[derive(Debug)]
2175struct BufferedSession {
2176    index: usize,
2177    session: Session,
2178}
2179
2180#[derive(Debug)]
2181struct BufferedMessage {
2182    index: usize,
2183    message: Message,
2184    parts: Vec<BufferedPart>,
2185    search_text: Option<String>,
2186}
2187
2188#[derive(Debug)]
2189struct BufferedPart {
2190    index: usize,
2191    part: Part,
2192}
2193
2194/// State machine that turns the `events: Vec<IngestEvent>` array into a
2195/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2196/// session substream so `merge_insert` runs once per substream (three
2197/// batches: sessions, messages, parts). A validation error on a single event
2198/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2199/// continues; only Session-level invariants (immutable source_agent / project
2200/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2201///
2202/// Writes are batched at flush time. As complete substreams arrive (a new
2203/// `Session` event closes out the current one), they accumulate in
2204/// `completed` rather than each one calling `merge_insert` immediately.
2205/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2206/// at which point one batched 3-parallel-merge-insert covers all pending
2207/// substreams. This is the load-bearing perf change: per-substream commit
2208/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2209/// and amortizing it across N sessions cuts wall time materially.
2210#[derive(Debug, Default)]
2211pub struct IngestValidator {
2212    session: Option<BufferedSession>,
2213    current_message: Option<BufferedMessage>,
2214    current_parts: Vec<BufferedPart>,
2215    messages: Vec<BufferedMessage>,
2216    /// Message ids already buffered in the current substream. Duplicate ids
2217    /// drop the offending event in-line rather than failing the whole batch
2218    /// downstream.
2219    seen_message_ids: HashSet<String>,
2220    /// `(message_id, part_id)` keys already buffered in the current
2221    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2222    seen_part_keys: HashSet<(String, String)>,
2223    /// Substreams whose end-of-stream boundary has been observed but whose
2224    /// rows haven't been written yet. Flushed in batched mode by
2225    /// [`Self::flush`].
2226    completed: Vec<CompletedSubstream>,
2227}
2228
2229/// One closed substream ready for the batched flush path.
2230#[derive(Debug)]
2231struct CompletedSubstream {
2232    session_index: usize,
2233    session: Session,
2234    messages: Vec<BufferedMessage>,
2235}
2236
2237impl IngestValidator {
2238    /// Drive one input event through the validator. Returns the per-row
2239    /// outcomes the event triggered: empty when the event is just buffered,
2240    /// or N entries when a session substream just flushed (success or
2241    /// failure). `Err` is reserved for catastrophic storage failures that
2242    /// should fail the whole `pond_ingest` request.
2243    pub async fn push(
2244        &mut self,
2245        store: &Store,
2246        index: usize,
2247        event: IngestEvent,
2248    ) -> Result<Vec<RowOutcome>> {
2249        match event {
2250            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2251            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2252            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2253        }
2254    }
2255
2256    /// Final flush at end-of-batch. Closes the in-flight substream and
2257    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2258    /// the wire layer) alongside the honest per-table counts (for
2259    /// `IngestSummary::add_batch`).
2260    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2261        self.close_current_substream();
2262        self.flush(store).await
2263    }
2264
2265    /// Drain every completed substream into batched 3-parallel-merge_insert
2266    /// writes. Caller invokes this periodically (every N completed
2267    /// substreams) to keep memory bounded; in adapter-driven sync that
2268    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2269    /// in-flight substream stays buffered - close it explicitly via
2270    /// [`Self::finish`] or by feeding the next Session event.
2271    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2272        if self.completed.is_empty() {
2273            return Ok((Vec::new(), BatchCounts::default()));
2274        }
2275        let completed = std::mem::take(&mut self.completed);
2276        store.upsert_session_batch(completed).await
2277    }
2278
2279    /// Number of fully-buffered substreams awaiting batched write. Used by
2280    /// the adapter caller to decide when to call [`Self::flush`].
2281    pub fn pending_substreams(&self) -> usize {
2282        self.completed.len()
2283    }
2284
2285    async fn push_session(
2286        &mut self,
2287        _store: &Store,
2288        index: usize,
2289        mut session: Session,
2290    ) -> Result<Vec<RowOutcome>> {
2291        // Close out the current substream (if any) - move it to the pending
2292        // buffer instead of writing immediately. The actual write happens
2293        // when the caller invokes `flush` / `finish`.
2294        self.close_current_substream();
2295
2296        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2297        // if empty after trim. A Session event with empty source_agent is
2298        // dropped on the spot - the substream that would follow has nothing
2299        // to anchor on, so subsequent message/part events will also drop.
2300        let trimmed = session.source_agent.trim();
2301        if trimmed.is_empty() {
2302            return Ok(vec![RowOutcome {
2303                index,
2304                kind: "session",
2305                pk: Value::String(session.id.clone()),
2306                status: OutcomeStatus::Error,
2307                error: Some(RowError {
2308                    message: format!("session {} has empty source_agent after trim", session.id),
2309                    field: Some("source_agent"),
2310                    reason: None,
2311                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2312                }),
2313                searchable: false,
2314            }]);
2315        }
2316        if trimmed.len() != session.source_agent.len() {
2317            session.source_agent = trimmed.to_owned();
2318        }
2319
2320        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2321            return Ok(vec![RowOutcome {
2322                index,
2323                kind: "session",
2324                pk: Value::String(session.id.clone()),
2325                status: OutcomeStatus::Error,
2326                error: Some(RowError {
2327                    message: format!(
2328                        "session {} has parent_message_id without parent_session_id",
2329                        session.id,
2330                    ),
2331                    field: Some("parent_message_id"),
2332                    reason: None,
2333                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2334                }),
2335                searchable: false,
2336            }]);
2337        }
2338
2339        self.seen_message_ids.clear();
2340        self.seen_part_keys.clear();
2341        self.session = Some(BufferedSession { index, session });
2342        Ok(Vec::new())
2343    }
2344
2345    fn close_current_substream(&mut self) {
2346        self.flush_current_message();
2347        let Some(BufferedSession {
2348            index: session_index,
2349            session,
2350        }) = self.session.take()
2351        else {
2352            return;
2353        };
2354        let messages = std::mem::take(&mut self.messages);
2355        self.seen_message_ids.clear();
2356        self.seen_part_keys.clear();
2357        self.completed.push(CompletedSubstream {
2358            session_index,
2359            session,
2360            messages,
2361        });
2362    }
2363
2364    fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2365        let pk = Value::Array(vec![
2366            Value::String(message.session_id().to_owned()),
2367            Value::String(message.id().to_owned()),
2368        ]);
2369        let Some(session) = &self.session else {
2370            return vec![error_outcome(
2371                index,
2372                "message",
2373                pk,
2374                "first event in a session stream must be Session",
2375                None,
2376                DROP_REASON_MESSAGE_BEFORE_SESSION,
2377            )];
2378        };
2379        if message.session_id() != session.session.id {
2380            let msg = format!(
2381                "message {} references session {}, expected {}",
2382                message.id(),
2383                message.session_id(),
2384                session.session.id
2385            );
2386            return vec![error_outcome(
2387                index,
2388                "message",
2389                pk,
2390                &msg,
2391                Some("session_id"),
2392                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2393            )];
2394        }
2395        if !self.seen_message_ids.insert(message.id().to_owned()) {
2396            // Keep same-substream duplicate ids visible in `dropped_events`;
2397            // adapters are expected to dedupe upstream (see claude-code's
2398            // per-file `seen_uuids`), so a hit here is worth investigating.
2399            let msg = format!("duplicate message id {} in session substream", message.id());
2400            return vec![error_outcome(
2401                index,
2402                "message",
2403                pk,
2404                &msg,
2405                None,
2406                DROP_REASON_DUPLICATE_MESSAGE_ID,
2407            )];
2408        }
2409        self.flush_current_message();
2410        self.current_message = Some(BufferedMessage {
2411            index,
2412            message,
2413            parts: Vec::new(),
2414            search_text: None,
2415        });
2416        Vec::new()
2417    }
2418
2419    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2420        let pk = Value::Array(vec![
2421            Value::String(part.session_id.clone()),
2422            Value::String(part.message_id.clone()),
2423            Value::String(part.id.clone()),
2424        ]);
2425        let Some(current) = &self.current_message else {
2426            return vec![error_outcome(
2427                index,
2428                "part",
2429                pk,
2430                "part event appeared before a message",
2431                None,
2432                DROP_REASON_PART_BEFORE_MESSAGE,
2433            )];
2434        };
2435        if part.session_id != current.message.session_id() {
2436            let msg = format!(
2437                "part {} references session {}, expected {}",
2438                part.id,
2439                part.session_id,
2440                current.message.session_id()
2441            );
2442            return vec![error_outcome(
2443                index,
2444                "part",
2445                pk,
2446                &msg,
2447                Some("session_id"),
2448                DROP_REASON_PART_MESSAGE_MISMATCH,
2449            )];
2450        }
2451        if part.message_id != current.message.id() {
2452            let msg = format!(
2453                "part {} references message {}, expected {}",
2454                part.id,
2455                part.message_id,
2456                current.message.id()
2457            );
2458            return vec![error_outcome(
2459                index,
2460                "part",
2461                pk,
2462                &msg,
2463                Some("message_id"),
2464                DROP_REASON_PART_MESSAGE_MISMATCH,
2465            )];
2466        }
2467        let part_key = (part.message_id.clone(), part.id.clone());
2468        if !self.seen_part_keys.insert(part_key) {
2469            let msg = format!(
2470                "duplicate part id {} for message {} in session substream",
2471                part.id, part.message_id
2472            );
2473            return vec![error_outcome(
2474                index,
2475                "part",
2476                pk,
2477                &msg,
2478                None,
2479                DROP_REASON_DUPLICATE_PART_KEY,
2480            )];
2481        }
2482        self.current_parts.push(BufferedPart { index, part });
2483        Vec::new()
2484    }
2485
2486    fn flush_current_message(&mut self) {
2487        let Some(mut buffered) = self.current_message.take() else {
2488            return;
2489        };
2490        let parts = std::mem::take(&mut self.current_parts);
2491        let mut canonical_parts = Vec::with_capacity(parts.len());
2492        for part in &parts {
2493            canonical_parts.push(part.part.clone());
2494        }
2495        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2496        buffered.parts = parts;
2497        self.messages.push(buffered);
2498    }
2499}
2500
2501fn error_outcome(
2502    index: usize,
2503    kind: &'static str,
2504    pk: Value,
2505    message: &str,
2506    field: Option<&'static str>,
2507    reason_key: &'static str,
2508) -> RowOutcome {
2509    RowOutcome {
2510        index,
2511        kind,
2512        pk,
2513        status: OutcomeStatus::Error,
2514        error: Some(RowError {
2515            message: message.to_owned(),
2516            field,
2517            reason: None,
2518            reason_key: Some(reason_key),
2519        }),
2520        searchable: false,
2521    }
2522}
2523
2524/// Session-level rejection (immutable `source_agent` / `project` violation):
2525/// emit exactly one Error outcome on the Session row. The buffered messages
2526/// and parts of this substream are *not* surfaced as per-row errors - their
2527/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2528fn error_outcomes_for_substream(
2529    session_index: usize,
2530    session: &Session,
2531    _messages: &[BufferedMessage],
2532    message: impl Into<String>,
2533    field: Option<&'static str>,
2534    reason_key: &'static str,
2535) -> Vec<RowOutcome> {
2536    let reason = field.map(|_| "immutable");
2537    vec![RowOutcome {
2538        index: session_index,
2539        kind: "session",
2540        pk: Value::String(session.id.clone()),
2541        status: OutcomeStatus::Error,
2542        error: Some(RowError {
2543            message: message.into(),
2544            field,
2545            reason,
2546            reason_key: Some(reason_key),
2547        }),
2548        searchable: false,
2549    }]
2550}
2551
2552/// Batched-path success helper. Each row's Inserted/Matched status is read
2553/// from the pre-existence sets captured by `upsert_session_batch` before its
2554/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2555/// Also accumulates the per-table totals into `counts` so the CLI summary
2556/// gets the same truth without re-walking the outcomes.
2557fn success_outcomes_for_substream(
2558    session_index: usize,
2559    session: &Session,
2560    messages: &[BufferedMessage],
2561    existing_sessions: &std::collections::HashMap<String, Session>,
2562    existing_message_pks: &HashSet<(String, String)>,
2563    existing_part_pks: &HashSet<(String, String, String)>,
2564    counts: &mut BatchCounts,
2565) -> Vec<RowOutcome> {
2566    let session_was_present = existing_sessions.contains_key(&session.id);
2567    let session_status = if session_was_present {
2568        counts.sessions_matched += 1;
2569        UpsertStatus::Matched
2570    } else {
2571        counts.sessions_inserted += 1;
2572        UpsertStatus::Inserted
2573    };
2574
2575    let mut outcomes = Vec::with_capacity(1 + messages.len());
2576    outcomes.push(success_outcome(
2577        session_index,
2578        "session",
2579        Value::String(session.id.clone()),
2580        session_status,
2581        false,
2582    ));
2583    for buffered in messages {
2584        let key = (
2585            buffered.message.session_id().to_owned(),
2586            buffered.message.id().to_owned(),
2587        );
2588        let searchable = buffered.search_text.is_some();
2589        let message_status = if existing_message_pks.contains(&key) {
2590            counts.messages_matched_total += 1;
2591            if searchable {
2592                counts.messages_matched_searchable += 1;
2593            }
2594            UpsertStatus::Matched
2595        } else {
2596            counts.messages_inserted_total += 1;
2597            if searchable {
2598                counts.messages_inserted_searchable += 1;
2599            }
2600            UpsertStatus::Inserted
2601        };
2602        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2603        outcomes.push(success_outcome(
2604            buffered.index,
2605            "message",
2606            pk,
2607            message_status,
2608            searchable,
2609        ));
2610        for part in &buffered.parts {
2611            let part_key = (
2612                part.part.session_id.clone(),
2613                part.part.message_id.clone(),
2614                part.part.id.clone(),
2615            );
2616            let part_status = if existing_part_pks.contains(&part_key) {
2617                counts.parts_matched += 1;
2618                UpsertStatus::Matched
2619            } else {
2620                counts.parts_inserted += 1;
2621                UpsertStatus::Inserted
2622            };
2623            let part_pk = Value::Array(vec![
2624                Value::String(part_key.0),
2625                Value::String(part_key.1),
2626                Value::String(part_key.2),
2627            ]);
2628            outcomes.push(success_outcome(
2629                part.index,
2630                "part",
2631                part_pk,
2632                part_status,
2633                false,
2634            ));
2635        }
2636    }
2637    outcomes
2638}
2639
2640fn success_outcome(
2641    index: usize,
2642    kind: &'static str,
2643    pk: Value,
2644    status: UpsertStatus,
2645    searchable: bool,
2646) -> RowOutcome {
2647    let status = match status {
2648        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2649        UpsertStatus::Matched => OutcomeStatus::Matched,
2650    };
2651    RowOutcome {
2652        index,
2653        kind,
2654        pk,
2655        status,
2656        error: None,
2657        searchable,
2658    }
2659}
2660
2661#[derive(Debug, Clone, PartialEq, Eq)]
2662enum IngestError {
2663    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2664    /// immutable post-first-write because the denormalized copies on
2665    /// `messages` were stamped from the prior Session at first ingest.
2666    /// A re-write that changes either would silently desync.
2667    ImmutableField {
2668        field: &'static str,
2669        session_id: String,
2670        stored: String,
2671        attempted: String,
2672    },
2673}
2674
2675impl std::fmt::Display for IngestError {
2676    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2677        match self {
2678            Self::ImmutableField {
2679                field,
2680                session_id,
2681                stored,
2682                attempted,
2683            } => write!(
2684                formatter,
2685                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2686            ),
2687        }
2688    }
2689}
2690
2691impl std::error::Error for IngestError {}
2692
2693/// Compare an incoming Session row against the stored row on the two
2694/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2695/// counts a NULL-vs-non-NULL change as a mismatch.
2696fn ensure_immutable_match(
2697    existing: &Session,
2698    incoming: &Session,
2699) -> std::result::Result<(), IngestError> {
2700    if existing.source_agent != incoming.source_agent {
2701        return Err(IngestError::ImmutableField {
2702            field: "source_agent",
2703            session_id: incoming.id.clone(),
2704            stored: existing.source_agent.clone(),
2705            attempted: incoming.source_agent.clone(),
2706        });
2707    }
2708    if existing.project != incoming.project {
2709        return Err(IngestError::ImmutableField {
2710            field: "project",
2711            session_id: incoming.id.clone(),
2712            stored: (*existing.project).clone(),
2713            attempted: (*incoming.project).clone(),
2714        });
2715    }
2716    Ok(())
2717}
2718
2719pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2720    use crate::wire::Provenance;
2721    let mut chunks: Vec<String> = Vec::new();
2722    for part in parts {
2723        // spec.md#search: only conversational parts contribute to the indexed
2724        // text; harness-injected scaffolding is excluded from search.
2725        if part.provenance != Provenance::Conversational {
2726            continue;
2727        }
2728        match (message.role(), &part.kind) {
2729            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2730                if let Some(text) = text {
2731                    chunks.push(text.to_string());
2732                }
2733            }
2734            (
2735                Role::User | Role::Assistant,
2736                PartKind::File {
2737                    media_type,
2738                    file_name,
2739                    data,
2740                },
2741            ) => {
2742                if let Some(file_name) = file_name {
2743                    chunks.push(file_name.clone());
2744                }
2745                if let Some(media_type) = media_type {
2746                    chunks.push(media_type.clone());
2747                }
2748                if let FileData::Url(uri) = data {
2749                    chunks.push(uri.clone());
2750                }
2751            }
2752            (
2753                Role::System | Role::Tool,
2754                PartKind::Text { .. }
2755                | PartKind::Reasoning { .. }
2756                | PartKind::File { .. }
2757                | PartKind::ToolCall { .. }
2758                | PartKind::ToolResult { .. }
2759                | PartKind::ToolApprovalRequest { .. }
2760                | PartKind::ToolApprovalResponse { .. },
2761            )
2762            | (
2763                Role::User | Role::Assistant,
2764                PartKind::Reasoning { .. }
2765                | PartKind::ToolCall { .. }
2766                | PartKind::ToolResult { .. }
2767                | PartKind::ToolApprovalRequest { .. }
2768                | PartKind::ToolApprovalResponse { .. },
2769            ) => {}
2770        }
2771    }
2772
2773    let text = chunks
2774        .into_iter()
2775        .filter(|chunk| !chunk.trim().is_empty())
2776        .collect::<Vec<_>>()
2777        .join("\n");
2778    if text.is_empty() { None } else { Some(text) }
2779}
2780
2781/// Non-empty conversational text (spec.md#search).
2782#[derive(Debug, Clone, PartialEq, Eq)]
2783pub struct SearchText(String);
2784
2785impl SearchText {
2786    pub fn as_str(&self) -> &str {
2787        &self.0
2788    }
2789
2790    pub fn into_inner(self) -> String {
2791        self.0
2792    }
2793}
2794
2795impl AsRef<str> for SearchText {
2796    fn as_ref(&self) -> &str {
2797        &self.0
2798    }
2799}
2800
2801#[derive(Debug, Clone, PartialEq)]
2802pub struct MessageWithParts {
2803    pub message: Message,
2804    pub parts: Vec<Part>,
2805}
2806
2807#[derive(Debug, Clone, PartialEq)]
2808pub struct SessionWithMessages {
2809    pub session: Session,
2810    pub messages: Vec<MessageWithParts>,
2811}
2812
2813#[derive(Debug, Clone)]
2814pub struct SessionViewParams<'a> {
2815    pub mode: ResponseMode,
2816    pub after_id: Option<&'a str>,
2817    pub limit: usize,
2818    pub budget_bytes: usize,
2819    pub session_from: SessionFrom,
2820}
2821
2822#[derive(Debug, Clone)]
2823pub struct MessageViewParams<'a> {
2824    pub context_depth: usize,
2825    pub after_id: Option<&'a str>,
2826    pub limit: usize,
2827    pub budget_bytes: usize,
2828}
2829
2830/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2831/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2832/// `validation_failed`): the message/part stream is append-only, so an anchor
2833/// that was ever valid never disappears - an unknown one is always a client
2834/// error, never a reason to silently restart the page.
2835#[derive(Debug, Clone, PartialEq)]
2836pub enum GetLookup<T> {
2837    NotFound,
2838    UnknownAfterId,
2839    Found(T),
2840}
2841
2842/// Canonical retrieval result for `pond_get` session mode: the stored session
2843/// plus the page of messages (each with its `Part`s) and a remaining count.
2844/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2845#[derive(Debug, Clone, PartialEq)]
2846pub struct SessionPage {
2847    pub session: Session,
2848    pub messages: Vec<RetrievedMessage>,
2849    pub messages_remaining: usize,
2850}
2851
2852/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2853/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2854/// their parts so the handler can summarize them.
2855#[derive(Debug, Clone, PartialEq)]
2856pub struct MessagePage {
2857    pub session: Session,
2858    pub target: RetrievedMessage,
2859    pub target_parts: Vec<Part>,
2860    pub target_parts_remaining: usize,
2861    pub siblings: Vec<RetrievedMessage>,
2862}
2863
2864#[derive(Debug, Clone, PartialEq)]
2865pub struct RetrievedMessage {
2866    pub id: String,
2867    pub role: Role,
2868    pub timestamp: DateTime<Utc>,
2869    pub text: Option<String>,
2870    pub content: Option<String>,
2871    pub parts: Vec<Part>,
2872}
2873
2874#[derive(Debug, Clone)]
2875struct ScanRow {
2876    id: String,
2877    role: Role,
2878    timestamp: DateTime<Utc>,
2879    text: Option<String>,
2880    content: Option<String>,
2881}
2882
2883/// One row of the conversational scan. `text` is non-empty by
2884/// `IsNotNull("search_text")` pushdown (spec.md#search).
2885#[derive(Debug, Clone)]
2886pub struct ConversationalRow {
2887    pub session_id: String,
2888    pub message_id: String,
2889    pub role: Role,
2890    pub timestamp: DateTime<Utc>,
2891    pub text: SearchText,
2892}
2893
2894/// Number of leading `items` that fit within `limit` and the byte budget,
2895/// sizing each by `size`. Always emits at least one (a single oversize item
2896/// never blocks its own page); the budget then stops the page at the next item
2897/// boundary.
2898fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2899    let capped = items.len().min(limit.clamp(1, 1000));
2900    let mut acc = 0usize;
2901    let mut emitted = 0usize;
2902    for item in &items[..capped] {
2903        let next = acc.saturating_add(size(item));
2904        if emitted > 0 && next > budget_bytes {
2905            break;
2906        }
2907        acc = next;
2908        emitted += 1;
2909    }
2910    emitted
2911}
2912
2913fn role_from_str(value: &str) -> Result<Role> {
2914    match value {
2915        "system" => Ok(Role::System),
2916        "user" => Ok(Role::User),
2917        "assistant" => Ok(Role::Assistant),
2918        "tool" => Ok(Role::Tool),
2919        other => anyhow::bail!("unknown message role {other}"),
2920    }
2921}
2922
2923/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2924/// and range columns, BITMAP for low-cardinality columns. There is no index
2925/// on `embedding_model`: pond's invariant is one active model at a time
2926/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2927/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2928/// query-time predicate - the only embedding-state filter is `vector IS NOT
2929/// NULL`. `id` lookups are rare and full-scan.
2930const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2931    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2932    (
2933        "session_id",
2934        BuiltinIndexType::BTree,
2935        "messages_session_id_btree",
2936    ),
2937    (
2938        "timestamp",
2939        BuiltinIndexType::BTree,
2940        "messages_timestamp_btree",
2941    ),
2942    (
2943        "source_agent",
2944        BuiltinIndexType::Bitmap,
2945        "messages_source_agent_bitmap",
2946    ),
2947    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2948];
2949
2950/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2951/// `parts_for_messages` (hydration on every `get` and grouped search).
2952const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2953    (
2954        "session_id",
2955        BuiltinIndexType::BTree,
2956        "parts_session_id_btree",
2957    ),
2958    (
2959        "message_id",
2960        BuiltinIndexType::BTree,
2961        "parts_message_id_btree",
2962    ),
2963];
2964
2965/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
2966/// `get` and every grouped search.
2967const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2968    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2969
2970fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2971    Predicate::In(
2972        column,
2973        values.iter().cloned().map(ScalarValue::String).collect(),
2974    )
2975}
2976
2977/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
2978/// never sees a null-vector row. Under the single-active-model invariant,
2979/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
2980/// the configured model" - no per-row `embedding_model` filter needed.
2981fn embedded_scope(filter: &Predicate) -> Predicate {
2982    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
2983}
2984
2985fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
2986    let inserted = usize::try_from(inserted_rows)
2987        .unwrap_or(usize::MAX)
2988        .min(total);
2989    let mut statuses = Vec::with_capacity(total);
2990    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
2991    statuses.extend(std::iter::repeat_n(
2992        UpsertStatus::Matched,
2993        total.saturating_sub(inserted),
2994    ));
2995    statuses
2996}
2997
2998// Bare logical table names: the lance-namespace Directory impl owns the
2999// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3000// a `.lance` path.
3001pub(crate) const SESSIONS: &str = "sessions";
3002pub(crate) const MESSAGES: &str = "messages";
3003pub(crate) const PARTS: &str = "parts";
3004
3005/// FTS index name on `messages.search_text`. Stable so status and index
3006/// creation name the same index.
3007pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3008
3009/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3010/// activation check and index creation name the same index.
3011pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3012
3013/// IVF_PQ tuning constants (spec.md#search):
3014/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3015/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3016/// - max_iters = 15 (kmeans cap)
3017/// - cosine metric (e5 vectors are L2-normalized)
3018const IVF_PQ_NUM_BITS: u8 = 8;
3019const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3020const IVF_PQ_MAX_ITERS: usize = 15;
3021
3022/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3023/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3024/// (`FTS`, `OCC`) searchable.
3025const FTS_NGRAM_MIN: u32 = 3;
3026const FTS_NGRAM_MAX: u32 = 5;
3027
3028/// Pond's production IndexIntents: the per-table intent set
3029/// `Store::open_with_options` registers with the substrate.
3030pub fn pond_index_intents() -> IndexIntents {
3031    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3032}
3033
3034/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3035/// threshold. Used by tests that need to exercise the activation boundary
3036/// without writing 100k vectors.
3037pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3038    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3039    messages.push(IndexIntent {
3040        name: MESSAGES_FTS_INDEX,
3041        column: "search_text",
3042        trigger: IndexTrigger::OnAnyRows,
3043        params: IndexParamsKind::InvertedFtsNgram {
3044            min: FTS_NGRAM_MIN,
3045            max: FTS_NGRAM_MAX,
3046        },
3047    });
3048    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3049        messages.push(IndexIntent {
3050            name,
3051            column,
3052            trigger: IndexTrigger::OnAnyRows,
3053            params: IndexParamsKind::Scalar(kind.clone()),
3054        });
3055    }
3056    messages.push(IndexIntent {
3057        name: MESSAGES_VECTOR_INDEX,
3058        column: "vector",
3059        trigger: IndexTrigger::OnNonNullCount {
3060            column: "vector",
3061            threshold: vector_threshold,
3062        },
3063        params: IndexParamsKind::IvfPqCosine {
3064            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3065            num_bits: IVF_PQ_NUM_BITS,
3066            max_iters: IVF_PQ_MAX_ITERS,
3067        },
3068    });
3069    let parts = PARTS_SCALAR_INDICES
3070        .iter()
3071        .map(|(column, kind, name)| IndexIntent {
3072            name,
3073            column,
3074            trigger: IndexTrigger::OnAnyRows,
3075            params: IndexParamsKind::Scalar(kind.clone()),
3076        })
3077        .collect();
3078    let sessions = SESSIONS_SCALAR_INDICES
3079        .iter()
3080        .map(|(column, kind, name)| IndexIntent {
3081            name,
3082            column,
3083            trigger: IndexTrigger::OnAnyRows,
3084            params: IndexParamsKind::Scalar(kind.clone()),
3085        })
3086        .collect();
3087    IndexIntents {
3088        sessions,
3089        messages,
3090        parts,
3091    }
3092}
3093
3094/// Default width of the `messages.vector` embedding column (spec.md#search):
3095/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3096/// 384). Used when `[embeddings].dim` is absent.
3097pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3098
3099/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3100/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3101/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3102/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3103/// which keeps unit tests config-free.
3104static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3105
3106/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3107/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3108pub fn embedding_dim() -> usize {
3109    EMBEDDING_DIM_RUNTIME
3110        .get()
3111        .copied()
3112        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3113}
3114
3115/// Seed [`embedding_dim`] from config. First call wins.
3116pub fn init_embedding_dim(dim: usize) {
3117    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3118}
3119
3120/// Initial-`CREATE` write params for the namespace-mediated path. The
3121/// substrate seam stamps in `session`, `mode`, and `store_params`.
3122/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3123/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3124/// suppresses the per-commit hook so cleanup stays operator-driven via
3125/// `pond index optimize` (one LIST per command instead of per write).
3126pub(crate) fn write_params_for_create() -> WriteParams {
3127    WriteParams {
3128        data_storage_version: Some(LanceFileVersion::V2_1),
3129        enable_v2_manifest_paths: true,
3130        enable_stable_row_ids: true,
3131        auto_cleanup: Some(AutoCleanupParams {
3132            interval: 20,
3133            older_than: chrono::TimeDelta::days(1),
3134        }),
3135        skip_auto_cleanup: true,
3136        ..WriteParams::default()
3137    }
3138}
3139
3140fn export_schema(table: Table) -> Arc<Schema> {
3141    match table {
3142        Table::Sessions => session_schema(),
3143        Table::Messages => message_schema(),
3144        Table::Parts => part_schema(),
3145    }
3146}
3147
3148fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3149    let expected = export_schema(table);
3150    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3151    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3152    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3153    if actual_names != expected_names {
3154        anyhow::bail!(
3155            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3156            table.as_str(),
3157        );
3158    }
3159    Ok(())
3160}
3161
3162async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3163    let source_uri = source
3164        .to_str()
3165        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3166    let dataset = Dataset::open(source_uri)
3167        .await
3168        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3169    ensure_schema_matches_archive(&dataset, table)?;
3170    Ok(dataset)
3171}
3172
3173pub(crate) fn session_schema() -> Arc<Schema> {
3174    Arc::new(Schema::new(vec![
3175        primary_field("id", DataType::Utf8, false),
3176        Field::new("parent_session_id", DataType::Utf8, true),
3177        Field::new("parent_message_id", DataType::Utf8, true),
3178        Field::new("source_agent", DataType::Utf8, false),
3179        Field::new(
3180            "created_at",
3181            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3182            false,
3183        ),
3184        Field::new("project", DataType::Utf8, false),
3185        json_field("options", false),
3186    ]))
3187}
3188
3189pub(crate) fn message_schema() -> Arc<Schema> {
3190    Arc::new(Schema::new(vec![
3191        primary_field("session_id", DataType::Utf8, false),
3192        primary_field("id", DataType::Utf8, false),
3193        Field::new(
3194            "timestamp",
3195            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3196            false,
3197        ),
3198        Field::new("role", DataType::Utf8, false),
3199        Field::new("source_agent", DataType::Utf8, false),
3200        Field::new("project", DataType::Utf8, false),
3201        Field::new("content", DataType::Utf8, true),
3202        Field::new("search_text", DataType::Utf8, true),
3203        // The message's derived embedding (spec.md#session-embed-from-canonical):
3204        // both null until `pond embed` fills them, set together thereafter.
3205        Field::new("vector", embedding_vector_type(), true),
3206        Field::new("embedding_model", DataType::Utf8, true),
3207        json_field("options", false),
3208    ]))
3209}
3210
3211pub(crate) fn part_schema() -> Arc<Schema> {
3212    Arc::new(Schema::new(vec![
3213        primary_field("session_id", DataType::Utf8, false),
3214        primary_field("message_id", DataType::Utf8, false),
3215        primary_field("id", DataType::Utf8, false),
3216        Field::new("ordinal", DataType::Int32, false),
3217        Field::new("type", DataType::Utf8, false),
3218        // spec.md#model-part-provenance: conversation vs harness-injected; search
3219        // reads this column to exclude injected scaffolding.
3220        Field::new("provenance", DataType::Utf8, false),
3221        json_field("variant_data", false),
3222        legacy_blob_field("data", true),
3223        json_field("options", false),
3224    ]))
3225}
3226
3227pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3228    let arrays = schema
3229        .fields()
3230        .iter()
3231        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3232        .collect();
3233    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3234}
3235
3236pub(crate) fn empty_reader(
3237    schema: Arc<Schema>,
3238) -> Result<
3239    RecordBatchIterator<
3240        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3241    >,
3242> {
3243    let batch = empty_batch(schema.clone())?;
3244    Ok(RecordBatchIterator::new(
3245        vec![Ok(batch)].into_iter(),
3246        schema,
3247    ))
3248}
3249
3250pub(crate) struct MessageBatchRow<'a> {
3251    pub message: &'a Message,
3252    pub source_agent: &'a str,
3253    pub project: &'a str,
3254    pub search_text: Option<&'a str>,
3255}
3256
3257// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3258// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3259// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3260// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3261// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3262fn embedding_vector_type() -> DataType {
3263    DataType::FixedSizeList(
3264        Arc::new(Field::new("item", DataType::Float16, true)),
3265        embedding_dim() as i32,
3266    )
3267}
3268
3269/// The partial-schema source for the embedding column update: the `messages`
3270/// primary key plus the two columns `pond embed` fills. The field definitions
3271/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3272fn embedding_update_schema() -> Arc<Schema> {
3273    Arc::new(Schema::new(vec![
3274        primary_field("session_id", DataType::Utf8, false),
3275        primary_field("id", DataType::Utf8, false),
3276        Field::new("vector", embedding_vector_type(), true),
3277        Field::new("embedding_model", DataType::Utf8, true),
3278    ]))
3279}
3280
3281/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3282/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3283pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3284    let dim = embedding_dim();
3285    let mut flat = Vec::with_capacity(rows.len() * dim);
3286    for row in rows {
3287        if row.vector.len() != dim {
3288            anyhow::bail!(
3289                "embedding for message {} has dim {}, expected {dim}",
3290                row.id,
3291                row.vector.len(),
3292            );
3293        }
3294        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3295    }
3296    let values = Float16Array::from(flat);
3297    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3298    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3299        .context("failed to build embedding vector column")?;
3300
3301    RecordBatch::try_new(
3302        embedding_update_schema(),
3303        vec![
3304            Arc::new(StringArray::from(
3305                rows.iter()
3306                    .map(|row| row.session_id.as_str())
3307                    .collect::<Vec<_>>(),
3308            )),
3309            Arc::new(StringArray::from(
3310                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3311            )),
3312            Arc::new(vectors),
3313            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3314        ],
3315    )
3316    .context("failed to build embedding update batch")
3317}
3318
3319/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3320/// is split before the running total of its text columns reaches this, and a
3321/// single cell at or above it is rejected rather than left to panic inside
3322/// `StringArray::from` (spec.md#adapter-bounded-values).
3323const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3324
3325/// Contiguous row ranges whose summed text-column byte cost each stays within
3326/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3327/// column too, since no single column's total can exceed it. `cells[i]` is row
3328/// `i`'s byte cost summed across every text column.
3329fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3330    let mut chunks = Vec::new();
3331    let mut start = 0usize;
3332    let mut running = 0usize;
3333    for (index, &row) in cells.iter().enumerate() {
3334        if running + row > COLUMN_BYTE_BUDGET && index > start {
3335            chunks.push(start..index);
3336            start = index;
3337            running = 0;
3338        }
3339        running += row;
3340    }
3341    if start < cells.len() {
3342        chunks.push(start..cells.len());
3343    }
3344    chunks
3345}
3346
3347fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3348    if bytes >= COLUMN_BYTE_BUDGET {
3349        anyhow::bail!(
3350            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3351             overflow Arrow's i32 offset buffer"
3352        );
3353    }
3354    Ok(())
3355}
3356
3357async fn merge_insert_chunks(
3358    handle: &Handle,
3359    table: Table,
3360    batches: Vec<RecordBatch>,
3361) -> Result<u64> {
3362    let mut inserted = 0u64;
3363    for batch in batches {
3364        let rows = batch.num_rows();
3365        inserted += handle.merge_insert(table, batch, rows).await?;
3366    }
3367    Ok(inserted)
3368}
3369
3370pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3371    let options = sessions
3372        .iter()
3373        .map(|session| json_bytes(&session.options))
3374        .collect::<Result<Vec<_>>>()?;
3375    let mut cells = Vec::with_capacity(sessions.len());
3376    for (session, encoded) in sessions.iter().zip(&options) {
3377        let columns = [
3378            session.id.len(),
3379            session.parent_session_id.as_deref().map_or(0, str::len),
3380            session.parent_message_id.as_deref().map_or(0, str::len),
3381            session.source_agent.len(),
3382            session.project.as_str().len(),
3383            encoded.len(),
3384        ];
3385        for bytes in columns {
3386            guard_cell("sessions", &session.id, bytes)?;
3387        }
3388        cells.push(columns.iter().sum());
3389    }
3390    chunk_ranges(&cells)
3391        .into_iter()
3392        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3393        .collect()
3394}
3395
3396fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3397    let schema = session_schema();
3398    RecordBatch::try_new(
3399        schema.clone(),
3400        vec![
3401            Arc::new(StringArray::from(
3402                sessions
3403                    .iter()
3404                    .map(|session| session.id.as_str())
3405                    .collect::<Vec<_>>(),
3406            )),
3407            Arc::new(StringArray::from(
3408                sessions
3409                    .iter()
3410                    .map(|session| session.parent_session_id.as_deref())
3411                    .collect::<Vec<_>>(),
3412            )),
3413            Arc::new(StringArray::from(
3414                sessions
3415                    .iter()
3416                    .map(|session| session.parent_message_id.as_deref())
3417                    .collect::<Vec<_>>(),
3418            )),
3419            Arc::new(StringArray::from(
3420                sessions
3421                    .iter()
3422                    .map(|session| session.source_agent.as_str())
3423                    .collect::<Vec<_>>(),
3424            )),
3425            Arc::new(
3426                TimestampMicrosecondArray::from(
3427                    sessions
3428                        .iter()
3429                        .map(|session| micros(session.created_at))
3430                        .collect::<Vec<_>>(),
3431                )
3432                .with_timezone("UTC"),
3433            ),
3434            Arc::new(StringArray::from(
3435                sessions
3436                    .iter()
3437                    .map(|session| session.project.as_str())
3438                    .collect::<Vec<_>>(),
3439            )),
3440            Arc::new(LargeBinaryArray::from_iter_values(
3441                options.iter().map(Vec::as_slice),
3442            )),
3443        ],
3444    )
3445    .context("failed to build session batch")
3446}
3447
3448pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3449    let options = rows
3450        .iter()
3451        .map(|row| json_bytes(row.message.options()))
3452        .collect::<Result<Vec<_>>>()?;
3453    let mut cells = Vec::with_capacity(rows.len());
3454    for (row, encoded) in rows.iter().zip(&options) {
3455        let columns = [
3456            row.message.session_id().len(),
3457            row.message.id().len(),
3458            row.message.role().as_str().len(),
3459            row.source_agent.len(),
3460            row.project.len(),
3461            row.message.system_content().map_or(0, str::len),
3462            row.search_text.map_or(0, str::len),
3463            encoded.len(),
3464        ];
3465        for bytes in columns {
3466            guard_cell("messages", row.message.id(), bytes)?;
3467        }
3468        cells.push(columns.iter().sum());
3469    }
3470    chunk_ranges(&cells)
3471        .into_iter()
3472        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3473        .collect()
3474}
3475
3476fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3477    let schema = message_schema();
3478    RecordBatch::try_new(
3479        schema.clone(),
3480        vec![
3481            Arc::new(StringArray::from(
3482                rows.iter()
3483                    .map(|row| row.message.session_id())
3484                    .collect::<Vec<_>>(),
3485            )),
3486            Arc::new(StringArray::from(
3487                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3488            )),
3489            Arc::new(
3490                TimestampMicrosecondArray::from(
3491                    rows.iter()
3492                        .map(|row| micros(row.message.timestamp()))
3493                        .collect::<Vec<_>>(),
3494                )
3495                .with_timezone("UTC"),
3496            ),
3497            Arc::new(StringArray::from(
3498                rows.iter()
3499                    .map(|row| row.message.role().as_str())
3500                    .collect::<Vec<_>>(),
3501            )),
3502            Arc::new(StringArray::from(
3503                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3504            )),
3505            Arc::new(StringArray::from(
3506                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3507            )),
3508            Arc::new(StringArray::from(
3509                rows.iter()
3510                    .map(|row| row.message.system_content())
3511                    .collect::<Vec<_>>(),
3512            )),
3513            Arc::new(StringArray::from(
3514                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3515            )),
3516            // `vector` / `embedding_model` are written null at ingest; every
3517            // message starts un-embedded and `pond embed` fills them later
3518            // (spec.md#session-embed-from-canonical).
3519            new_null_array(&embedding_vector_type(), rows.len()),
3520            new_null_array(&DataType::Utf8, rows.len()),
3521            Arc::new(LargeBinaryArray::from_iter_values(
3522                options.iter().map(Vec::as_slice),
3523            )),
3524        ],
3525    )
3526    .context("failed to build message batch")
3527}
3528
3529pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3530    let variant_data = parts
3531        .iter()
3532        .map(|part| part_variant_json(&part.kind))
3533        .collect::<Result<Vec<_>>>()?;
3534    let options = parts
3535        .iter()
3536        .map(|part| json_bytes(&part.options))
3537        .collect::<Result<Vec<_>>>()?;
3538    let mut cells = Vec::with_capacity(parts.len());
3539    // The blob column is a BinaryArray, exempt from the text-column bound
3540    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3541    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3542        let columns = [
3543            part.session_id.len(),
3544            part.message_id.len(),
3545            part.id.len(),
3546            part.kind.type_name().len(),
3547            part.provenance.as_str().len(),
3548            variant.len(),
3549            encoded.len(),
3550        ];
3551        for bytes in columns {
3552            guard_cell("parts", &part.id, bytes)?;
3553        }
3554        cells.push(columns.iter().sum());
3555    }
3556    chunk_ranges(&cells)
3557        .into_iter()
3558        .map(|range| {
3559            parts_chunk(
3560                &parts[range.clone()],
3561                &variant_data[range.clone()],
3562                &options[range],
3563            )
3564        })
3565        .collect()
3566}
3567
3568fn parts_chunk(
3569    parts: &[Part],
3570    variant_data: &[Vec<u8>],
3571    options: &[Vec<u8>],
3572) -> Result<RecordBatch> {
3573    let schema = part_schema();
3574    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3575    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3576    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3577    let blob_payloads: Vec<Option<&[u8]>> = parts
3578        .iter()
3579        .map(|part| match &part.kind {
3580            PartKind::File { data, .. } => Some(match data {
3581                FileData::String(value) => value.as_bytes(),
3582                FileData::Bytes(value) => value.as_slice(),
3583                FileData::Url(value) => value.as_bytes(),
3584            }),
3585            PartKind::Text { .. }
3586            | PartKind::Reasoning { .. }
3587            | PartKind::ToolCall { .. }
3588            | PartKind::ToolResult { .. }
3589            | PartKind::ToolApprovalRequest { .. }
3590            | PartKind::ToolApprovalResponse { .. } => None,
3591        })
3592        .collect();
3593    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3594
3595    RecordBatch::try_new(
3596        schema.clone(),
3597        vec![
3598            Arc::new(StringArray::from(
3599                parts
3600                    .iter()
3601                    .map(|part| part.session_id.as_str())
3602                    .collect::<Vec<_>>(),
3603            )),
3604            Arc::new(StringArray::from(
3605                parts
3606                    .iter()
3607                    .map(|part| part.message_id.as_str())
3608                    .collect::<Vec<_>>(),
3609            )),
3610            Arc::new(StringArray::from(
3611                parts
3612                    .iter()
3613                    .map(|part| part.id.as_str())
3614                    .collect::<Vec<_>>(),
3615            )),
3616            Arc::new(Int32Array::from(
3617                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3618            )),
3619            Arc::new(StringArray::from(
3620                parts
3621                    .iter()
3622                    .map(|part| part.kind.type_name())
3623                    .collect::<Vec<_>>(),
3624            )),
3625            Arc::new(StringArray::from(
3626                parts
3627                    .iter()
3628                    .map(|part| part.provenance.as_str())
3629                    .collect::<Vec<_>>(),
3630            )),
3631            Arc::new(LargeBinaryArray::from_iter_values(
3632                variant_data.iter().map(Vec::as_slice),
3633            )),
3634            Arc::new(blob_array),
3635            Arc::new(LargeBinaryArray::from_iter_values(
3636                options.iter().map(Vec::as_slice),
3637            )),
3638        ],
3639    )
3640    .context("failed to build parts batch")
3641}
3642
3643pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3644    Ok(Session {
3645        id: string(batch, "id", row)?.context("session id is null")?,
3646        parent_session_id: string(batch, "parent_session_id", row)?,
3647        parent_message_id: string(batch, "parent_message_id", row)?,
3648        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3649        created_at: datetime(batch, "created_at", row)?,
3650        project: crate::adapter::Extracted::from_stored(
3651            string(batch, "project", row)?.context("project is null")?,
3652        ),
3653        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3654    })
3655}
3656
3657pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3658    let id = string(batch, "id", row)?.context("message id is null")?;
3659    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3660    let timestamp = datetime(batch, "timestamp", row)?;
3661    let options =
3662        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3663
3664    match string(batch, "role", row)?
3665        .context("message role is null")?
3666        .as_str()
3667    {
3668        "system" => Ok(Message::System {
3669            id,
3670            session_id,
3671            timestamp,
3672            // `content` is nullable in the schema; preserve the distinction
3673            // between "no content row stored" (`None`) and "empty string
3674            // stored" (`Some(extracted_empty)`). The value originally
3675            // came from a `Source` extraction at ingest time; rewrap via
3676            // the storage-internal `from_stored` so the type-system seal
3677            // for adapters stays intact.
3678            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3679            options,
3680        }),
3681        "user" => Ok(Message::User {
3682            id,
3683            session_id,
3684            timestamp,
3685            options,
3686        }),
3687        "assistant" => Ok(Message::Assistant {
3688            id,
3689            session_id,
3690            timestamp,
3691            options,
3692        }),
3693        "tool" => Ok(Message::Tool {
3694            id,
3695            session_id,
3696            timestamp,
3697            options,
3698        }),
3699        other => anyhow::bail!("unknown message role {other}"),
3700    }
3701}
3702
3703pub(crate) fn part_from_batch(
3704    batch: &RecordBatch,
3705    row: usize,
3706    file_data: Option<FileData>,
3707) -> Result<Part> {
3708    let type_name = string(batch, "type", row)?.context("part type is null")?;
3709    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3710    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3711    Ok(Part {
3712        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3713        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3714        id: string(batch, "id", row)?.context("part id is null")?,
3715        ordinal: int32(batch, "ordinal", row)?,
3716        provenance: provenance_from_str(&provenance)?,
3717        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3718        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3719    })
3720}
3721
3722fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3723    match value {
3724        "conversational" => Ok(crate::wire::Provenance::Conversational),
3725        "injected" => Ok(crate::wire::Provenance::Injected),
3726        other => anyhow::bail!("unknown part provenance {other}"),
3727    }
3728}
3729
3730fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3731    let kind = file_data_kind(variant_data)?;
3732    match kind.as_str() {
3733        "string" => {
3734            let text = std::str::from_utf8(bytes)
3735                .context("file string payload is not UTF-8")?
3736                .to_owned();
3737            Ok(FileData::String(text))
3738        }
3739        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3740        "url" => Ok(FileData::Url(
3741            std::str::from_utf8(bytes)
3742                .context("file URL payload is not UTF-8")?
3743                .to_owned(),
3744        )),
3745        other => anyhow::bail!("unknown file data_kind {other}"),
3746    }
3747}
3748
3749fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3750    let value = json_parse::<Value>(variant_data)?;
3751    value
3752        .get("data_kind")
3753        .and_then(Value::as_str)
3754        .map(str::to_owned)
3755        .context("file part variant_data missing data_kind")
3756}
3757
3758fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3759    batch
3760        .column_by_name(name)
3761        .with_context(|| format!("missing column {name}"))?
3762        .as_any()
3763        .downcast_ref::<UInt64Array>()
3764        .with_context(|| format!("column {name} is not UInt64"))
3765}
3766
3767pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3768    let array = batch
3769        .column_by_name(name)
3770        .with_context(|| format!("missing column {name}"))?
3771        .as_any()
3772        .downcast_ref::<StringArray>()
3773        .with_context(|| format!("column {name} is not Utf8"))?;
3774    if array.is_null(row) {
3775        Ok(None)
3776    } else {
3777        Ok(Some(array.value(row).to_owned()))
3778    }
3779}
3780
3781fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3782    // Lance can return a `lance.json` column either as raw JSONB bytes
3783    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3784    // LargeUtf8), depending on the read path. Handle both.
3785    let column = batch
3786        .column_by_name(name)
3787        .with_context(|| format!("missing column {name}"))?;
3788    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3789        return if array.is_null(row) {
3790            Ok(None)
3791        } else {
3792            Ok(Some(
3793                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3794            ))
3795        };
3796    }
3797    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3798        return if array.is_null(row) {
3799            Ok(None)
3800        } else {
3801            Ok(Some(array.value(row).as_bytes().to_vec()))
3802        };
3803    }
3804    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3805        return if array.is_null(row) {
3806            Ok(None)
3807        } else {
3808            Ok(Some(array.value(row).as_bytes().to_vec()))
3809        };
3810    }
3811    anyhow::bail!("column {name} is not a JSON-compatible array")
3812}
3813
3814fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3815    let array = batch
3816        .column_by_name(name)
3817        .with_context(|| format!("missing column {name}"))?
3818        .as_any()
3819        .downcast_ref::<Int32Array>()
3820        .with_context(|| format!("column {name} is not Int32"))?;
3821    Ok(array.value(row))
3822}
3823
3824pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3825    let array = batch
3826        .column_by_name(name)
3827        .with_context(|| format!("missing column {name}"))?
3828        .as_any()
3829        .downcast_ref::<Float32Array>()
3830        .with_context(|| format!("column {name} is not Float32"))?;
3831    Ok(array.value(row))
3832}
3833
3834pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3835    let array = batch
3836        .column_by_name(name)
3837        .with_context(|| format!("missing column {name}"))?
3838        .as_any()
3839        .downcast_ref::<TimestampMicrosecondArray>()
3840        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3841    Utc.timestamp_micros(array.value(row))
3842        .single()
3843        .context("timestamp is out of range")
3844}
3845
3846fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3847    Field::new(name, data_type, nullable).with_metadata(
3848        [(
3849            "lance-schema:unenforced-primary-key".to_owned(),
3850            "true".to_owned(),
3851        )]
3852        .into(),
3853    )
3854}
3855
3856// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3857// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3858// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3859// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3860// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3861// allocated only one column_info, so the decoder's second `expect_next()`
3862// fires `"there were more fields in the schema than provided column
3863// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3864// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3865fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3866    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3867        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3868            .into_iter()
3869            .collect(),
3870    )
3871}
3872
3873fn json_field(name: &str, nullable: bool) -> Field {
3874    lance_arrow::json::json_field(name, nullable)
3875}
3876
3877fn micros(timestamp: DateTime<Utc>) -> i64 {
3878    timestamp.timestamp_micros()
3879}
3880
3881fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3882    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3883    // matches the `lance.json` extension contract. Lance's compact path
3884    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3885    // applies `decode_json -> encode_json` on this column; with proper JSONB
3886    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3887    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3888    // `try_into_dfstream`; compact still goes through the adapter).
3889    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3890    lance_arrow::json::encode_json(&text)
3891        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3892}
3893
3894fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3895    serde_json::from_slice(value).context("failed to parse JSON field")
3896}
3897
3898fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3899    if let PartKind::File {
3900        media_type,
3901        file_name,
3902        data,
3903    } = kind
3904    {
3905        let data_kind = match data {
3906            FileData::String(_) => "string",
3907            FileData::Bytes(_) => "bytes",
3908            FileData::Url(_) => "url",
3909        };
3910        return json_bytes(&serde_json::json!({
3911            "media_type": media_type,
3912            "file_name": file_name,
3913            "data_kind": data_kind,
3914        }));
3915    }
3916    let value = serde_json::to_value(kind)?;
3917    let mut object = value
3918        .as_object()
3919        .cloned()
3920        .context("part variant did not serialize to an object")?;
3921    object.remove("type");
3922    json_bytes(&object)
3923}
3924
3925fn part_kind_from_json(
3926    type_name: &str,
3927    variant_data: &[u8],
3928    file_data: Option<FileData>,
3929) -> Result<PartKind> {
3930    let mut value = json_parse::<Value>(variant_data)?;
3931    let object = value
3932        .as_object_mut()
3933        .context("part variant data is not an object")?;
3934    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3935    if let Some(data) = file_data {
3936        object.remove("data_kind");
3937        object.insert("data".to_owned(), serde_json::to_value(data)?);
3938    }
3939    serde_json::from_value(value).context("failed to parse part kind")
3940}
3941
3942#[cfg(test)]
3943mod tests {
3944    #![allow(clippy::expect_used, clippy::unwrap_used)]
3945
3946    use super::*;
3947    use crate::{
3948        adapter::Extracted,
3949        handlers::ingest_events,
3950        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3951    };
3952    use chrono::Utc;
3953    use serde_json::json;
3954    use tempfile::TempDir;
3955
3956    fn synthetic_session(id: &str) -> Session {
3957        Session {
3958            id: id.to_owned(),
3959            parent_session_id: None,
3960            parent_message_id: None,
3961            source_agent: "claude-code".to_owned(),
3962            created_at: Utc::now(),
3963            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3964            options: ProviderOptions::new(),
3965        }
3966    }
3967
3968    #[test]
3969    fn search_text_excludes_injected_parts() {
3970        use crate::wire::Provenance;
3971        let message = Message::User {
3972            id: "m1".to_owned(),
3973            session_id: "s1".to_owned(),
3974            timestamp: Utc::now(),
3975            options: ProviderOptions::new(),
3976        };
3977        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
3978            session_id: "s1".to_owned(),
3979            id: id.to_owned(),
3980            message_id: "m1".to_owned(),
3981            ordinal: 0,
3982            provenance,
3983            options: ProviderOptions::new(),
3984            kind: PartKind::Text {
3985                text: Some(Extracted::from_test_value(text.to_owned())),
3986            },
3987        };
3988
3989        // A conversational part contributes; an injected one is excluded
3990        // (spec.md#search).
3991        let conversational = search_text(
3992            &message,
3993            &[text_part(
3994                "p1",
3995                "real human prompt",
3996                Provenance::Conversational,
3997            )],
3998        );
3999        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4000
4001        let injected = search_text(
4002            &message,
4003            &[text_part(
4004                "p2",
4005                "<task-notification>...</task-notification>",
4006                Provenance::Injected,
4007            )],
4008        );
4009        assert!(
4010            injected.is_none(),
4011            "a message whose only part is injected has null search_text"
4012        );
4013    }
4014
4015    #[test]
4016    fn chunk_ranges_splits_on_byte_budget() {
4017        assert!(chunk_ranges(&[]).is_empty());
4018        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4019
4020        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4021        assert_eq!(
4022            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4023            vec![0..1, 1..2, 2..3],
4024        );
4025
4026        // An oversized single row gets its own chunk, never an infinite loop.
4027        assert_eq!(
4028            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4029            vec![0..1, 1..2, 2..3],
4030        );
4031    }
4032
4033    #[tokio::test]
4034    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4035        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4036        // Message is dropped on the spot, with one Error outcome surfaced. The
4037        // rest of the substream continues normally - subsequent valid messages
4038        // and parts get written.
4039        let temp = TempDir::new()?;
4040        let store = Store::open_local(temp.path()).await?;
4041        let session = synthetic_session("ordering");
4042        let orphan_part = Part {
4043            session_id: session.id.clone(),
4044            id: "orphan-part".to_owned(),
4045            message_id: "missing-message".to_owned(),
4046            ordinal: 0,
4047            provenance: crate::wire::Provenance::Conversational,
4048            options: ProviderOptions::new(),
4049            kind: PartKind::Text {
4050                text: Some(Extracted::from_test_value("orphan".to_owned())),
4051            },
4052        };
4053        let valid_message = Message::User {
4054            id: "valid-message".to_owned(),
4055            session_id: session.id.clone(),
4056            timestamp: Utc::now(),
4057            options: ProviderOptions::new(),
4058        };
4059        let valid_part = Part {
4060            session_id: session.id.clone(),
4061            id: "valid-part".to_owned(),
4062            message_id: valid_message.id().to_owned(),
4063            ordinal: 0,
4064            provenance: crate::wire::Provenance::Conversational,
4065            options: ProviderOptions::new(),
4066            kind: PartKind::Text {
4067                text: Some(Extracted::from_test_value("kept".to_owned())),
4068            },
4069        };
4070
4071        let mut validator = IngestValidator::default();
4072        validator
4073            .push(&store, 0, IngestEvent::Session(session.clone()))
4074            .await?;
4075        let part_outcomes = validator
4076            .push(&store, 1, IngestEvent::Part(orphan_part))
4077            .await?;
4078        assert_eq!(part_outcomes.len(), 1);
4079        assert_eq!(part_outcomes[0].kind, "part");
4080        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4081        assert!(
4082            part_outcomes[0]
4083                .error
4084                .as_ref()
4085                .map(|e| e.message.contains("part event appeared before a message"))
4086                .unwrap_or(false),
4087            "error message must explain the ordering violation: {part_outcomes:?}"
4088        );
4089        validator
4090            .push(&store, 2, IngestEvent::Message(valid_message))
4091            .await?;
4092        validator
4093            .push(&store, 3, IngestEvent::Part(valid_part))
4094            .await?;
4095        validator.finish(&store).await?;
4096
4097        let (sessions, messages, parts) = store.row_counts().await?;
4098        assert_eq!(sessions, 1, "session committed despite the orphan part");
4099        assert_eq!(messages, 1, "valid message committed");
4100        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4101
4102        Ok(())
4103    }
4104
4105    #[tokio::test]
4106    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4107        // Per-event drop: a duplicate message id within a substream drops the
4108        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4109        // session still commits.
4110        let temp = TempDir::new()?;
4111        let store = Store::open_local(temp.path()).await?;
4112        let session = synthetic_session("duplicate-message");
4113        let first = Message::User {
4114            id: "message-1".to_owned(),
4115            session_id: session.id.clone(),
4116            timestamp: Utc::now(),
4117            options: ProviderOptions::new(),
4118        };
4119        let second = Message::Assistant {
4120            id: "message-1".to_owned(),
4121            session_id: session.id.clone(),
4122            timestamp: Utc::now(),
4123            options: ProviderOptions::new(),
4124        };
4125
4126        let mut validator = IngestValidator::default();
4127        validator
4128            .push(&store, 0, IngestEvent::Session(session.clone()))
4129            .await?;
4130        validator
4131            .push(&store, 1, IngestEvent::Message(first))
4132            .await?;
4133        let dup_outcomes = validator
4134            .push(&store, 2, IngestEvent::Message(second))
4135            .await?;
4136        assert_eq!(dup_outcomes.len(), 1);
4137        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4138        assert!(
4139            dup_outcomes[0]
4140                .error
4141                .as_ref()
4142                .map(|e| e.message.contains("duplicate message id message-1"))
4143                .unwrap_or(false),
4144            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4145        );
4146
4147        validator.finish(&store).await?;
4148        let (sessions, messages, _) = store.row_counts().await?;
4149        assert_eq!(sessions, 1, "session committed");
4150        assert_eq!(messages, 1, "only the first message committed");
4151
4152        Ok(())
4153    }
4154
4155    /// Regression: compact_files on `parts` with the blob column tripped a
4156    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4157    /// batches give compact fragments to merge; every `FileData` variant
4158    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4159    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4160    /// write batch mixes null + valid rows in the blob column - benign in
4161    /// release, irrelevant to this regression's scope.
4162    #[tokio::test(flavor = "multi_thread")]
4163    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4164        use crate::wire::{FileData, PartKind, Provenance};
4165        let temp = TempDir::new()?;
4166        let store = Store::open_local(temp.path()).await?;
4167
4168        let session = synthetic_session("compact-blob");
4169        store
4170            .upsert_sessions(std::slice::from_ref(&session))
4171            .await?;
4172
4173        let make_part = |idx: usize, kind: PartKind| Part {
4174            session_id: session.id.clone(),
4175            message_id: format!("msg-{idx}"),
4176            id: format!("part-{idx}"),
4177            ordinal: 0,
4178            provenance: Provenance::Conversational,
4179            options: ProviderOptions::new(),
4180            kind,
4181        };
4182
4183        let batch_a = vec![
4184            make_part(
4185                0,
4186                PartKind::File {
4187                    media_type: Some("text/plain".to_owned()),
4188                    file_name: Some("a.txt".to_owned()),
4189                    data: FileData::Bytes(b"alpha".to_vec()),
4190                },
4191            ),
4192            make_part(
4193                1,
4194                PartKind::File {
4195                    media_type: Some("text/plain".to_owned()),
4196                    file_name: Some("b.txt".to_owned()),
4197                    data: FileData::String("beta".to_owned()),
4198                },
4199            ),
4200        ];
4201        store.upsert_parts(&batch_a).await?;
4202
4203        let batch_b = vec![
4204            make_part(
4205                2,
4206                PartKind::File {
4207                    media_type: Some("application/octet-stream".to_owned()),
4208                    file_name: None,
4209                    data: FileData::Url("https://example.com/file".to_owned()),
4210                },
4211            ),
4212            make_part(
4213                3,
4214                PartKind::File {
4215                    media_type: Some("image/png".to_owned()),
4216                    file_name: Some("c.png".to_owned()),
4217                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4218                },
4219            ),
4220        ];
4221        store.upsert_parts(&batch_b).await?;
4222
4223        store
4224            .optimize_indices(None, &MaintenancePolicy::always_compact())
4225            .await?
4226            .into_result()?;
4227
4228        Ok(())
4229    }
4230
4231    #[tokio::test]
4232    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4233        let temp = TempDir::new()?;
4234        let store = Store::open_local(temp.path()).await?;
4235        let session = synthetic_session("blob");
4236        let message = Message::User {
4237            id: "message-1".to_owned(),
4238            session_id: session.id.clone(),
4239            timestamp: Utc::now(),
4240            options: ProviderOptions::new(),
4241        };
4242        let part = Part {
4243            session_id: session.id.clone(),
4244            id: "part-1".to_owned(),
4245            message_id: message.id().to_owned(),
4246            ordinal: 0,
4247            provenance: crate::wire::Provenance::Conversational,
4248            options: ProviderOptions::new(),
4249            kind: PartKind::File {
4250                media_type: Some("text/plain".to_owned()),
4251                file_name: Some("payload.txt".to_owned()),
4252                data: FileData::Bytes(b"pond".to_vec()),
4253            },
4254        };
4255
4256        let mut validator = IngestValidator::default();
4257        validator
4258            .push(&store, 0, IngestEvent::Session(session.clone()))
4259            .await?;
4260        validator
4261            .push(&store, 1, IngestEvent::Message(message.clone()))
4262            .await?;
4263        validator
4264            .push(&store, 2, IngestEvent::Part(part.clone()))
4265            .await?;
4266        validator.finish(&store).await?;
4267
4268        let stored = store
4269            .get_session(&session.id)
4270            .await?
4271            .expect("session should exist");
4272        let stored_part = &stored.messages[0].parts[0];
4273        assert_eq!(stored_part, &part);
4274
4275        Ok(())
4276    }
4277
4278    //
4279    // `Session.source_agent` and `Session.project` are immutable
4280    // post-first-write because `messages` denormalizes them at first
4281    // ingest; a silent overwrite would desync the denormalized
4282    // copies. pond core's `IngestValidator` probes the existing session
4283    // before the merge_insert and emits a per-row `validation_failed`
4284    // outcome with the typed field name when either changes. Other Session
4285    // fields (options, parent_session_id, created_at, parent_message_id)
4286    // re-write idempotently via merge_insert.
4287
4288    fn base_session() -> Session {
4289        Session {
4290            id: "01HXY00000000001".to_owned(),
4291            parent_session_id: None,
4292            parent_message_id: None,
4293            source_agent: "claude-code".to_owned(),
4294            created_at: Utc::now(),
4295            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4296            options: ProviderOptions::new(),
4297        }
4298    }
4299
4300    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4301        outcomes
4302            .iter()
4303            .filter(|outcome| outcome.status == target)
4304            .count()
4305    }
4306
4307    #[tokio::test(flavor = "multi_thread")]
4308    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4309    -> anyhow::Result<()> {
4310        let temp = TempDir::new()?;
4311        let store = Store::open_local(temp.path()).await?;
4312
4313        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4314        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4315
4316        let mut again = base_session();
4317        again.options.insert("title".to_owned(), json!("renamed"));
4318        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4319        assert_eq!(
4320            count_status(&second, OutcomeStatus::Error),
4321            0,
4322            "options is mutable; the re-ingest must not surface an error: {second:?}",
4323        );
4324        assert_eq!(
4325            count_status(&second, OutcomeStatus::Matched),
4326            1,
4327            "unchanged immutable fields must match-insert via merge_insert",
4328        );
4329
4330        Ok(())
4331    }
4332
4333    #[tokio::test(flavor = "multi_thread")]
4334    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4335        let temp = TempDir::new()?;
4336        let store = Store::open_local(temp.path()).await?;
4337
4338        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4339        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4340
4341        let mut tampered = base_session();
4342        tampered.source_agent = "codex-cli".to_owned();
4343        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4344        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4345        let err_row = second
4346            .iter()
4347            .find(|outcome| outcome.status == OutcomeStatus::Error)
4348            .expect("error outcome present");
4349        let err = err_row.error.as_ref().expect("error body present");
4350        assert_eq!(err.field, Some("source_agent"));
4351        assert_eq!(err.reason, Some("immutable"));
4352
4353        // The stored row stayed on the original adapter - no silent rewrite.
4354        let stored = store
4355            .get_session(&base_session().id)
4356            .await?
4357            .expect("session row survives the rejected re-ingest");
4358        assert_eq!(stored.session.source_agent, "claude-code");
4359
4360        Ok(())
4361    }
4362
4363    #[tokio::test(flavor = "multi_thread")]
4364    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4365        let temp = TempDir::new()?;
4366        let store = Store::open_local(temp.path()).await?;
4367
4368        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4369        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4370
4371        let mut tampered = base_session();
4372        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4373        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4374        let err_row = second
4375            .iter()
4376            .find(|outcome| outcome.status == OutcomeStatus::Error)
4377            .expect("project change must surface an error outcome");
4378        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4379
4380        let stored = store
4381            .get_session(&base_session().id)
4382            .await?
4383            .expect("session row survives");
4384        assert_eq!(
4385            stored.session.project.as_str(),
4386            "/home/me/proj",
4387            "stored project must remain the original",
4388        );
4389
4390        Ok(())
4391    }
4392
4393    #[tokio::test(flavor = "multi_thread")]
4394    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4395        // Regression guard: re-ingesting an existing session with NEW
4396        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4397        // on `BatchCounts`, and per-row outcomes must mark the new message
4398        // rows `Inserted` while the session row is `Matched`. The prior
4399        // implementation derived all per-row statuses from the batch-level
4400        // session inserted count, which silently flipped the new messages
4401        // into `Matched` (visible as "up to date" in the CLI bar tail).
4402        use crate::wire::Provenance;
4403        let temp = TempDir::new()?;
4404        let store = Store::open_local(temp.path()).await?;
4405        let session = base_session();
4406
4407        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4408            session_id: session.id.clone(),
4409            id: part_id.to_owned(),
4410            message_id: message_id.to_owned(),
4411            ordinal: 0,
4412            provenance: Provenance::Conversational,
4413            options: ProviderOptions::new(),
4414            kind: PartKind::Text {
4415                text: Some(Extracted::from_test_value(body.to_owned())),
4416            },
4417        };
4418        let user_message = |id: &str| Message::User {
4419            id: id.to_owned(),
4420            session_id: session.id.clone(),
4421            timestamp: Utc::now(),
4422            options: ProviderOptions::new(),
4423        };
4424
4425        // First pass: 2 messages land fresh.
4426        let mut validator = IngestValidator::default();
4427        validator
4428            .push(&store, 0, IngestEvent::Session(session.clone()))
4429            .await?;
4430        validator
4431            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4432            .await?;
4433        validator
4434            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4435            .await?;
4436        validator
4437            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4438            .await?;
4439        validator
4440            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4441            .await?;
4442        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4443        assert_eq!(first_counts.sessions_inserted, 1);
4444        assert_eq!(first_counts.messages_inserted_total, 2);
4445        assert_eq!(first_counts.messages_inserted_searchable, 2);
4446
4447        // Second pass: same session id, 3 NEW messages.
4448        let mut validator = IngestValidator::default();
4449        validator
4450            .push(&store, 0, IngestEvent::Session(session.clone()))
4451            .await?;
4452        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4453            let pid = format!("p{}", idx + 3);
4454            validator
4455                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4456                .await?;
4457            validator
4458                .push(
4459                    &store,
4460                    idx * 2 + 2,
4461                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4462                )
4463                .await?;
4464        }
4465        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4466
4467        assert_eq!(
4468            second_counts.sessions_inserted, 0,
4469            "existing session row must report as Matched, not Inserted",
4470        );
4471        assert_eq!(second_counts.sessions_matched, 1);
4472        assert_eq!(
4473            second_counts.messages_inserted_total, 3,
4474            "the three NEW messages must register as Inserted in BatchCounts",
4475        );
4476        assert_eq!(
4477            second_counts.messages_inserted_searchable, 3,
4478            "all three new messages carry conversational text -> searchable",
4479        );
4480        assert_eq!(second_counts.messages_matched_total, 0);
4481        assert_eq!(second_counts.parts_inserted, 3);
4482        assert_eq!(second_counts.parts_matched, 0);
4483
4484        // Per-row outcomes mirror the BatchCounts shape: the session row is
4485        // Matched, every new message + part row is Inserted.
4486        let session_outcome = second_outcomes
4487            .iter()
4488            .find(|outcome| outcome.kind == "session")
4489            .expect("session-row outcome present");
4490        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4491        for outcome in &second_outcomes {
4492            if outcome.kind == "message" || outcome.kind == "part" {
4493                assert_eq!(
4494                    outcome.status,
4495                    OutcomeStatus::Inserted,
4496                    "new row must be Inserted, got: {outcome:?}",
4497                );
4498            }
4499        }
4500        Ok(())
4501    }
4502
4503    /// Ingest `count` synthetic messages spread across a handful of sessions
4504    /// and projects, each with conversational `search_text`. Returns the store
4505    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4506    async fn store_with_messages(
4507        temp: &TempDir,
4508        count: usize,
4509    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4510        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4511    }
4512
4513    /// Same as [`store_with_messages`] but tests optimize with a custom
4514    /// IVF_PQ activation threshold.
4515    async fn store_with_messages_at_threshold(
4516        temp: &TempDir,
4517        count: usize,
4518        _vector_threshold: usize,
4519    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4520        let store = Store::open_local(temp.path()).await?;
4521        let sessions = 8.min(count.max(1));
4522        let mut events = Vec::new();
4523        for s in 0..sessions {
4524            events.push(IngestEvent::Session(Session {
4525                id: format!("session-{s}"),
4526                parent_session_id: None,
4527                parent_message_id: None,
4528                source_agent: "claude-code".to_owned(),
4529                created_at: Utc::now(),
4530                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4531                options: ProviderOptions::new(),
4532            }));
4533            for i in (s..count).step_by(sessions) {
4534                let message_id = format!("msg-{i}");
4535                events.push(IngestEvent::Message(Message::User {
4536                    id: message_id.clone(),
4537                    session_id: format!("session-{s}"),
4538                    timestamp: Utc::now(),
4539                    options: ProviderOptions::new(),
4540                }));
4541                events.push(IngestEvent::Part(Part {
4542                    session_id: format!("session-{s}"),
4543                    id: format!("{message_id}-part"),
4544                    message_id,
4545                    ordinal: 0,
4546                    provenance: crate::wire::Provenance::Conversational,
4547                    options: ProviderOptions::new(),
4548                    kind: PartKind::Text {
4549                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4550                    },
4551                }));
4552            }
4553        }
4554        ingest_events(&store, events).await?;
4555        let keys = (0..count)
4556            .map(|i| MessageKey {
4557                session_id: format!("session-{}", i % sessions),
4558                message_id: format!("msg-{i}"),
4559            })
4560            .collect();
4561        Ok((store, keys))
4562    }
4563
4564    /// A deterministic pseudo-random vector of the production dimension.
4565    fn synthetic_vector(seed: usize) -> Vec<f32> {
4566        let mut state = (seed as u64)
4567            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4568            .wrapping_add(1);
4569        (0..embedding_dim())
4570            .map(|_| {
4571                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4572                #[allow(clippy::cast_precision_loss)]
4573                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4574                unit - 1.0
4575            })
4576            .collect()
4577    }
4578
4579    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4580    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4581        keys.iter()
4582            .enumerate()
4583            .map(|(seed, key)| EmbeddedMessage {
4584                session_id: key.session_id.clone(),
4585                id: key.message_id.clone(),
4586                vector: synthetic_vector(seed),
4587            })
4588            .collect()
4589    }
4590
4591    fn embedding_update_batch_with_model(
4592        rows: &[EmbeddedMessage],
4593        model: &str,
4594    ) -> Result<RecordBatch> {
4595        let mut batch = embedding_update_batch(rows)?;
4596        let columns = batch
4597            .columns()
4598            .iter()
4599            .take(3)
4600            .cloned()
4601            .chain(std::iter::once(
4602                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4603            ))
4604            .collect::<Vec<_>>();
4605        batch = RecordBatch::try_new(batch.schema(), columns)?;
4606        Ok(batch)
4607    }
4608
4609    #[tokio::test]
4610    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4611        let temp = TempDir::new()?;
4612        // 4 messages cycle session-0..session-3, so `session-3` is a real
4613        // partition. Scalar-index pushdown is volume-independent: the planner
4614        // emits `ScalarIndexQuery` whenever the index exists.
4615        let (store, keys) = store_with_messages(&temp, 4).await?;
4616        store.write_embeddings(&embedded(&keys)).await?;
4617        store
4618            .optimize_indices(None, &MaintenancePolicy::always_compact())
4619            .await?
4620            .into_result()?;
4621
4622        let query = vec![0.01_f32; embedding_dim()];
4623        let plan = store
4624            .explain_vector_plan(
4625                &query,
4626                10,
4627                &Predicate::Eq("session_id", "session-3".into()),
4628                None,
4629            )
4630            .await?;
4631
4632        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4633        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4634        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4635        // and unrelated.)
4636        assert!(
4637            plan.contains("ScalarIndexQuery"),
4638            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4639        );
4640        let predicate_postfiltered = plan
4641            .lines()
4642            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4643        assert!(
4644            !predicate_postfiltered,
4645            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4646        );
4647        Ok(())
4648    }
4649
4650    #[tokio::test]
4651    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4652        let temp = TempDir::new()?;
4653        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4654
4655        // First batch: 255 vectors, one below threshold. Optimize does not
4656        // create the IVF_PQ because the trigger is not met.
4657        store.write_embeddings(&embedded(&keys[..255])).await?;
4658        store
4659            .optimize_indices_with_vector_threshold(256)
4660            .await?
4661            .into_result()?;
4662        assert!(
4663            !store
4664                .handle
4665                .messages_index_names()
4666                .await?
4667                .iter()
4668                .any(|name| name == MESSAGES_VECTOR_INDEX),
4669            "IVF_PQ must not exist below the activation threshold",
4670        );
4671
4672        // Next batch: one more vector. Total reaches 256; optimize creates
4673        // the IVF_PQ.
4674        store.write_embeddings(&embedded(&keys[255..256])).await?;
4675        store
4676            .optimize_indices_with_vector_threshold(256)
4677            .await?
4678            .into_result()?;
4679        assert!(
4680            store
4681                .handle
4682                .messages_index_names()
4683                .await?
4684                .iter()
4685                .any(|name| name == MESSAGES_VECTOR_INDEX),
4686            "optimize must create the IVF_PQ once the threshold is crossed",
4687        );
4688
4689        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4690        // non-null subset and a planted vector is retrievable.
4691        let hits = store
4692            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4693            .await?;
4694        assert!(
4695            hits.iter().any(|(key, _)| key == &keys[0]),
4696            "an embedded row is retrievable via the index",
4697        );
4698        Ok(())
4699    }
4700
4701    #[tokio::test]
4702    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4703    {
4704        let temp = TempDir::new()?;
4705        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4706        let old_rows = embedded(&keys);
4707        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4708        store
4709            .handle
4710            .merge_update(Table::Messages, old_batch, old_rows.len())
4711            .await?;
4712        store
4713            .optimize_indices_with_vector_threshold(256)
4714            .await?
4715            .into_result()?;
4716        assert!(
4717            store
4718                .handle
4719                .messages_index_names()
4720                .await?
4721                .iter()
4722                .any(|name| name == MESSAGES_VECTOR_INDEX),
4723            "IVF_PQ must exist before a model swap",
4724        );
4725        assert_eq!(store.stale_embedding_count().await?, keys.len());
4726
4727        store.drop_vector_index().await?;
4728        let mut pending = Vec::new();
4729        let stream = store.pending_or_stale_messages();
4730        tokio::pin!(stream);
4731        while let Some(row) = stream.next().await {
4732            pending.push(row?);
4733        }
4734        assert_eq!(
4735            pending.len(),
4736            keys.len(),
4737            "force stream should see stale rows"
4738        );
4739        store.write_embeddings(&embedded(&keys)).await?;
4740        assert_eq!(store.stale_embedding_count().await?, 0);
4741        store
4742            .optimize_indices_with_vector_threshold(256)
4743            .await?
4744            .into_result()?;
4745        assert!(
4746            store
4747                .handle
4748                .messages_index_names()
4749                .await?
4750                .iter()
4751                .any(|name| name == MESSAGES_VECTOR_INDEX),
4752            "optimize must rebuild IVF_PQ after force re-embed",
4753        );
4754
4755        let stream = store.pending_or_stale_messages();
4756        tokio::pin!(stream);
4757        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4758        Ok(())
4759    }
4760
4761    #[tokio::test]
4762    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4763        // Regression: `_row_last_updated_at_version` can point at a Lance
4764        // manifest version that `cleanup_old_versions` or the auto_cleanup
4765        // hook has since dropped from `Dataset::versions()`. The old code
4766        // silently dropped any session whose row-version was not in the
4767        // visible list, collapsing the staleness-skip map down to recent
4768        // commits and forcing `pond sync` to re-touch every file. The fix
4769        // falls back to the oldest still-visible commit timestamp - a
4770        // sound upper bound on the row's true ingest time.
4771        let temp = TempDir::new()?;
4772        let (store, _keys) = store_with_messages(&temp, 4).await?;
4773
4774        // Produce several distinct manifest versions on `sessions` so the
4775        // older ones become eligible for cleanup.
4776        for tag in 0..3 {
4777            let extra = synthetic_session(&format!("extra-{tag}"));
4778            store.upsert_sessions(&[extra]).await?;
4779        }
4780
4781        // Prune everything older than ~now, leaving only the latest manifest.
4782        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4783        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4784        // Lance's `delete_unverified` floor still protects in-flight files.
4785        let dataset = store.handle.dataset(Table::Sessions).await?;
4786        dataset
4787            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4788            .await
4789            .context("cleanup_old_versions failed")?;
4790
4791        let map = store.session_last_ingested_at().await?;
4792        let session_count = store.row_counts().await?.0;
4793        assert!(
4794            map.len() >= session_count,
4795            "watermark map ({}) must still cover every session ({}) after \
4796             version cleanup; an empty fallback regresses pond sync to a \
4797             full re-scan",
4798            map.len(),
4799            session_count,
4800        );
4801        Ok(())
4802    }
4803
4804    #[tokio::test]
4805    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4806        let temp = TempDir::new()?;
4807        let (store, keys) = store_with_messages(&temp, 10).await?;
4808
4809        let before = store.embedding_progress().await?;
4810        assert_eq!(before.embedded, 0);
4811        assert_eq!(before.total, 10);
4812        assert_eq!(before.model, crate::embed::model_id());
4813
4814        store.write_embeddings(&embedded(&keys[..4])).await?;
4815        let partial = store.embedding_progress().await?;
4816        assert_eq!(partial.embedded, 4);
4817        assert_eq!(partial.total, 10);
4818
4819        store.write_embeddings(&embedded(&keys[4..])).await?;
4820        let full = store.embedding_progress().await?;
4821        assert_eq!(full.embedded, 10);
4822        assert_eq!(full.total, 10);
4823        Ok(())
4824    }
4825}