Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{
32        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33        SessionFrom,
34    },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40    handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45    pub sessions: usize,
46    pub messages: usize,
47    pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52    pub sessions: u64,
53    pub messages: u64,
54    pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59    pub rows: LanceArchiveCounts,
60    pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65    pub rows: LanceArchiveCounts,
66    pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71    pub sessions: Vec<IndexIntent>,
72    pub messages: Vec<IndexIntent>,
73    pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78        [
79            (Table::Sessions, &self.sessions),
80            (Table::Messages, &self.messages),
81            (Table::Parts, &self.parts),
82        ]
83    }
84}
85
86/// A message awaiting embedding: its primary key plus the `search_text` to
87/// embed. The vector lives on the same `messages` row, so no denormalized
88/// filter columns are needed (spec.md#session-embed-from-canonical).
89#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91    pub session_id: String,
92    pub id: String,
93    pub search_text: String,
94}
95
96/// One embedded message: a primary key and the vector to store. `pond embed`
97/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
98#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100    pub session_id: String,
101    pub id: String,
102    pub vector: Vec<f32>,
103}
104
105/// Message metadata used to hydrate search hits after retriever ranking.
106#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108    pub message_id: String,
109    pub session_id: String,
110    pub role: String,
111    pub project: String,
112    pub source_agent: String,
113    pub timestamp: DateTime<Utc>,
114    pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119    pub session_id: String,
120    pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125    Inserted,
126    Matched,
127}
128
129/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
130/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
131/// results so the CLI can render compaction-skipped (under writer contention)
132/// distinctly from index-build failure (real problem).
133#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135    pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139    /// True if any table's indices phase reported a non-conflict failure.
140    /// `SkippedConflict` is expected under contention and does not count.
141    pub fn any_indices_failed(&self) -> bool {
142        self.tables.iter().any(|t| t.indices.is_failed())
143    }
144
145    /// Treat any `Failed` phase as an error. Tests that don't run under
146    /// contention use this to keep their existing `.await?` style: a real
147    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
148    pub fn into_result(self) -> Result<Self> {
149        for table in &self.tables {
150            if let PhaseOutcome::Failed(error) = &table.indices {
151                anyhow::bail!(
152                    "indices phase failed on {}: {error:#}",
153                    table.table.as_str()
154                );
155            }
156            if let PhaseOutcome::Failed(error) = &table.compaction {
157                anyhow::bail!(
158                    "compaction phase failed on {}: {error:#}",
159                    table.table.as_str()
160                );
161            }
162        }
163        Ok(self)
164    }
165}
166
167/// What `pond status` reports: where the data lives, total rows per table,
168/// and a per-(adapter, project) breakdown built from one `messages` scan.
169#[derive(Debug, Clone)]
170pub struct CorpusStats {
171    pub data_url: Url,
172    pub totals: RowTotals,
173    /// One entry per adapter present in the corpus. When `include_subagents`
174    /// is false (the CLI default), sub-agent rows (`source_agent` containing
175    /// `/`) are filtered out so only the main-agent sessions appear. When
176    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
177    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
178    /// this into registry order at render time so the tree matches the
179    /// discovery picker.
180    pub adapters: Vec<AdapterStats>,
181    /// Whether the rollup includes sub-agent sessions. The renderer prints a
182    /// hint about `--include-subagents` when this is false so users know the
183    /// `totals` row above counts sessions that aren't broken down below.
184    pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189    pub sessions: u64,
190    pub messages: u64,
191    pub parts: u64,
192}
193
194/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
195/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
196/// without `search_text` produce no vector. `embedded` is the subset of those
197/// already carrying a vector under the current [`embed::model_id()`]. The pending
198/// backlog is `total - embedded`.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201    pub embedded: usize,
202    pub total: usize,
203    pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
209    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
210    /// `include_subagents` is on.
211    pub adapter: String,
212    pub sessions: u64,
213    pub messages: u64,
214    /// Projects under this adapter, sorted by message count desc, then by
215    /// project name asc.
216    pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221    pub project: String,
222    pub sessions: u64,
223    pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228    messages: u64,
229    session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234    pub message: &'a Message,
235    pub parts: &'a [Part],
236    pub search_text: Option<&'a str>,
237}
238
239impl Store {
240    /// Open against a local filesystem URL or a remote one for which the
241    /// caller has no extra options to pass (env vars suffice). CLI verbs
242    /// that load `[storage]` from config should call
243    /// [`Store::open_with_options`] instead so the same options flow into
244    /// every dataset open and write.
245    pub async fn open(location: &Url) -> Result<Self> {
246        Ok(Self {
247            handle: Handle::open(location).await?,
248        })
249    }
250
251    /// Open with object-store options (S3 creds, region, endpoint, ...)
252    /// threaded through Lance verbatim. Keys are the standard `object_store`
253    /// config names; pond does not parse them. Empty options + default caps
254    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
255    /// config block via [`crate::substrate::RuntimeCaps`].
256    pub async fn open_with_options(
257        location: &Url,
258        storage_options: std::collections::HashMap<String, String>,
259        caps: crate::substrate::RuntimeCaps,
260    ) -> Result<Self> {
261        Ok(Self {
262            handle: Handle::open_with_options(location, storage_options, caps).await?,
263        })
264    }
265
266    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
267    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
268    /// through [`Store::open_with_options`] so the production policy is
269    /// applied; tests get the backend-aware local-FS defaults.
270    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271        let url = config::url_for_path(path)?;
272        Self::open_with_options(
273            &url,
274            std::collections::HashMap::new(),
275            crate::substrate::RuntimeCaps::default(),
276        )
277        .await
278    }
279
280    /// Export clean, index-free Lance datasets into `dest`.
281    ///
282    /// This rewrites the visible rows of each table instead of copying the
283    /// dataset roots. The resulting manifests therefore contain no references
284    /// to the source store's `_indices`, while `messages.vector` and
285    /// `messages.embedding_model` remain ordinary data columns and are
286    /// preserved.
287    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288        std::fs::create_dir_all(dest)
289            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290        let (sessions, sessions_version) = self
291            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292            .await?;
293        let (messages, messages_version) = self
294            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295            .await?;
296        let (parts, parts_version) = self
297            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298            .await?;
299        Ok(LanceArchiveExport {
300            rows: LanceArchiveCounts {
301                sessions,
302                messages,
303                parts,
304            },
305            source_versions: LanceArchiveVersions {
306                sessions: sessions_version,
307                messages: messages_version,
308                parts: parts_version,
309            },
310        })
311    }
312
313    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314        let sessions_dataset =
315            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316        let messages_dataset =
317            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319        let (sessions, sessions_inserted) = self
320            .import_clean_table(Table::Sessions, sessions_dataset)
321            .await?;
322        let (messages, messages_inserted) = self
323            .import_clean_table(Table::Messages, messages_dataset)
324            .await?;
325        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326        Ok(LanceArchiveImport {
327            rows: LanceArchiveCounts {
328                sessions,
329                messages,
330                parts,
331            },
332            inserted: LanceArchiveCounts {
333                sessions: sessions_inserted,
334                messages: messages_inserted,
335                parts: parts_inserted,
336            },
337        })
338    }
339
340    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341        let dataset = self.handle.dataset(table).await?;
342        let source_version = dataset.version_id();
343        let schema = export_schema(table);
344        let mut stream = dataset
345            .scan()
346            .try_into_stream()
347            .await
348            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349        let dest_uri = dest
350            .to_str()
351            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353        let mut rows = 0usize;
354        let mut wrote = false;
355        while let Some(batch) = stream.next().await {
356            let batch = batch
357                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358            rows += batch.num_rows();
359            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360            let mut params = write_params_for_create();
361            if wrote {
362                params.mode = WriteMode::Append;
363            }
364            Dataset::write(reader, dest_uri, Some(params))
365                .await
366                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367            wrote = true;
368        }
369
370        if !wrote {
371            let batch = RecordBatch::new_empty(schema.clone());
372            let reader = RecordBatchIterator::new([Ok(batch)], schema);
373            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374                .await
375                .with_context(|| {
376                    format!("failed to write empty {} archive table", table.as_str())
377                })?;
378        }
379        Ok((rows, source_version))
380    }
381
382    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383        let mut stream = dataset
384            .scan()
385            .try_into_stream()
386            .await
387            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388        let mut rows = 0usize;
389        let mut inserted = 0usize;
390        while let Some(batch) = stream.next().await {
391            let batch = batch
392                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393            let row_count = batch.num_rows();
394            rows += row_count;
395            inserted += self
396                .handle
397                .merge_insert(table, batch, row_count)
398                .await
399                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400                as usize;
401        }
402        Ok((rows, inserted))
403    }
404
405    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406        if sessions.is_empty() {
407            return Ok(Vec::new());
408        }
409        let batches = sessions_batches(sessions)?;
410        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411        // Per-row outcomes; no fold here (see `pond index optimize`).
412        Ok(statuses_from_inserted(sessions.len(), inserted))
413    }
414
415    /// Batched write path used by the adapter ingest loop and by the wire
416    /// handler's final flush. Receives N completed substreams from the
417    /// validator and:
418    ///
419    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
420    ///      per session, sequentially. Sessions that fail produce one Error
421    ///      outcome and are excluded from the write batch.
422    ///   2. Deduplicates in-batch at the substream level: when two substreams
423    ///      in the same batch share a `session_id` (Claude Code's subagent
424    ///      files reuse their parent's id), the first occurrence wins. The
425    ///      second is either *merged* (same `source_agent` + `project`:
426    ///      messages/parts append, no duplicate rows) or *rejected*
427    ///      (different `project` - the subagent-vs-parent case). Row-level
428    ///      duplicates that slip past here are caught downstream by Lance's
429    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
430    ///      (invariant 17): this layer's job is preserving substream merge
431    ///      semantics, not policing the PK uniqueness Lance handles itself.
432    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
433    ///      parts) across every valid substream.
434    ///   4. Fires the three `merge_insert` calls in parallel via
435    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
436    ///      independent, so these proceed concurrently.
437    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
438    async fn upsert_session_batch(
439        &self,
440        substreams: Vec<CompletedSubstream>,
441    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442        if substreams.is_empty() {
443            return Ok((Vec::new(), BatchCounts::default()));
444        }
445
446        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447        let mut counts = BatchCounts::default();
448
449        // In-batch dedup. First occurrence of each session_id wins; later
450        // occurrences either merge or get rejected. Iteration order preserves
451        // original substream order so outcomes index correctly.
452        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453        let mut by_session_id: std::collections::HashMap<String, usize> =
454            std::collections::HashMap::with_capacity(substreams.len());
455        for substream in substreams {
456            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457                let existing = &merged[existing_idx];
458                if existing.session.source_agent != substream.session.source_agent
459                    || existing.session.project != substream.session.project
460                {
461                    // Subagent-vs-parent class. The first occurrence's
462                    // metadata stays authoritative; this substream is
463                    // rejected on the same immutable-field axis as the
464                    // storage-side check.
465                    let reason = if existing.session.source_agent != substream.session.source_agent
466                    {
467                        IngestError::ImmutableField {
468                            field: "source_agent",
469                            session_id: substream.session.id.clone(),
470                            stored: existing.session.source_agent.clone(),
471                            attempted: substream.session.source_agent.clone(),
472                        }
473                    } else {
474                        IngestError::ImmutableField {
475                            field: "project",
476                            session_id: substream.session.id.clone(),
477                            stored: (*existing.session.project).clone(),
478                            attempted: (*substream.session.project).clone(),
479                        }
480                    };
481                    let field = match &reason {
482                        IngestError::ImmutableField { field, .. } => Some(*field),
483                    };
484                    let reason_key = match field {
485                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487                        _ => DROP_REASON_UNCATEGORIZED,
488                    };
489                    outcomes.extend(error_outcomes_for_substream(
490                        substream.session_index,
491                        &substream.session,
492                        &substream.messages,
493                        reason.to_string(),
494                        field,
495                        reason_key,
496                    ));
497                    continue;
498                }
499                // Same session, same metadata: merge messages. Dedup message
500                // ids defensively (within one batch, the validator's seen
501                // sets are per-substream so cross-substream dups can happen
502                // legally if both files re-emit the same row).
503                let existing = &mut merged[existing_idx];
504                let mut seen: std::collections::HashSet<String> = existing
505                    .messages
506                    .iter()
507                    .map(|m| m.message.id().to_owned())
508                    .collect();
509                for msg in substream.messages {
510                    if seen.insert(msg.message.id().to_owned()) {
511                        existing.messages.push(msg);
512                    }
513                }
514                continue;
515            }
516            by_session_id.insert(substream.session.id.clone(), merged.len());
517            merged.push(substream);
518        }
519
520        // Pre-existence sweep: one scan per table keyed on the batch's
521        // session_ids, capped at the substream count. Replaces the prior
522        // N-sequential `find_session` calls and gives us honest per-row
523        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
524        let session_id_values: Vec<ScalarValue> = merged
525            .iter()
526            .map(|substream| ScalarValue::String(substream.session.id.clone()))
527            .collect();
528        let existing_sessions: std::collections::HashMap<String, Session> =
529            if session_id_values.is_empty() {
530                std::collections::HashMap::new()
531            } else {
532                let batch = self
533                    .handle
534                    .scan_batch(
535                        Table::Sessions,
536                        Some(&Predicate::In("id", session_id_values.clone())),
537                        &[],
538                    )
539                    .await?;
540                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541                for row in 0..batch.num_rows() {
542                    let session = session_from_batch(&batch, row)?;
543                    map.insert(session.id.clone(), session);
544                }
545                map
546            };
547        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548            HashSet::new()
549        } else {
550            let batch = self
551                .handle
552                .scan_batch(
553                    Table::Messages,
554                    Some(&Predicate::In("session_id", session_id_values.clone())),
555                    &["session_id", "id"],
556                )
557                .await?;
558            let mut set = HashSet::with_capacity(batch.num_rows());
559            for row in 0..batch.num_rows() {
560                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561                let mid = string(&batch, "id", row)?.context("message id is null")?;
562                set.insert((sid, mid));
563            }
564            set
565        };
566        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567            HashSet::new()
568        } else {
569            let batch = self
570                .handle
571                .scan_batch(
572                    Table::Parts,
573                    Some(&Predicate::In("session_id", session_id_values)),
574                    &["session_id", "message_id", "id"],
575                )
576                .await?;
577            let mut set = HashSet::with_capacity(batch.num_rows());
578            for row in 0..batch.num_rows() {
579                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581                let pid = string(&batch, "id", row)?.context("part id is null")?;
582                set.insert((sid, mid, pid));
583            }
584            set
585        };
586
587        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588        for substream in merged {
589            if let Some(existing) = existing_sessions.get(&substream.session.id)
590                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591            {
592                let field = match &failure {
593                    IngestError::ImmutableField { field, .. } => Some(*field),
594                };
595                let reason_key = match field {
596                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598                    _ => DROP_REASON_UNCATEGORIZED,
599                };
600                outcomes.extend(error_outcomes_for_substream(
601                    substream.session_index,
602                    &substream.session,
603                    &substream.messages,
604                    failure.to_string(),
605                    field,
606                    reason_key,
607                ));
608                continue;
609            }
610            writeable.push(substream);
611        }
612
613        if writeable.is_empty() {
614            outcomes.sort_by_key(|outcome| outcome.index);
615            return Ok((outcomes, counts));
616        }
617
618        let sessions_owned: Vec<Session> = writeable
619            .iter()
620            .map(|substream| substream.session.clone())
621            .collect();
622        let message_rows: Vec<MessageBatchRow<'_>> = writeable
623            .iter()
624            .flat_map(|substream| {
625                substream.messages.iter().map(|buffered| MessageBatchRow {
626                    message: &buffered.message,
627                    source_agent: &substream.session.source_agent,
628                    project: &substream.session.project,
629                    search_text: buffered.search_text.as_deref(),
630                })
631            })
632            .collect();
633        let part_rows: Vec<Part> = writeable
634            .iter()
635            .flat_map(|substream| {
636                substream.messages.iter().flat_map(|buffered| {
637                    buffered
638                        .parts
639                        .iter()
640                        .map(|buffered_part| buffered_part.part.clone())
641                })
642            })
643            .collect();
644
645        let session_batches = sessions_batches(&sessions_owned)?;
646        let message_batches = messages_batches(&message_rows)?;
647        let part_batches = parts_batches(&part_rows)?;
648
649        // Merge_insert returns a batch-level inserted count which we cross-
650        // check against our pre-existence sets, but for per-row truth we
651        // attribute through the sets themselves (next loop). Under
652        // single-writer the two agree exactly; under a hostile concurrent
653        // writer the sets are authoritative for THIS request's wire shape -
654        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
655        // distinction informational, not behavioral.
656        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660        )?;
661
662        for substream in &writeable {
663            outcomes.extend(success_outcomes_for_substream(
664                substream.session_index,
665                &substream.session,
666                &substream.messages,
667                &existing_sessions,
668                &existing_message_pks,
669                &existing_part_pks,
670                &mut counts,
671            ));
672        }
673
674        outcomes.sort_by_key(|outcome| outcome.index);
675        Ok((outcomes, counts))
676    }
677
678    pub async fn upsert_messages(
679        &self,
680        session: &Session,
681        messages: &[MessageWrite<'_>],
682    ) -> Result<Vec<UpsertStatus>> {
683        if messages.is_empty() {
684            return Ok(Vec::new());
685        }
686
687        let rows = messages
688            .iter()
689            .map(|write| MessageBatchRow {
690                message: write.message,
691                source_agent: &session.source_agent,
692                project: &session.project,
693                search_text: write.search_text,
694            })
695            .collect::<Vec<_>>();
696        let batches = messages_batches(&rows)?;
697        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698        Ok(statuses_from_inserted(messages.len(), inserted))
699    }
700
701    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702        if parts.is_empty() {
703            return Ok(Vec::new());
704        }
705        let batches = parts_batches(parts)?;
706        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707        Ok(statuses_from_inserted(parts.len(), inserted))
708    }
709
710    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711        let Some(session) = self.find_session(session_id).await? else {
712            return Ok(None);
713        };
714        let messages = self.messages_for_session(session_id).await?;
715        Ok(Some(SessionWithMessages { session, messages }))
716    }
717
718    /// Every session id currently in the store, unsorted.
719    pub async fn session_ids(&self) -> Result<Vec<String>> {
720        let batch = self
721            .handle
722            .scan_batch(Table::Sessions, None, &["id"])
723            .await?;
724        let mut ids = Vec::with_capacity(batch.num_rows());
725        for row in 0..batch.num_rows() {
726            if let Some(id) = string(&batch, "id", row)? {
727                ids.push(id);
728            }
729        }
730        Ok(ids)
731    }
732
733    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734        let batch = self
735            .handle
736            .scan_batch(
737                Table::Sessions,
738                Some(&Predicate::Eq(
739                    "parent_session_id",
740                    parent_session_id.into(),
741                )),
742                &[
743                    "id",
744                    "parent_session_id",
745                    "parent_message_id",
746                    "source_agent",
747                    "created_at",
748                    "project",
749                    "options",
750                ],
751            )
752            .await?;
753        let mut sessions = Vec::with_capacity(batch.num_rows());
754        for row in 0..batch.num_rows() {
755            sessions.push(session_from_batch(&batch, row)?);
756        }
757        sessions.sort_by(|left, right| left.id.cmp(&right.id));
758        Ok(sessions)
759    }
760
761    /// `session_id -> wall-clock time of the Lance manifest version that
762    /// last wrote the row` for the per-session staleness skip
763    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
764    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
765    /// and joins it against `Dataset::versions()` for commit timestamps.
766    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767        use lance::deps::arrow_array::UInt64Array;
768
769        let dataset = self.handle.dataset(Table::Sessions).await?;
770        let version_list = dataset.versions().await?;
771        let versions: HashMap<u64, DateTime<Utc>> = version_list
772            .iter()
773            .map(|v| (v.version, v.timestamp))
774            .collect();
775        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
776        // pruned versions from the manifest list, leaving rows whose
777        // `_row_last_updated_at_version` points at a version that no longer
778        // resolves. Those rows are still real and were ingested at some time
779        // <= the oldest still-visible version's commit timestamp - so falling
780        // back to that bound preserves a sound `mtime <= ingested` upper edge
781        // and keeps the staleness skip working after cleanup.
782        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784        let scanner = self
785            .handle
786            .scan(
787                Table::Sessions,
788                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789            )
790            .await?;
791        let mut stream = scanner.try_into_stream().await?;
792        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793        while let Some(batch) = stream.next().await {
794            let batch = batch?;
795            let version_array = batch
796                .column_by_name("_row_last_updated_at_version")
797                .context("missing _row_last_updated_at_version column")?
798                .as_any()
799                .downcast_ref::<UInt64Array>()
800                .context("_row_last_updated_at_version is not UInt64")?;
801            for row in 0..batch.num_rows() {
802                let Some(id) = string(&batch, "id", row)? else {
803                    continue;
804                };
805                if version_array.is_null(row) {
806                    continue;
807                }
808                let version = version_array.value(row);
809                let ts = versions.get(&version).copied().or(oldest_visible_ts);
810                if let Some(ts) = ts {
811                    out.insert(id, ts);
812                }
813            }
814        }
815        Ok(out)
816    }
817
818    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
819    /// Conversational filters to `search_text IS NOT NULL`; Complete and
820    /// Verbatim scan every message. Every mode attaches compact part summaries;
821    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
822    /// lower bound (a message id); the page is bounded by `limit` and a byte
823    /// budget and never cuts mid-message.
824    pub async fn session_view(
825        &self,
826        session_id: &str,
827        params: SessionViewParams<'_>,
828    ) -> Result<GetLookup<SessionPage>> {
829        let Some(session) = self.find_session(session_id).await? else {
830            return Ok(GetLookup::NotFound);
831        };
832
833        let mut rows = match params.mode {
834            ResponseMode::Conversational => self
835                .scan_conversational_messages(session_id)
836                .await?
837                .into_iter()
838                .map(|row| ScanRow {
839                    id: row.message_id,
840                    role: row.role,
841                    timestamp: row.timestamp,
842                    text: Some(row.text.into_inner()),
843                    content: None,
844                })
845                .collect(),
846            ResponseMode::Complete | ResponseMode::Verbatim => {
847                self.scan_all_messages(session_id).await?
848            }
849        };
850        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852        let start_at = match params.after_id {
853            // Append-only stream: a real anchor never vanishes, so an unknown
854            // `after_id` is a stale/mistyped client cursor, not "start over".
855            Some(after) => match rows.iter().position(|row| row.id == after) {
856                Some(idx) => idx + 1,
857                None => return Ok(GetLookup::UnknownAfterId),
858            },
859            None => 0,
860        };
861        let remaining = rows.get(start_at..).unwrap_or(&[]);
862        let (emitted, messages_remaining) = match params.session_from {
863            SessionFrom::Start => {
864                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865                    row.text.as_deref().map_or(0, str::len)
866                });
867                (&remaining[..n], remaining.len() - n)
868            }
869            // Tail: the newest messages that fit `limit` and the byte budget,
870            // dropping oldest first; the newest is always kept and the page
871            // stays chronological so the agent reads the flow forward.
872            SessionFrom::End => {
873                let mut bytes = 0usize;
874                let mut start = remaining.len();
875                for row in remaining.iter().rev() {
876                    if remaining.len() - start >= params.limit {
877                        break;
878                    }
879                    let size = row.text.as_deref().map_or(0, str::len);
880                    if start < remaining.len() && bytes + size > params.budget_bytes {
881                        break;
882                    }
883                    bytes += size;
884                    start -= 1;
885                }
886                (&remaining[start..], start)
887            }
888        };
889        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891        // Conversational/Complete only summarize parts; Verbatim inlines every
892        // part (blobs included).
893        let mut parts_by_message = match params.mode {
894            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895            ResponseMode::Conversational | ResponseMode::Complete => {
896                self.summary_parts_for_messages(session_id, &ids).await?
897            }
898        };
899        let messages = emitted
900            .iter()
901            .map(|row| RetrievedMessage {
902                id: row.id.clone(),
903                role: row.role,
904                timestamp: row.timestamp,
905                text: row.text.clone(),
906                content: row.content.clone(),
907                parts: parts_by_message
908                    .remove(&(session_id.to_owned(), row.id.clone()))
909                    .unwrap_or_default(),
910            })
911            .collect();
912
913        Ok(GetLookup::Found(SessionPage {
914            session,
915            messages,
916            messages_remaining,
917        }))
918    }
919
920    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
921    /// the target with its full parts (paginated by `after_id` over part
922    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
923    /// `None` when no stored message carries `message_id`. Sibling parts are
924    /// carried for summarizing; the target's parts ride `target_parts`.
925    pub async fn message_view(
926        &self,
927        message_id: &str,
928        params: MessageViewParams<'_>,
929    ) -> Result<GetLookup<MessagePage>> {
930        let Some(session_id) = self.session_id_for_message(message_id).await? else {
931            return Ok(GetLookup::NotFound);
932        };
933        let Some(session) = self.find_session(&session_id).await? else {
934            return Ok(GetLookup::NotFound);
935        };
936        let mut rows = self.scan_all_messages(&session_id).await?;
937        // spec.md#protocol: context siblings follow the response mode, and the
938        // default is the conversational view - in carrier-heavy sessions the
939        // system/tool rows would otherwise fill the whole +-depth window and
940        // push the actual conversation out of it. The target stays regardless
941        // of its own role: the caller asked for that message.
942        if matches!(params.mode, ResponseMode::Conversational) {
943            rows.retain(|row| row.text.is_some() || row.id == message_id);
944        }
945        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
946        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
947            return Ok(GetLookup::NotFound);
948        };
949
950        let start = target_pos.saturating_sub(params.context_depth);
951        let end = (target_pos + params.context_depth + 1).min(rows.len());
952        let window = &rows[start..end];
953        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
954        // The target's full parts (blobs included) ride the response; siblings
955        // are only summarized, but they share this one window scan.
956        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
957
958        let all_parts = parts_by_message
959            .remove(&(session_id.clone(), message_id.to_owned()))
960            .unwrap_or_default();
961        let start_part = match params.after_id {
962            // Exclusive over ordinals: parts are ordinal-sorted, so the first
963            // part past the anchor's ordinal is the page start. An anchor absent
964            // from the target's parts is a stale/mistyped client cursor.
965            Some(after) => match all_parts.iter().find(|part| part.id == after) {
966                Some(anchor) => all_parts
967                    .iter()
968                    .position(|part| part.ordinal > anchor.ordinal)
969                    .unwrap_or(all_parts.len()),
970                None => return Ok(GetLookup::UnknownAfterId),
971            },
972            None => 0,
973        };
974        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
975        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
976            serde_json::to_string(part).map_or(0, |json| json.len())
977        });
978        let target_parts = remaining_parts[..part_count].to_vec();
979        let target_parts_remaining = remaining_parts.len() - part_count;
980
981        let target_row = &rows[target_pos];
982        let target = RetrievedMessage {
983            id: target_row.id.clone(),
984            role: target_row.role,
985            timestamp: target_row.timestamp,
986            text: target_row.text.clone(),
987            content: target_row.content.clone(),
988            // Target structure is carried in full by `target_parts`.
989            parts: Vec::new(),
990        };
991        let siblings = window
992            .iter()
993            .enumerate()
994            .filter(|(idx, _)| start + idx != target_pos)
995            .map(|(_, row)| RetrievedMessage {
996                id: row.id.clone(),
997                role: row.role,
998                timestamp: row.timestamp,
999                text: row.text.clone(),
1000                content: row.content.clone(),
1001                parts: parts_by_message
1002                    .get(&(session_id.clone(), row.id.clone()))
1003                    .cloned()
1004                    .unwrap_or_default(),
1005            })
1006            .collect();
1007
1008        Ok(GetLookup::Found(MessagePage {
1009            session,
1010            target,
1011            target_parts,
1012            target_parts_remaining,
1013            siblings,
1014        }))
1015    }
1016
1017    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1018        let batch = self
1019            .handle
1020            .scan_batch(
1021                Table::Messages,
1022                Some(&Predicate::Eq("session_id", session_id.into())),
1023                &["id", "timestamp", "role", "search_text", "content"],
1024            )
1025            .await?;
1026        let mut rows = Vec::with_capacity(batch.num_rows());
1027        for row in 0..batch.num_rows() {
1028            let id = string(&batch, "id", row)?.context("message id is null")?;
1029            let role =
1030                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1031            let timestamp = datetime(&batch, "timestamp", row)?;
1032            rows.push(ScanRow {
1033                id,
1034                role,
1035                timestamp,
1036                text: string(&batch, "search_text", row)?,
1037                content: string(&batch, "content", row)?,
1038            });
1039        }
1040        Ok(rows)
1041    }
1042
1043    /// Conversational scan over one session: rows ordered by
1044    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1045    /// read seam (spec.md#search-prefilter-pushdown).
1046    pub async fn scan_conversational_messages(
1047        &self,
1048        session_id: &str,
1049    ) -> Result<Vec<ConversationalRow>> {
1050        let filter = Predicate::And(vec![
1051            Predicate::Eq("session_id", session_id.into()),
1052            Predicate::IsNotNull("search_text"),
1053        ]);
1054        let batch = self
1055            .handle
1056            .scan_batch(
1057                Table::Messages,
1058                Some(&filter),
1059                &["id", "timestamp", "role", "search_text"],
1060            )
1061            .await?;
1062
1063        let mut rows = Vec::with_capacity(batch.num_rows());
1064        for row in 0..batch.num_rows() {
1065            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1066            let role =
1067                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1068            let timestamp = datetime(&batch, "timestamp", row)?;
1069            let text_str = string(&batch, "search_text", row)?.context(
1070                "search_text null after IsNotNull pushdown - storage invariant violated",
1071            )?;
1072            rows.push(ConversationalRow {
1073                session_id: session_id.to_owned(),
1074                message_id,
1075                role,
1076                timestamp,
1077                text: SearchText(text_str),
1078            });
1079        }
1080        rows.sort_by(|a, b| {
1081            a.timestamp
1082                .cmp(&b.timestamp)
1083                .then_with(|| a.message_id.cmp(&b.message_id))
1084        });
1085        Ok(rows)
1086    }
1087
1088    /// Locate the session id for a stored message. Cheap when only the routing
1089    /// hint is needed - callers that need the messages use `scan_all_messages`.
1090    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1091        let batch = self
1092            .handle
1093            .scan_batch(
1094                Table::Messages,
1095                Some(&Predicate::Eq("id", message_id.into())),
1096                &["session_id"],
1097            )
1098            .await?;
1099        if batch.num_rows() == 0 {
1100            return Ok(None);
1101        }
1102        string(&batch, "session_id", 0)
1103    }
1104
1105    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1106        self.handle.row_counts().await
1107    }
1108
1109    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1110    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1111    /// handle's freshness gate, so each query sees a current snapshot.
1112    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1113        Ok(Arc::new(self.handle.dataset(table).await?))
1114    }
1115
1116    /// Write a `pond_sql_query` export artifact.
1117    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1118        self.handle.export_write(name, bytes).await
1119    }
1120
1121    /// Read a `pond_sql_query` export artifact back.
1122    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1123        self.handle.export_read(name).await
1124    }
1125
1126    /// Local filesystem path of an export artifact on `file://` installs.
1127    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1128        self.handle.export_local_path(name)
1129    }
1130
1131    /// Compute the per-adapter / per-project rollup that drives
1132    /// `pond status`. One scan over `messages` projecting the three
1133    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1134    /// aggregated in-memory. Bounded by the cross product of adapters and
1135    /// projects, which stays small on real corpora.
1136    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1137        let scanner = self
1138            .handle
1139            .scan(
1140                Table::Messages,
1141                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1142            )
1143            .await?;
1144        let mut stream = scanner.try_into_stream().await?;
1145        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1146        while let Some(batch) = stream.next().await {
1147            let batch = batch?;
1148            for row in 0..batch.num_rows() {
1149                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1150                let project = string(&batch, "project", row)?.unwrap_or_default();
1151                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1152                let is_subagent = source_agent.contains('/');
1153                if is_subagent && !include_subagents {
1154                    continue;
1155                }
1156                let entry = groups.entry((source_agent, project)).or_default();
1157                entry.messages += 1;
1158                entry.session_ids.insert(session_id);
1159            }
1160        }
1161
1162        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1163        let totals = RowTotals {
1164            sessions: totals_sessions as u64,
1165            messages: totals_messages as u64,
1166            parts: totals_parts as u64,
1167        };
1168
1169        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1170        for ((adapter, project), acc) in groups {
1171            by_adapter.entry(adapter).or_default().push(ProjectStats {
1172                project,
1173                sessions: acc.session_ids.len() as u64,
1174                messages: acc.messages,
1175            });
1176        }
1177
1178        let mut adapters = Vec::with_capacity(by_adapter.len());
1179        for (adapter, mut projects) in by_adapter {
1180            projects.sort_by(|a, b| {
1181                b.messages
1182                    .cmp(&a.messages)
1183                    .then_with(|| a.project.cmp(&b.project))
1184            });
1185            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1186            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1187            adapters.push(AdapterStats {
1188                adapter,
1189                sessions,
1190                messages,
1191                projects,
1192            });
1193        }
1194
1195        Ok(CorpusStats {
1196            data_url: self.handle.location().clone(),
1197            totals,
1198            adapters,
1199            include_subagents,
1200        })
1201    }
1202
1203    /// Write a batch of embeddings into `messages`: set `vector` and
1204    /// `embedding_model` on each row by `(session_id, id)`
1205    /// (spec.md#session-embed-from-canonical). The column update goes through the
1206    /// write seam and lands as a new manifest version (`append-only`).
1207    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1208        if rows.is_empty() {
1209            return Ok(());
1210        }
1211        let batch = embedding_update_batch(rows)?;
1212        self.handle
1213            .merge_update(Table::Messages, batch, rows.len())
1214            .await?;
1215        Ok(())
1216    }
1217
1218    /// Stream the backlog of messages needing embedding: rows with `search_text`
1219    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1220    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1221        try_stream! {
1222            let filter = Predicate::And(vec![
1223                Predicate::IsNull("vector"),
1224                Predicate::IsNotNull("search_text"),
1225            ]);
1226            let projection: &[&str] = &["session_id", "id", "search_text"];
1227            let scanner = self
1228                .handle
1229                .scan(
1230                    Table::Messages,
1231                    ScanOpts::with_predicate_and_projection(&filter, projection),
1232                )
1233                .await?;
1234            let mut batches = scanner
1235                .try_into_stream()
1236                .await
1237                .context("failed to open messages stream")?;
1238            while let Some(batch) = batches.next().await {
1239                let batch = batch?;
1240                for row in 0..batch.num_rows() {
1241                    yield PendingMessage {
1242                        session_id: string(&batch, "session_id", row)?
1243                            .context("session_id is null")?,
1244                        id: string(&batch, "id", row)?.context("message id is null")?,
1245                        search_text: string(&batch, "search_text", row)?
1246                            .context("search_text is null")?,
1247                    };
1248                }
1249            }
1250        }
1251    }
1252
1253    /// Stream messages that are either never embedded or stale under the
1254    /// current model. `pond embed --force` feeds this to the same unconditional
1255    /// merge_update as the normal backlog; the filter makes that semantically
1256    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1257    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1258        try_stream! {
1259            let filter = Predicate::And(vec![
1260                Predicate::IsNotNull("search_text"),
1261                Predicate::Or(vec![
1262                    Predicate::IsNull("vector"),
1263                    Predicate::Ne("embedding_model", embed::model_id().into()),
1264                ]),
1265            ]);
1266            let projection: &[&str] = &["session_id", "id", "search_text"];
1267            let scanner = self
1268                .handle
1269                .scan(
1270                    Table::Messages,
1271                    ScanOpts::with_predicate_and_projection(&filter, projection),
1272                )
1273                .await?;
1274            let mut batches = scanner
1275                .try_into_stream()
1276                .await
1277                .context("failed to open pending-or-stale messages stream")?;
1278            while let Some(batch) = batches.next().await {
1279                let batch = batch?;
1280                for row in 0..batch.num_rows() {
1281                    yield PendingMessage {
1282                        session_id: string(&batch, "session_id", row)?
1283                            .context("session_id is null")?,
1284                        id: string(&batch, "id", row)?.context("message id is null")?,
1285                        search_text: string(&batch, "search_text", row)?
1286                            .context("search_text is null")?,
1287                    };
1288                }
1289            }
1290        }
1291    }
1292
1293    /// BM25 full-text retriever over `messages.search_text`.
1294    pub async fn fts_search(
1295        &self,
1296        query: &str,
1297        limit: usize,
1298        filter: &Predicate,
1299    ) -> Result<Vec<(MessageKey, f32)>> {
1300        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1301        scanner.full_text_search(
1302            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1303        )?;
1304        // Lance ships an autoprojection that silently appends `_score` to FTS
1305        // output when the projection omits it. That behavior is going away;
1306        // we opt into the future explicit-projection contract here so the
1307        // scanner stops emitting a per-call deprecation warning, and we list
1308        // `_score` ourselves since the loop below reads it.
1309        scanner.disable_scoring_autoprojection();
1310        scanner.project(&["session_id", "id", "_score"])?;
1311        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1312        let batch = scanner.try_into_batch().await?;
1313        let mut hits = Vec::with_capacity(batch.num_rows());
1314        for row in 0..batch.num_rows() {
1315            let key = MessageKey {
1316                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1317                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1318            };
1319            hits.push((key, float32(&batch, "_score", row)?));
1320        }
1321        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1322        // order, which varies between runs and across calls with different pool
1323        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1324        // different orderings at the same tied score). Without an explicit
1325        // tiebreak the downstream RRF dedup-rank for a tied target session can
1326        // flip session-to-session, making fusion outcomes nondeterministic.
1327        // Sort by `score desc`, then `(session_id, message_id)` asc.
1328        hits.sort_by(|left, right| {
1329            right
1330                .1
1331                .partial_cmp(&left.1)
1332                .unwrap_or(std::cmp::Ordering::Equal)
1333                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1334                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1335        });
1336        Ok(hits)
1337    }
1338
1339    /// Count of searchable messages (non-null `search_text`) inside the
1340    /// caller's filter scope - the universe a search actually ran over.
1341    /// Powers the response's absence honesty (spec.md#search): "no relevant
1342    /// hits" only means something relative to how many messages were
1343    /// searchable at all, and 0 tells the caller their filters excluded
1344    /// everything before retrieval even started.
1345    pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1346        let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1347        let dataset = self.handle.dataset(Table::Messages).await?;
1348        dataset
1349            .count_rows(Some(scope.to_lance()))
1350            .await
1351            .map_err(Into::into)
1352    }
1353
1354    /// Whether any `messages` row carries a vector (spec.md#search) - the
1355    /// signal that flips search from FTS-only to hybrid. The single-active-
1356    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1357    /// vector belongs to the current model.
1358    pub async fn has_embeddings(&self) -> Result<bool> {
1359        let scope = Predicate::IsNotNull("vector");
1360        let mut scanner = self
1361            .handle
1362            .scan(
1363                Table::Messages,
1364                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1365            )
1366            .await?;
1367        scanner.limit(Some(1), None)?;
1368        let batch = scanner.try_into_batch().await?;
1369        Ok(batch.num_rows() > 0)
1370    }
1371
1372    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1373    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1374    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1375    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1376    /// skip them anyway). The single-active-model invariant lets pond drop
1377    /// the per-row model filter: every non-null vector belongs to the current
1378    /// model.
1379    pub async fn vector_search(
1380        &self,
1381        query: &[f32],
1382        limit: usize,
1383        filter: &Predicate,
1384        search: Option<&config::SearchConfig>,
1385    ) -> Result<Vec<(MessageKey, f32)>> {
1386        let scope = embedded_scope(filter);
1387        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1388        let key = Float32Array::from(query.to_vec());
1389        scanner.nearest("vector", &key, limit)?;
1390        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1391            scanner.nprobes(nprobes);
1392        }
1393        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1394            scanner.refine(refine_factor);
1395        }
1396        // Mirror the explicit-projection contract from `fts_search`: opt out
1397        // of `_distance` autoprojection and list it ourselves since the loop
1398        // below reads it.
1399        scanner.disable_scoring_autoprojection();
1400        scanner.project(&["session_id", "id", "_distance"])?;
1401        let batch = scanner.try_into_batch().await?;
1402        let mut hits = Vec::with_capacity(batch.num_rows());
1403        for row in 0..batch.num_rows() {
1404            let key = MessageKey {
1405                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1406                message_id: string(&batch, "id", row)?.context("message id is null")?,
1407            };
1408            hits.push((key, float32(&batch, "_distance", row)?));
1409        }
1410        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1411        // emit hits with effectively identical `_distance` in fragment-dependent
1412        // order, which makes RRF dedup-ranks nondeterministic for tied
1413        // neighbors. Sort by distance asc (smaller = more similar), then by
1414        // `(session_id, message_id)` asc.
1415        hits.sort_by(|left, right| {
1416            left.1
1417                .partial_cmp(&right.1)
1418                .unwrap_or(std::cmp::Ordering::Equal)
1419                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1420                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1421        });
1422        Ok(hits)
1423    }
1424
1425    /// The DataFusion plan string for a filtered vector scan - the
1426    /// `search-prefilter-pushdown` regression guard reads it.
1427    pub async fn explain_vector_plan(
1428        &self,
1429        query: &[f32],
1430        limit: usize,
1431        filter: &Predicate,
1432        search: Option<&config::SearchConfig>,
1433    ) -> Result<String> {
1434        let scope = embedded_scope(filter);
1435        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1436        let key = Float32Array::from(query.to_vec());
1437        scanner.nearest("vector", &key, limit)?;
1438        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1439            scanner.nprobes(nprobes);
1440        }
1441        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1442            scanner.refine(refine_factor);
1443        }
1444        scanner
1445            .explain_plan(true)
1446            .await
1447            .context("explain_plan failed")
1448    }
1449
1450    pub async fn explain_fts_plan(
1451        &self,
1452        query: &str,
1453        limit: usize,
1454        filter: &Predicate,
1455    ) -> Result<String> {
1456        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1457        scanner.full_text_search(
1458            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1459        )?;
1460        scanner.project(&["session_id", "id"])?;
1461        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1462        scanner
1463            .explain_plan(true)
1464            .await
1465            .context("explain_plan failed")
1466    }
1467
1468    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1469    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1470        if keys.is_empty() {
1471            return Ok(Vec::new());
1472        }
1473        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1474        let session_ids = keys
1475            .iter()
1476            .map(|key| key.session_id.clone())
1477            .collect::<Vec<_>>();
1478        let message_ids = keys
1479            .iter()
1480            .map(|key| key.message_id.clone())
1481            .collect::<Vec<_>>();
1482        let predicate = Predicate::And(vec![
1483            in_predicate("session_id", &session_ids),
1484            in_predicate("id", &message_ids),
1485        ]);
1486        let batch = self
1487            .handle
1488            .scan_batch(
1489                Table::Messages,
1490                Some(&predicate),
1491                &[
1492                    "id",
1493                    "session_id",
1494                    "role",
1495                    "project",
1496                    "source_agent",
1497                    "timestamp",
1498                    "search_text",
1499                ],
1500            )
1501            .await?;
1502        let mut metas = Vec::with_capacity(batch.num_rows());
1503        for row in 0..batch.num_rows() {
1504            let message_id = string(&batch, "id", row)?.context("id is null")?;
1505            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1506            if !wanted.contains(&MessageKey {
1507                session_id: session_id.clone(),
1508                message_id: message_id.clone(),
1509            }) {
1510                continue;
1511            }
1512            metas.push(MessageMeta {
1513                message_id,
1514                session_id,
1515                role: string(&batch, "role", row)?.context("role is null")?,
1516                project: string(&batch, "project", row)?.context("project is null")?,
1517                source_agent: string(&batch, "source_agent", row)?
1518                    .context("source_agent is null")?,
1519                timestamp: datetime(&batch, "timestamp", row)?,
1520                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1521            });
1522        }
1523        Ok(metas)
1524    }
1525
1526    /// Total message count per session, for search session summaries.
1527    pub async fn session_message_counts(
1528        &self,
1529        session_ids: &[String],
1530    ) -> Result<BTreeMap<String, usize>> {
1531        if session_ids.is_empty() {
1532            return Ok(BTreeMap::new());
1533        }
1534        let dataset = self.handle.dataset(Table::Messages).await?;
1535        let mut tasks = tokio::task::JoinSet::new();
1536        for session_id in session_ids {
1537            let dataset = dataset.clone();
1538            let session_id = session_id.clone();
1539            tasks.spawn(async move {
1540                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1541                let count = dataset.count_rows(Some(filter)).await?;
1542                anyhow::Ok((session_id, count))
1543            });
1544        }
1545        let mut counts = BTreeMap::new();
1546        while let Some(joined) = tasks.join_next().await {
1547            let (session_id, count) = joined.context("session count task panicked")??;
1548            counts.insert(session_id, count);
1549        }
1550        Ok(counts)
1551    }
1552
1553    /// Rows appended to `messages` since the FTS index was last optimized.
1554    /// A missing index reports the whole table; the query is manifest-only.
1555    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1556        self.handle
1557            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1558            .await
1559    }
1560
1561    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1562    /// was last optimized. Below
1563    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1564    /// must read [`embedding_progress`](Self::embedding_progress) too and
1565    /// distinguish "index not built yet" from "index trails data".
1566    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1567        self.handle
1568            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1569            .await
1570    }
1571
1572    /// Embedding coverage: how many `messages` rows carry a vector and how
1573    /// many are still eligible. Drives the `pond status` embeddings line and
1574    /// the `pond embed` progress bar's known total. `embedded` reads the
1575    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1576    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1577    /// `embedding_model` column.
1578    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1579        let dataset = self.handle.dataset(Table::Messages).await?;
1580        let embedded = dataset
1581            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1582            .await?;
1583        let total = dataset
1584            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1585            .await?;
1586        Ok(EmbeddingProgress {
1587            embedded,
1588            total,
1589            model: embed::model_id(),
1590        })
1591    }
1592
1593    /// Count rows whose `embedding_model` is not the currently configured
1594    /// model AND whose `vector` is still populated - the signal `pond embed`
1595    /// uses to detect a model swap and require `--force`.
1596    pub async fn stale_embedding_count(&self) -> Result<usize> {
1597        let dataset = self.handle.dataset(Table::Messages).await?;
1598        dataset
1599            .count_rows(Some(
1600                Predicate::And(vec![
1601                    Predicate::IsNotNull("vector"),
1602                    Predicate::Ne("embedding_model", embed::model_id().into()),
1603                ])
1604                .to_lance(),
1605            ))
1606            .await
1607            .map_err(Into::into)
1608    }
1609
1610    /// Run the per-table maintenance cycle (compact + indices) across every
1611    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1612    /// and compaction commit independently, so a hot writer that starves
1613    /// compaction on one table does not abort the index work the operator
1614    /// asked for on other tables (or even on the same table).
1615    pub async fn optimize_indices(
1616        &self,
1617        progress: Option<OptimizeProgressFn>,
1618        maintenance: &MaintenancePolicy,
1619    ) -> Result<OptimizeOutcome> {
1620        let intents = pond_index_intents();
1621        let mut tables = Vec::with_capacity(3);
1622        for (table, intents) in intents.all() {
1623            let outcome = self
1624                .handle
1625                .optimize_table(table, intents, progress.as_ref(), maintenance)
1626                .await;
1627            tables.push(outcome);
1628        }
1629        Ok(OptimizeOutcome { tables })
1630    }
1631
1632    /// Fold trailing fragments into existing indices across every table,
1633    /// without running compaction. Used by `pond embed`'s tail so newly
1634    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1635    /// without paying the compaction retry budget while embed itself may
1636    /// still be writing in a sibling process.
1637    pub async fn build_indices_only(
1638        &self,
1639        progress: Option<OptimizeProgressFn>,
1640    ) -> Result<OptimizeOutcome> {
1641        let policy = pond_index_intents();
1642        let mut tables = Vec::with_capacity(3);
1643        for (table, intents) in policy.all() {
1644            let indices = self
1645                .handle
1646                .optimize_table_indices_only(table, intents, progress.as_ref())
1647                .await;
1648            tables.push(TableOptimizeOutcome {
1649                table,
1650                indices,
1651                compaction: PhaseOutcome::NotAttempted,
1652            });
1653        }
1654        Ok(OptimizeOutcome { tables })
1655    }
1656
1657    #[cfg(test)]
1658    async fn optimize_indices_with_vector_threshold(
1659        &self,
1660        vector_threshold: usize,
1661    ) -> Result<OptimizeOutcome> {
1662        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1663        let policy = MaintenancePolicy::always_compact();
1664        let mut tables = Vec::with_capacity(3);
1665        for (table, intents) in intents.all() {
1666            let outcome = self
1667                .handle
1668                .optimize_table(table, intents, None, &policy)
1669                .await;
1670            tables.push(outcome);
1671        }
1672        Ok(OptimizeOutcome { tables })
1673    }
1674
1675    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1676        let policy = pond_index_intents();
1677        let mut matched = false;
1678        for (table, intents) in policy.all() {
1679            for intent in intents {
1680                if intent_name.is_none_or(|name| name == intent.name) {
1681                    matched = true;
1682                    self.handle.rebuild_index(table, intent).await?;
1683                }
1684            }
1685        }
1686        if let Some(name) = intent_name
1687            && !matched
1688        {
1689            anyhow::bail!("unknown index intent {name:?}");
1690        }
1691        Ok(())
1692    }
1693
1694    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1695        let policy = pond_index_intents();
1696        let mut statuses = Vec::new();
1697        for (table, intents) in policy.all() {
1698            statuses.extend(self.handle.index_status(table, intents).await?);
1699        }
1700        Ok(statuses)
1701    }
1702
1703    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1704    /// --force` before re-bootstrapping under a different model. Silent
1705    /// when the index does not exist.
1706    pub async fn drop_vector_index(&self) -> Result<()> {
1707        match self
1708            .handle
1709            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1710            .await
1711        {
1712            Ok(()) => Ok(()),
1713            Err(error) => {
1714                let msg = error.to_string();
1715                if msg.contains("not found") || msg.contains("does not exist") {
1716                    Ok(())
1717                } else {
1718                    Err(error)
1719                }
1720            }
1721        }
1722    }
1723
1724    /// On-disk byte totals per dataset, sized through Lance's object store
1725    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1726    pub async fn table_sizes(&self) -> Result<TableSizes> {
1727        self.handle.table_sizes().await
1728    }
1729
1730    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1731        let batch = self
1732            .handle
1733            .scan_batch(
1734                Table::Sessions,
1735                Some(&Predicate::Eq("id", session_id.into())),
1736                &[],
1737            )
1738            .await?;
1739        if batch.num_rows() == 0 {
1740            Ok(None)
1741        } else {
1742            Ok(Some(session_from_batch(&batch, 0)?))
1743        }
1744    }
1745
1746    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1747        let batch = self
1748            .handle
1749            .scan_batch(
1750                Table::Messages,
1751                Some(&Predicate::Eq("session_id", session_id.into())),
1752                &[
1753                    "session_id",
1754                    "id",
1755                    "timestamp",
1756                    "role",
1757                    "content",
1758                    "options",
1759                ],
1760            )
1761            .await?;
1762        let mut messages = Vec::with_capacity(batch.num_rows());
1763        for row in 0..batch.num_rows() {
1764            messages.push(message_from_batch(&batch, row)?);
1765        }
1766        messages.sort_by(|left, right| {
1767            left.timestamp()
1768                .cmp(&right.timestamp())
1769                .then_with(|| left.id().cmp(right.id()))
1770        });
1771
1772        let message_ids = messages
1773            .iter()
1774            .map(|message| message.id().to_owned())
1775            .collect::<Vec<_>>();
1776        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1777
1778        Ok(messages
1779            .into_iter()
1780            .map(|message| {
1781                let key = (message.session_id().to_owned(), message.id().to_owned());
1782                let parts = parts_by_message.remove(&key).unwrap_or_default();
1783                MessageWithParts { message, parts }
1784            })
1785            .collect())
1786    }
1787
1788    /// Every part of these messages, full fidelity (file blobs included). The
1789    /// canonical read primitive - restore/export, verbatim mode, and the
1790    /// message-mode target all need the complete set.
1791    pub async fn parts_for_messages(
1792        &self,
1793        session_id: &str,
1794        message_ids: &[String],
1795    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1796        self.scan_parts(session_id, message_ids, None).await
1797    }
1798
1799    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1800    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1801    /// nothing. For the summary-only reads (conversational/complete session
1802    /// views, search hits) - it never feeds restore/export.
1803    pub async fn summary_parts_for_messages(
1804        &self,
1805        session_id: &str,
1806        message_ids: &[String],
1807    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1808        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1809            .await
1810    }
1811
1812    async fn scan_parts(
1813        &self,
1814        session_id: &str,
1815        message_ids: &[String],
1816        part_types: Option<&[&str]>,
1817    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1818        if message_ids.is_empty() {
1819            return Ok(BTreeMap::new());
1820        }
1821        let mut clauses = vec![
1822            Predicate::Eq("session_id", session_id.into()),
1823            in_predicate("message_id", message_ids),
1824        ];
1825        if let Some(types) = part_types {
1826            clauses.push(Predicate::In(
1827                "type",
1828                types.iter().map(|&t| t.into()).collect(),
1829            ));
1830        }
1831        let predicate = Predicate::And(clauses);
1832        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1833        let mut scanner = self
1834            .handle
1835            .scan(
1836                Table::Parts,
1837                ScanOpts::with_predicate_and_projection(
1838                    &predicate,
1839                    &[
1840                        "session_id",
1841                        "message_id",
1842                        "id",
1843                        "ordinal",
1844                        "type",
1845                        "provenance",
1846                        "variant_data",
1847                        "options",
1848                    ],
1849                ),
1850            )
1851            .await?;
1852        scanner.with_row_address();
1853        let batch = scanner.try_into_batch().await.context("scan failed")?;
1854        let row_addresses = uint64(&batch, "_rowaddr")?;
1855        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1856        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1857        for row in 0..batch.num_rows() {
1858            if string(&batch, "type", row)?.as_deref() == Some("file") {
1859                let variant_data =
1860                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1861                file_rows.push((row, row_addresses.value(row), variant_data));
1862            }
1863        }
1864        if !file_rows.is_empty() {
1865            let addresses = file_rows
1866                .iter()
1867                .map(|(_, address, _)| *address)
1868                .collect::<Vec<_>>();
1869            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1870            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1871                // Legacy blob (lance-encoding:blob): payload is bytes; the
1872                // url variant stored its URL as UTF-8 bytes, recovered via
1873                // `file_data_from_blob`'s `data_kind = "url"` branch.
1874                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1875                file_payloads.insert(row, payload);
1876            }
1877        }
1878        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1879        for row in 0..batch.num_rows() {
1880            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1881            parts_by_message
1882                .entry((part.session_id.clone(), part.message_id.clone()))
1883                .or_default()
1884                .push(part);
1885        }
1886        for parts in parts_by_message.values_mut() {
1887            parts.sort_by_key(|part| part.ordinal);
1888        }
1889        Ok(parts_by_message)
1890    }
1891}
1892
1893#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1894#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1895pub enum IngestEvent {
1896    Session(Session),
1897    Message(Message),
1898    Part(Part),
1899}
1900
1901/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1902/// The wire layer (`pond_ingest`) instead returns per-row results; the
1903/// aggregate is derived from those at the wire boundary.
1904///
1905/// Fields are bucketed by population so the summary never conflates "100
1906/// validator-rejected rows in 1 bad session" with "100 separate failures."
1907/// The shape is set by spec.md#adapter-integrity-event-ordering.
1908#[derive(Debug, Clone, PartialEq, Eq, Default)]
1909pub struct IngestSummary {
1910    /// Rows actually written to Lance, summed across all three tables.
1911    /// Use the per-table fields below for user-facing counts; this stays
1912    /// for `accepted()` and existing wire callers.
1913    pub inserted: usize,
1914    /// Rows that already existed (merge_insert no-op match).
1915    pub matched: usize,
1916    /// Session rows inserted this pass.
1917    pub sessions_inserted: usize,
1918    /// Message rows inserted this pass (total - includes tool calls,
1919    /// tool results, and other non-searchable messages).
1920    pub messages_inserted_total: usize,
1921    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1922    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1923    /// count in `pond sync` / `pond status` reads this field.
1924    pub messages_inserted_searchable: usize,
1925    /// Part rows inserted this pass.
1926    pub parts_inserted: usize,
1927    /// Session rows already-present (merge_insert matched).
1928    pub sessions_matched: usize,
1929    /// Message rows already-present (merge_insert matched), total.
1930    pub messages_matched_total: usize,
1931    /// Subset of `messages_matched_total` with `search_text`.
1932    pub messages_matched_searchable: usize,
1933    /// Part rows already-present.
1934    pub parts_matched: usize,
1935    /// Events the validator dropped under per-event-drop policy (ordering
1936    /// violation, orphan part, mismatched parent, adapter parse failure,
1937    /// duplicate-id collision, ...). Counted by event, not by session: a
1938    /// session with one bad part stays in this bucket as 1, not as "the
1939    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1940    /// own emissions upstream when source replay is expected; the
1941    /// validator's in-batch HashSet is a safety net, not a feature
1942    /// adapters may rely on. If this bucket grows on a clean adapter,
1943    /// inspect `drop_reasons` for the top contributors.
1944    pub dropped_events: usize,
1945    /// Sessions whose Session-level invariants (immutable `source_agent` /
1946    /// `project` against the stored row) failed at flush time and
1947    /// whose substream got rejected wholesale. Always small relative to
1948    /// `inserted`; if not, there's a real problem to investigate.
1949    pub dropped_sessions: usize,
1950    /// Files the adapter couldn't decode at all (no Session header
1951    /// extractable: empty `.jsonl`, missing required field).
1952    pub skipped_files: usize,
1953    /// Files that produced no importable session and were benignly skipped:
1954    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1955    /// metadata file), or an unextractable header. Never an error or a drop;
1956    /// the underlying cause is logged at `-vv` (debug) verbosity.
1957    pub skipped_empty: usize,
1958    /// Sessions short-circuited via the per-session staleness skip
1959    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1960    /// pond last wrote that session's row, so re-decode was bypassed.
1961    pub skipped_fresh: usize,
1962    /// Storage-layer failures whose retries were exhausted (commit
1963    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1964    /// runs.
1965    pub storage_errors: usize,
1966    /// Oversized values truncated to a bounded sentinel at the seam
1967    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1968    pub truncated_values: usize,
1969    /// Histogram of stable reason keys for the combined `dropped_events +
1970    /// dropped_sessions` populations. Keys are `&'static str` (see the
1971    /// `DROP_REASON_*` constants) so consumers can match by identity.
1972    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1973    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1974    pub drop_reasons: BTreeMap<&'static str, usize>,
1975}
1976
1977/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
1978/// the per-row `RowError::reason_key`. `&'static str` so consumers can
1979/// match by identity rather than prose. Adding a new variant: pick a short
1980/// snake_case identifier, route it from the validator/adapter, and update
1981/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
1982pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1983pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1984pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1985pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1986pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1987pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1988pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1989pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1990pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1991pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1992pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1993
1994/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
1995/// returned counts together with the pre-existence sets captured by
1996/// `upsert_session_batch`. Folded into a per-sync summary via
1997/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
1998/// is a no-op write, so the inserted/matched split is informational - we still
1999/// surface it because both `pond sync` and `pond_ingest` clients reconcile
2000/// against "which rows landed this call."
2001#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2002pub struct BatchCounts {
2003    pub sessions_inserted: usize,
2004    pub sessions_matched: usize,
2005    pub messages_inserted_total: usize,
2006    pub messages_inserted_searchable: usize,
2007    pub messages_matched_total: usize,
2008    pub messages_matched_searchable: usize,
2009    pub parts_inserted: usize,
2010    pub parts_matched: usize,
2011}
2012
2013impl IngestSummary {
2014    pub fn accepted(&self) -> usize {
2015        self.inserted + self.matched
2016    }
2017
2018    /// Sole writer of the per-table counters on the CLI batched flush path.
2019    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2020    /// both for the same rows would double-count.
2021    pub fn add_batch(&mut self, counts: &BatchCounts) {
2022        self.sessions_inserted += counts.sessions_inserted;
2023        self.sessions_matched += counts.sessions_matched;
2024        self.messages_inserted_total += counts.messages_inserted_total;
2025        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2026        self.messages_matched_total += counts.messages_matched_total;
2027        self.messages_matched_searchable += counts.messages_matched_searchable;
2028        self.parts_inserted += counts.parts_inserted;
2029        self.parts_matched += counts.parts_matched;
2030        self.inserted +=
2031            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2032        self.matched +=
2033            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2034    }
2035
2036    /// Sum every counter from `other` into `self`. Used by the multi-source
2037    /// `pond sync` loop so adding a new field to this struct doesn't silently
2038    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2039    pub fn merge(&mut self, other: &Self) {
2040        self.inserted += other.inserted;
2041        self.matched += other.matched;
2042        self.sessions_inserted += other.sessions_inserted;
2043        self.messages_inserted_total += other.messages_inserted_total;
2044        self.messages_inserted_searchable += other.messages_inserted_searchable;
2045        self.parts_inserted += other.parts_inserted;
2046        self.sessions_matched += other.sessions_matched;
2047        self.messages_matched_total += other.messages_matched_total;
2048        self.messages_matched_searchable += other.messages_matched_searchable;
2049        self.parts_matched += other.parts_matched;
2050        self.dropped_events += other.dropped_events;
2051        self.dropped_sessions += other.dropped_sessions;
2052        self.skipped_files += other.skipped_files;
2053        self.skipped_empty += other.skipped_empty;
2054        self.skipped_fresh += other.skipped_fresh;
2055        self.storage_errors += other.storage_errors;
2056        self.truncated_values += other.truncated_values;
2057        for (key, value) in &other.drop_reasons {
2058            *self.drop_reasons.entry(key).or_insert(0) += value;
2059        }
2060    }
2061
2062    /// Same dispatch as [`Self::add_outcomes`] but ignores
2063    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2064    /// via [`Self::add_batch`] and uses this method to attribute per-row
2065    /// `Error` outcomes from the same flush.
2066    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2067        for outcome in outcomes {
2068            if !matches!(outcome.status, OutcomeStatus::Error) {
2069                continue;
2070            }
2071            if outcome.kind == "session" {
2072                self.dropped_sessions += 1;
2073            } else {
2074                self.dropped_events += 1;
2075            }
2076            let reason = outcome
2077                .error
2078                .as_ref()
2079                .and_then(|error| error.reason_key)
2080                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2081            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2082        }
2083    }
2084
2085    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2086        for outcome in outcomes {
2087            match outcome.status {
2088                OutcomeStatus::Inserted => {
2089                    self.inserted += 1;
2090                    match outcome.kind {
2091                        "session" => self.sessions_inserted += 1,
2092                        "message" => {
2093                            self.messages_inserted_total += 1;
2094                            if outcome.searchable {
2095                                self.messages_inserted_searchable += 1;
2096                            }
2097                        }
2098                        "part" => self.parts_inserted += 1,
2099                        _ => {}
2100                    }
2101                }
2102                OutcomeStatus::Matched => {
2103                    self.matched += 1;
2104                    match outcome.kind {
2105                        "session" => self.sessions_matched += 1,
2106                        "message" => {
2107                            self.messages_matched_total += 1;
2108                            if outcome.searchable {
2109                                self.messages_matched_searchable += 1;
2110                            }
2111                        }
2112                        "part" => self.parts_matched += 1,
2113                        _ => {}
2114                    }
2115                }
2116                OutcomeStatus::Error => {
2117                    // Session-level rejection: exactly one session-kind Error
2118                    // outcome (see `error_outcomes_for_substream`). Per-event
2119                    // drop: one Error per message/part. The two populations
2120                    // are counted separately so the operator can tell a
2121                    // structural reject from a row-level skip.
2122                    if outcome.kind == "session" {
2123                        self.dropped_sessions += 1;
2124                    } else {
2125                        self.dropped_events += 1;
2126                    }
2127                    let reason = outcome
2128                        .error
2129                        .as_ref()
2130                        .and_then(|e| e.reason_key)
2131                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2132                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2133                }
2134            }
2135        }
2136    }
2137}
2138
2139/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2140/// row per input event from the request's `events` array. The validator
2141/// returns these in array order so the wire layer can pack them directly
2142/// into [`crate::wire::IngestResult`] entries.
2143#[derive(Debug, Clone, PartialEq)]
2144pub struct RowOutcome {
2145    pub index: usize,
2146    pub kind: &'static str,
2147    pub pk: Value,
2148    pub status: OutcomeStatus,
2149    pub error: Option<RowError>,
2150    /// True iff `kind == "message"` AND the underlying row carries
2151    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2152    /// so the CLI can show "searchable" message deltas distinct from raw
2153    /// inserts. Always false for session/part rows.
2154    pub searchable: bool,
2155}
2156
2157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2158pub enum OutcomeStatus {
2159    Inserted,
2160    Matched,
2161    Error,
2162}
2163
2164/// Structured per-row error body. Mirrors the wire shape so the handler
2165/// can pass it straight through.
2166#[derive(Debug, Clone, PartialEq, Eq)]
2167pub struct RowError {
2168    pub message: String,
2169    pub field: Option<&'static str>,
2170    pub reason: Option<&'static str>,
2171    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2172    /// `reason` field above is human-prose; `reason_key` is the machine
2173    /// bucket. `None` means uncategorized; consumers attribute to
2174    /// `DROP_REASON_UNCATEGORIZED`.
2175    pub reason_key: Option<&'static str>,
2176}
2177
2178/// Buffered session events tagged with their input array index, so the
2179/// per-row outcomes can be re-attributed once `merge_insert` returns its
2180/// per-row Inserted/Matched stats.
2181#[derive(Debug)]
2182struct BufferedSession {
2183    index: usize,
2184    session: Session,
2185}
2186
2187#[derive(Debug)]
2188struct BufferedMessage {
2189    index: usize,
2190    message: Message,
2191    parts: Vec<BufferedPart>,
2192    search_text: Option<String>,
2193}
2194
2195#[derive(Debug)]
2196struct BufferedPart {
2197    index: usize,
2198    part: Part,
2199}
2200
2201/// State machine that turns the `events: Vec<IngestEvent>` array into a
2202/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2203/// session substream so `merge_insert` runs once per substream (three
2204/// batches: sessions, messages, parts). A validation error on a single event
2205/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2206/// continues; only Session-level invariants (immutable source_agent / project
2207/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2208///
2209/// Writes are batched at flush time. As complete substreams arrive (a new
2210/// `Session` event closes out the current one), they accumulate in
2211/// `completed` rather than each one calling `merge_insert` immediately.
2212/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2213/// at which point one batched 3-parallel-merge-insert covers all pending
2214/// substreams. This is the load-bearing perf change: per-substream commit
2215/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2216/// and amortizing it across N sessions cuts wall time materially.
2217#[derive(Debug, Default)]
2218pub struct IngestValidator {
2219    session: Option<BufferedSession>,
2220    current_message: Option<BufferedMessage>,
2221    current_parts: Vec<BufferedPart>,
2222    messages: Vec<BufferedMessage>,
2223    /// Message ids already buffered in the current substream. Duplicate ids
2224    /// drop the offending event in-line rather than failing the whole batch
2225    /// downstream.
2226    seen_message_ids: HashSet<String>,
2227    /// `(message_id, part_id)` keys already buffered in the current
2228    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2229    seen_part_keys: HashSet<(String, String)>,
2230    /// Substreams whose end-of-stream boundary has been observed but whose
2231    /// rows haven't been written yet. Flushed in batched mode by
2232    /// [`Self::flush`].
2233    completed: Vec<CompletedSubstream>,
2234}
2235
2236/// One closed substream ready for the batched flush path.
2237#[derive(Debug)]
2238struct CompletedSubstream {
2239    session_index: usize,
2240    session: Session,
2241    messages: Vec<BufferedMessage>,
2242}
2243
2244/// Ingest host provenance (`options.pond`, spec.md#model-pond-options),
2245/// computed once per process. An audit fact - "the process that inserted this
2246/// row" - not identity. Fallible lookups are omitted, never synthesized as
2247/// placeholders.
2248fn ingest_host_stamp() -> Option<&'static Value> {
2249    static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2250    STAMP
2251        .get_or_init(|| {
2252            let mut host = serde_json::Map::new();
2253            if let Ok(username) = whoami::username() {
2254                host.insert("username".to_owned(), username.into());
2255            }
2256            if let Ok(hostname) = whoami::hostname() {
2257                host.insert("hostname".to_owned(), hostname.into());
2258            }
2259            if let Ok(devicename) = whoami::devicename() {
2260                host.insert("device_name".to_owned(), devicename.into());
2261            }
2262            (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2263        })
2264        .as_ref()
2265}
2266
2267impl IngestValidator {
2268    /// Drive one input event through the validator. Returns the per-row
2269    /// outcomes the event triggered: empty when the event is just buffered,
2270    /// or N entries when a session substream just flushed (success or
2271    /// failure). `Err` is reserved for catastrophic storage failures that
2272    /// should fail the whole `pond_ingest` request.
2273    pub async fn push(
2274        &mut self,
2275        store: &Store,
2276        index: usize,
2277        event: IngestEvent,
2278    ) -> Result<Vec<RowOutcome>> {
2279        match event {
2280            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2281            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2282            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2283        }
2284    }
2285
2286    /// Final flush at end-of-batch. Closes the in-flight substream and
2287    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2288    /// the wire layer) alongside the honest per-table counts (for
2289    /// `IngestSummary::add_batch`).
2290    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2291        self.close_current_substream();
2292        self.flush(store).await
2293    }
2294
2295    /// Drain every completed substream into batched 3-parallel-merge_insert
2296    /// writes. Caller invokes this periodically (every N completed
2297    /// substreams) to keep memory bounded; in adapter-driven sync that
2298    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2299    /// in-flight substream stays buffered - close it explicitly via
2300    /// [`Self::finish`] or by feeding the next Session event.
2301    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2302        if self.completed.is_empty() {
2303            return Ok((Vec::new(), BatchCounts::default()));
2304        }
2305        let completed = std::mem::take(&mut self.completed);
2306        store.upsert_session_batch(completed).await
2307    }
2308
2309    /// Number of fully-buffered substreams awaiting batched write. Used by
2310    /// the adapter caller to decide when to call [`Self::flush`].
2311    pub fn pending_substreams(&self) -> usize {
2312        self.completed.len()
2313    }
2314
2315    async fn push_session(
2316        &mut self,
2317        _store: &Store,
2318        index: usize,
2319        mut session: Session,
2320    ) -> Result<Vec<RowOutcome>> {
2321        // Close out the current substream (if any) - move it to the pending
2322        // buffer instead of writing immediately. The actual write happens
2323        // when the caller invokes `flush` / `finish`.
2324        self.close_current_substream();
2325
2326        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2327        // if empty after trim. A Session event with empty source_agent is
2328        // dropped on the spot - the substream that would follow has nothing
2329        // to anchor on, so subsequent message/part events will also drop.
2330        let trimmed = session.source_agent.trim();
2331        if trimmed.is_empty() {
2332            return Ok(vec![RowOutcome {
2333                index,
2334                kind: "session",
2335                pk: Value::String(session.id.clone()),
2336                status: OutcomeStatus::Error,
2337                error: Some(RowError {
2338                    message: format!("session {} has empty source_agent after trim", session.id),
2339                    field: Some("source_agent"),
2340                    reason: None,
2341                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2342                }),
2343                searchable: false,
2344            }]);
2345        }
2346        if trimmed.len() != session.source_agent.len() {
2347            session.source_agent = trimmed.to_owned();
2348        }
2349
2350        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2351            return Ok(vec![RowOutcome {
2352                index,
2353                kind: "session",
2354                pk: Value::String(session.id.clone()),
2355                status: OutcomeStatus::Error,
2356                error: Some(RowError {
2357                    message: format!(
2358                        "session {} has parent_message_id without parent_session_id",
2359                        session.id,
2360                    ),
2361                    field: Some("parent_message_id"),
2362                    reason: None,
2363                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2364                }),
2365                searchable: false,
2366            }]);
2367        }
2368
2369        self.seen_message_ids.clear();
2370        self.seen_part_keys.clear();
2371        self.session = Some(BufferedSession { index, session });
2372        Ok(Vec::new())
2373    }
2374
2375    fn close_current_substream(&mut self) {
2376        self.flush_current_message();
2377        let Some(BufferedSession {
2378            index: session_index,
2379            session,
2380        }) = self.session.take()
2381        else {
2382            return;
2383        };
2384        let messages = std::mem::take(&mut self.messages);
2385        self.seen_message_ids.clear();
2386        self.seen_part_keys.clear();
2387        self.completed.push(CompletedSubstream {
2388            session_index,
2389            session,
2390            messages,
2391        });
2392    }
2393
2394    fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2395        let pk = Value::Array(vec![
2396            Value::String(message.session_id().to_owned()),
2397            Value::String(message.id().to_owned()),
2398        ]);
2399        let Some(session) = &self.session else {
2400            return vec![error_outcome(
2401                index,
2402                "message",
2403                pk,
2404                "first event in a session stream must be Session",
2405                None,
2406                DROP_REASON_MESSAGE_BEFORE_SESSION,
2407            )];
2408        };
2409        if message.session_id() != session.session.id {
2410            let msg = format!(
2411                "message {} references session {}, expected {}",
2412                message.id(),
2413                message.session_id(),
2414                session.session.id
2415            );
2416            return vec![error_outcome(
2417                index,
2418                "message",
2419                pk,
2420                &msg,
2421                Some("session_id"),
2422                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2423            )];
2424        }
2425        if !self.seen_message_ids.insert(message.id().to_owned()) {
2426            // Keep same-substream duplicate ids visible in `dropped_events`;
2427            // adapters are expected to dedupe upstream (see claude-code's
2428            // per-file `seen_uuids`), so a hit here is worth investigating.
2429            let msg = format!("duplicate message id {} in session substream", message.id());
2430            return vec![error_outcome(
2431                index,
2432                "message",
2433                pk,
2434                &msg,
2435                None,
2436                DROP_REASON_DUPLICATE_MESSAGE_ID,
2437            )];
2438        }
2439        // `options.pond` is core-owned (spec.md#model-pond-options): stripped
2440        // and restamped at ingest so neither adapters nor wire clients can
2441        // spoof provenance. Matched rows are merge_insert no-ops, so re-ingest
2442        // never restamps stored rows.
2443        match ingest_host_stamp() {
2444            Some(stamp) => {
2445                message
2446                    .options_mut()
2447                    .insert("pond".to_owned(), stamp.clone());
2448            }
2449            None => {
2450                message.options_mut().remove("pond");
2451            }
2452        }
2453        self.flush_current_message();
2454        self.current_message = Some(BufferedMessage {
2455            index,
2456            message,
2457            parts: Vec::new(),
2458            search_text: None,
2459        });
2460        Vec::new()
2461    }
2462
2463    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2464        let pk = Value::Array(vec![
2465            Value::String(part.session_id.clone()),
2466            Value::String(part.message_id.clone()),
2467            Value::String(part.id.clone()),
2468        ]);
2469        let Some(current) = &self.current_message else {
2470            return vec![error_outcome(
2471                index,
2472                "part",
2473                pk,
2474                "part event appeared before a message",
2475                None,
2476                DROP_REASON_PART_BEFORE_MESSAGE,
2477            )];
2478        };
2479        if part.session_id != current.message.session_id() {
2480            let msg = format!(
2481                "part {} references session {}, expected {}",
2482                part.id,
2483                part.session_id,
2484                current.message.session_id()
2485            );
2486            return vec![error_outcome(
2487                index,
2488                "part",
2489                pk,
2490                &msg,
2491                Some("session_id"),
2492                DROP_REASON_PART_MESSAGE_MISMATCH,
2493            )];
2494        }
2495        if part.message_id != current.message.id() {
2496            let msg = format!(
2497                "part {} references message {}, expected {}",
2498                part.id,
2499                part.message_id,
2500                current.message.id()
2501            );
2502            return vec![error_outcome(
2503                index,
2504                "part",
2505                pk,
2506                &msg,
2507                Some("message_id"),
2508                DROP_REASON_PART_MESSAGE_MISMATCH,
2509            )];
2510        }
2511        let part_key = (part.message_id.clone(), part.id.clone());
2512        if !self.seen_part_keys.insert(part_key) {
2513            let msg = format!(
2514                "duplicate part id {} for message {} in session substream",
2515                part.id, part.message_id
2516            );
2517            return vec![error_outcome(
2518                index,
2519                "part",
2520                pk,
2521                &msg,
2522                None,
2523                DROP_REASON_DUPLICATE_PART_KEY,
2524            )];
2525        }
2526        self.current_parts.push(BufferedPart { index, part });
2527        Vec::new()
2528    }
2529
2530    fn flush_current_message(&mut self) {
2531        let Some(mut buffered) = self.current_message.take() else {
2532            return;
2533        };
2534        let parts = std::mem::take(&mut self.current_parts);
2535        let mut canonical_parts = Vec::with_capacity(parts.len());
2536        for part in &parts {
2537            canonical_parts.push(part.part.clone());
2538        }
2539        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2540        buffered.parts = parts;
2541        self.messages.push(buffered);
2542    }
2543}
2544
2545fn error_outcome(
2546    index: usize,
2547    kind: &'static str,
2548    pk: Value,
2549    message: &str,
2550    field: Option<&'static str>,
2551    reason_key: &'static str,
2552) -> RowOutcome {
2553    RowOutcome {
2554        index,
2555        kind,
2556        pk,
2557        status: OutcomeStatus::Error,
2558        error: Some(RowError {
2559            message: message.to_owned(),
2560            field,
2561            reason: None,
2562            reason_key: Some(reason_key),
2563        }),
2564        searchable: false,
2565    }
2566}
2567
2568/// Session-level rejection (immutable `source_agent` / `project` violation):
2569/// emit exactly one Error outcome on the Session row. The buffered messages
2570/// and parts of this substream are *not* surfaced as per-row errors - their
2571/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2572fn error_outcomes_for_substream(
2573    session_index: usize,
2574    session: &Session,
2575    _messages: &[BufferedMessage],
2576    message: impl Into<String>,
2577    field: Option<&'static str>,
2578    reason_key: &'static str,
2579) -> Vec<RowOutcome> {
2580    let reason = field.map(|_| "immutable");
2581    vec![RowOutcome {
2582        index: session_index,
2583        kind: "session",
2584        pk: Value::String(session.id.clone()),
2585        status: OutcomeStatus::Error,
2586        error: Some(RowError {
2587            message: message.into(),
2588            field,
2589            reason,
2590            reason_key: Some(reason_key),
2591        }),
2592        searchable: false,
2593    }]
2594}
2595
2596/// Batched-path success helper. Each row's Inserted/Matched status is read
2597/// from the pre-existence sets captured by `upsert_session_batch` before its
2598/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2599/// Also accumulates the per-table totals into `counts` so the CLI summary
2600/// gets the same truth without re-walking the outcomes.
2601fn success_outcomes_for_substream(
2602    session_index: usize,
2603    session: &Session,
2604    messages: &[BufferedMessage],
2605    existing_sessions: &std::collections::HashMap<String, Session>,
2606    existing_message_pks: &HashSet<(String, String)>,
2607    existing_part_pks: &HashSet<(String, String, String)>,
2608    counts: &mut BatchCounts,
2609) -> Vec<RowOutcome> {
2610    let session_was_present = existing_sessions.contains_key(&session.id);
2611    let session_status = if session_was_present {
2612        counts.sessions_matched += 1;
2613        UpsertStatus::Matched
2614    } else {
2615        counts.sessions_inserted += 1;
2616        UpsertStatus::Inserted
2617    };
2618
2619    let mut outcomes = Vec::with_capacity(1 + messages.len());
2620    outcomes.push(success_outcome(
2621        session_index,
2622        "session",
2623        Value::String(session.id.clone()),
2624        session_status,
2625        false,
2626    ));
2627    for buffered in messages {
2628        let key = (
2629            buffered.message.session_id().to_owned(),
2630            buffered.message.id().to_owned(),
2631        );
2632        let searchable = buffered.search_text.is_some();
2633        let message_status = if existing_message_pks.contains(&key) {
2634            counts.messages_matched_total += 1;
2635            if searchable {
2636                counts.messages_matched_searchable += 1;
2637            }
2638            UpsertStatus::Matched
2639        } else {
2640            counts.messages_inserted_total += 1;
2641            if searchable {
2642                counts.messages_inserted_searchable += 1;
2643            }
2644            UpsertStatus::Inserted
2645        };
2646        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2647        outcomes.push(success_outcome(
2648            buffered.index,
2649            "message",
2650            pk,
2651            message_status,
2652            searchable,
2653        ));
2654        for part in &buffered.parts {
2655            let part_key = (
2656                part.part.session_id.clone(),
2657                part.part.message_id.clone(),
2658                part.part.id.clone(),
2659            );
2660            let part_status = if existing_part_pks.contains(&part_key) {
2661                counts.parts_matched += 1;
2662                UpsertStatus::Matched
2663            } else {
2664                counts.parts_inserted += 1;
2665                UpsertStatus::Inserted
2666            };
2667            let part_pk = Value::Array(vec![
2668                Value::String(part_key.0),
2669                Value::String(part_key.1),
2670                Value::String(part_key.2),
2671            ]);
2672            outcomes.push(success_outcome(
2673                part.index,
2674                "part",
2675                part_pk,
2676                part_status,
2677                false,
2678            ));
2679        }
2680    }
2681    outcomes
2682}
2683
2684fn success_outcome(
2685    index: usize,
2686    kind: &'static str,
2687    pk: Value,
2688    status: UpsertStatus,
2689    searchable: bool,
2690) -> RowOutcome {
2691    let status = match status {
2692        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2693        UpsertStatus::Matched => OutcomeStatus::Matched,
2694    };
2695    RowOutcome {
2696        index,
2697        kind,
2698        pk,
2699        status,
2700        error: None,
2701        searchable,
2702    }
2703}
2704
2705#[derive(Debug, Clone, PartialEq, Eq)]
2706enum IngestError {
2707    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2708    /// immutable post-first-write because the denormalized copies on
2709    /// `messages` were stamped from the prior Session at first ingest.
2710    /// A re-write that changes either would silently desync.
2711    ImmutableField {
2712        field: &'static str,
2713        session_id: String,
2714        stored: String,
2715        attempted: String,
2716    },
2717}
2718
2719impl std::fmt::Display for IngestError {
2720    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2721        match self {
2722            Self::ImmutableField {
2723                field,
2724                session_id,
2725                stored,
2726                attempted,
2727            } => write!(
2728                formatter,
2729                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2730            ),
2731        }
2732    }
2733}
2734
2735impl std::error::Error for IngestError {}
2736
2737/// Compare an incoming Session row against the stored row on the two
2738/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2739/// counts a NULL-vs-non-NULL change as a mismatch.
2740fn ensure_immutable_match(
2741    existing: &Session,
2742    incoming: &Session,
2743) -> std::result::Result<(), IngestError> {
2744    if existing.source_agent != incoming.source_agent {
2745        return Err(IngestError::ImmutableField {
2746            field: "source_agent",
2747            session_id: incoming.id.clone(),
2748            stored: existing.source_agent.clone(),
2749            attempted: incoming.source_agent.clone(),
2750        });
2751    }
2752    if existing.project != incoming.project {
2753        return Err(IngestError::ImmutableField {
2754            field: "project",
2755            session_id: incoming.id.clone(),
2756            stored: (*existing.project).clone(),
2757            attempted: (*incoming.project).clone(),
2758        });
2759    }
2760    Ok(())
2761}
2762
2763pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2764    use crate::wire::Provenance;
2765    let mut chunks: Vec<String> = Vec::new();
2766    for part in parts {
2767        // spec.md#search: only conversational parts contribute to the indexed
2768        // text; harness-injected scaffolding is excluded from search.
2769        if part.provenance != Provenance::Conversational {
2770            continue;
2771        }
2772        match (message.role(), &part.kind) {
2773            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2774                if let Some(text) = text {
2775                    chunks.push(text.to_string());
2776                }
2777            }
2778            (
2779                Role::User | Role::Assistant,
2780                PartKind::File {
2781                    media_type,
2782                    file_name,
2783                    data,
2784                },
2785            ) => {
2786                if let Some(file_name) = file_name {
2787                    chunks.push(file_name.clone());
2788                }
2789                if let Some(media_type) = media_type {
2790                    chunks.push(media_type.clone());
2791                }
2792                if let FileData::Url(uri) = data {
2793                    chunks.push(uri.clone());
2794                }
2795            }
2796            (
2797                Role::System | Role::Tool,
2798                PartKind::Text { .. }
2799                | PartKind::Reasoning { .. }
2800                | PartKind::File { .. }
2801                | PartKind::ToolCall { .. }
2802                | PartKind::ToolResult { .. }
2803                | PartKind::ToolApprovalRequest { .. }
2804                | PartKind::ToolApprovalResponse { .. },
2805            )
2806            | (
2807                Role::User | Role::Assistant,
2808                PartKind::Reasoning { .. }
2809                | PartKind::ToolCall { .. }
2810                | PartKind::ToolResult { .. }
2811                | PartKind::ToolApprovalRequest { .. }
2812                | PartKind::ToolApprovalResponse { .. },
2813            ) => {}
2814        }
2815    }
2816
2817    let text = chunks
2818        .into_iter()
2819        .filter(|chunk| !chunk.trim().is_empty())
2820        .collect::<Vec<_>>()
2821        .join("\n");
2822    if text.is_empty() { None } else { Some(text) }
2823}
2824
2825/// Non-empty conversational text (spec.md#search).
2826#[derive(Debug, Clone, PartialEq, Eq)]
2827pub struct SearchText(String);
2828
2829impl SearchText {
2830    pub fn as_str(&self) -> &str {
2831        &self.0
2832    }
2833
2834    pub fn into_inner(self) -> String {
2835        self.0
2836    }
2837}
2838
2839impl AsRef<str> for SearchText {
2840    fn as_ref(&self) -> &str {
2841        &self.0
2842    }
2843}
2844
2845#[derive(Debug, Clone, PartialEq)]
2846pub struct MessageWithParts {
2847    pub message: Message,
2848    pub parts: Vec<Part>,
2849}
2850
2851#[derive(Debug, Clone, PartialEq)]
2852pub struct SessionWithMessages {
2853    pub session: Session,
2854    pub messages: Vec<MessageWithParts>,
2855}
2856
2857#[derive(Debug, Clone)]
2858pub struct SessionViewParams<'a> {
2859    pub mode: ResponseMode,
2860    pub after_id: Option<&'a str>,
2861    pub limit: usize,
2862    pub budget_bytes: usize,
2863    pub session_from: SessionFrom,
2864}
2865
2866#[derive(Debug, Clone)]
2867pub struct MessageViewParams<'a> {
2868    pub context_depth: usize,
2869    /// Which siblings fill the context window: conversational (default)
2870    /// keeps the window on the human/model exchange; complete/verbatim
2871    /// include system/tool carriers.
2872    pub mode: ResponseMode,
2873    pub after_id: Option<&'a str>,
2874    pub limit: usize,
2875    pub budget_bytes: usize,
2876}
2877
2878/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2879/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2880/// `validation_failed`): the message/part stream is append-only, so an anchor
2881/// that was ever valid never disappears - an unknown one is always a client
2882/// error, never a reason to silently restart the page.
2883#[derive(Debug, Clone, PartialEq)]
2884pub enum GetLookup<T> {
2885    NotFound,
2886    UnknownAfterId,
2887    Found(T),
2888}
2889
2890/// Canonical retrieval result for `pond_get` session mode: the stored session
2891/// plus the page of messages (each with its `Part`s) and a remaining count.
2892/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2893#[derive(Debug, Clone, PartialEq)]
2894pub struct SessionPage {
2895    pub session: Session,
2896    pub messages: Vec<RetrievedMessage>,
2897    pub messages_remaining: usize,
2898}
2899
2900/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2901/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2902/// their parts so the handler can summarize them.
2903#[derive(Debug, Clone, PartialEq)]
2904pub struct MessagePage {
2905    pub session: Session,
2906    pub target: RetrievedMessage,
2907    pub target_parts: Vec<Part>,
2908    pub target_parts_remaining: usize,
2909    pub siblings: Vec<RetrievedMessage>,
2910}
2911
2912#[derive(Debug, Clone, PartialEq)]
2913pub struct RetrievedMessage {
2914    pub id: String,
2915    pub role: Role,
2916    pub timestamp: DateTime<Utc>,
2917    pub text: Option<String>,
2918    pub content: Option<String>,
2919    pub parts: Vec<Part>,
2920}
2921
2922#[derive(Debug, Clone)]
2923struct ScanRow {
2924    id: String,
2925    role: Role,
2926    timestamp: DateTime<Utc>,
2927    text: Option<String>,
2928    content: Option<String>,
2929}
2930
2931/// One row of the conversational scan. `text` is non-empty by
2932/// `IsNotNull("search_text")` pushdown (spec.md#search).
2933#[derive(Debug, Clone)]
2934pub struct ConversationalRow {
2935    pub session_id: String,
2936    pub message_id: String,
2937    pub role: Role,
2938    pub timestamp: DateTime<Utc>,
2939    pub text: SearchText,
2940}
2941
2942/// Number of leading `items` that fit within `limit` and the byte budget,
2943/// sizing each by `size`. Always emits at least one (a single oversize item
2944/// never blocks its own page); the budget then stops the page at the next item
2945/// boundary.
2946fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2947    let capped = items.len().min(limit.clamp(1, 1000));
2948    let mut acc = 0usize;
2949    let mut emitted = 0usize;
2950    for item in &items[..capped] {
2951        let next = acc.saturating_add(size(item));
2952        if emitted > 0 && next > budget_bytes {
2953            break;
2954        }
2955        acc = next;
2956        emitted += 1;
2957    }
2958    emitted
2959}
2960
2961fn role_from_str(value: &str) -> Result<Role> {
2962    match value {
2963        "system" => Ok(Role::System),
2964        "user" => Ok(Role::User),
2965        "assistant" => Ok(Role::Assistant),
2966        "tool" => Ok(Role::Tool),
2967        other => anyhow::bail!("unknown message role {other}"),
2968    }
2969}
2970
2971/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2972/// and range columns, BITMAP for low-cardinality columns. There is no index
2973/// on `embedding_model`: pond's invariant is one active model at a time
2974/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2975/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2976/// query-time predicate - the only embedding-state filter is `vector IS NOT
2977/// NULL`. `id` lookups are rare and full-scan.
2978const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2979    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2980    (
2981        "session_id",
2982        BuiltinIndexType::BTree,
2983        "messages_session_id_btree",
2984    ),
2985    (
2986        "timestamp",
2987        BuiltinIndexType::BTree,
2988        "messages_timestamp_btree",
2989    ),
2990    (
2991        "source_agent",
2992        BuiltinIndexType::Bitmap,
2993        "messages_source_agent_bitmap",
2994    ),
2995    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2996];
2997
2998/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2999/// `parts_for_messages` (hydration on every `get` and grouped search).
3000const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3001    (
3002        "session_id",
3003        BuiltinIndexType::BTree,
3004        "parts_session_id_btree",
3005    ),
3006    (
3007        "message_id",
3008        BuiltinIndexType::BTree,
3009        "parts_message_id_btree",
3010    ),
3011];
3012
3013/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
3014/// `get` and every grouped search.
3015const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3016    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3017
3018fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3019    Predicate::In(
3020        column,
3021        values.iter().cloned().map(ScalarValue::String).collect(),
3022    )
3023}
3024
3025/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
3026/// never sees a null-vector row. Under the single-active-model invariant,
3027/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
3028/// the configured model" - no per-row `embedding_model` filter needed.
3029fn embedded_scope(filter: &Predicate) -> Predicate {
3030    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3031}
3032
3033fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3034    let inserted = usize::try_from(inserted_rows)
3035        .unwrap_or(usize::MAX)
3036        .min(total);
3037    let mut statuses = Vec::with_capacity(total);
3038    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3039    statuses.extend(std::iter::repeat_n(
3040        UpsertStatus::Matched,
3041        total.saturating_sub(inserted),
3042    ));
3043    statuses
3044}
3045
3046// Bare logical table names: the lance-namespace Directory impl owns the
3047// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3048// a `.lance` path.
3049pub(crate) const SESSIONS: &str = "sessions";
3050pub(crate) const MESSAGES: &str = "messages";
3051pub(crate) const PARTS: &str = "parts";
3052
3053/// FTS index name on `messages.search_text`. Stable so status and index
3054/// creation name the same index.
3055pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3056
3057/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3058/// activation check and index creation name the same index.
3059pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3060
3061/// IVF_PQ tuning constants (spec.md#search):
3062/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3063/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3064/// - max_iters = 15 (kmeans cap)
3065/// - cosine metric (e5 vectors are L2-normalized)
3066const IVF_PQ_NUM_BITS: u8 = 8;
3067const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3068const IVF_PQ_MAX_ITERS: usize = 15;
3069
3070/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3071/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3072/// (`FTS`, `OCC`) searchable.
3073const FTS_NGRAM_MIN: u32 = 3;
3074const FTS_NGRAM_MAX: u32 = 5;
3075
3076/// Pond's production IndexIntents: the per-table intent set
3077/// `Store::open_with_options` registers with the substrate.
3078pub fn pond_index_intents() -> IndexIntents {
3079    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3080}
3081
3082/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3083/// threshold. Used by tests that need to exercise the activation boundary
3084/// without writing 100k vectors.
3085pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3086    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3087    messages.push(IndexIntent {
3088        name: MESSAGES_FTS_INDEX,
3089        column: "search_text",
3090        trigger: IndexTrigger::OnAnyRows,
3091        params: IndexParamsKind::InvertedFtsNgram {
3092            min: FTS_NGRAM_MIN,
3093            max: FTS_NGRAM_MAX,
3094        },
3095    });
3096    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3097        messages.push(IndexIntent {
3098            name,
3099            column,
3100            trigger: IndexTrigger::OnAnyRows,
3101            params: IndexParamsKind::Scalar(kind.clone()),
3102        });
3103    }
3104    messages.push(IndexIntent {
3105        name: MESSAGES_VECTOR_INDEX,
3106        column: "vector",
3107        trigger: IndexTrigger::OnNonNullCount {
3108            column: "vector",
3109            threshold: vector_threshold,
3110        },
3111        params: IndexParamsKind::IvfPqCosine {
3112            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3113            num_bits: IVF_PQ_NUM_BITS,
3114            max_iters: IVF_PQ_MAX_ITERS,
3115        },
3116    });
3117    let parts = PARTS_SCALAR_INDICES
3118        .iter()
3119        .map(|(column, kind, name)| IndexIntent {
3120            name,
3121            column,
3122            trigger: IndexTrigger::OnAnyRows,
3123            params: IndexParamsKind::Scalar(kind.clone()),
3124        })
3125        .collect();
3126    let sessions = SESSIONS_SCALAR_INDICES
3127        .iter()
3128        .map(|(column, kind, name)| IndexIntent {
3129            name,
3130            column,
3131            trigger: IndexTrigger::OnAnyRows,
3132            params: IndexParamsKind::Scalar(kind.clone()),
3133        })
3134        .collect();
3135    IndexIntents {
3136        sessions,
3137        messages,
3138        parts,
3139    }
3140}
3141
3142/// Default width of the `messages.vector` embedding column (spec.md#search):
3143/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3144/// 384). Used when `[embeddings].dim` is absent.
3145pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3146
3147/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3148/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3149/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3150/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3151/// which keeps unit tests config-free.
3152static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3153
3154/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3155/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3156pub fn embedding_dim() -> usize {
3157    EMBEDDING_DIM_RUNTIME
3158        .get()
3159        .copied()
3160        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3161}
3162
3163/// Seed [`embedding_dim`] from config. First call wins.
3164pub fn init_embedding_dim(dim: usize) {
3165    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3166}
3167
3168/// Initial-`CREATE` write params for the namespace-mediated path. The
3169/// substrate seam stamps in `session`, `mode`, and `store_params`.
3170/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3171/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3172/// suppresses the per-commit hook so cleanup stays operator-driven via
3173/// `pond index optimize` (one LIST per command instead of per write).
3174pub(crate) fn write_params_for_create() -> WriteParams {
3175    WriteParams {
3176        data_storage_version: Some(LanceFileVersion::V2_1),
3177        enable_v2_manifest_paths: true,
3178        enable_stable_row_ids: true,
3179        auto_cleanup: Some(AutoCleanupParams {
3180            interval: 20,
3181            older_than: chrono::TimeDelta::days(1),
3182        }),
3183        skip_auto_cleanup: true,
3184        ..WriteParams::default()
3185    }
3186}
3187
3188fn export_schema(table: Table) -> Arc<Schema> {
3189    match table {
3190        Table::Sessions => session_schema(),
3191        Table::Messages => message_schema(),
3192        Table::Parts => part_schema(),
3193    }
3194}
3195
3196fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3197    let expected = export_schema(table);
3198    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3199    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3200    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3201    if actual_names != expected_names {
3202        anyhow::bail!(
3203            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3204            table.as_str(),
3205        );
3206    }
3207    Ok(())
3208}
3209
3210async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3211    let source_uri = source
3212        .to_str()
3213        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3214    let dataset = Dataset::open(source_uri)
3215        .await
3216        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3217    ensure_schema_matches_archive(&dataset, table)?;
3218    Ok(dataset)
3219}
3220
3221pub(crate) fn session_schema() -> Arc<Schema> {
3222    Arc::new(Schema::new(vec![
3223        primary_field("id", DataType::Utf8, false),
3224        Field::new("parent_session_id", DataType::Utf8, true),
3225        Field::new("parent_message_id", DataType::Utf8, true),
3226        Field::new("source_agent", DataType::Utf8, false),
3227        Field::new(
3228            "created_at",
3229            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3230            false,
3231        ),
3232        Field::new("project", DataType::Utf8, false),
3233        json_field("options", false),
3234    ]))
3235}
3236
3237pub(crate) fn message_schema() -> Arc<Schema> {
3238    Arc::new(Schema::new(vec![
3239        primary_field("session_id", DataType::Utf8, false),
3240        primary_field("id", DataType::Utf8, false),
3241        Field::new(
3242            "timestamp",
3243            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3244            false,
3245        ),
3246        Field::new("role", DataType::Utf8, false),
3247        Field::new("source_agent", DataType::Utf8, false),
3248        Field::new("project", DataType::Utf8, false),
3249        Field::new("content", DataType::Utf8, true),
3250        Field::new("search_text", DataType::Utf8, true),
3251        // The message's derived embedding (spec.md#session-embed-from-canonical):
3252        // both null until `pond embed` fills them, set together thereafter.
3253        Field::new("vector", embedding_vector_type(), true),
3254        Field::new("embedding_model", DataType::Utf8, true),
3255        json_field("options", false),
3256    ]))
3257}
3258
3259pub(crate) fn part_schema() -> Arc<Schema> {
3260    Arc::new(Schema::new(vec![
3261        primary_field("session_id", DataType::Utf8, false),
3262        primary_field("message_id", DataType::Utf8, false),
3263        primary_field("id", DataType::Utf8, false),
3264        Field::new("ordinal", DataType::Int32, false),
3265        Field::new("type", DataType::Utf8, false),
3266        // spec.md#model-part-provenance: conversation vs harness-injected; search
3267        // reads this column to exclude injected scaffolding.
3268        Field::new("provenance", DataType::Utf8, false),
3269        json_field("variant_data", false),
3270        legacy_blob_field("data", true),
3271        json_field("options", false),
3272    ]))
3273}
3274
3275pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3276    let arrays = schema
3277        .fields()
3278        .iter()
3279        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3280        .collect();
3281    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3282}
3283
3284pub(crate) fn empty_reader(
3285    schema: Arc<Schema>,
3286) -> Result<
3287    RecordBatchIterator<
3288        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3289    >,
3290> {
3291    let batch = empty_batch(schema.clone())?;
3292    Ok(RecordBatchIterator::new(
3293        vec![Ok(batch)].into_iter(),
3294        schema,
3295    ))
3296}
3297
3298pub(crate) struct MessageBatchRow<'a> {
3299    pub message: &'a Message,
3300    pub source_agent: &'a str,
3301    pub project: &'a str,
3302    pub search_text: Option<&'a str>,
3303}
3304
3305// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3306// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3307// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3308// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3309// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3310fn embedding_vector_type() -> DataType {
3311    DataType::FixedSizeList(
3312        Arc::new(Field::new("item", DataType::Float16, true)),
3313        embedding_dim() as i32,
3314    )
3315}
3316
3317/// The partial-schema source for the embedding column update: the `messages`
3318/// primary key plus the two columns `pond embed` fills. The field definitions
3319/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3320fn embedding_update_schema() -> Arc<Schema> {
3321    Arc::new(Schema::new(vec![
3322        primary_field("session_id", DataType::Utf8, false),
3323        primary_field("id", DataType::Utf8, false),
3324        Field::new("vector", embedding_vector_type(), true),
3325        Field::new("embedding_model", DataType::Utf8, true),
3326    ]))
3327}
3328
3329/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3330/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3331pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3332    let dim = embedding_dim();
3333    let mut flat = Vec::with_capacity(rows.len() * dim);
3334    for row in rows {
3335        if row.vector.len() != dim {
3336            anyhow::bail!(
3337                "embedding for message {} has dim {}, expected {dim}",
3338                row.id,
3339                row.vector.len(),
3340            );
3341        }
3342        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3343    }
3344    let values = Float16Array::from(flat);
3345    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3346    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3347        .context("failed to build embedding vector column")?;
3348
3349    RecordBatch::try_new(
3350        embedding_update_schema(),
3351        vec![
3352            Arc::new(StringArray::from(
3353                rows.iter()
3354                    .map(|row| row.session_id.as_str())
3355                    .collect::<Vec<_>>(),
3356            )),
3357            Arc::new(StringArray::from(
3358                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3359            )),
3360            Arc::new(vectors),
3361            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3362        ],
3363    )
3364    .context("failed to build embedding update batch")
3365}
3366
3367/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3368/// is split before the running total of its text columns reaches this, and a
3369/// single cell at or above it is rejected rather than left to panic inside
3370/// `StringArray::from` (spec.md#adapter-bounded-values).
3371const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3372
3373/// Contiguous row ranges whose summed text-column byte cost each stays within
3374/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3375/// column too, since no single column's total can exceed it. `cells[i]` is row
3376/// `i`'s byte cost summed across every text column.
3377fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3378    let mut chunks = Vec::new();
3379    let mut start = 0usize;
3380    let mut running = 0usize;
3381    for (index, &row) in cells.iter().enumerate() {
3382        if running + row > COLUMN_BYTE_BUDGET && index > start {
3383            chunks.push(start..index);
3384            start = index;
3385            running = 0;
3386        }
3387        running += row;
3388    }
3389    if start < cells.len() {
3390        chunks.push(start..cells.len());
3391    }
3392    chunks
3393}
3394
3395fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3396    if bytes >= COLUMN_BYTE_BUDGET {
3397        anyhow::bail!(
3398            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3399             overflow Arrow's i32 offset buffer"
3400        );
3401    }
3402    Ok(())
3403}
3404
3405async fn merge_insert_chunks(
3406    handle: &Handle,
3407    table: Table,
3408    batches: Vec<RecordBatch>,
3409) -> Result<u64> {
3410    let mut inserted = 0u64;
3411    for batch in batches {
3412        let rows = batch.num_rows();
3413        inserted += handle.merge_insert(table, batch, rows).await?;
3414    }
3415    Ok(inserted)
3416}
3417
3418pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3419    let options = sessions
3420        .iter()
3421        .map(|session| json_bytes(&session.options))
3422        .collect::<Result<Vec<_>>>()?;
3423    let mut cells = Vec::with_capacity(sessions.len());
3424    for (session, encoded) in sessions.iter().zip(&options) {
3425        let columns = [
3426            session.id.len(),
3427            session.parent_session_id.as_deref().map_or(0, str::len),
3428            session.parent_message_id.as_deref().map_or(0, str::len),
3429            session.source_agent.len(),
3430            session.project.as_str().len(),
3431            encoded.len(),
3432        ];
3433        for bytes in columns {
3434            guard_cell("sessions", &session.id, bytes)?;
3435        }
3436        cells.push(columns.iter().sum());
3437    }
3438    chunk_ranges(&cells)
3439        .into_iter()
3440        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3441        .collect()
3442}
3443
3444fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3445    let schema = session_schema();
3446    RecordBatch::try_new(
3447        schema.clone(),
3448        vec![
3449            Arc::new(StringArray::from(
3450                sessions
3451                    .iter()
3452                    .map(|session| session.id.as_str())
3453                    .collect::<Vec<_>>(),
3454            )),
3455            Arc::new(StringArray::from(
3456                sessions
3457                    .iter()
3458                    .map(|session| session.parent_session_id.as_deref())
3459                    .collect::<Vec<_>>(),
3460            )),
3461            Arc::new(StringArray::from(
3462                sessions
3463                    .iter()
3464                    .map(|session| session.parent_message_id.as_deref())
3465                    .collect::<Vec<_>>(),
3466            )),
3467            Arc::new(StringArray::from(
3468                sessions
3469                    .iter()
3470                    .map(|session| session.source_agent.as_str())
3471                    .collect::<Vec<_>>(),
3472            )),
3473            Arc::new(
3474                TimestampMicrosecondArray::from(
3475                    sessions
3476                        .iter()
3477                        .map(|session| micros(session.created_at))
3478                        .collect::<Vec<_>>(),
3479                )
3480                .with_timezone("UTC"),
3481            ),
3482            Arc::new(StringArray::from(
3483                sessions
3484                    .iter()
3485                    .map(|session| session.project.as_str())
3486                    .collect::<Vec<_>>(),
3487            )),
3488            Arc::new(LargeBinaryArray::from_iter_values(
3489                options.iter().map(Vec::as_slice),
3490            )),
3491        ],
3492    )
3493    .context("failed to build session batch")
3494}
3495
3496pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3497    let options = rows
3498        .iter()
3499        .map(|row| json_bytes(row.message.options()))
3500        .collect::<Result<Vec<_>>>()?;
3501    let mut cells = Vec::with_capacity(rows.len());
3502    for (row, encoded) in rows.iter().zip(&options) {
3503        let columns = [
3504            row.message.session_id().len(),
3505            row.message.id().len(),
3506            row.message.role().as_str().len(),
3507            row.source_agent.len(),
3508            row.project.len(),
3509            row.message.system_content().map_or(0, str::len),
3510            row.search_text.map_or(0, str::len),
3511            encoded.len(),
3512        ];
3513        for bytes in columns {
3514            guard_cell("messages", row.message.id(), bytes)?;
3515        }
3516        cells.push(columns.iter().sum());
3517    }
3518    chunk_ranges(&cells)
3519        .into_iter()
3520        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3521        .collect()
3522}
3523
3524fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3525    let schema = message_schema();
3526    RecordBatch::try_new(
3527        schema.clone(),
3528        vec![
3529            Arc::new(StringArray::from(
3530                rows.iter()
3531                    .map(|row| row.message.session_id())
3532                    .collect::<Vec<_>>(),
3533            )),
3534            Arc::new(StringArray::from(
3535                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3536            )),
3537            Arc::new(
3538                TimestampMicrosecondArray::from(
3539                    rows.iter()
3540                        .map(|row| micros(row.message.timestamp()))
3541                        .collect::<Vec<_>>(),
3542                )
3543                .with_timezone("UTC"),
3544            ),
3545            Arc::new(StringArray::from(
3546                rows.iter()
3547                    .map(|row| row.message.role().as_str())
3548                    .collect::<Vec<_>>(),
3549            )),
3550            Arc::new(StringArray::from(
3551                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3552            )),
3553            Arc::new(StringArray::from(
3554                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3555            )),
3556            Arc::new(StringArray::from(
3557                rows.iter()
3558                    .map(|row| row.message.system_content())
3559                    .collect::<Vec<_>>(),
3560            )),
3561            Arc::new(StringArray::from(
3562                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3563            )),
3564            // `vector` / `embedding_model` are written null at ingest; every
3565            // message starts un-embedded and `pond embed` fills them later
3566            // (spec.md#session-embed-from-canonical).
3567            new_null_array(&embedding_vector_type(), rows.len()),
3568            new_null_array(&DataType::Utf8, rows.len()),
3569            Arc::new(LargeBinaryArray::from_iter_values(
3570                options.iter().map(Vec::as_slice),
3571            )),
3572        ],
3573    )
3574    .context("failed to build message batch")
3575}
3576
3577pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3578    let variant_data = parts
3579        .iter()
3580        .map(|part| part_variant_json(&part.kind))
3581        .collect::<Result<Vec<_>>>()?;
3582    let options = parts
3583        .iter()
3584        .map(|part| json_bytes(&part.options))
3585        .collect::<Result<Vec<_>>>()?;
3586    let mut cells = Vec::with_capacity(parts.len());
3587    // The blob column is a BinaryArray, exempt from the text-column bound
3588    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3589    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3590        let columns = [
3591            part.session_id.len(),
3592            part.message_id.len(),
3593            part.id.len(),
3594            part.kind.type_name().len(),
3595            part.provenance.as_str().len(),
3596            variant.len(),
3597            encoded.len(),
3598        ];
3599        for bytes in columns {
3600            guard_cell("parts", &part.id, bytes)?;
3601        }
3602        cells.push(columns.iter().sum());
3603    }
3604    chunk_ranges(&cells)
3605        .into_iter()
3606        .map(|range| {
3607            parts_chunk(
3608                &parts[range.clone()],
3609                &variant_data[range.clone()],
3610                &options[range],
3611            )
3612        })
3613        .collect()
3614}
3615
3616fn parts_chunk(
3617    parts: &[Part],
3618    variant_data: &[Vec<u8>],
3619    options: &[Vec<u8>],
3620) -> Result<RecordBatch> {
3621    let schema = part_schema();
3622    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3623    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3624    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3625    let blob_payloads: Vec<Option<&[u8]>> = parts
3626        .iter()
3627        .map(|part| match &part.kind {
3628            PartKind::File { data, .. } => Some(match data {
3629                FileData::String(value) => value.as_bytes(),
3630                FileData::Bytes(value) => value.as_slice(),
3631                FileData::Url(value) => value.as_bytes(),
3632            }),
3633            PartKind::Text { .. }
3634            | PartKind::Reasoning { .. }
3635            | PartKind::ToolCall { .. }
3636            | PartKind::ToolResult { .. }
3637            | PartKind::ToolApprovalRequest { .. }
3638            | PartKind::ToolApprovalResponse { .. } => None,
3639        })
3640        .collect();
3641    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3642
3643    RecordBatch::try_new(
3644        schema.clone(),
3645        vec![
3646            Arc::new(StringArray::from(
3647                parts
3648                    .iter()
3649                    .map(|part| part.session_id.as_str())
3650                    .collect::<Vec<_>>(),
3651            )),
3652            Arc::new(StringArray::from(
3653                parts
3654                    .iter()
3655                    .map(|part| part.message_id.as_str())
3656                    .collect::<Vec<_>>(),
3657            )),
3658            Arc::new(StringArray::from(
3659                parts
3660                    .iter()
3661                    .map(|part| part.id.as_str())
3662                    .collect::<Vec<_>>(),
3663            )),
3664            Arc::new(Int32Array::from(
3665                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3666            )),
3667            Arc::new(StringArray::from(
3668                parts
3669                    .iter()
3670                    .map(|part| part.kind.type_name())
3671                    .collect::<Vec<_>>(),
3672            )),
3673            Arc::new(StringArray::from(
3674                parts
3675                    .iter()
3676                    .map(|part| part.provenance.as_str())
3677                    .collect::<Vec<_>>(),
3678            )),
3679            Arc::new(LargeBinaryArray::from_iter_values(
3680                variant_data.iter().map(Vec::as_slice),
3681            )),
3682            Arc::new(blob_array),
3683            Arc::new(LargeBinaryArray::from_iter_values(
3684                options.iter().map(Vec::as_slice),
3685            )),
3686        ],
3687    )
3688    .context("failed to build parts batch")
3689}
3690
3691pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3692    Ok(Session {
3693        id: string(batch, "id", row)?.context("session id is null")?,
3694        parent_session_id: string(batch, "parent_session_id", row)?,
3695        parent_message_id: string(batch, "parent_message_id", row)?,
3696        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3697        created_at: datetime(batch, "created_at", row)?,
3698        project: crate::adapter::Extracted::from_stored(
3699            string(batch, "project", row)?.context("project is null")?,
3700        ),
3701        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3702    })
3703}
3704
3705pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3706    let id = string(batch, "id", row)?.context("message id is null")?;
3707    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3708    let timestamp = datetime(batch, "timestamp", row)?;
3709    let options =
3710        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3711
3712    match string(batch, "role", row)?
3713        .context("message role is null")?
3714        .as_str()
3715    {
3716        "system" => Ok(Message::System {
3717            id,
3718            session_id,
3719            timestamp,
3720            // `content` is nullable in the schema; preserve the distinction
3721            // between "no content row stored" (`None`) and "empty string
3722            // stored" (`Some(extracted_empty)`). The value originally
3723            // came from a `Source` extraction at ingest time; rewrap via
3724            // the storage-internal `from_stored` so the type-system seal
3725            // for adapters stays intact.
3726            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3727            options,
3728        }),
3729        "user" => Ok(Message::User {
3730            id,
3731            session_id,
3732            timestamp,
3733            options,
3734        }),
3735        "assistant" => Ok(Message::Assistant {
3736            id,
3737            session_id,
3738            timestamp,
3739            options,
3740        }),
3741        "tool" => Ok(Message::Tool {
3742            id,
3743            session_id,
3744            timestamp,
3745            options,
3746        }),
3747        other => anyhow::bail!("unknown message role {other}"),
3748    }
3749}
3750
3751pub(crate) fn part_from_batch(
3752    batch: &RecordBatch,
3753    row: usize,
3754    file_data: Option<FileData>,
3755) -> Result<Part> {
3756    let type_name = string(batch, "type", row)?.context("part type is null")?;
3757    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3758    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3759    Ok(Part {
3760        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3761        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3762        id: string(batch, "id", row)?.context("part id is null")?,
3763        ordinal: int32(batch, "ordinal", row)?,
3764        provenance: provenance_from_str(&provenance)?,
3765        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3766        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3767    })
3768}
3769
3770fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3771    match value {
3772        "conversational" => Ok(crate::wire::Provenance::Conversational),
3773        "injected" => Ok(crate::wire::Provenance::Injected),
3774        other => anyhow::bail!("unknown part provenance {other}"),
3775    }
3776}
3777
3778fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3779    let kind = file_data_kind(variant_data)?;
3780    match kind.as_str() {
3781        "string" => {
3782            let text = std::str::from_utf8(bytes)
3783                .context("file string payload is not UTF-8")?
3784                .to_owned();
3785            Ok(FileData::String(text))
3786        }
3787        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3788        "url" => Ok(FileData::Url(
3789            std::str::from_utf8(bytes)
3790                .context("file URL payload is not UTF-8")?
3791                .to_owned(),
3792        )),
3793        other => anyhow::bail!("unknown file data_kind {other}"),
3794    }
3795}
3796
3797fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3798    let value = json_parse::<Value>(variant_data)?;
3799    value
3800        .get("data_kind")
3801        .and_then(Value::as_str)
3802        .map(str::to_owned)
3803        .context("file part variant_data missing data_kind")
3804}
3805
3806fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3807    batch
3808        .column_by_name(name)
3809        .with_context(|| format!("missing column {name}"))?
3810        .as_any()
3811        .downcast_ref::<UInt64Array>()
3812        .with_context(|| format!("column {name} is not UInt64"))
3813}
3814
3815pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3816    let array = batch
3817        .column_by_name(name)
3818        .with_context(|| format!("missing column {name}"))?
3819        .as_any()
3820        .downcast_ref::<StringArray>()
3821        .with_context(|| format!("column {name} is not Utf8"))?;
3822    if array.is_null(row) {
3823        Ok(None)
3824    } else {
3825        Ok(Some(array.value(row).to_owned()))
3826    }
3827}
3828
3829fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3830    // Lance can return a `lance.json` column either as raw JSONB bytes
3831    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3832    // LargeUtf8), depending on the read path. Handle both.
3833    let column = batch
3834        .column_by_name(name)
3835        .with_context(|| format!("missing column {name}"))?;
3836    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3837        return if array.is_null(row) {
3838            Ok(None)
3839        } else {
3840            Ok(Some(
3841                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3842            ))
3843        };
3844    }
3845    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3846        return if array.is_null(row) {
3847            Ok(None)
3848        } else {
3849            Ok(Some(array.value(row).as_bytes().to_vec()))
3850        };
3851    }
3852    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3853        return if array.is_null(row) {
3854            Ok(None)
3855        } else {
3856            Ok(Some(array.value(row).as_bytes().to_vec()))
3857        };
3858    }
3859    anyhow::bail!("column {name} is not a JSON-compatible array")
3860}
3861
3862fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3863    let array = batch
3864        .column_by_name(name)
3865        .with_context(|| format!("missing column {name}"))?
3866        .as_any()
3867        .downcast_ref::<Int32Array>()
3868        .with_context(|| format!("column {name} is not Int32"))?;
3869    Ok(array.value(row))
3870}
3871
3872pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3873    let array = batch
3874        .column_by_name(name)
3875        .with_context(|| format!("missing column {name}"))?
3876        .as_any()
3877        .downcast_ref::<Float32Array>()
3878        .with_context(|| format!("column {name} is not Float32"))?;
3879    Ok(array.value(row))
3880}
3881
3882pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3883    let array = batch
3884        .column_by_name(name)
3885        .with_context(|| format!("missing column {name}"))?
3886        .as_any()
3887        .downcast_ref::<TimestampMicrosecondArray>()
3888        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3889    Utc.timestamp_micros(array.value(row))
3890        .single()
3891        .context("timestamp is out of range")
3892}
3893
3894fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3895    Field::new(name, data_type, nullable).with_metadata(
3896        [(
3897            "lance-schema:unenforced-primary-key".to_owned(),
3898            "true".to_owned(),
3899        )]
3900        .into(),
3901    )
3902}
3903
3904// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3905// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3906// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3907// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3908// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3909// allocated only one column_info, so the decoder's second `expect_next()`
3910// fires `"there were more fields in the schema than provided column
3911// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3912// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3913fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3914    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3915        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3916            .into_iter()
3917            .collect(),
3918    )
3919}
3920
3921fn json_field(name: &str, nullable: bool) -> Field {
3922    lance_arrow::json::json_field(name, nullable)
3923}
3924
3925fn micros(timestamp: DateTime<Utc>) -> i64 {
3926    timestamp.timestamp_micros()
3927}
3928
3929fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3930    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3931    // matches the `lance.json` extension contract. Lance's compact path
3932    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3933    // applies `decode_json -> encode_json` on this column; with proper JSONB
3934    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3935    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3936    // `try_into_dfstream`; compact still goes through the adapter).
3937    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3938    lance_arrow::json::encode_json(&text)
3939        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3940}
3941
3942fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3943    serde_json::from_slice(value).context("failed to parse JSON field")
3944}
3945
3946fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3947    if let PartKind::File {
3948        media_type,
3949        file_name,
3950        data,
3951    } = kind
3952    {
3953        let data_kind = match data {
3954            FileData::String(_) => "string",
3955            FileData::Bytes(_) => "bytes",
3956            FileData::Url(_) => "url",
3957        };
3958        return json_bytes(&serde_json::json!({
3959            "media_type": media_type,
3960            "file_name": file_name,
3961            "data_kind": data_kind,
3962        }));
3963    }
3964    let value = serde_json::to_value(kind)?;
3965    let mut object = value
3966        .as_object()
3967        .cloned()
3968        .context("part variant did not serialize to an object")?;
3969    object.remove("type");
3970    json_bytes(&object)
3971}
3972
3973fn part_kind_from_json(
3974    type_name: &str,
3975    variant_data: &[u8],
3976    file_data: Option<FileData>,
3977) -> Result<PartKind> {
3978    let mut value = json_parse::<Value>(variant_data)?;
3979    let object = value
3980        .as_object_mut()
3981        .context("part variant data is not an object")?;
3982    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3983    if let Some(data) = file_data {
3984        object.remove("data_kind");
3985        object.insert("data".to_owned(), serde_json::to_value(data)?);
3986    }
3987    serde_json::from_value(value).context("failed to parse part kind")
3988}
3989
3990#[cfg(test)]
3991mod tests {
3992    #![allow(clippy::expect_used, clippy::unwrap_used)]
3993
3994    use super::*;
3995    use crate::{
3996        adapter::Extracted,
3997        handlers::ingest_events,
3998        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3999    };
4000    use chrono::Utc;
4001    use serde_json::json;
4002    use tempfile::TempDir;
4003
4004    fn synthetic_session(id: &str) -> Session {
4005        Session {
4006            id: id.to_owned(),
4007            parent_session_id: None,
4008            parent_message_id: None,
4009            source_agent: "claude-code".to_owned(),
4010            created_at: Utc::now(),
4011            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4012            options: ProviderOptions::new(),
4013        }
4014    }
4015
4016    #[test]
4017    fn search_text_excludes_injected_parts() {
4018        use crate::wire::Provenance;
4019        let message = Message::User {
4020            id: "m1".to_owned(),
4021            session_id: "s1".to_owned(),
4022            timestamp: Utc::now(),
4023            options: ProviderOptions::new(),
4024        };
4025        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4026            session_id: "s1".to_owned(),
4027            id: id.to_owned(),
4028            message_id: "m1".to_owned(),
4029            ordinal: 0,
4030            provenance,
4031            options: ProviderOptions::new(),
4032            kind: PartKind::Text {
4033                text: Some(Extracted::from_test_value(text.to_owned())),
4034            },
4035        };
4036
4037        // A conversational part contributes; an injected one is excluded
4038        // (spec.md#search).
4039        let conversational = search_text(
4040            &message,
4041            &[text_part(
4042                "p1",
4043                "real human prompt",
4044                Provenance::Conversational,
4045            )],
4046        );
4047        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4048
4049        let injected = search_text(
4050            &message,
4051            &[text_part(
4052                "p2",
4053                "<task-notification>...</task-notification>",
4054                Provenance::Injected,
4055            )],
4056        );
4057        assert!(
4058            injected.is_none(),
4059            "a message whose only part is injected has null search_text"
4060        );
4061    }
4062
4063    #[test]
4064    fn chunk_ranges_splits_on_byte_budget() {
4065        assert!(chunk_ranges(&[]).is_empty());
4066        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4067
4068        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4069        assert_eq!(
4070            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4071            vec![0..1, 1..2, 2..3],
4072        );
4073
4074        // An oversized single row gets its own chunk, never an infinite loop.
4075        assert_eq!(
4076            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4077            vec![0..1, 1..2, 2..3],
4078        );
4079    }
4080
4081    #[tokio::test]
4082    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4083        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4084        // Message is dropped on the spot, with one Error outcome surfaced. The
4085        // rest of the substream continues normally - subsequent valid messages
4086        // and parts get written.
4087        let temp = TempDir::new()?;
4088        let store = Store::open_local(temp.path()).await?;
4089        let session = synthetic_session("ordering");
4090        let orphan_part = Part {
4091            session_id: session.id.clone(),
4092            id: "orphan-part".to_owned(),
4093            message_id: "missing-message".to_owned(),
4094            ordinal: 0,
4095            provenance: crate::wire::Provenance::Conversational,
4096            options: ProviderOptions::new(),
4097            kind: PartKind::Text {
4098                text: Some(Extracted::from_test_value("orphan".to_owned())),
4099            },
4100        };
4101        let valid_message = Message::User {
4102            id: "valid-message".to_owned(),
4103            session_id: session.id.clone(),
4104            timestamp: Utc::now(),
4105            options: ProviderOptions::new(),
4106        };
4107        let valid_part = Part {
4108            session_id: session.id.clone(),
4109            id: "valid-part".to_owned(),
4110            message_id: valid_message.id().to_owned(),
4111            ordinal: 0,
4112            provenance: crate::wire::Provenance::Conversational,
4113            options: ProviderOptions::new(),
4114            kind: PartKind::Text {
4115                text: Some(Extracted::from_test_value("kept".to_owned())),
4116            },
4117        };
4118
4119        let mut validator = IngestValidator::default();
4120        validator
4121            .push(&store, 0, IngestEvent::Session(session.clone()))
4122            .await?;
4123        let part_outcomes = validator
4124            .push(&store, 1, IngestEvent::Part(orphan_part))
4125            .await?;
4126        assert_eq!(part_outcomes.len(), 1);
4127        assert_eq!(part_outcomes[0].kind, "part");
4128        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4129        assert!(
4130            part_outcomes[0]
4131                .error
4132                .as_ref()
4133                .map(|e| e.message.contains("part event appeared before a message"))
4134                .unwrap_or(false),
4135            "error message must explain the ordering violation: {part_outcomes:?}"
4136        );
4137        validator
4138            .push(&store, 2, IngestEvent::Message(valid_message))
4139            .await?;
4140        validator
4141            .push(&store, 3, IngestEvent::Part(valid_part))
4142            .await?;
4143        validator.finish(&store).await?;
4144
4145        let (sessions, messages, parts) = store.row_counts().await?;
4146        assert_eq!(sessions, 1, "session committed despite the orphan part");
4147        assert_eq!(messages, 1, "valid message committed");
4148        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4149
4150        Ok(())
4151    }
4152
4153    #[tokio::test]
4154    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4155        // Per-event drop: a duplicate message id within a substream drops the
4156        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4157        // session still commits.
4158        let temp = TempDir::new()?;
4159        let store = Store::open_local(temp.path()).await?;
4160        let session = synthetic_session("duplicate-message");
4161        let first = Message::User {
4162            id: "message-1".to_owned(),
4163            session_id: session.id.clone(),
4164            timestamp: Utc::now(),
4165            options: ProviderOptions::new(),
4166        };
4167        let second = Message::Assistant {
4168            id: "message-1".to_owned(),
4169            session_id: session.id.clone(),
4170            timestamp: Utc::now(),
4171            options: ProviderOptions::new(),
4172        };
4173
4174        let mut validator = IngestValidator::default();
4175        validator
4176            .push(&store, 0, IngestEvent::Session(session.clone()))
4177            .await?;
4178        validator
4179            .push(&store, 1, IngestEvent::Message(first))
4180            .await?;
4181        let dup_outcomes = validator
4182            .push(&store, 2, IngestEvent::Message(second))
4183            .await?;
4184        assert_eq!(dup_outcomes.len(), 1);
4185        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4186        assert!(
4187            dup_outcomes[0]
4188                .error
4189                .as_ref()
4190                .map(|e| e.message.contains("duplicate message id message-1"))
4191                .unwrap_or(false),
4192            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4193        );
4194
4195        validator.finish(&store).await?;
4196        let (sessions, messages, _) = store.row_counts().await?;
4197        assert_eq!(sessions, 1, "session committed");
4198        assert_eq!(messages, 1, "only the first message committed");
4199
4200        Ok(())
4201    }
4202
4203    #[tokio::test]
4204    async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4205    -> anyhow::Result<()> {
4206        // spec.md#model-pond-options: `options.pond` is core-owned. A stored
4207        // message carries the process's host stamp (when resolvable) and never
4208        // a client-supplied value; session and part options stay untouched.
4209        let temp = TempDir::new()?;
4210        let store = Store::open_local(temp.path()).await?;
4211        let session = synthetic_session("host-provenance");
4212        let mut spoofed = ProviderOptions::new();
4213        spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4214        let message = Message::User {
4215            id: "message-1".to_owned(),
4216            session_id: session.id.clone(),
4217            timestamp: Utc::now(),
4218            options: spoofed,
4219        };
4220        let part = Part {
4221            session_id: session.id.clone(),
4222            id: "part-1".to_owned(),
4223            message_id: "message-1".to_owned(),
4224            ordinal: 0,
4225            provenance: crate::wire::Provenance::Conversational,
4226            options: ProviderOptions::new(),
4227            kind: PartKind::Text {
4228                text: Some(Extracted::from_test_value("hello".to_owned())),
4229            },
4230        };
4231
4232        let mut validator = IngestValidator::default();
4233        validator
4234            .push(&store, 0, IngestEvent::Session(session.clone()))
4235            .await?;
4236        validator
4237            .push(&store, 1, IngestEvent::Message(message))
4238            .await?;
4239        validator.push(&store, 2, IngestEvent::Part(part)).await?;
4240        validator.finish(&store).await?;
4241
4242        let stored = store
4243            .get_session(&session.id)
4244            .await?
4245            .expect("ingested session is readable");
4246        assert!(
4247            !stored.session.options.contains_key("pond"),
4248            "session rows are not stamped (attribution derives from messages)"
4249        );
4250        let stored_message = &stored.messages[0].message;
4251        match ingest_host_stamp() {
4252            Some(stamp) => {
4253                assert_eq!(
4254                    stored_message.options().get("pond"),
4255                    Some(stamp),
4256                    "stored message carries the real stamp, never the spoof"
4257                );
4258                let host = stamp
4259                    .pointer("/ingest/host")
4260                    .and_then(Value::as_object)
4261                    .expect("stamp shape is {ingest: {host: {..}}}");
4262                assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4263                assert!(
4264                    host.values()
4265                        .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4266                    "stamp fields are omitted when unavailable, never empty: {host:?}"
4267                );
4268            }
4269            None => assert!(
4270                stored_message.options().get("pond").is_none(),
4271                "with no resolvable stamp the spoofed key is still stripped"
4272            ),
4273        }
4274        assert!(
4275            !stored.messages[0].parts[0].options.contains_key("pond"),
4276            "part rows are not stamped (covered by their message's stamp)"
4277        );
4278
4279        Ok(())
4280    }
4281
4282    /// Regression: compact_files on `parts` with the blob column tripped a
4283    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4284    /// batches give compact fragments to merge; every `FileData` variant
4285    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4286    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4287    /// write batch mixes null + valid rows in the blob column - benign in
4288    /// release, irrelevant to this regression's scope.
4289    #[tokio::test(flavor = "multi_thread")]
4290    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4291        use crate::wire::{FileData, PartKind, Provenance};
4292        let temp = TempDir::new()?;
4293        let store = Store::open_local(temp.path()).await?;
4294
4295        let session = synthetic_session("compact-blob");
4296        store
4297            .upsert_sessions(std::slice::from_ref(&session))
4298            .await?;
4299
4300        let make_part = |idx: usize, kind: PartKind| Part {
4301            session_id: session.id.clone(),
4302            message_id: format!("msg-{idx}"),
4303            id: format!("part-{idx}"),
4304            ordinal: 0,
4305            provenance: Provenance::Conversational,
4306            options: ProviderOptions::new(),
4307            kind,
4308        };
4309
4310        let batch_a = vec![
4311            make_part(
4312                0,
4313                PartKind::File {
4314                    media_type: Some("text/plain".to_owned()),
4315                    file_name: Some("a.txt".to_owned()),
4316                    data: FileData::Bytes(b"alpha".to_vec()),
4317                },
4318            ),
4319            make_part(
4320                1,
4321                PartKind::File {
4322                    media_type: Some("text/plain".to_owned()),
4323                    file_name: Some("b.txt".to_owned()),
4324                    data: FileData::String("beta".to_owned()),
4325                },
4326            ),
4327        ];
4328        store.upsert_parts(&batch_a).await?;
4329
4330        let batch_b = vec![
4331            make_part(
4332                2,
4333                PartKind::File {
4334                    media_type: Some("application/octet-stream".to_owned()),
4335                    file_name: None,
4336                    data: FileData::Url("https://example.com/file".to_owned()),
4337                },
4338            ),
4339            make_part(
4340                3,
4341                PartKind::File {
4342                    media_type: Some("image/png".to_owned()),
4343                    file_name: Some("c.png".to_owned()),
4344                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4345                },
4346            ),
4347        ];
4348        store.upsert_parts(&batch_b).await?;
4349
4350        store
4351            .optimize_indices(None, &MaintenancePolicy::always_compact())
4352            .await?
4353            .into_result()?;
4354
4355        Ok(())
4356    }
4357
4358    #[tokio::test]
4359    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4360        let temp = TempDir::new()?;
4361        let store = Store::open_local(temp.path()).await?;
4362        let session = synthetic_session("blob");
4363        let message = Message::User {
4364            id: "message-1".to_owned(),
4365            session_id: session.id.clone(),
4366            timestamp: Utc::now(),
4367            options: ProviderOptions::new(),
4368        };
4369        let part = Part {
4370            session_id: session.id.clone(),
4371            id: "part-1".to_owned(),
4372            message_id: message.id().to_owned(),
4373            ordinal: 0,
4374            provenance: crate::wire::Provenance::Conversational,
4375            options: ProviderOptions::new(),
4376            kind: PartKind::File {
4377                media_type: Some("text/plain".to_owned()),
4378                file_name: Some("payload.txt".to_owned()),
4379                data: FileData::Bytes(b"pond".to_vec()),
4380            },
4381        };
4382
4383        let mut validator = IngestValidator::default();
4384        validator
4385            .push(&store, 0, IngestEvent::Session(session.clone()))
4386            .await?;
4387        validator
4388            .push(&store, 1, IngestEvent::Message(message.clone()))
4389            .await?;
4390        validator
4391            .push(&store, 2, IngestEvent::Part(part.clone()))
4392            .await?;
4393        validator.finish(&store).await?;
4394
4395        let stored = store
4396            .get_session(&session.id)
4397            .await?
4398            .expect("session should exist");
4399        let stored_part = &stored.messages[0].parts[0];
4400        assert_eq!(stored_part, &part);
4401
4402        Ok(())
4403    }
4404
4405    //
4406    // `Session.source_agent` and `Session.project` are immutable
4407    // post-first-write because `messages` denormalizes them at first
4408    // ingest; a silent overwrite would desync the denormalized
4409    // copies. pond core's `IngestValidator` probes the existing session
4410    // before the merge_insert and emits a per-row `validation_failed`
4411    // outcome with the typed field name when either changes. Other Session
4412    // fields (options, parent_session_id, created_at, parent_message_id)
4413    // re-write idempotently via merge_insert.
4414
4415    fn base_session() -> Session {
4416        Session {
4417            id: "01HXY00000000001".to_owned(),
4418            parent_session_id: None,
4419            parent_message_id: None,
4420            source_agent: "claude-code".to_owned(),
4421            created_at: Utc::now(),
4422            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4423            options: ProviderOptions::new(),
4424        }
4425    }
4426
4427    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4428        outcomes
4429            .iter()
4430            .filter(|outcome| outcome.status == target)
4431            .count()
4432    }
4433
4434    #[tokio::test(flavor = "multi_thread")]
4435    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4436    -> anyhow::Result<()> {
4437        let temp = TempDir::new()?;
4438        let store = Store::open_local(temp.path()).await?;
4439
4440        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4441        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4442
4443        let mut again = base_session();
4444        again.options.insert("title".to_owned(), json!("renamed"));
4445        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4446        assert_eq!(
4447            count_status(&second, OutcomeStatus::Error),
4448            0,
4449            "options is mutable; the re-ingest must not surface an error: {second:?}",
4450        );
4451        assert_eq!(
4452            count_status(&second, OutcomeStatus::Matched),
4453            1,
4454            "unchanged immutable fields must match-insert via merge_insert",
4455        );
4456
4457        Ok(())
4458    }
4459
4460    #[tokio::test(flavor = "multi_thread")]
4461    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4462        let temp = TempDir::new()?;
4463        let store = Store::open_local(temp.path()).await?;
4464
4465        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4466        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4467
4468        let mut tampered = base_session();
4469        tampered.source_agent = "codex-cli".to_owned();
4470        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4471        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4472        let err_row = second
4473            .iter()
4474            .find(|outcome| outcome.status == OutcomeStatus::Error)
4475            .expect("error outcome present");
4476        let err = err_row.error.as_ref().expect("error body present");
4477        assert_eq!(err.field, Some("source_agent"));
4478        assert_eq!(err.reason, Some("immutable"));
4479
4480        // The stored row stayed on the original adapter - no silent rewrite.
4481        let stored = store
4482            .get_session(&base_session().id)
4483            .await?
4484            .expect("session row survives the rejected re-ingest");
4485        assert_eq!(stored.session.source_agent, "claude-code");
4486
4487        Ok(())
4488    }
4489
4490    #[tokio::test(flavor = "multi_thread")]
4491    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4492        let temp = TempDir::new()?;
4493        let store = Store::open_local(temp.path()).await?;
4494
4495        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4496        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4497
4498        let mut tampered = base_session();
4499        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4500        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4501        let err_row = second
4502            .iter()
4503            .find(|outcome| outcome.status == OutcomeStatus::Error)
4504            .expect("project change must surface an error outcome");
4505        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4506
4507        let stored = store
4508            .get_session(&base_session().id)
4509            .await?
4510            .expect("session row survives");
4511        assert_eq!(
4512            stored.session.project.as_str(),
4513            "/home/me/proj",
4514            "stored project must remain the original",
4515        );
4516
4517        Ok(())
4518    }
4519
4520    #[tokio::test(flavor = "multi_thread")]
4521    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4522        // Regression guard: re-ingesting an existing session with NEW
4523        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4524        // on `BatchCounts`, and per-row outcomes must mark the new message
4525        // rows `Inserted` while the session row is `Matched`. The prior
4526        // implementation derived all per-row statuses from the batch-level
4527        // session inserted count, which silently flipped the new messages
4528        // into `Matched` (visible as "up to date" in the CLI bar tail).
4529        use crate::wire::Provenance;
4530        let temp = TempDir::new()?;
4531        let store = Store::open_local(temp.path()).await?;
4532        let session = base_session();
4533
4534        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4535            session_id: session.id.clone(),
4536            id: part_id.to_owned(),
4537            message_id: message_id.to_owned(),
4538            ordinal: 0,
4539            provenance: Provenance::Conversational,
4540            options: ProviderOptions::new(),
4541            kind: PartKind::Text {
4542                text: Some(Extracted::from_test_value(body.to_owned())),
4543            },
4544        };
4545        let user_message = |id: &str| Message::User {
4546            id: id.to_owned(),
4547            session_id: session.id.clone(),
4548            timestamp: Utc::now(),
4549            options: ProviderOptions::new(),
4550        };
4551
4552        // First pass: 2 messages land fresh.
4553        let mut validator = IngestValidator::default();
4554        validator
4555            .push(&store, 0, IngestEvent::Session(session.clone()))
4556            .await?;
4557        validator
4558            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4559            .await?;
4560        validator
4561            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4562            .await?;
4563        validator
4564            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4565            .await?;
4566        validator
4567            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4568            .await?;
4569        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4570        assert_eq!(first_counts.sessions_inserted, 1);
4571        assert_eq!(first_counts.messages_inserted_total, 2);
4572        assert_eq!(first_counts.messages_inserted_searchable, 2);
4573
4574        // Second pass: same session id, 3 NEW messages.
4575        let mut validator = IngestValidator::default();
4576        validator
4577            .push(&store, 0, IngestEvent::Session(session.clone()))
4578            .await?;
4579        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4580            let pid = format!("p{}", idx + 3);
4581            validator
4582                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4583                .await?;
4584            validator
4585                .push(
4586                    &store,
4587                    idx * 2 + 2,
4588                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4589                )
4590                .await?;
4591        }
4592        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4593
4594        assert_eq!(
4595            second_counts.sessions_inserted, 0,
4596            "existing session row must report as Matched, not Inserted",
4597        );
4598        assert_eq!(second_counts.sessions_matched, 1);
4599        assert_eq!(
4600            second_counts.messages_inserted_total, 3,
4601            "the three NEW messages must register as Inserted in BatchCounts",
4602        );
4603        assert_eq!(
4604            second_counts.messages_inserted_searchable, 3,
4605            "all three new messages carry conversational text -> searchable",
4606        );
4607        assert_eq!(second_counts.messages_matched_total, 0);
4608        assert_eq!(second_counts.parts_inserted, 3);
4609        assert_eq!(second_counts.parts_matched, 0);
4610
4611        // Per-row outcomes mirror the BatchCounts shape: the session row is
4612        // Matched, every new message + part row is Inserted.
4613        let session_outcome = second_outcomes
4614            .iter()
4615            .find(|outcome| outcome.kind == "session")
4616            .expect("session-row outcome present");
4617        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4618        for outcome in &second_outcomes {
4619            if outcome.kind == "message" || outcome.kind == "part" {
4620                assert_eq!(
4621                    outcome.status,
4622                    OutcomeStatus::Inserted,
4623                    "new row must be Inserted, got: {outcome:?}",
4624                );
4625            }
4626        }
4627        Ok(())
4628    }
4629
4630    /// Ingest `count` synthetic messages spread across a handful of sessions
4631    /// and projects, each with conversational `search_text`. Returns the store
4632    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4633    async fn store_with_messages(
4634        temp: &TempDir,
4635        count: usize,
4636    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4637        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4638    }
4639
4640    /// Same as [`store_with_messages`] but tests optimize with a custom
4641    /// IVF_PQ activation threshold.
4642    async fn store_with_messages_at_threshold(
4643        temp: &TempDir,
4644        count: usize,
4645        _vector_threshold: usize,
4646    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4647        let store = Store::open_local(temp.path()).await?;
4648        let sessions = 8.min(count.max(1));
4649        let mut events = Vec::new();
4650        for s in 0..sessions {
4651            events.push(IngestEvent::Session(Session {
4652                id: format!("session-{s}"),
4653                parent_session_id: None,
4654                parent_message_id: None,
4655                source_agent: "claude-code".to_owned(),
4656                created_at: Utc::now(),
4657                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4658                options: ProviderOptions::new(),
4659            }));
4660            for i in (s..count).step_by(sessions) {
4661                let message_id = format!("msg-{i}");
4662                events.push(IngestEvent::Message(Message::User {
4663                    id: message_id.clone(),
4664                    session_id: format!("session-{s}"),
4665                    timestamp: Utc::now(),
4666                    options: ProviderOptions::new(),
4667                }));
4668                events.push(IngestEvent::Part(Part {
4669                    session_id: format!("session-{s}"),
4670                    id: format!("{message_id}-part"),
4671                    message_id,
4672                    ordinal: 0,
4673                    provenance: crate::wire::Provenance::Conversational,
4674                    options: ProviderOptions::new(),
4675                    kind: PartKind::Text {
4676                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4677                    },
4678                }));
4679            }
4680        }
4681        ingest_events(&store, events).await?;
4682        let keys = (0..count)
4683            .map(|i| MessageKey {
4684                session_id: format!("session-{}", i % sessions),
4685                message_id: format!("msg-{i}"),
4686            })
4687            .collect();
4688        Ok((store, keys))
4689    }
4690
4691    /// A deterministic pseudo-random vector of the production dimension.
4692    fn synthetic_vector(seed: usize) -> Vec<f32> {
4693        let mut state = (seed as u64)
4694            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4695            .wrapping_add(1);
4696        (0..embedding_dim())
4697            .map(|_| {
4698                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4699                #[allow(clippy::cast_precision_loss)]
4700                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4701                unit - 1.0
4702            })
4703            .collect()
4704    }
4705
4706    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4707    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4708        keys.iter()
4709            .enumerate()
4710            .map(|(seed, key)| EmbeddedMessage {
4711                session_id: key.session_id.clone(),
4712                id: key.message_id.clone(),
4713                vector: synthetic_vector(seed),
4714            })
4715            .collect()
4716    }
4717
4718    fn embedding_update_batch_with_model(
4719        rows: &[EmbeddedMessage],
4720        model: &str,
4721    ) -> Result<RecordBatch> {
4722        let mut batch = embedding_update_batch(rows)?;
4723        let columns = batch
4724            .columns()
4725            .iter()
4726            .take(3)
4727            .cloned()
4728            .chain(std::iter::once(
4729                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4730            ))
4731            .collect::<Vec<_>>();
4732        batch = RecordBatch::try_new(batch.schema(), columns)?;
4733        Ok(batch)
4734    }
4735
4736    #[tokio::test]
4737    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4738        let temp = TempDir::new()?;
4739        // 4 messages cycle session-0..session-3, so `session-3` is a real
4740        // partition. Scalar-index pushdown is volume-independent: the planner
4741        // emits `ScalarIndexQuery` whenever the index exists.
4742        let (store, keys) = store_with_messages(&temp, 4).await?;
4743        store.write_embeddings(&embedded(&keys)).await?;
4744        store
4745            .optimize_indices(None, &MaintenancePolicy::always_compact())
4746            .await?
4747            .into_result()?;
4748
4749        let query = vec![0.01_f32; embedding_dim()];
4750        let plan = store
4751            .explain_vector_plan(
4752                &query,
4753                10,
4754                &Predicate::Eq("session_id", "session-3".into()),
4755                None,
4756            )
4757            .await?;
4758
4759        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4760        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4761        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4762        // and unrelated.)
4763        assert!(
4764            plan.contains("ScalarIndexQuery"),
4765            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4766        );
4767        let predicate_postfiltered = plan
4768            .lines()
4769            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4770        assert!(
4771            !predicate_postfiltered,
4772            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4773        );
4774        Ok(())
4775    }
4776
4777    #[tokio::test]
4778    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4779        let temp = TempDir::new()?;
4780        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4781
4782        // First batch: 255 vectors, one below threshold. Optimize does not
4783        // create the IVF_PQ because the trigger is not met.
4784        store.write_embeddings(&embedded(&keys[..255])).await?;
4785        store
4786            .optimize_indices_with_vector_threshold(256)
4787            .await?
4788            .into_result()?;
4789        assert!(
4790            !store
4791                .handle
4792                .messages_index_names()
4793                .await?
4794                .iter()
4795                .any(|name| name == MESSAGES_VECTOR_INDEX),
4796            "IVF_PQ must not exist below the activation threshold",
4797        );
4798
4799        // Next batch: one more vector. Total reaches 256; optimize creates
4800        // the IVF_PQ.
4801        store.write_embeddings(&embedded(&keys[255..256])).await?;
4802        store
4803            .optimize_indices_with_vector_threshold(256)
4804            .await?
4805            .into_result()?;
4806        assert!(
4807            store
4808                .handle
4809                .messages_index_names()
4810                .await?
4811                .iter()
4812                .any(|name| name == MESSAGES_VECTOR_INDEX),
4813            "optimize must create the IVF_PQ once the threshold is crossed",
4814        );
4815
4816        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4817        // non-null subset and a planted vector is retrievable.
4818        let hits = store
4819            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4820            .await?;
4821        assert!(
4822            hits.iter().any(|(key, _)| key == &keys[0]),
4823            "an embedded row is retrievable via the index",
4824        );
4825        Ok(())
4826    }
4827
4828    #[tokio::test]
4829    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4830    {
4831        let temp = TempDir::new()?;
4832        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4833        let old_rows = embedded(&keys);
4834        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4835        store
4836            .handle
4837            .merge_update(Table::Messages, old_batch, old_rows.len())
4838            .await?;
4839        store
4840            .optimize_indices_with_vector_threshold(256)
4841            .await?
4842            .into_result()?;
4843        assert!(
4844            store
4845                .handle
4846                .messages_index_names()
4847                .await?
4848                .iter()
4849                .any(|name| name == MESSAGES_VECTOR_INDEX),
4850            "IVF_PQ must exist before a model swap",
4851        );
4852        assert_eq!(store.stale_embedding_count().await?, keys.len());
4853
4854        store.drop_vector_index().await?;
4855        let mut pending = Vec::new();
4856        let stream = store.pending_or_stale_messages();
4857        tokio::pin!(stream);
4858        while let Some(row) = stream.next().await {
4859            pending.push(row?);
4860        }
4861        assert_eq!(
4862            pending.len(),
4863            keys.len(),
4864            "force stream should see stale rows"
4865        );
4866        store.write_embeddings(&embedded(&keys)).await?;
4867        assert_eq!(store.stale_embedding_count().await?, 0);
4868        store
4869            .optimize_indices_with_vector_threshold(256)
4870            .await?
4871            .into_result()?;
4872        assert!(
4873            store
4874                .handle
4875                .messages_index_names()
4876                .await?
4877                .iter()
4878                .any(|name| name == MESSAGES_VECTOR_INDEX),
4879            "optimize must rebuild IVF_PQ after force re-embed",
4880        );
4881
4882        let stream = store.pending_or_stale_messages();
4883        tokio::pin!(stream);
4884        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4885        Ok(())
4886    }
4887
4888    #[tokio::test]
4889    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4890        // Regression: `_row_last_updated_at_version` can point at a Lance
4891        // manifest version that `cleanup_old_versions` or the auto_cleanup
4892        // hook has since dropped from `Dataset::versions()`. The old code
4893        // silently dropped any session whose row-version was not in the
4894        // visible list, collapsing the staleness-skip map down to recent
4895        // commits and forcing `pond sync` to re-touch every file. The fix
4896        // falls back to the oldest still-visible commit timestamp - a
4897        // sound upper bound on the row's true ingest time.
4898        let temp = TempDir::new()?;
4899        let (store, _keys) = store_with_messages(&temp, 4).await?;
4900
4901        // Produce several distinct manifest versions on `sessions` so the
4902        // older ones become eligible for cleanup.
4903        for tag in 0..3 {
4904            let extra = synthetic_session(&format!("extra-{tag}"));
4905            store.upsert_sessions(&[extra]).await?;
4906        }
4907
4908        // Prune everything older than ~now, leaving only the latest manifest.
4909        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4910        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4911        // Lance's `delete_unverified` floor still protects in-flight files.
4912        let dataset = store.handle.dataset(Table::Sessions).await?;
4913        dataset
4914            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4915            .await
4916            .context("cleanup_old_versions failed")?;
4917
4918        let map = store.session_last_ingested_at().await?;
4919        let session_count = store.row_counts().await?.0;
4920        assert!(
4921            map.len() >= session_count,
4922            "watermark map ({}) must still cover every session ({}) after \
4923             version cleanup; an empty fallback regresses pond sync to a \
4924             full re-scan",
4925            map.len(),
4926            session_count,
4927        );
4928        Ok(())
4929    }
4930
4931    #[tokio::test]
4932    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4933        let temp = TempDir::new()?;
4934        let (store, keys) = store_with_messages(&temp, 10).await?;
4935
4936        let before = store.embedding_progress().await?;
4937        assert_eq!(before.embedded, 0);
4938        assert_eq!(before.total, 10);
4939        assert_eq!(before.model, crate::embed::model_id());
4940
4941        store.write_embeddings(&embedded(&keys[..4])).await?;
4942        let partial = store.embedding_progress().await?;
4943        assert_eq!(partial.embedded, 4);
4944        assert_eq!(partial.total, 10);
4945
4946        store.write_embeddings(&embedded(&keys[4..])).await?;
4947        let full = store.embedding_progress().await?;
4948        assert_eq!(full.embedded, 10);
4949        assert_eq!(full.total, 10);
4950        Ok(())
4951    }
4952}