Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
28        OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
29        TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{
32        FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33        SessionFrom,
34    },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40    handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45    pub sessions: usize,
46    pub messages: usize,
47    pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52    pub sessions: u64,
53    pub messages: u64,
54    pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59    pub rows: LanceArchiveCounts,
60    pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65    pub rows: LanceArchiveCounts,
66    pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71    pub sessions: Vec<IndexIntent>,
72    pub messages: Vec<IndexIntent>,
73    pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78        [
79            (Table::Sessions, &self.sessions),
80            (Table::Messages, &self.messages),
81            (Table::Parts, &self.parts),
82        ]
83    }
84}
85
86/// A message awaiting embedding: its primary key plus the `search_text` to
87/// embed. The vector lives on the same `messages` row, so no denormalized
88/// filter columns are needed (spec.md#session-embed-from-canonical).
89#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91    pub session_id: String,
92    pub id: String,
93    pub search_text: String,
94}
95
96/// One embedded message: a primary key and the vector to store. `pond embed`
97/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
98#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100    pub session_id: String,
101    pub id: String,
102    pub vector: Vec<f32>,
103}
104
105/// Message metadata used to hydrate search hits after retriever ranking.
106#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108    pub message_id: String,
109    pub session_id: String,
110    pub role: String,
111    pub project: String,
112    pub source_agent: String,
113    pub timestamp: DateTime<Utc>,
114    pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119    pub session_id: String,
120    pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125    Inserted,
126    Matched,
127}
128
129/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
130/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
131/// results so the CLI can render compaction-skipped (under writer contention)
132/// distinctly from index-build failure (real problem).
133#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135    pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139    /// True if any table's indices phase reported a non-conflict failure.
140    /// `SkippedConflict` is expected under contention and does not count.
141    pub fn any_indices_failed(&self) -> bool {
142        self.tables.iter().any(|t| t.indices.is_failed())
143    }
144
145    /// Treat any `Failed` phase as an error. Tests that don't run under
146    /// contention use this to keep their existing `.await?` style: a real
147    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
148    pub fn into_result(self) -> Result<Self> {
149        for table in &self.tables {
150            if let PhaseOutcome::Failed(error) = &table.indices {
151                anyhow::bail!(
152                    "indices phase failed on {}: {error:#}",
153                    table.table.as_str()
154                );
155            }
156            if let PhaseOutcome::Failed(error) = &table.compaction {
157                anyhow::bail!(
158                    "compaction phase failed on {}: {error:#}",
159                    table.table.as_str()
160                );
161            }
162        }
163        Ok(self)
164    }
165}
166
167/// What `pond status` reports: where the data lives, total rows per table,
168/// and a per-(adapter, project) breakdown built from one `messages` scan.
169#[derive(Debug, Clone)]
170pub struct CorpusStats {
171    pub data_url: Url,
172    pub totals: RowTotals,
173    /// One entry per adapter present in the corpus. When `include_subagents`
174    /// is false (the CLI default), sub-agent rows (`source_agent` containing
175    /// `/`) are filtered out so only the main-agent sessions appear. When
176    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
177    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
178    /// this into registry order at render time so the tree matches the
179    /// discovery picker.
180    pub adapters: Vec<AdapterStats>,
181    /// Whether the rollup includes sub-agent sessions. The renderer prints a
182    /// hint about `--include-subagents` when this is false so users know the
183    /// `totals` row above counts sessions that aren't broken down below.
184    pub include_subagents: bool,
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub struct RowTotals {
189    pub sessions: u64,
190    pub messages: u64,
191    pub parts: u64,
192}
193
194/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
195/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
196/// without `search_text` produce no vector. `embedded` is the subset of those
197/// already carrying a vector under the current [`embed::model_id()`]. The pending
198/// backlog is `total - embedded`.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct EmbeddingProgress {
201    pub embedded: usize,
202    pub total: usize,
203    pub model: &'static str,
204}
205
206#[derive(Debug, Clone)]
207pub struct AdapterStats {
208    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
209    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
210    /// `include_subagents` is on.
211    pub adapter: String,
212    pub sessions: u64,
213    pub messages: u64,
214    /// Projects under this adapter, sorted by message count desc, then by
215    /// project name asc.
216    pub projects: Vec<ProjectStats>,
217}
218
219#[derive(Debug, Clone)]
220pub struct ProjectStats {
221    pub project: String,
222    pub sessions: u64,
223    pub messages: u64,
224}
225
226#[derive(Default)]
227struct GroupAccumulator {
228    messages: u64,
229    session_ids: HashSet<String>,
230}
231
232#[derive(Debug, Clone, Copy)]
233pub struct MessageWrite<'a> {
234    pub message: &'a Message,
235    pub parts: &'a [Part],
236    pub search_text: Option<&'a str>,
237}
238
239impl Store {
240    /// Open against a local filesystem URL or a remote one for which the
241    /// caller has no extra options to pass (env vars suffice). CLI verbs
242    /// that load `[storage]` from config should call
243    /// [`Store::open_with_options`] instead so the same options flow into
244    /// every dataset open and write.
245    pub async fn open(location: &Url) -> Result<Self> {
246        Ok(Self {
247            handle: Handle::open(location).await?,
248        })
249    }
250
251    /// Open with object-store options (S3 creds, region, endpoint, ...)
252    /// threaded through Lance verbatim. Keys are the standard `object_store`
253    /// config names; pond does not parse them. Empty options + default caps
254    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
255    /// config block via [`crate::substrate::RuntimeCaps`].
256    pub async fn open_with_options(
257        location: &Url,
258        storage_options: std::collections::HashMap<String, String>,
259        caps: crate::substrate::RuntimeCaps,
260    ) -> Result<Self> {
261        Ok(Self {
262            handle: Handle::open_with_options(location, storage_options, caps).await?,
263        })
264    }
265
266    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
267    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
268    /// through [`Store::open_with_options`] so the production policy is
269    /// applied; tests get the backend-aware local-FS defaults.
270    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
271        let url = config::url_for_path(path)?;
272        Self::open_with_options(
273            &url,
274            std::collections::HashMap::new(),
275            crate::substrate::RuntimeCaps::default(),
276        )
277        .await
278    }
279
280    /// Export clean, index-free Lance datasets into `dest`.
281    ///
282    /// This rewrites the visible rows of each table instead of copying the
283    /// dataset roots. The resulting manifests therefore contain no references
284    /// to the source store's `_indices`, while `messages.vector` and
285    /// `messages.embedding_model` remain ordinary data columns and are
286    /// preserved.
287    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
288        std::fs::create_dir_all(dest)
289            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
290        let (sessions, sessions_version) = self
291            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
292            .await?;
293        let (messages, messages_version) = self
294            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
295            .await?;
296        let (parts, parts_version) = self
297            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
298            .await?;
299        Ok(LanceArchiveExport {
300            rows: LanceArchiveCounts {
301                sessions,
302                messages,
303                parts,
304            },
305            source_versions: LanceArchiveVersions {
306                sessions: sessions_version,
307                messages: messages_version,
308                parts: parts_version,
309            },
310        })
311    }
312
313    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
314        let sessions_dataset =
315            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
316        let messages_dataset =
317            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
318        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
319        let (sessions, sessions_inserted) = self
320            .import_clean_table(Table::Sessions, sessions_dataset)
321            .await?;
322        let (messages, messages_inserted) = self
323            .import_clean_table(Table::Messages, messages_dataset)
324            .await?;
325        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
326        Ok(LanceArchiveImport {
327            rows: LanceArchiveCounts {
328                sessions,
329                messages,
330                parts,
331            },
332            inserted: LanceArchiveCounts {
333                sessions: sessions_inserted,
334                messages: messages_inserted,
335                parts: parts_inserted,
336            },
337        })
338    }
339
340    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
341        let dataset = self.handle.dataset(table).await?;
342        let source_version = dataset.version_id();
343        let schema = export_schema(table);
344        let mut stream = dataset
345            .scan()
346            .try_into_stream()
347            .await
348            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
349        let dest_uri = dest
350            .to_str()
351            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
352
353        let mut rows = 0usize;
354        let mut wrote = false;
355        while let Some(batch) = stream.next().await {
356            let batch = batch
357                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
358            rows += batch.num_rows();
359            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
360            let mut params = write_params_for_create();
361            if wrote {
362                params.mode = WriteMode::Append;
363            }
364            Dataset::write(reader, dest_uri, Some(params))
365                .await
366                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
367            wrote = true;
368        }
369
370        if !wrote {
371            let batch = RecordBatch::new_empty(schema.clone());
372            let reader = RecordBatchIterator::new([Ok(batch)], schema);
373            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
374                .await
375                .with_context(|| {
376                    format!("failed to write empty {} archive table", table.as_str())
377                })?;
378        }
379        Ok((rows, source_version))
380    }
381
382    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
383        let mut stream = dataset
384            .scan()
385            .try_into_stream()
386            .await
387            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
388        let mut rows = 0usize;
389        let mut inserted = 0usize;
390        while let Some(batch) = stream.next().await {
391            let batch = batch
392                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
393            let row_count = batch.num_rows();
394            rows += row_count;
395            inserted += self
396                .handle
397                .merge_insert(table, batch, row_count)
398                .await
399                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
400                as usize;
401        }
402        Ok((rows, inserted))
403    }
404
405    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
406        if sessions.is_empty() {
407            return Ok(Vec::new());
408        }
409        let batches = sessions_batches(sessions)?;
410        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
411        // Per-row outcomes; no fold here (see `pond index optimize`).
412        Ok(statuses_from_inserted(sessions.len(), inserted))
413    }
414
415    /// Batched write path used by the adapter ingest loop and by the wire
416    /// handler's final flush. Receives N completed substreams from the
417    /// validator and:
418    ///
419    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
420    ///      per session, sequentially. Sessions that fail produce one Error
421    ///      outcome and are excluded from the write batch.
422    ///   2. Deduplicates in-batch at the substream level: when two substreams
423    ///      in the same batch share a `session_id` (Claude Code's subagent
424    ///      files reuse their parent's id), the first occurrence wins. The
425    ///      second is either *merged* (same `source_agent` + `project`:
426    ///      messages/parts append, no duplicate rows) or *rejected*
427    ///      (different `project` - the subagent-vs-parent case). Row-level
428    ///      duplicates that slip past here are caught downstream by Lance's
429    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
430    ///      (invariant 17): this layer's job is preserving substream merge
431    ///      semantics, not policing the PK uniqueness Lance handles itself.
432    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
433    ///      parts) across every valid substream.
434    ///   4. Fires the three `merge_insert` calls in parallel via
435    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
436    ///      independent, so these proceed concurrently.
437    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
438    async fn upsert_session_batch(
439        &self,
440        substreams: Vec<CompletedSubstream>,
441    ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
442        if substreams.is_empty() {
443            return Ok((Vec::new(), BatchCounts::default()));
444        }
445
446        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
447        let mut counts = BatchCounts::default();
448
449        // In-batch dedup. First occurrence of each session_id wins; later
450        // occurrences either merge or get rejected. Iteration order preserves
451        // original substream order so outcomes index correctly.
452        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
453        let mut by_session_id: std::collections::HashMap<String, usize> =
454            std::collections::HashMap::with_capacity(substreams.len());
455        for substream in substreams {
456            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
457                let existing = &merged[existing_idx];
458                if existing.session.source_agent != substream.session.source_agent
459                    || existing.session.project != substream.session.project
460                {
461                    // Subagent-vs-parent class. The first occurrence's
462                    // metadata stays authoritative; this substream is
463                    // rejected on the same immutable-field axis as the
464                    // storage-side check.
465                    let reason = if existing.session.source_agent != substream.session.source_agent
466                    {
467                        IngestError::ImmutableField {
468                            field: "source_agent",
469                            session_id: substream.session.id.clone(),
470                            stored: existing.session.source_agent.clone(),
471                            attempted: substream.session.source_agent.clone(),
472                        }
473                    } else {
474                        IngestError::ImmutableField {
475                            field: "project",
476                            session_id: substream.session.id.clone(),
477                            stored: (*existing.session.project).clone(),
478                            attempted: (*substream.session.project).clone(),
479                        }
480                    };
481                    let field = match &reason {
482                        IngestError::ImmutableField { field, .. } => Some(*field),
483                    };
484                    let reason_key = match field {
485                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
486                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
487                        _ => DROP_REASON_UNCATEGORIZED,
488                    };
489                    outcomes.extend(error_outcomes_for_substream(
490                        substream.session_index,
491                        &substream.session,
492                        &substream.messages,
493                        reason.to_string(),
494                        field,
495                        reason_key,
496                    ));
497                    continue;
498                }
499                // Same session, same metadata: merge messages. Dedup message
500                // ids defensively (within one batch, the validator's seen
501                // sets are per-substream so cross-substream dups can happen
502                // legally if both files re-emit the same row).
503                let existing = &mut merged[existing_idx];
504                let mut seen: std::collections::HashSet<String> = existing
505                    .messages
506                    .iter()
507                    .map(|m| m.message.id().to_owned())
508                    .collect();
509                for msg in substream.messages {
510                    if seen.insert(msg.message.id().to_owned()) {
511                        existing.messages.push(msg);
512                    }
513                }
514                continue;
515            }
516            by_session_id.insert(substream.session.id.clone(), merged.len());
517            merged.push(substream);
518        }
519
520        // Pre-existence sweep: one scan per table keyed on the batch's
521        // session_ids, capped at the substream count. Replaces the prior
522        // N-sequential `find_session` calls and gives us honest per-row
523        // Inserted/Matched attribution downstream (spec.md#adapter-integrity-additive-sync).
524        let session_id_values: Vec<ScalarValue> = merged
525            .iter()
526            .map(|substream| ScalarValue::String(substream.session.id.clone()))
527            .collect();
528        let existing_sessions: std::collections::HashMap<String, Session> =
529            if session_id_values.is_empty() {
530                std::collections::HashMap::new()
531            } else {
532                let batch = self
533                    .handle
534                    .scan_batch(
535                        Table::Sessions,
536                        Some(&Predicate::In("id", session_id_values.clone())),
537                        &[],
538                    )
539                    .await?;
540                let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
541                for row in 0..batch.num_rows() {
542                    let session = session_from_batch(&batch, row)?;
543                    map.insert(session.id.clone(), session);
544                }
545                map
546            };
547        let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
548            HashSet::new()
549        } else {
550            let batch = self
551                .handle
552                .scan_batch(
553                    Table::Messages,
554                    Some(&Predicate::In("session_id", session_id_values.clone())),
555                    &["session_id", "id"],
556                )
557                .await?;
558            let mut set = HashSet::with_capacity(batch.num_rows());
559            for row in 0..batch.num_rows() {
560                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
561                let mid = string(&batch, "id", row)?.context("message id is null")?;
562                set.insert((sid, mid));
563            }
564            set
565        };
566        let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
567            HashSet::new()
568        } else {
569            let batch = self
570                .handle
571                .scan_batch(
572                    Table::Parts,
573                    Some(&Predicate::In("session_id", session_id_values)),
574                    &["session_id", "message_id", "id"],
575                )
576                .await?;
577            let mut set = HashSet::with_capacity(batch.num_rows());
578            for row in 0..batch.num_rows() {
579                let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
580                let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
581                let pid = string(&batch, "id", row)?.context("part id is null")?;
582                set.insert((sid, mid, pid));
583            }
584            set
585        };
586
587        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
588        for substream in merged {
589            if let Some(existing) = existing_sessions.get(&substream.session.id)
590                && let Err(failure) = ensure_immutable_match(existing, &substream.session)
591            {
592                let field = match &failure {
593                    IngestError::ImmutableField { field, .. } => Some(*field),
594                };
595                let reason_key = match field {
596                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
597                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
598                    _ => DROP_REASON_UNCATEGORIZED,
599                };
600                outcomes.extend(error_outcomes_for_substream(
601                    substream.session_index,
602                    &substream.session,
603                    &substream.messages,
604                    failure.to_string(),
605                    field,
606                    reason_key,
607                ));
608                continue;
609            }
610            writeable.push(substream);
611        }
612
613        if writeable.is_empty() {
614            outcomes.sort_by_key(|outcome| outcome.index);
615            return Ok((outcomes, counts));
616        }
617
618        let sessions_owned: Vec<Session> = writeable
619            .iter()
620            .map(|substream| substream.session.clone())
621            .collect();
622        let message_rows: Vec<MessageBatchRow<'_>> = writeable
623            .iter()
624            .flat_map(|substream| {
625                substream.messages.iter().map(|buffered| MessageBatchRow {
626                    message: &buffered.message,
627                    source_agent: &substream.session.source_agent,
628                    project: &substream.session.project,
629                    search_text: buffered.search_text.as_deref(),
630                })
631            })
632            .collect();
633        let part_rows: Vec<Part> = writeable
634            .iter()
635            .flat_map(|substream| {
636                substream.messages.iter().flat_map(|buffered| {
637                    buffered
638                        .parts
639                        .iter()
640                        .map(|buffered_part| buffered_part.part.clone())
641                })
642            })
643            .collect();
644
645        let session_batches = sessions_batches(&sessions_owned)?;
646        let message_batches = messages_batches(&message_rows)?;
647        let part_batches = parts_batches(&part_rows)?;
648
649        // Merge_insert returns a batch-level inserted count which we cross-
650        // check against our pre-existence sets, but for per-row truth we
651        // attribute through the sets themselves (next loop). Under
652        // single-writer the two agree exactly; under a hostile concurrent
653        // writer the sets are authoritative for THIS request's wire shape -
654        // matched-no-op (spec.md#adapter-integrity-additive-sync) makes the
655        // distinction informational, not behavioral.
656        let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
657            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
658            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
659            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
660        )?;
661
662        for substream in &writeable {
663            outcomes.extend(success_outcomes_for_substream(
664                substream.session_index,
665                &substream.session,
666                &substream.messages,
667                &existing_sessions,
668                &existing_message_pks,
669                &existing_part_pks,
670                &mut counts,
671            ));
672        }
673
674        outcomes.sort_by_key(|outcome| outcome.index);
675        Ok((outcomes, counts))
676    }
677
678    pub async fn upsert_messages(
679        &self,
680        session: &Session,
681        messages: &[MessageWrite<'_>],
682    ) -> Result<Vec<UpsertStatus>> {
683        if messages.is_empty() {
684            return Ok(Vec::new());
685        }
686
687        let rows = messages
688            .iter()
689            .map(|write| MessageBatchRow {
690                message: write.message,
691                source_agent: &session.source_agent,
692                project: &session.project,
693                search_text: write.search_text,
694            })
695            .collect::<Vec<_>>();
696        let batches = messages_batches(&rows)?;
697        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
698        Ok(statuses_from_inserted(messages.len(), inserted))
699    }
700
701    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
702        if parts.is_empty() {
703            return Ok(Vec::new());
704        }
705        let batches = parts_batches(parts)?;
706        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
707        Ok(statuses_from_inserted(parts.len(), inserted))
708    }
709
710    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
711        let Some(session) = self.find_session(session_id).await? else {
712            return Ok(None);
713        };
714        let messages = self.messages_for_session(session_id).await?;
715        Ok(Some(SessionWithMessages { session, messages }))
716    }
717
718    /// Every session id currently in the store, unsorted.
719    pub async fn session_ids(&self) -> Result<Vec<String>> {
720        let batch = self
721            .handle
722            .scan_batch(Table::Sessions, None, &["id"])
723            .await?;
724        let mut ids = Vec::with_capacity(batch.num_rows());
725        for row in 0..batch.num_rows() {
726            if let Some(id) = string(&batch, "id", row)? {
727                ids.push(id);
728            }
729        }
730        Ok(ids)
731    }
732
733    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
734        let batch = self
735            .handle
736            .scan_batch(
737                Table::Sessions,
738                Some(&Predicate::Eq(
739                    "parent_session_id",
740                    parent_session_id.into(),
741                )),
742                &[
743                    "id",
744                    "parent_session_id",
745                    "parent_message_id",
746                    "source_agent",
747                    "created_at",
748                    "project",
749                    "options",
750                ],
751            )
752            .await?;
753        let mut sessions = Vec::with_capacity(batch.num_rows());
754        for row in 0..batch.num_rows() {
755            sessions.push(session_from_batch(&batch, row)?);
756        }
757        sessions.sort_by(|left, right| left.id.cmp(&right.id));
758        Ok(sessions)
759    }
760
761    /// `session_id -> wall-clock time of the Lance manifest version that
762    /// last wrote the row` for the per-session staleness skip
763    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
764    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
765    /// and joins it against `Dataset::versions()` for commit timestamps.
766    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
767        use lance::deps::arrow_array::UInt64Array;
768
769        let dataset = self.handle.dataset(Table::Sessions).await?;
770        let version_list = dataset.versions().await?;
771        let versions: HashMap<u64, DateTime<Utc>> = version_list
772            .iter()
773            .map(|v| (v.version, v.timestamp))
774            .collect();
775        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
776        // pruned versions from the manifest list, leaving rows whose
777        // `_row_last_updated_at_version` points at a version that no longer
778        // resolves. Those rows are still real and were ingested at some time
779        // <= the oldest still-visible version's commit timestamp - so falling
780        // back to that bound preserves a sound `mtime <= ingested` upper edge
781        // and keeps the staleness skip working after cleanup.
782        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
783
784        let scanner = self
785            .handle
786            .scan(
787                Table::Sessions,
788                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
789            )
790            .await?;
791        let mut stream = scanner.try_into_stream().await?;
792        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
793        while let Some(batch) = stream.next().await {
794            let batch = batch?;
795            let version_array = batch
796                .column_by_name("_row_last_updated_at_version")
797                .context("missing _row_last_updated_at_version column")?
798                .as_any()
799                .downcast_ref::<UInt64Array>()
800                .context("_row_last_updated_at_version is not UInt64")?;
801            for row in 0..batch.num_rows() {
802                let Some(id) = string(&batch, "id", row)? else {
803                    continue;
804                };
805                if version_array.is_null(row) {
806                    continue;
807                }
808                let version = version_array.value(row);
809                let ts = versions.get(&version).copied().or(oldest_visible_ts);
810                if let Some(ts) = ts {
811                    out.insert(id, ts);
812                }
813            }
814        }
815        Ok(out)
816    }
817
818    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
819    /// Conversational filters to `search_text IS NOT NULL`; Complete and
820    /// Verbatim scan every message. Every mode attaches compact part summaries;
821    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
822    /// lower bound (a message id); the page is bounded by `limit` and a byte
823    /// budget and never cuts mid-message.
824    pub async fn session_view(
825        &self,
826        session_id: &str,
827        params: SessionViewParams<'_>,
828    ) -> Result<GetLookup<SessionPage>> {
829        let Some(session) = self.find_session(session_id).await? else {
830            return Ok(GetLookup::NotFound);
831        };
832
833        let mut rows = match params.mode {
834            ResponseMode::Conversational => self
835                .scan_conversational_messages(session_id)
836                .await?
837                .into_iter()
838                .map(|row| ScanRow {
839                    id: row.message_id,
840                    role: row.role,
841                    timestamp: row.timestamp,
842                    text: Some(row.text.into_inner()),
843                    content: None,
844                })
845                .collect(),
846            ResponseMode::Complete | ResponseMode::Verbatim => {
847                self.scan_all_messages(session_id).await?
848            }
849        };
850        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
851
852        let start_at = match params.after_id {
853            // Append-only stream: a real anchor never vanishes, so an unknown
854            // `after_id` is a stale/mistyped client cursor, not "start over".
855            Some(after) => match rows.iter().position(|row| row.id == after) {
856                Some(idx) => idx + 1,
857                None => return Ok(GetLookup::UnknownAfterId),
858            },
859            None => 0,
860        };
861        let remaining = rows.get(start_at..).unwrap_or(&[]);
862        let (emitted, messages_remaining) = match params.session_from {
863            SessionFrom::Start => {
864                let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
865                    row.text.as_deref().map_or(0, str::len)
866                });
867                (&remaining[..n], remaining.len() - n)
868            }
869            // Tail: the newest messages that fit `limit` and the byte budget,
870            // dropping oldest first; the newest is always kept and the page
871            // stays chronological so the agent reads the flow forward.
872            SessionFrom::End => {
873                let mut bytes = 0usize;
874                let mut start = remaining.len();
875                for row in remaining.iter().rev() {
876                    if remaining.len() - start >= params.limit {
877                        break;
878                    }
879                    let size = row.text.as_deref().map_or(0, str::len);
880                    if start < remaining.len() && bytes + size > params.budget_bytes {
881                        break;
882                    }
883                    bytes += size;
884                    start -= 1;
885                }
886                (&remaining[start..], start)
887            }
888        };
889        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
890
891        // Conversational/Complete only summarize parts; Verbatim inlines every
892        // part (blobs included).
893        let mut parts_by_message = match params.mode {
894            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
895            ResponseMode::Conversational | ResponseMode::Complete => {
896                self.summary_parts_for_messages(session_id, &ids).await?
897            }
898        };
899        let messages = emitted
900            .iter()
901            .map(|row| RetrievedMessage {
902                id: row.id.clone(),
903                role: row.role,
904                timestamp: row.timestamp,
905                text: row.text.clone(),
906                content: row.content.clone(),
907                parts: parts_by_message
908                    .remove(&(session_id.to_owned(), row.id.clone()))
909                    .unwrap_or_default(),
910            })
911            .collect();
912
913        Ok(GetLookup::Found(SessionPage {
914            session,
915            messages,
916            messages_remaining,
917        }))
918    }
919
920    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
921    /// the target with its full parts (paginated by `after_id` over part
922    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
923    /// `None` when no stored message carries `message_id`. Sibling parts are
924    /// carried for summarizing; the target's parts ride `target_parts`.
925    pub async fn message_view(
926        &self,
927        message_id: &str,
928        params: MessageViewParams<'_>,
929    ) -> Result<GetLookup<MessagePage>> {
930        let Some(session_id) = self.session_id_for_message(message_id).await? else {
931            return Ok(GetLookup::NotFound);
932        };
933        let Some(session) = self.find_session(&session_id).await? else {
934            return Ok(GetLookup::NotFound);
935        };
936        let mut rows = self.scan_all_messages(&session_id).await?;
937        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
938        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
939            return Ok(GetLookup::NotFound);
940        };
941
942        let start = target_pos.saturating_sub(params.context_depth);
943        let end = (target_pos + params.context_depth + 1).min(rows.len());
944        let window = &rows[start..end];
945        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
946        // The target's full parts (blobs included) ride the response; siblings
947        // are only summarized, but they share this one window scan.
948        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
949
950        let all_parts = parts_by_message
951            .remove(&(session_id.clone(), message_id.to_owned()))
952            .unwrap_or_default();
953        let start_part = match params.after_id {
954            // Exclusive over ordinals: parts are ordinal-sorted, so the first
955            // part past the anchor's ordinal is the page start. An anchor absent
956            // from the target's parts is a stale/mistyped client cursor.
957            Some(after) => match all_parts.iter().find(|part| part.id == after) {
958                Some(anchor) => all_parts
959                    .iter()
960                    .position(|part| part.ordinal > anchor.ordinal)
961                    .unwrap_or(all_parts.len()),
962                None => return Ok(GetLookup::UnknownAfterId),
963            },
964            None => 0,
965        };
966        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
967        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
968            serde_json::to_string(part).map_or(0, |json| json.len())
969        });
970        let target_parts = remaining_parts[..part_count].to_vec();
971        let target_parts_remaining = remaining_parts.len() - part_count;
972
973        let target_row = &rows[target_pos];
974        let target = RetrievedMessage {
975            id: target_row.id.clone(),
976            role: target_row.role,
977            timestamp: target_row.timestamp,
978            text: target_row.text.clone(),
979            content: target_row.content.clone(),
980            // Target structure is carried in full by `target_parts`.
981            parts: Vec::new(),
982        };
983        let siblings = window
984            .iter()
985            .enumerate()
986            .filter(|(idx, _)| start + idx != target_pos)
987            .map(|(_, row)| RetrievedMessage {
988                id: row.id.clone(),
989                role: row.role,
990                timestamp: row.timestamp,
991                text: row.text.clone(),
992                content: row.content.clone(),
993                parts: parts_by_message
994                    .get(&(session_id.clone(), row.id.clone()))
995                    .cloned()
996                    .unwrap_or_default(),
997            })
998            .collect();
999
1000        Ok(GetLookup::Found(MessagePage {
1001            session,
1002            target,
1003            target_parts,
1004            target_parts_remaining,
1005            siblings,
1006        }))
1007    }
1008
1009    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1010        let batch = self
1011            .handle
1012            .scan_batch(
1013                Table::Messages,
1014                Some(&Predicate::Eq("session_id", session_id.into())),
1015                &["id", "timestamp", "role", "search_text", "content"],
1016            )
1017            .await?;
1018        let mut rows = Vec::with_capacity(batch.num_rows());
1019        for row in 0..batch.num_rows() {
1020            let id = string(&batch, "id", row)?.context("message id is null")?;
1021            let role =
1022                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1023            let timestamp = datetime(&batch, "timestamp", row)?;
1024            rows.push(ScanRow {
1025                id,
1026                role,
1027                timestamp,
1028                text: string(&batch, "search_text", row)?,
1029                content: string(&batch, "content", row)?,
1030            });
1031        }
1032        Ok(rows)
1033    }
1034
1035    /// Conversational scan over one session: rows ordered by
1036    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
1037    /// read seam (spec.md#search-prefilter-pushdown).
1038    pub async fn scan_conversational_messages(
1039        &self,
1040        session_id: &str,
1041    ) -> Result<Vec<ConversationalRow>> {
1042        let filter = Predicate::And(vec![
1043            Predicate::Eq("session_id", session_id.into()),
1044            Predicate::IsNotNull("search_text"),
1045        ]);
1046        let batch = self
1047            .handle
1048            .scan_batch(
1049                Table::Messages,
1050                Some(&filter),
1051                &["id", "timestamp", "role", "search_text"],
1052            )
1053            .await?;
1054
1055        let mut rows = Vec::with_capacity(batch.num_rows());
1056        for row in 0..batch.num_rows() {
1057            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1058            let role =
1059                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1060            let timestamp = datetime(&batch, "timestamp", row)?;
1061            let text_str = string(&batch, "search_text", row)?.context(
1062                "search_text null after IsNotNull pushdown - storage invariant violated",
1063            )?;
1064            rows.push(ConversationalRow {
1065                session_id: session_id.to_owned(),
1066                message_id,
1067                role,
1068                timestamp,
1069                text: SearchText(text_str),
1070            });
1071        }
1072        rows.sort_by(|a, b| {
1073            a.timestamp
1074                .cmp(&b.timestamp)
1075                .then_with(|| a.message_id.cmp(&b.message_id))
1076        });
1077        Ok(rows)
1078    }
1079
1080    /// Locate the session id for a stored message. Cheap when only the routing
1081    /// hint is needed - callers that need the messages use `scan_all_messages`.
1082    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1083        let batch = self
1084            .handle
1085            .scan_batch(
1086                Table::Messages,
1087                Some(&Predicate::Eq("id", message_id.into())),
1088                &["session_id"],
1089            )
1090            .await?;
1091        if batch.num_rows() == 0 {
1092            return Ok(None);
1093        }
1094        string(&batch, "session_id", 0)
1095    }
1096
1097    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1098        self.handle.row_counts().await
1099    }
1100
1101    /// A point-in-time `Arc<Dataset>` for `table`, for registering as a
1102    /// DataFusion `LanceTableProvider` in `pond_sql_query`. Goes through the
1103    /// handle's freshness gate, so each query sees a current snapshot.
1104    pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1105        Ok(Arc::new(self.handle.dataset(table).await?))
1106    }
1107
1108    /// Write a `pond_sql_query` export artifact.
1109    pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1110        self.handle.export_write(name, bytes).await
1111    }
1112
1113    /// Read a `pond_sql_query` export artifact back.
1114    pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1115        self.handle.export_read(name).await
1116    }
1117
1118    /// Local filesystem path of an export artifact on `file://` installs.
1119    pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1120        self.handle.export_local_path(name)
1121    }
1122
1123    /// Compute the per-adapter / per-project rollup that drives
1124    /// `pond status`. One scan over `messages` projecting the three
1125    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1126    /// aggregated in-memory. Bounded by the cross product of adapters and
1127    /// projects, which stays small on real corpora.
1128    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1129        let scanner = self
1130            .handle
1131            .scan(
1132                Table::Messages,
1133                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1134            )
1135            .await?;
1136        let mut stream = scanner.try_into_stream().await?;
1137        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1138        while let Some(batch) = stream.next().await {
1139            let batch = batch?;
1140            for row in 0..batch.num_rows() {
1141                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1142                let project = string(&batch, "project", row)?.unwrap_or_default();
1143                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1144                let is_subagent = source_agent.contains('/');
1145                if is_subagent && !include_subagents {
1146                    continue;
1147                }
1148                let entry = groups.entry((source_agent, project)).or_default();
1149                entry.messages += 1;
1150                entry.session_ids.insert(session_id);
1151            }
1152        }
1153
1154        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1155        let totals = RowTotals {
1156            sessions: totals_sessions as u64,
1157            messages: totals_messages as u64,
1158            parts: totals_parts as u64,
1159        };
1160
1161        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1162        for ((adapter, project), acc) in groups {
1163            by_adapter.entry(adapter).or_default().push(ProjectStats {
1164                project,
1165                sessions: acc.session_ids.len() as u64,
1166                messages: acc.messages,
1167            });
1168        }
1169
1170        let mut adapters = Vec::with_capacity(by_adapter.len());
1171        for (adapter, mut projects) in by_adapter {
1172            projects.sort_by(|a, b| {
1173                b.messages
1174                    .cmp(&a.messages)
1175                    .then_with(|| a.project.cmp(&b.project))
1176            });
1177            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1178            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1179            adapters.push(AdapterStats {
1180                adapter,
1181                sessions,
1182                messages,
1183                projects,
1184            });
1185        }
1186
1187        Ok(CorpusStats {
1188            data_url: self.handle.location().clone(),
1189            totals,
1190            adapters,
1191            include_subagents,
1192        })
1193    }
1194
1195    /// Write a batch of embeddings into `messages`: set `vector` and
1196    /// `embedding_model` on each row by `(session_id, id)`
1197    /// (spec.md#session-embed-from-canonical). The column update goes through the
1198    /// write seam and lands as a new manifest version (`append-only`).
1199    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1200        if rows.is_empty() {
1201            return Ok(());
1202        }
1203        let batch = embedding_update_batch(rows)?;
1204        self.handle
1205            .merge_update(Table::Messages, batch, rows.len())
1206            .await?;
1207        Ok(())
1208    }
1209
1210    /// Stream the backlog of messages needing embedding: rows with `search_text`
1211    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1212    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1213        try_stream! {
1214            let filter = Predicate::And(vec![
1215                Predicate::IsNull("vector"),
1216                Predicate::IsNotNull("search_text"),
1217            ]);
1218            let projection: &[&str] = &["session_id", "id", "search_text"];
1219            let scanner = self
1220                .handle
1221                .scan(
1222                    Table::Messages,
1223                    ScanOpts::with_predicate_and_projection(&filter, projection),
1224                )
1225                .await?;
1226            let mut batches = scanner
1227                .try_into_stream()
1228                .await
1229                .context("failed to open messages stream")?;
1230            while let Some(batch) = batches.next().await {
1231                let batch = batch?;
1232                for row in 0..batch.num_rows() {
1233                    yield PendingMessage {
1234                        session_id: string(&batch, "session_id", row)?
1235                            .context("session_id is null")?,
1236                        id: string(&batch, "id", row)?.context("message id is null")?,
1237                        search_text: string(&batch, "search_text", row)?
1238                            .context("search_text is null")?,
1239                    };
1240                }
1241            }
1242        }
1243    }
1244
1245    /// Stream messages that are either never embedded or stale under the
1246    /// current model. `pond embed --force` feeds this to the same unconditional
1247    /// merge_update as the normal backlog; the filter makes that semantically
1248    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1249    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1250        try_stream! {
1251            let filter = Predicate::And(vec![
1252                Predicate::IsNotNull("search_text"),
1253                Predicate::Or(vec![
1254                    Predicate::IsNull("vector"),
1255                    Predicate::Ne("embedding_model", embed::model_id().into()),
1256                ]),
1257            ]);
1258            let projection: &[&str] = &["session_id", "id", "search_text"];
1259            let scanner = self
1260                .handle
1261                .scan(
1262                    Table::Messages,
1263                    ScanOpts::with_predicate_and_projection(&filter, projection),
1264                )
1265                .await?;
1266            let mut batches = scanner
1267                .try_into_stream()
1268                .await
1269                .context("failed to open pending-or-stale messages stream")?;
1270            while let Some(batch) = batches.next().await {
1271                let batch = batch?;
1272                for row in 0..batch.num_rows() {
1273                    yield PendingMessage {
1274                        session_id: string(&batch, "session_id", row)?
1275                            .context("session_id is null")?,
1276                        id: string(&batch, "id", row)?.context("message id is null")?,
1277                        search_text: string(&batch, "search_text", row)?
1278                            .context("search_text is null")?,
1279                    };
1280                }
1281            }
1282        }
1283    }
1284
1285    /// BM25 full-text retriever over `messages.search_text`.
1286    pub async fn fts_search(
1287        &self,
1288        query: &str,
1289        limit: usize,
1290        filter: &Predicate,
1291    ) -> Result<Vec<(MessageKey, f32)>> {
1292        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1293        scanner.full_text_search(
1294            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1295        )?;
1296        // Lance ships an autoprojection that silently appends `_score` to FTS
1297        // output when the projection omits it. That behavior is going away;
1298        // we opt into the future explicit-projection contract here so the
1299        // scanner stops emitting a per-call deprecation warning, and we list
1300        // `_score` ourselves since the loop below reads it.
1301        scanner.disable_scoring_autoprojection();
1302        scanner.project(&["session_id", "id", "_score"])?;
1303        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1304        let batch = scanner.try_into_batch().await?;
1305        let mut hits = Vec::with_capacity(batch.num_rows());
1306        for row in 0..batch.num_rows() {
1307            let key = MessageKey {
1308                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1309                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1310            };
1311            hits.push((key, float32(&batch, "_score", row)?));
1312        }
1313        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1314        // order, which varies between runs and across calls with different pool
1315        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1316        // different orderings at the same tied score). Without an explicit
1317        // tiebreak the downstream RRF dedup-rank for a tied target session can
1318        // flip session-to-session, making fusion outcomes nondeterministic.
1319        // Sort by `score desc`, then `(session_id, message_id)` asc.
1320        hits.sort_by(|left, right| {
1321            right
1322                .1
1323                .partial_cmp(&left.1)
1324                .unwrap_or(std::cmp::Ordering::Equal)
1325                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1326                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1327        });
1328        Ok(hits)
1329    }
1330
1331    /// Whether any `messages` row carries a vector (spec.md#search) - the
1332    /// signal that flips search from FTS-only to hybrid. The single-active-
1333    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1334    /// vector belongs to the current model.
1335    pub async fn has_embeddings(&self) -> Result<bool> {
1336        let scope = Predicate::IsNotNull("vector");
1337        let mut scanner = self
1338            .handle
1339            .scan(
1340                Table::Messages,
1341                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1342            )
1343            .await?;
1344        scanner.limit(Some(1), None)?;
1345        let batch = scanner.try_into_batch().await?;
1346        Ok(batch.num_rows() > 0)
1347    }
1348
1349    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1350    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1351    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1352    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1353    /// skip them anyway). The single-active-model invariant lets pond drop
1354    /// the per-row model filter: every non-null vector belongs to the current
1355    /// model.
1356    pub async fn vector_search(
1357        &self,
1358        query: &[f32],
1359        limit: usize,
1360        filter: &Predicate,
1361        search: Option<&config::SearchConfig>,
1362    ) -> Result<Vec<(MessageKey, f32)>> {
1363        let scope = embedded_scope(filter);
1364        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1365        let key = Float32Array::from(query.to_vec());
1366        scanner.nearest("vector", &key, limit)?;
1367        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1368            scanner.nprobes(nprobes);
1369        }
1370        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1371            scanner.refine(refine_factor);
1372        }
1373        // Mirror the explicit-projection contract from `fts_search`: opt out
1374        // of `_distance` autoprojection and list it ourselves since the loop
1375        // below reads it.
1376        scanner.disable_scoring_autoprojection();
1377        scanner.project(&["session_id", "id", "_distance"])?;
1378        let batch = scanner.try_into_batch().await?;
1379        let mut hits = Vec::with_capacity(batch.num_rows());
1380        for row in 0..batch.num_rows() {
1381            let key = MessageKey {
1382                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1383                message_id: string(&batch, "id", row)?.context("message id is null")?,
1384            };
1385            hits.push((key, float32(&batch, "_distance", row)?));
1386        }
1387        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1388        // emit hits with effectively identical `_distance` in fragment-dependent
1389        // order, which makes RRF dedup-ranks nondeterministic for tied
1390        // neighbors. Sort by distance asc (smaller = more similar), then by
1391        // `(session_id, message_id)` asc.
1392        hits.sort_by(|left, right| {
1393            left.1
1394                .partial_cmp(&right.1)
1395                .unwrap_or(std::cmp::Ordering::Equal)
1396                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1397                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1398        });
1399        Ok(hits)
1400    }
1401
1402    /// The DataFusion plan string for a filtered vector scan - the
1403    /// `search-prefilter-pushdown` regression guard reads it.
1404    pub async fn explain_vector_plan(
1405        &self,
1406        query: &[f32],
1407        limit: usize,
1408        filter: &Predicate,
1409        search: Option<&config::SearchConfig>,
1410    ) -> Result<String> {
1411        let scope = embedded_scope(filter);
1412        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1413        let key = Float32Array::from(query.to_vec());
1414        scanner.nearest("vector", &key, limit)?;
1415        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1416            scanner.nprobes(nprobes);
1417        }
1418        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1419            scanner.refine(refine_factor);
1420        }
1421        scanner
1422            .explain_plan(true)
1423            .await
1424            .context("explain_plan failed")
1425    }
1426
1427    pub async fn explain_fts_plan(
1428        &self,
1429        query: &str,
1430        limit: usize,
1431        filter: &Predicate,
1432    ) -> Result<String> {
1433        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1434        scanner.full_text_search(
1435            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1436        )?;
1437        scanner.project(&["session_id", "id"])?;
1438        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1439        scanner
1440            .explain_plan(true)
1441            .await
1442            .context("explain_plan failed")
1443    }
1444
1445    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1446    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1447        if keys.is_empty() {
1448            return Ok(Vec::new());
1449        }
1450        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1451        let session_ids = keys
1452            .iter()
1453            .map(|key| key.session_id.clone())
1454            .collect::<Vec<_>>();
1455        let message_ids = keys
1456            .iter()
1457            .map(|key| key.message_id.clone())
1458            .collect::<Vec<_>>();
1459        let predicate = Predicate::And(vec![
1460            in_predicate("session_id", &session_ids),
1461            in_predicate("id", &message_ids),
1462        ]);
1463        let batch = self
1464            .handle
1465            .scan_batch(
1466                Table::Messages,
1467                Some(&predicate),
1468                &[
1469                    "id",
1470                    "session_id",
1471                    "role",
1472                    "project",
1473                    "source_agent",
1474                    "timestamp",
1475                    "search_text",
1476                ],
1477            )
1478            .await?;
1479        let mut metas = Vec::with_capacity(batch.num_rows());
1480        for row in 0..batch.num_rows() {
1481            let message_id = string(&batch, "id", row)?.context("id is null")?;
1482            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1483            if !wanted.contains(&MessageKey {
1484                session_id: session_id.clone(),
1485                message_id: message_id.clone(),
1486            }) {
1487                continue;
1488            }
1489            metas.push(MessageMeta {
1490                message_id,
1491                session_id,
1492                role: string(&batch, "role", row)?.context("role is null")?,
1493                project: string(&batch, "project", row)?.context("project is null")?,
1494                source_agent: string(&batch, "source_agent", row)?
1495                    .context("source_agent is null")?,
1496                timestamp: datetime(&batch, "timestamp", row)?,
1497                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1498            });
1499        }
1500        Ok(metas)
1501    }
1502
1503    /// Total message count per session, for search session summaries.
1504    pub async fn session_message_counts(
1505        &self,
1506        session_ids: &[String],
1507    ) -> Result<BTreeMap<String, usize>> {
1508        if session_ids.is_empty() {
1509            return Ok(BTreeMap::new());
1510        }
1511        let dataset = self.handle.dataset(Table::Messages).await?;
1512        let mut tasks = tokio::task::JoinSet::new();
1513        for session_id in session_ids {
1514            let dataset = dataset.clone();
1515            let session_id = session_id.clone();
1516            tasks.spawn(async move {
1517                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1518                let count = dataset.count_rows(Some(filter)).await?;
1519                anyhow::Ok((session_id, count))
1520            });
1521        }
1522        let mut counts = BTreeMap::new();
1523        while let Some(joined) = tasks.join_next().await {
1524            let (session_id, count) = joined.context("session count task panicked")??;
1525            counts.insert(session_id, count);
1526        }
1527        Ok(counts)
1528    }
1529
1530    /// Rows appended to `messages` since the FTS index was last optimized.
1531    /// A missing index reports the whole table; the query is manifest-only.
1532    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1533        self.handle
1534            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1535            .await
1536    }
1537
1538    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1539    /// was last optimized. Below
1540    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1541    /// must read [`embedding_progress`](Self::embedding_progress) too and
1542    /// distinguish "index not built yet" from "index trails data".
1543    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1544        self.handle
1545            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1546            .await
1547    }
1548
1549    /// Embedding coverage: how many `messages` rows carry a vector and how
1550    /// many are still eligible. Drives the `pond status` embeddings line and
1551    /// the `pond embed` progress bar's known total. `embedded` reads the
1552    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1553    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1554    /// `embedding_model` column.
1555    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1556        let dataset = self.handle.dataset(Table::Messages).await?;
1557        let embedded = dataset
1558            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1559            .await?;
1560        let total = dataset
1561            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1562            .await?;
1563        Ok(EmbeddingProgress {
1564            embedded,
1565            total,
1566            model: embed::model_id(),
1567        })
1568    }
1569
1570    /// Count rows whose `embedding_model` is not the currently configured
1571    /// model AND whose `vector` is still populated - the signal `pond embed`
1572    /// uses to detect a model swap and require `--force`.
1573    pub async fn stale_embedding_count(&self) -> Result<usize> {
1574        let dataset = self.handle.dataset(Table::Messages).await?;
1575        dataset
1576            .count_rows(Some(
1577                Predicate::And(vec![
1578                    Predicate::IsNotNull("vector"),
1579                    Predicate::Ne("embedding_model", embed::model_id().into()),
1580                ])
1581                .to_lance(),
1582            ))
1583            .await
1584            .map_err(Into::into)
1585    }
1586
1587    /// Run the per-table maintenance cycle (compact + indices) across every
1588    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1589    /// and compaction commit independently, so a hot writer that starves
1590    /// compaction on one table does not abort the index work the operator
1591    /// asked for on other tables (or even on the same table).
1592    pub async fn optimize_indices(
1593        &self,
1594        progress: Option<OptimizeProgressFn>,
1595        maintenance: &MaintenancePolicy,
1596    ) -> Result<OptimizeOutcome> {
1597        let intents = pond_index_intents();
1598        let mut tables = Vec::with_capacity(3);
1599        for (table, intents) in intents.all() {
1600            let outcome = self
1601                .handle
1602                .optimize_table(table, intents, progress.as_ref(), maintenance)
1603                .await;
1604            tables.push(outcome);
1605        }
1606        Ok(OptimizeOutcome { tables })
1607    }
1608
1609    /// Fold trailing fragments into existing indices across every table,
1610    /// without running compaction. Used by `pond embed`'s tail so newly
1611    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1612    /// without paying the compaction retry budget while embed itself may
1613    /// still be writing in a sibling process.
1614    pub async fn build_indices_only(
1615        &self,
1616        progress: Option<OptimizeProgressFn>,
1617    ) -> Result<OptimizeOutcome> {
1618        let policy = pond_index_intents();
1619        let mut tables = Vec::with_capacity(3);
1620        for (table, intents) in policy.all() {
1621            let indices = self
1622                .handle
1623                .optimize_table_indices_only(table, intents, progress.as_ref())
1624                .await;
1625            tables.push(TableOptimizeOutcome {
1626                table,
1627                indices,
1628                compaction: PhaseOutcome::NotAttempted,
1629            });
1630        }
1631        Ok(OptimizeOutcome { tables })
1632    }
1633
1634    #[cfg(test)]
1635    async fn optimize_indices_with_vector_threshold(
1636        &self,
1637        vector_threshold: usize,
1638    ) -> Result<OptimizeOutcome> {
1639        let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1640        let policy = MaintenancePolicy::always_compact();
1641        let mut tables = Vec::with_capacity(3);
1642        for (table, intents) in intents.all() {
1643            let outcome = self
1644                .handle
1645                .optimize_table(table, intents, None, &policy)
1646                .await;
1647            tables.push(outcome);
1648        }
1649        Ok(OptimizeOutcome { tables })
1650    }
1651
1652    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1653        let policy = pond_index_intents();
1654        let mut matched = false;
1655        for (table, intents) in policy.all() {
1656            for intent in intents {
1657                if intent_name.is_none_or(|name| name == intent.name) {
1658                    matched = true;
1659                    self.handle.rebuild_index(table, intent).await?;
1660                }
1661            }
1662        }
1663        if let Some(name) = intent_name
1664            && !matched
1665        {
1666            anyhow::bail!("unknown index intent {name:?}");
1667        }
1668        Ok(())
1669    }
1670
1671    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1672        let policy = pond_index_intents();
1673        let mut statuses = Vec::new();
1674        for (table, intents) in policy.all() {
1675            statuses.extend(self.handle.index_status(table, intents).await?);
1676        }
1677        Ok(statuses)
1678    }
1679
1680    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1681    /// --force` before re-bootstrapping under a different model. Silent
1682    /// when the index does not exist.
1683    pub async fn drop_vector_index(&self) -> Result<()> {
1684        match self
1685            .handle
1686            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1687            .await
1688        {
1689            Ok(()) => Ok(()),
1690            Err(error) => {
1691                let msg = error.to_string();
1692                if msg.contains("not found") || msg.contains("does not exist") {
1693                    Ok(())
1694                } else {
1695                    Err(error)
1696                }
1697            }
1698        }
1699    }
1700
1701    /// On-disk byte totals per dataset, sized through Lance's object store
1702    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1703    pub async fn table_sizes(&self) -> Result<TableSizes> {
1704        self.handle.table_sizes().await
1705    }
1706
1707    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1708        let batch = self
1709            .handle
1710            .scan_batch(
1711                Table::Sessions,
1712                Some(&Predicate::Eq("id", session_id.into())),
1713                &[],
1714            )
1715            .await?;
1716        if batch.num_rows() == 0 {
1717            Ok(None)
1718        } else {
1719            Ok(Some(session_from_batch(&batch, 0)?))
1720        }
1721    }
1722
1723    /// Fetch the stored vector for one message, for `similar_to`-style
1724    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1725    /// embedded, or when no message has that id. Vectors are stored Float16
1726    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1727    /// widen here. Cheap rare op - one predicate scan, no index.
1728    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1729        let batch = self
1730            .handle
1731            .scan_batch(
1732                Table::Messages,
1733                Some(&Predicate::Eq("id", message_id.into())),
1734                &["vector"],
1735            )
1736            .await?;
1737        if batch.num_rows() == 0 {
1738            return Ok(None);
1739        }
1740        let column = batch
1741            .column(0)
1742            .as_any()
1743            .downcast_ref::<FixedSizeListArray>();
1744        let Some(list) = column else {
1745            return Ok(None);
1746        };
1747        if list.is_null(0) {
1748            return Ok(None);
1749        }
1750        let values = list.value(0);
1751        let halves = values
1752            .as_any()
1753            .downcast_ref::<Float16Array>()
1754            .context("messages.vector inner array is not Float16")?;
1755        let widened = (0..halves.len())
1756            .map(|i| halves.value(i).to_f32())
1757            .collect();
1758        Ok(Some(widened))
1759    }
1760
1761    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1762        let batch = self
1763            .handle
1764            .scan_batch(
1765                Table::Messages,
1766                Some(&Predicate::Eq("session_id", session_id.into())),
1767                &[
1768                    "session_id",
1769                    "id",
1770                    "timestamp",
1771                    "role",
1772                    "content",
1773                    "options",
1774                ],
1775            )
1776            .await?;
1777        let mut messages = Vec::with_capacity(batch.num_rows());
1778        for row in 0..batch.num_rows() {
1779            messages.push(message_from_batch(&batch, row)?);
1780        }
1781        messages.sort_by(|left, right| {
1782            left.timestamp()
1783                .cmp(&right.timestamp())
1784                .then_with(|| left.id().cmp(right.id()))
1785        });
1786
1787        let message_ids = messages
1788            .iter()
1789            .map(|message| message.id().to_owned())
1790            .collect::<Vec<_>>();
1791        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1792
1793        Ok(messages
1794            .into_iter()
1795            .map(|message| {
1796                let key = (message.session_id().to_owned(), message.id().to_owned());
1797                let parts = parts_by_message.remove(&key).unwrap_or_default();
1798                MessageWithParts { message, parts }
1799            })
1800            .collect())
1801    }
1802
1803    /// Every part of these messages, full fidelity (file blobs included). The
1804    /// canonical read primitive - restore/export, verbatim mode, and the
1805    /// message-mode target all need the complete set.
1806    pub async fn parts_for_messages(
1807        &self,
1808        session_id: &str,
1809        message_ids: &[String],
1810    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1811        self.scan_parts(session_id, message_ids, None).await
1812    }
1813
1814    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1815    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1816    /// nothing. For the summary-only reads (conversational/complete session
1817    /// views, search hits) - it never feeds restore/export.
1818    pub async fn summary_parts_for_messages(
1819        &self,
1820        session_id: &str,
1821        message_ids: &[String],
1822    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1823        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1824            .await
1825    }
1826
1827    async fn scan_parts(
1828        &self,
1829        session_id: &str,
1830        message_ids: &[String],
1831        part_types: Option<&[&str]>,
1832    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1833        if message_ids.is_empty() {
1834            return Ok(BTreeMap::new());
1835        }
1836        let mut clauses = vec![
1837            Predicate::Eq("session_id", session_id.into()),
1838            in_predicate("message_id", message_ids),
1839        ];
1840        if let Some(types) = part_types {
1841            clauses.push(Predicate::In(
1842                "type",
1843                types.iter().map(|&t| t.into()).collect(),
1844            ));
1845        }
1846        let predicate = Predicate::And(clauses);
1847        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1848        let mut scanner = self
1849            .handle
1850            .scan(
1851                Table::Parts,
1852                ScanOpts::with_predicate_and_projection(
1853                    &predicate,
1854                    &[
1855                        "session_id",
1856                        "message_id",
1857                        "id",
1858                        "ordinal",
1859                        "type",
1860                        "provenance",
1861                        "variant_data",
1862                        "options",
1863                    ],
1864                ),
1865            )
1866            .await?;
1867        scanner.with_row_address();
1868        let batch = scanner.try_into_batch().await.context("scan failed")?;
1869        let row_addresses = uint64(&batch, "_rowaddr")?;
1870        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1871        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1872        for row in 0..batch.num_rows() {
1873            if string(&batch, "type", row)?.as_deref() == Some("file") {
1874                let variant_data =
1875                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1876                file_rows.push((row, row_addresses.value(row), variant_data));
1877            }
1878        }
1879        if !file_rows.is_empty() {
1880            let addresses = file_rows
1881                .iter()
1882                .map(|(_, address, _)| *address)
1883                .collect::<Vec<_>>();
1884            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1885            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1886                // Legacy blob (lance-encoding:blob): payload is bytes; the
1887                // url variant stored its URL as UTF-8 bytes, recovered via
1888                // `file_data_from_blob`'s `data_kind = "url"` branch.
1889                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1890                file_payloads.insert(row, payload);
1891            }
1892        }
1893        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1894        for row in 0..batch.num_rows() {
1895            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1896            parts_by_message
1897                .entry((part.session_id.clone(), part.message_id.clone()))
1898                .or_default()
1899                .push(part);
1900        }
1901        for parts in parts_by_message.values_mut() {
1902            parts.sort_by_key(|part| part.ordinal);
1903        }
1904        Ok(parts_by_message)
1905    }
1906}
1907
1908#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1909#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1910pub enum IngestEvent {
1911    Session(Session),
1912    Message(Message),
1913    Part(Part),
1914}
1915
1916/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1917/// The wire layer (`pond_ingest`) instead returns per-row results; the
1918/// aggregate is derived from those at the wire boundary.
1919///
1920/// Fields are bucketed by population so the summary never conflates "100
1921/// validator-rejected rows in 1 bad session" with "100 separate failures."
1922/// The shape is set by spec.md#adapter-integrity-event-ordering.
1923#[derive(Debug, Clone, PartialEq, Eq, Default)]
1924pub struct IngestSummary {
1925    /// Rows actually written to Lance, summed across all three tables.
1926    /// Use the per-table fields below for user-facing counts; this stays
1927    /// for `accepted()` and existing wire callers.
1928    pub inserted: usize,
1929    /// Rows that already existed (merge_insert no-op match).
1930    pub matched: usize,
1931    /// Session rows inserted this pass.
1932    pub sessions_inserted: usize,
1933    /// Message rows inserted this pass (total - includes tool calls,
1934    /// tool results, and other non-searchable messages).
1935    pub messages_inserted_total: usize,
1936    /// Subset of `messages_inserted_total` whose `search_text` is non-null
1937    /// (eligible for FTS + semantic indexing). The user-facing "messages"
1938    /// count in `pond sync` / `pond status` reads this field.
1939    pub messages_inserted_searchable: usize,
1940    /// Part rows inserted this pass.
1941    pub parts_inserted: usize,
1942    /// Session rows already-present (merge_insert matched).
1943    pub sessions_matched: usize,
1944    /// Message rows already-present (merge_insert matched), total.
1945    pub messages_matched_total: usize,
1946    /// Subset of `messages_matched_total` with `search_text`.
1947    pub messages_matched_searchable: usize,
1948    /// Part rows already-present.
1949    pub parts_matched: usize,
1950    /// Events the validator dropped under per-event-drop policy (ordering
1951    /// violation, orphan part, mismatched parent, adapter parse failure,
1952    /// duplicate-id collision, ...). Counted by event, not by session: a
1953    /// session with one bad part stays in this bucket as 1, not as "the
1954    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1955    /// own emissions upstream when source replay is expected; the
1956    /// validator's in-batch HashSet is a safety net, not a feature
1957    /// adapters may rely on. If this bucket grows on a clean adapter,
1958    /// inspect `drop_reasons` for the top contributors.
1959    pub dropped_events: usize,
1960    /// Sessions whose Session-level invariants (immutable `source_agent` /
1961    /// `project` against the stored row) failed at flush time and
1962    /// whose substream got rejected wholesale. Always small relative to
1963    /// `inserted`; if not, there's a real problem to investigate.
1964    pub dropped_sessions: usize,
1965    /// Files the adapter couldn't decode at all (no Session header
1966    /// extractable: empty `.jsonl`, missing required field).
1967    pub skipped_files: usize,
1968    /// Files that produced no importable session and were benignly skipped:
1969    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1970    /// metadata file), or an unextractable header. Never an error or a drop;
1971    /// the underlying cause is logged at `-vv` (debug) verbosity.
1972    pub skipped_empty: usize,
1973    /// Sessions short-circuited via the per-session staleness skip
1974    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1975    /// pond last wrote that session's row, so re-decode was bypassed.
1976    pub skipped_fresh: usize,
1977    /// Storage-layer failures whose retries were exhausted (commit
1978    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1979    /// runs.
1980    pub storage_errors: usize,
1981    /// Oversized values truncated to a bounded sentinel at the seam
1982    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1983    pub truncated_values: usize,
1984    /// Histogram of stable reason keys for the combined `dropped_events +
1985    /// dropped_sessions` populations. Keys are `&'static str` (see the
1986    /// `DROP_REASON_*` constants) so consumers can match by identity.
1987    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1988    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1989    pub drop_reasons: BTreeMap<&'static str, usize>,
1990}
1991
1992/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
1993/// the per-row `RowError::reason_key`. `&'static str` so consumers can
1994/// match by identity rather than prose. Adding a new variant: pick a short
1995/// snake_case identifier, route it from the validator/adapter, and update
1996/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
1997pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1998pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1999pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2000pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2001pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2002pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2003pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2004pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2005pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2006pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2007pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2008
2009/// Honest per-table outcome of one batched flush. Built from `merge_insert`'s
2010/// returned counts together with the pre-existence sets captured by
2011/// `upsert_session_batch`. Folded into a per-sync summary via
2012/// [`IngestSummary::add_batch`]. spec.md#adapter-integrity-additive-sync: matched
2013/// is a no-op write, so the inserted/matched split is informational - we still
2014/// surface it because both `pond sync` and `pond_ingest` clients reconcile
2015/// against "which rows landed this call."
2016#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2017pub struct BatchCounts {
2018    pub sessions_inserted: usize,
2019    pub sessions_matched: usize,
2020    pub messages_inserted_total: usize,
2021    pub messages_inserted_searchable: usize,
2022    pub messages_matched_total: usize,
2023    pub messages_matched_searchable: usize,
2024    pub parts_inserted: usize,
2025    pub parts_matched: usize,
2026}
2027
2028impl IngestSummary {
2029    pub fn accepted(&self) -> usize {
2030        self.inserted + self.matched
2031    }
2032
2033    /// Sole writer of the per-table counters on the CLI batched flush path.
2034    /// The wire single-row path keeps using [`Self::add_outcomes`]; emitting
2035    /// both for the same rows would double-count.
2036    pub fn add_batch(&mut self, counts: &BatchCounts) {
2037        self.sessions_inserted += counts.sessions_inserted;
2038        self.sessions_matched += counts.sessions_matched;
2039        self.messages_inserted_total += counts.messages_inserted_total;
2040        self.messages_inserted_searchable += counts.messages_inserted_searchable;
2041        self.messages_matched_total += counts.messages_matched_total;
2042        self.messages_matched_searchable += counts.messages_matched_searchable;
2043        self.parts_inserted += counts.parts_inserted;
2044        self.parts_matched += counts.parts_matched;
2045        self.inserted +=
2046            counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2047        self.matched +=
2048            counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2049    }
2050
2051    /// Sum every counter from `other` into `self`. Used by the multi-source
2052    /// `pond sync` loop so adding a new field to this struct doesn't silently
2053    /// drop on aggregation - the prior hand-rolled `+=` block grew bugs.
2054    pub fn merge(&mut self, other: &Self) {
2055        self.inserted += other.inserted;
2056        self.matched += other.matched;
2057        self.sessions_inserted += other.sessions_inserted;
2058        self.messages_inserted_total += other.messages_inserted_total;
2059        self.messages_inserted_searchable += other.messages_inserted_searchable;
2060        self.parts_inserted += other.parts_inserted;
2061        self.sessions_matched += other.sessions_matched;
2062        self.messages_matched_total += other.messages_matched_total;
2063        self.messages_matched_searchable += other.messages_matched_searchable;
2064        self.parts_matched += other.parts_matched;
2065        self.dropped_events += other.dropped_events;
2066        self.dropped_sessions += other.dropped_sessions;
2067        self.skipped_files += other.skipped_files;
2068        self.skipped_empty += other.skipped_empty;
2069        self.skipped_fresh += other.skipped_fresh;
2070        self.storage_errors += other.storage_errors;
2071        self.truncated_values += other.truncated_values;
2072        for (key, value) in &other.drop_reasons {
2073            *self.drop_reasons.entry(key).or_insert(0) += value;
2074        }
2075    }
2076
2077    /// Same dispatch as [`Self::add_outcomes`] but ignores
2078    /// `Inserted`/`Matched` rows. The CLI batched path drives those counters
2079    /// via [`Self::add_batch`] and uses this method to attribute per-row
2080    /// `Error` outcomes from the same flush.
2081    pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2082        for outcome in outcomes {
2083            if !matches!(outcome.status, OutcomeStatus::Error) {
2084                continue;
2085            }
2086            if outcome.kind == "session" {
2087                self.dropped_sessions += 1;
2088            } else {
2089                self.dropped_events += 1;
2090            }
2091            let reason = outcome
2092                .error
2093                .as_ref()
2094                .and_then(|error| error.reason_key)
2095                .unwrap_or(DROP_REASON_UNCATEGORIZED);
2096            *self.drop_reasons.entry(reason).or_insert(0) += 1;
2097        }
2098    }
2099
2100    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2101        for outcome in outcomes {
2102            match outcome.status {
2103                OutcomeStatus::Inserted => {
2104                    self.inserted += 1;
2105                    match outcome.kind {
2106                        "session" => self.sessions_inserted += 1,
2107                        "message" => {
2108                            self.messages_inserted_total += 1;
2109                            if outcome.searchable {
2110                                self.messages_inserted_searchable += 1;
2111                            }
2112                        }
2113                        "part" => self.parts_inserted += 1,
2114                        _ => {}
2115                    }
2116                }
2117                OutcomeStatus::Matched => {
2118                    self.matched += 1;
2119                    match outcome.kind {
2120                        "session" => self.sessions_matched += 1,
2121                        "message" => {
2122                            self.messages_matched_total += 1;
2123                            if outcome.searchable {
2124                                self.messages_matched_searchable += 1;
2125                            }
2126                        }
2127                        "part" => self.parts_matched += 1,
2128                        _ => {}
2129                    }
2130                }
2131                OutcomeStatus::Error => {
2132                    // Session-level rejection: exactly one session-kind Error
2133                    // outcome (see `error_outcomes_for_substream`). Per-event
2134                    // drop: one Error per message/part. The two populations
2135                    // are counted separately so the operator can tell a
2136                    // structural reject from a row-level skip.
2137                    if outcome.kind == "session" {
2138                        self.dropped_sessions += 1;
2139                    } else {
2140                        self.dropped_events += 1;
2141                    }
2142                    let reason = outcome
2143                        .error
2144                        .as_ref()
2145                        .and_then(|e| e.reason_key)
2146                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
2147                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
2148                }
2149            }
2150        }
2151    }
2152}
2153
2154/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
2155/// row per input event from the request's `events` array. The validator
2156/// returns these in array order so the wire layer can pack them directly
2157/// into [`crate::wire::IngestResult`] entries.
2158#[derive(Debug, Clone, PartialEq)]
2159pub struct RowOutcome {
2160    pub index: usize,
2161    pub kind: &'static str,
2162    pub pk: Value,
2163    pub status: OutcomeStatus,
2164    pub error: Option<RowError>,
2165    /// True iff `kind == "message"` AND the underlying row carries
2166    /// `search_text`. Drives `IngestSummary::messages_inserted_searchable`
2167    /// so the CLI can show "searchable" message deltas distinct from raw
2168    /// inserts. Always false for session/part rows.
2169    pub searchable: bool,
2170}
2171
2172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2173pub enum OutcomeStatus {
2174    Inserted,
2175    Matched,
2176    Error,
2177}
2178
2179/// Structured per-row error body. Mirrors the wire shape so the handler
2180/// can pass it straight through.
2181#[derive(Debug, Clone, PartialEq, Eq)]
2182pub struct RowError {
2183    pub message: String,
2184    pub field: Option<&'static str>,
2185    pub reason: Option<&'static str>,
2186    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2187    /// `reason` field above is human-prose; `reason_key` is the machine
2188    /// bucket. `None` means uncategorized; consumers attribute to
2189    /// `DROP_REASON_UNCATEGORIZED`.
2190    pub reason_key: Option<&'static str>,
2191}
2192
2193/// Buffered session events tagged with their input array index, so the
2194/// per-row outcomes can be re-attributed once `merge_insert` returns its
2195/// per-row Inserted/Matched stats.
2196#[derive(Debug)]
2197struct BufferedSession {
2198    index: usize,
2199    session: Session,
2200}
2201
2202#[derive(Debug)]
2203struct BufferedMessage {
2204    index: usize,
2205    message: Message,
2206    parts: Vec<BufferedPart>,
2207    search_text: Option<String>,
2208}
2209
2210#[derive(Debug)]
2211struct BufferedPart {
2212    index: usize,
2213    part: Part,
2214}
2215
2216/// State machine that turns the `events: Vec<IngestEvent>` array into a
2217/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2218/// session substream so `merge_insert` runs once per substream (three
2219/// batches: sessions, messages, parts). A validation error on a single event
2220/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2221/// continues; only Session-level invariants (immutable source_agent / project
2222/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2223///
2224/// Writes are batched at flush time. As complete substreams arrive (a new
2225/// `Session` event closes out the current one), they accumulate in
2226/// `completed` rather than each one calling `merge_insert` immediately.
2227/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2228/// at which point one batched 3-parallel-merge-insert covers all pending
2229/// substreams. This is the load-bearing perf change: per-substream commit
2230/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2231/// and amortizing it across N sessions cuts wall time materially.
2232#[derive(Debug, Default)]
2233pub struct IngestValidator {
2234    session: Option<BufferedSession>,
2235    current_message: Option<BufferedMessage>,
2236    current_parts: Vec<BufferedPart>,
2237    messages: Vec<BufferedMessage>,
2238    /// Message ids already buffered in the current substream. Duplicate ids
2239    /// drop the offending event in-line rather than failing the whole batch
2240    /// downstream.
2241    seen_message_ids: HashSet<String>,
2242    /// `(message_id, part_id)` keys already buffered in the current
2243    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2244    seen_part_keys: HashSet<(String, String)>,
2245    /// Substreams whose end-of-stream boundary has been observed but whose
2246    /// rows haven't been written yet. Flushed in batched mode by
2247    /// [`Self::flush`].
2248    completed: Vec<CompletedSubstream>,
2249}
2250
2251/// One closed substream ready for the batched flush path.
2252#[derive(Debug)]
2253struct CompletedSubstream {
2254    session_index: usize,
2255    session: Session,
2256    messages: Vec<BufferedMessage>,
2257}
2258
2259impl IngestValidator {
2260    /// Drive one input event through the validator. Returns the per-row
2261    /// outcomes the event triggered: empty when the event is just buffered,
2262    /// or N entries when a session substream just flushed (success or
2263    /// failure). `Err` is reserved for catastrophic storage failures that
2264    /// should fail the whole `pond_ingest` request.
2265    pub async fn push(
2266        &mut self,
2267        store: &Store,
2268        index: usize,
2269        event: IngestEvent,
2270    ) -> Result<Vec<RowOutcome>> {
2271        match event {
2272            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2273            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2274            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2275        }
2276    }
2277
2278    /// Final flush at end-of-batch. Closes the in-flight substream and
2279    /// drains the pending-flush buffer. Returns the per-row outcomes (for
2280    /// the wire layer) alongside the honest per-table counts (for
2281    /// `IngestSummary::add_batch`).
2282    pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2283        self.close_current_substream();
2284        self.flush(store).await
2285    }
2286
2287    /// Drain every completed substream into batched 3-parallel-merge_insert
2288    /// writes. Caller invokes this periodically (every N completed
2289    /// substreams) to keep memory bounded; in adapter-driven sync that
2290    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2291    /// in-flight substream stays buffered - close it explicitly via
2292    /// [`Self::finish`] or by feeding the next Session event.
2293    pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2294        if self.completed.is_empty() {
2295            return Ok((Vec::new(), BatchCounts::default()));
2296        }
2297        let completed = std::mem::take(&mut self.completed);
2298        store.upsert_session_batch(completed).await
2299    }
2300
2301    /// Number of fully-buffered substreams awaiting batched write. Used by
2302    /// the adapter caller to decide when to call [`Self::flush`].
2303    pub fn pending_substreams(&self) -> usize {
2304        self.completed.len()
2305    }
2306
2307    async fn push_session(
2308        &mut self,
2309        _store: &Store,
2310        index: usize,
2311        mut session: Session,
2312    ) -> Result<Vec<RowOutcome>> {
2313        // Close out the current substream (if any) - move it to the pending
2314        // buffer instead of writing immediately. The actual write happens
2315        // when the caller invokes `flush` / `finish`.
2316        self.close_current_substream();
2317
2318        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2319        // if empty after trim. A Session event with empty source_agent is
2320        // dropped on the spot - the substream that would follow has nothing
2321        // to anchor on, so subsequent message/part events will also drop.
2322        let trimmed = session.source_agent.trim();
2323        if trimmed.is_empty() {
2324            return Ok(vec![RowOutcome {
2325                index,
2326                kind: "session",
2327                pk: Value::String(session.id.clone()),
2328                status: OutcomeStatus::Error,
2329                error: Some(RowError {
2330                    message: format!("session {} has empty source_agent after trim", session.id),
2331                    field: Some("source_agent"),
2332                    reason: None,
2333                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2334                }),
2335                searchable: false,
2336            }]);
2337        }
2338        if trimmed.len() != session.source_agent.len() {
2339            session.source_agent = trimmed.to_owned();
2340        }
2341
2342        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2343            return Ok(vec![RowOutcome {
2344                index,
2345                kind: "session",
2346                pk: Value::String(session.id.clone()),
2347                status: OutcomeStatus::Error,
2348                error: Some(RowError {
2349                    message: format!(
2350                        "session {} has parent_message_id without parent_session_id",
2351                        session.id,
2352                    ),
2353                    field: Some("parent_message_id"),
2354                    reason: None,
2355                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2356                }),
2357                searchable: false,
2358            }]);
2359        }
2360
2361        self.seen_message_ids.clear();
2362        self.seen_part_keys.clear();
2363        self.session = Some(BufferedSession { index, session });
2364        Ok(Vec::new())
2365    }
2366
2367    fn close_current_substream(&mut self) {
2368        self.flush_current_message();
2369        let Some(BufferedSession {
2370            index: session_index,
2371            session,
2372        }) = self.session.take()
2373        else {
2374            return;
2375        };
2376        let messages = std::mem::take(&mut self.messages);
2377        self.seen_message_ids.clear();
2378        self.seen_part_keys.clear();
2379        self.completed.push(CompletedSubstream {
2380            session_index,
2381            session,
2382            messages,
2383        });
2384    }
2385
2386    fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2387        let pk = Value::Array(vec![
2388            Value::String(message.session_id().to_owned()),
2389            Value::String(message.id().to_owned()),
2390        ]);
2391        let Some(session) = &self.session else {
2392            return vec![error_outcome(
2393                index,
2394                "message",
2395                pk,
2396                "first event in a session stream must be Session",
2397                None,
2398                DROP_REASON_MESSAGE_BEFORE_SESSION,
2399            )];
2400        };
2401        if message.session_id() != session.session.id {
2402            let msg = format!(
2403                "message {} references session {}, expected {}",
2404                message.id(),
2405                message.session_id(),
2406                session.session.id
2407            );
2408            return vec![error_outcome(
2409                index,
2410                "message",
2411                pk,
2412                &msg,
2413                Some("session_id"),
2414                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2415            )];
2416        }
2417        if !self.seen_message_ids.insert(message.id().to_owned()) {
2418            // Keep same-substream duplicate ids visible in `dropped_events`;
2419            // adapters are expected to dedupe upstream (see claude-code's
2420            // per-file `seen_uuids`), so a hit here is worth investigating.
2421            let msg = format!("duplicate message id {} in session substream", message.id());
2422            return vec![error_outcome(
2423                index,
2424                "message",
2425                pk,
2426                &msg,
2427                None,
2428                DROP_REASON_DUPLICATE_MESSAGE_ID,
2429            )];
2430        }
2431        self.flush_current_message();
2432        self.current_message = Some(BufferedMessage {
2433            index,
2434            message,
2435            parts: Vec::new(),
2436            search_text: None,
2437        });
2438        Vec::new()
2439    }
2440
2441    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2442        let pk = Value::Array(vec![
2443            Value::String(part.session_id.clone()),
2444            Value::String(part.message_id.clone()),
2445            Value::String(part.id.clone()),
2446        ]);
2447        let Some(current) = &self.current_message else {
2448            return vec![error_outcome(
2449                index,
2450                "part",
2451                pk,
2452                "part event appeared before a message",
2453                None,
2454                DROP_REASON_PART_BEFORE_MESSAGE,
2455            )];
2456        };
2457        if part.session_id != current.message.session_id() {
2458            let msg = format!(
2459                "part {} references session {}, expected {}",
2460                part.id,
2461                part.session_id,
2462                current.message.session_id()
2463            );
2464            return vec![error_outcome(
2465                index,
2466                "part",
2467                pk,
2468                &msg,
2469                Some("session_id"),
2470                DROP_REASON_PART_MESSAGE_MISMATCH,
2471            )];
2472        }
2473        if part.message_id != current.message.id() {
2474            let msg = format!(
2475                "part {} references message {}, expected {}",
2476                part.id,
2477                part.message_id,
2478                current.message.id()
2479            );
2480            return vec![error_outcome(
2481                index,
2482                "part",
2483                pk,
2484                &msg,
2485                Some("message_id"),
2486                DROP_REASON_PART_MESSAGE_MISMATCH,
2487            )];
2488        }
2489        let part_key = (part.message_id.clone(), part.id.clone());
2490        if !self.seen_part_keys.insert(part_key) {
2491            let msg = format!(
2492                "duplicate part id {} for message {} in session substream",
2493                part.id, part.message_id
2494            );
2495            return vec![error_outcome(
2496                index,
2497                "part",
2498                pk,
2499                &msg,
2500                None,
2501                DROP_REASON_DUPLICATE_PART_KEY,
2502            )];
2503        }
2504        self.current_parts.push(BufferedPart { index, part });
2505        Vec::new()
2506    }
2507
2508    fn flush_current_message(&mut self) {
2509        let Some(mut buffered) = self.current_message.take() else {
2510            return;
2511        };
2512        let parts = std::mem::take(&mut self.current_parts);
2513        let mut canonical_parts = Vec::with_capacity(parts.len());
2514        for part in &parts {
2515            canonical_parts.push(part.part.clone());
2516        }
2517        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2518        buffered.parts = parts;
2519        self.messages.push(buffered);
2520    }
2521}
2522
2523fn error_outcome(
2524    index: usize,
2525    kind: &'static str,
2526    pk: Value,
2527    message: &str,
2528    field: Option<&'static str>,
2529    reason_key: &'static str,
2530) -> RowOutcome {
2531    RowOutcome {
2532        index,
2533        kind,
2534        pk,
2535        status: OutcomeStatus::Error,
2536        error: Some(RowError {
2537            message: message.to_owned(),
2538            field,
2539            reason: None,
2540            reason_key: Some(reason_key),
2541        }),
2542        searchable: false,
2543    }
2544}
2545
2546/// Session-level rejection (immutable `source_agent` / `project` violation):
2547/// emit exactly one Error outcome on the Session row. The buffered messages
2548/// and parts of this substream are *not* surfaced as per-row errors - their
2549/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2550fn error_outcomes_for_substream(
2551    session_index: usize,
2552    session: &Session,
2553    _messages: &[BufferedMessage],
2554    message: impl Into<String>,
2555    field: Option<&'static str>,
2556    reason_key: &'static str,
2557) -> Vec<RowOutcome> {
2558    let reason = field.map(|_| "immutable");
2559    vec![RowOutcome {
2560        index: session_index,
2561        kind: "session",
2562        pk: Value::String(session.id.clone()),
2563        status: OutcomeStatus::Error,
2564        error: Some(RowError {
2565            message: message.into(),
2566            field,
2567            reason,
2568            reason_key: Some(reason_key),
2569        }),
2570        searchable: false,
2571    }]
2572}
2573
2574/// Batched-path success helper. Each row's Inserted/Matched status is read
2575/// from the pre-existence sets captured by `upsert_session_batch` before its
2576/// `merge_insert` calls, so the per-row outcome is honest (spec.md#adapter-integrity-additive-sync).
2577/// Also accumulates the per-table totals into `counts` so the CLI summary
2578/// gets the same truth without re-walking the outcomes.
2579fn success_outcomes_for_substream(
2580    session_index: usize,
2581    session: &Session,
2582    messages: &[BufferedMessage],
2583    existing_sessions: &std::collections::HashMap<String, Session>,
2584    existing_message_pks: &HashSet<(String, String)>,
2585    existing_part_pks: &HashSet<(String, String, String)>,
2586    counts: &mut BatchCounts,
2587) -> Vec<RowOutcome> {
2588    let session_was_present = existing_sessions.contains_key(&session.id);
2589    let session_status = if session_was_present {
2590        counts.sessions_matched += 1;
2591        UpsertStatus::Matched
2592    } else {
2593        counts.sessions_inserted += 1;
2594        UpsertStatus::Inserted
2595    };
2596
2597    let mut outcomes = Vec::with_capacity(1 + messages.len());
2598    outcomes.push(success_outcome(
2599        session_index,
2600        "session",
2601        Value::String(session.id.clone()),
2602        session_status,
2603        false,
2604    ));
2605    for buffered in messages {
2606        let key = (
2607            buffered.message.session_id().to_owned(),
2608            buffered.message.id().to_owned(),
2609        );
2610        let searchable = buffered.search_text.is_some();
2611        let message_status = if existing_message_pks.contains(&key) {
2612            counts.messages_matched_total += 1;
2613            if searchable {
2614                counts.messages_matched_searchable += 1;
2615            }
2616            UpsertStatus::Matched
2617        } else {
2618            counts.messages_inserted_total += 1;
2619            if searchable {
2620                counts.messages_inserted_searchable += 1;
2621            }
2622            UpsertStatus::Inserted
2623        };
2624        let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2625        outcomes.push(success_outcome(
2626            buffered.index,
2627            "message",
2628            pk,
2629            message_status,
2630            searchable,
2631        ));
2632        for part in &buffered.parts {
2633            let part_key = (
2634                part.part.session_id.clone(),
2635                part.part.message_id.clone(),
2636                part.part.id.clone(),
2637            );
2638            let part_status = if existing_part_pks.contains(&part_key) {
2639                counts.parts_matched += 1;
2640                UpsertStatus::Matched
2641            } else {
2642                counts.parts_inserted += 1;
2643                UpsertStatus::Inserted
2644            };
2645            let part_pk = Value::Array(vec![
2646                Value::String(part_key.0),
2647                Value::String(part_key.1),
2648                Value::String(part_key.2),
2649            ]);
2650            outcomes.push(success_outcome(
2651                part.index,
2652                "part",
2653                part_pk,
2654                part_status,
2655                false,
2656            ));
2657        }
2658    }
2659    outcomes
2660}
2661
2662fn success_outcome(
2663    index: usize,
2664    kind: &'static str,
2665    pk: Value,
2666    status: UpsertStatus,
2667    searchable: bool,
2668) -> RowOutcome {
2669    let status = match status {
2670        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2671        UpsertStatus::Matched => OutcomeStatus::Matched,
2672    };
2673    RowOutcome {
2674        index,
2675        kind,
2676        pk,
2677        status,
2678        error: None,
2679        searchable,
2680    }
2681}
2682
2683#[derive(Debug, Clone, PartialEq, Eq)]
2684enum IngestError {
2685    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2686    /// immutable post-first-write because the denormalized copies on
2687    /// `messages` were stamped from the prior Session at first ingest.
2688    /// A re-write that changes either would silently desync.
2689    ImmutableField {
2690        field: &'static str,
2691        session_id: String,
2692        stored: String,
2693        attempted: String,
2694    },
2695}
2696
2697impl std::fmt::Display for IngestError {
2698    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2699        match self {
2700            Self::ImmutableField {
2701                field,
2702                session_id,
2703                stored,
2704                attempted,
2705            } => write!(
2706                formatter,
2707                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2708            ),
2709        }
2710    }
2711}
2712
2713impl std::error::Error for IngestError {}
2714
2715/// Compare an incoming Session row against the stored row on the two
2716/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2717/// counts a NULL-vs-non-NULL change as a mismatch.
2718fn ensure_immutable_match(
2719    existing: &Session,
2720    incoming: &Session,
2721) -> std::result::Result<(), IngestError> {
2722    if existing.source_agent != incoming.source_agent {
2723        return Err(IngestError::ImmutableField {
2724            field: "source_agent",
2725            session_id: incoming.id.clone(),
2726            stored: existing.source_agent.clone(),
2727            attempted: incoming.source_agent.clone(),
2728        });
2729    }
2730    if existing.project != incoming.project {
2731        return Err(IngestError::ImmutableField {
2732            field: "project",
2733            session_id: incoming.id.clone(),
2734            stored: (*existing.project).clone(),
2735            attempted: (*incoming.project).clone(),
2736        });
2737    }
2738    Ok(())
2739}
2740
2741pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2742    use crate::wire::Provenance;
2743    let mut chunks: Vec<String> = Vec::new();
2744    for part in parts {
2745        // spec.md#search: only conversational parts contribute to the indexed
2746        // text; harness-injected scaffolding is excluded from search.
2747        if part.provenance != Provenance::Conversational {
2748            continue;
2749        }
2750        match (message.role(), &part.kind) {
2751            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2752                if let Some(text) = text {
2753                    chunks.push(text.to_string());
2754                }
2755            }
2756            (
2757                Role::User | Role::Assistant,
2758                PartKind::File {
2759                    media_type,
2760                    file_name,
2761                    data,
2762                },
2763            ) => {
2764                if let Some(file_name) = file_name {
2765                    chunks.push(file_name.clone());
2766                }
2767                if let Some(media_type) = media_type {
2768                    chunks.push(media_type.clone());
2769                }
2770                if let FileData::Url(uri) = data {
2771                    chunks.push(uri.clone());
2772                }
2773            }
2774            (
2775                Role::System | Role::Tool,
2776                PartKind::Text { .. }
2777                | PartKind::Reasoning { .. }
2778                | PartKind::File { .. }
2779                | PartKind::ToolCall { .. }
2780                | PartKind::ToolResult { .. }
2781                | PartKind::ToolApprovalRequest { .. }
2782                | PartKind::ToolApprovalResponse { .. },
2783            )
2784            | (
2785                Role::User | Role::Assistant,
2786                PartKind::Reasoning { .. }
2787                | PartKind::ToolCall { .. }
2788                | PartKind::ToolResult { .. }
2789                | PartKind::ToolApprovalRequest { .. }
2790                | PartKind::ToolApprovalResponse { .. },
2791            ) => {}
2792        }
2793    }
2794
2795    let text = chunks
2796        .into_iter()
2797        .filter(|chunk| !chunk.trim().is_empty())
2798        .collect::<Vec<_>>()
2799        .join("\n");
2800    if text.is_empty() { None } else { Some(text) }
2801}
2802
2803/// Non-empty conversational text (spec.md#search).
2804#[derive(Debug, Clone, PartialEq, Eq)]
2805pub struct SearchText(String);
2806
2807impl SearchText {
2808    pub fn as_str(&self) -> &str {
2809        &self.0
2810    }
2811
2812    pub fn into_inner(self) -> String {
2813        self.0
2814    }
2815}
2816
2817impl AsRef<str> for SearchText {
2818    fn as_ref(&self) -> &str {
2819        &self.0
2820    }
2821}
2822
2823#[derive(Debug, Clone, PartialEq)]
2824pub struct MessageWithParts {
2825    pub message: Message,
2826    pub parts: Vec<Part>,
2827}
2828
2829#[derive(Debug, Clone, PartialEq)]
2830pub struct SessionWithMessages {
2831    pub session: Session,
2832    pub messages: Vec<MessageWithParts>,
2833}
2834
2835#[derive(Debug, Clone)]
2836pub struct SessionViewParams<'a> {
2837    pub mode: ResponseMode,
2838    pub after_id: Option<&'a str>,
2839    pub limit: usize,
2840    pub budget_bytes: usize,
2841    pub session_from: SessionFrom,
2842}
2843
2844#[derive(Debug, Clone)]
2845pub struct MessageViewParams<'a> {
2846    pub context_depth: usize,
2847    pub after_id: Option<&'a str>,
2848    pub limit: usize,
2849    pub budget_bytes: usize,
2850}
2851
2852/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2853/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2854/// `validation_failed`): the message/part stream is append-only, so an anchor
2855/// that was ever valid never disappears - an unknown one is always a client
2856/// error, never a reason to silently restart the page.
2857#[derive(Debug, Clone, PartialEq)]
2858pub enum GetLookup<T> {
2859    NotFound,
2860    UnknownAfterId,
2861    Found(T),
2862}
2863
2864/// Canonical retrieval result for `pond_get` session mode: the stored session
2865/// plus the page of messages (each with its `Part`s) and a remaining count.
2866/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2867#[derive(Debug, Clone, PartialEq)]
2868pub struct SessionPage {
2869    pub session: Session,
2870    pub messages: Vec<RetrievedMessage>,
2871    pub messages_remaining: usize,
2872}
2873
2874/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2875/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2876/// their parts so the handler can summarize them.
2877#[derive(Debug, Clone, PartialEq)]
2878pub struct MessagePage {
2879    pub session: Session,
2880    pub target: RetrievedMessage,
2881    pub target_parts: Vec<Part>,
2882    pub target_parts_remaining: usize,
2883    pub siblings: Vec<RetrievedMessage>,
2884}
2885
2886#[derive(Debug, Clone, PartialEq)]
2887pub struct RetrievedMessage {
2888    pub id: String,
2889    pub role: Role,
2890    pub timestamp: DateTime<Utc>,
2891    pub text: Option<String>,
2892    pub content: Option<String>,
2893    pub parts: Vec<Part>,
2894}
2895
2896#[derive(Debug, Clone)]
2897struct ScanRow {
2898    id: String,
2899    role: Role,
2900    timestamp: DateTime<Utc>,
2901    text: Option<String>,
2902    content: Option<String>,
2903}
2904
2905/// One row of the conversational scan. `text` is non-empty by
2906/// `IsNotNull("search_text")` pushdown (spec.md#search).
2907#[derive(Debug, Clone)]
2908pub struct ConversationalRow {
2909    pub session_id: String,
2910    pub message_id: String,
2911    pub role: Role,
2912    pub timestamp: DateTime<Utc>,
2913    pub text: SearchText,
2914}
2915
2916/// Number of leading `items` that fit within `limit` and the byte budget,
2917/// sizing each by `size`. Always emits at least one (a single oversize item
2918/// never blocks its own page); the budget then stops the page at the next item
2919/// boundary.
2920fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2921    let capped = items.len().min(limit.clamp(1, 1000));
2922    let mut acc = 0usize;
2923    let mut emitted = 0usize;
2924    for item in &items[..capped] {
2925        let next = acc.saturating_add(size(item));
2926        if emitted > 0 && next > budget_bytes {
2927            break;
2928        }
2929        acc = next;
2930        emitted += 1;
2931    }
2932    emitted
2933}
2934
2935fn role_from_str(value: &str) -> Result<Role> {
2936    match value {
2937        "system" => Ok(Role::System),
2938        "user" => Ok(Role::User),
2939        "assistant" => Ok(Role::Assistant),
2940        "tool" => Ok(Role::Tool),
2941        other => anyhow::bail!("unknown message role {other}"),
2942    }
2943}
2944
2945/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2946/// and range columns, BITMAP for low-cardinality columns. There is no index
2947/// on `embedding_model`: pond's invariant is one active model at a time
2948/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2949/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2950/// query-time predicate - the only embedding-state filter is `vector IS NOT
2951/// NULL`. `id` lookups are rare and full-scan.
2952const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2953    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2954    (
2955        "session_id",
2956        BuiltinIndexType::BTree,
2957        "messages_session_id_btree",
2958    ),
2959    (
2960        "timestamp",
2961        BuiltinIndexType::BTree,
2962        "messages_timestamp_btree",
2963    ),
2964    (
2965        "source_agent",
2966        BuiltinIndexType::Bitmap,
2967        "messages_source_agent_bitmap",
2968    ),
2969    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2970];
2971
2972/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2973/// `parts_for_messages` (hydration on every `get` and grouped search).
2974const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2975    (
2976        "session_id",
2977        BuiltinIndexType::BTree,
2978        "parts_session_id_btree",
2979    ),
2980    (
2981        "message_id",
2982        BuiltinIndexType::BTree,
2983        "parts_message_id_btree",
2984    ),
2985];
2986
2987/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
2988/// `get` and every grouped search.
2989const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2990    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2991
2992fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2993    Predicate::In(
2994        column,
2995        values.iter().cloned().map(ScalarValue::String).collect(),
2996    )
2997}
2998
2999/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
3000/// never sees a null-vector row. Under the single-active-model invariant,
3001/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
3002/// the configured model" - no per-row `embedding_model` filter needed.
3003fn embedded_scope(filter: &Predicate) -> Predicate {
3004    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3005}
3006
3007fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3008    let inserted = usize::try_from(inserted_rows)
3009        .unwrap_or(usize::MAX)
3010        .min(total);
3011    let mut statuses = Vec::with_capacity(total);
3012    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3013    statuses.extend(std::iter::repeat_n(
3014        UpsertStatus::Matched,
3015        total.saturating_sub(inserted),
3016    ));
3017    statuses
3018}
3019
3020// Bare logical table names: the lance-namespace Directory impl owns the
3021// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
3022// a `.lance` path.
3023pub(crate) const SESSIONS: &str = "sessions";
3024pub(crate) const MESSAGES: &str = "messages";
3025pub(crate) const PARTS: &str = "parts";
3026
3027/// FTS index name on `messages.search_text`. Stable so status and index
3028/// creation name the same index.
3029pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3030
3031/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
3032/// activation check and index creation name the same index.
3033pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3034
3035/// IVF_PQ tuning constants (spec.md#search):
3036/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
3037/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
3038/// - max_iters = 15 (kmeans cap)
3039/// - cosine metric (e5 vectors are L2-normalized)
3040const IVF_PQ_NUM_BITS: u8 = 8;
3041const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3042const IVF_PQ_MAX_ITERS: usize = 15;
3043
3044/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
3045/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
3046/// (`FTS`, `OCC`) searchable.
3047const FTS_NGRAM_MIN: u32 = 3;
3048const FTS_NGRAM_MAX: u32 = 5;
3049
3050/// Pond's production IndexIntents: the per-table intent set
3051/// `Store::open_with_options` registers with the substrate.
3052pub fn pond_index_intents() -> IndexIntents {
3053    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3054}
3055
3056/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
3057/// threshold. Used by tests that need to exercise the activation boundary
3058/// without writing 100k vectors.
3059pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3060    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3061    messages.push(IndexIntent {
3062        name: MESSAGES_FTS_INDEX,
3063        column: "search_text",
3064        trigger: IndexTrigger::OnAnyRows,
3065        params: IndexParamsKind::InvertedFtsNgram {
3066            min: FTS_NGRAM_MIN,
3067            max: FTS_NGRAM_MAX,
3068        },
3069    });
3070    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3071        messages.push(IndexIntent {
3072            name,
3073            column,
3074            trigger: IndexTrigger::OnAnyRows,
3075            params: IndexParamsKind::Scalar(kind.clone()),
3076        });
3077    }
3078    messages.push(IndexIntent {
3079        name: MESSAGES_VECTOR_INDEX,
3080        column: "vector",
3081        trigger: IndexTrigger::OnNonNullCount {
3082            column: "vector",
3083            threshold: vector_threshold,
3084        },
3085        params: IndexParamsKind::IvfPqCosine {
3086            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3087            num_bits: IVF_PQ_NUM_BITS,
3088            max_iters: IVF_PQ_MAX_ITERS,
3089        },
3090    });
3091    let parts = PARTS_SCALAR_INDICES
3092        .iter()
3093        .map(|(column, kind, name)| IndexIntent {
3094            name,
3095            column,
3096            trigger: IndexTrigger::OnAnyRows,
3097            params: IndexParamsKind::Scalar(kind.clone()),
3098        })
3099        .collect();
3100    let sessions = SESSIONS_SCALAR_INDICES
3101        .iter()
3102        .map(|(column, kind, name)| IndexIntent {
3103            name,
3104            column,
3105            trigger: IndexTrigger::OnAnyRows,
3106            params: IndexParamsKind::Scalar(kind.clone()),
3107        })
3108        .collect();
3109    IndexIntents {
3110        sessions,
3111        messages,
3112        parts,
3113    }
3114}
3115
3116/// Default width of the `messages.vector` embedding column (spec.md#search):
3117/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
3118/// 384). Used when `[embeddings].dim` is absent.
3119pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3120
3121/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
3122/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
3123/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
3124/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
3125/// which keeps unit tests config-free.
3126static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3127
3128/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
3129/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
3130pub fn embedding_dim() -> usize {
3131    EMBEDDING_DIM_RUNTIME
3132        .get()
3133        .copied()
3134        .unwrap_or(DEFAULT_EMBEDDING_DIM)
3135}
3136
3137/// Seed [`embedding_dim`] from config. First call wins.
3138pub fn init_embedding_dim(dim: usize) {
3139    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3140}
3141
3142/// Initial-`CREATE` write params for the namespace-mediated path. The
3143/// substrate seam stamps in `session`, `mode`, and `store_params`.
3144/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
3145/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
3146/// suppresses the per-commit hook so cleanup stays operator-driven via
3147/// `pond index optimize` (one LIST per command instead of per write).
3148pub(crate) fn write_params_for_create() -> WriteParams {
3149    WriteParams {
3150        data_storage_version: Some(LanceFileVersion::V2_1),
3151        enable_v2_manifest_paths: true,
3152        enable_stable_row_ids: true,
3153        auto_cleanup: Some(AutoCleanupParams {
3154            interval: 20,
3155            older_than: chrono::TimeDelta::days(1),
3156        }),
3157        skip_auto_cleanup: true,
3158        ..WriteParams::default()
3159    }
3160}
3161
3162fn export_schema(table: Table) -> Arc<Schema> {
3163    match table {
3164        Table::Sessions => session_schema(),
3165        Table::Messages => message_schema(),
3166        Table::Parts => part_schema(),
3167    }
3168}
3169
3170fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3171    let expected = export_schema(table);
3172    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3173    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3174    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3175    if actual_names != expected_names {
3176        anyhow::bail!(
3177            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3178            table.as_str(),
3179        );
3180    }
3181    Ok(())
3182}
3183
3184async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3185    let source_uri = source
3186        .to_str()
3187        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3188    let dataset = Dataset::open(source_uri)
3189        .await
3190        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3191    ensure_schema_matches_archive(&dataset, table)?;
3192    Ok(dataset)
3193}
3194
3195pub(crate) fn session_schema() -> Arc<Schema> {
3196    Arc::new(Schema::new(vec![
3197        primary_field("id", DataType::Utf8, false),
3198        Field::new("parent_session_id", DataType::Utf8, true),
3199        Field::new("parent_message_id", DataType::Utf8, true),
3200        Field::new("source_agent", DataType::Utf8, false),
3201        Field::new(
3202            "created_at",
3203            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3204            false,
3205        ),
3206        Field::new("project", DataType::Utf8, false),
3207        json_field("options", false),
3208    ]))
3209}
3210
3211pub(crate) fn message_schema() -> Arc<Schema> {
3212    Arc::new(Schema::new(vec![
3213        primary_field("session_id", DataType::Utf8, false),
3214        primary_field("id", DataType::Utf8, false),
3215        Field::new(
3216            "timestamp",
3217            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3218            false,
3219        ),
3220        Field::new("role", DataType::Utf8, false),
3221        Field::new("source_agent", DataType::Utf8, false),
3222        Field::new("project", DataType::Utf8, false),
3223        Field::new("content", DataType::Utf8, true),
3224        Field::new("search_text", DataType::Utf8, true),
3225        // The message's derived embedding (spec.md#session-embed-from-canonical):
3226        // both null until `pond embed` fills them, set together thereafter.
3227        Field::new("vector", embedding_vector_type(), true),
3228        Field::new("embedding_model", DataType::Utf8, true),
3229        json_field("options", false),
3230    ]))
3231}
3232
3233pub(crate) fn part_schema() -> Arc<Schema> {
3234    Arc::new(Schema::new(vec![
3235        primary_field("session_id", DataType::Utf8, false),
3236        primary_field("message_id", DataType::Utf8, false),
3237        primary_field("id", DataType::Utf8, false),
3238        Field::new("ordinal", DataType::Int32, false),
3239        Field::new("type", DataType::Utf8, false),
3240        // spec.md#model-part-provenance: conversation vs harness-injected; search
3241        // reads this column to exclude injected scaffolding.
3242        Field::new("provenance", DataType::Utf8, false),
3243        json_field("variant_data", false),
3244        legacy_blob_field("data", true),
3245        json_field("options", false),
3246    ]))
3247}
3248
3249pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3250    let arrays = schema
3251        .fields()
3252        .iter()
3253        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3254        .collect();
3255    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3256}
3257
3258pub(crate) fn empty_reader(
3259    schema: Arc<Schema>,
3260) -> Result<
3261    RecordBatchIterator<
3262        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3263    >,
3264> {
3265    let batch = empty_batch(schema.clone())?;
3266    Ok(RecordBatchIterator::new(
3267        vec![Ok(batch)].into_iter(),
3268        schema,
3269    ))
3270}
3271
3272pub(crate) struct MessageBatchRow<'a> {
3273    pub message: &'a Message,
3274    pub source_agent: &'a str,
3275    pub project: &'a str,
3276    pub search_text: Option<&'a str>,
3277}
3278
3279// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3280// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3281// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3282// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3283// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3284fn embedding_vector_type() -> DataType {
3285    DataType::FixedSizeList(
3286        Arc::new(Field::new("item", DataType::Float16, true)),
3287        embedding_dim() as i32,
3288    )
3289}
3290
3291/// The partial-schema source for the embedding column update: the `messages`
3292/// primary key plus the two columns `pond embed` fills. The field definitions
3293/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3294fn embedding_update_schema() -> Arc<Schema> {
3295    Arc::new(Schema::new(vec![
3296        primary_field("session_id", DataType::Utf8, false),
3297        primary_field("id", DataType::Utf8, false),
3298        Field::new("vector", embedding_vector_type(), true),
3299        Field::new("embedding_model", DataType::Utf8, true),
3300    ]))
3301}
3302
3303/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3304/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3305pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3306    let dim = embedding_dim();
3307    let mut flat = Vec::with_capacity(rows.len() * dim);
3308    for row in rows {
3309        if row.vector.len() != dim {
3310            anyhow::bail!(
3311                "embedding for message {} has dim {}, expected {dim}",
3312                row.id,
3313                row.vector.len(),
3314            );
3315        }
3316        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3317    }
3318    let values = Float16Array::from(flat);
3319    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3320    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3321        .context("failed to build embedding vector column")?;
3322
3323    RecordBatch::try_new(
3324        embedding_update_schema(),
3325        vec![
3326            Arc::new(StringArray::from(
3327                rows.iter()
3328                    .map(|row| row.session_id.as_str())
3329                    .collect::<Vec<_>>(),
3330            )),
3331            Arc::new(StringArray::from(
3332                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3333            )),
3334            Arc::new(vectors),
3335            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3336        ],
3337    )
3338    .context("failed to build embedding update batch")
3339}
3340
3341/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3342/// is split before the running total of its text columns reaches this, and a
3343/// single cell at or above it is rejected rather than left to panic inside
3344/// `StringArray::from` (spec.md#adapter-bounded-values).
3345const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3346
3347/// Contiguous row ranges whose summed text-column byte cost each stays within
3348/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3349/// column too, since no single column's total can exceed it. `cells[i]` is row
3350/// `i`'s byte cost summed across every text column.
3351fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3352    let mut chunks = Vec::new();
3353    let mut start = 0usize;
3354    let mut running = 0usize;
3355    for (index, &row) in cells.iter().enumerate() {
3356        if running + row > COLUMN_BYTE_BUDGET && index > start {
3357            chunks.push(start..index);
3358            start = index;
3359            running = 0;
3360        }
3361        running += row;
3362    }
3363    if start < cells.len() {
3364        chunks.push(start..cells.len());
3365    }
3366    chunks
3367}
3368
3369fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3370    if bytes >= COLUMN_BYTE_BUDGET {
3371        anyhow::bail!(
3372            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3373             overflow Arrow's i32 offset buffer"
3374        );
3375    }
3376    Ok(())
3377}
3378
3379async fn merge_insert_chunks(
3380    handle: &Handle,
3381    table: Table,
3382    batches: Vec<RecordBatch>,
3383) -> Result<u64> {
3384    let mut inserted = 0u64;
3385    for batch in batches {
3386        let rows = batch.num_rows();
3387        inserted += handle.merge_insert(table, batch, rows).await?;
3388    }
3389    Ok(inserted)
3390}
3391
3392pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3393    let options = sessions
3394        .iter()
3395        .map(|session| json_bytes(&session.options))
3396        .collect::<Result<Vec<_>>>()?;
3397    let mut cells = Vec::with_capacity(sessions.len());
3398    for (session, encoded) in sessions.iter().zip(&options) {
3399        let columns = [
3400            session.id.len(),
3401            session.parent_session_id.as_deref().map_or(0, str::len),
3402            session.parent_message_id.as_deref().map_or(0, str::len),
3403            session.source_agent.len(),
3404            session.project.as_str().len(),
3405            encoded.len(),
3406        ];
3407        for bytes in columns {
3408            guard_cell("sessions", &session.id, bytes)?;
3409        }
3410        cells.push(columns.iter().sum());
3411    }
3412    chunk_ranges(&cells)
3413        .into_iter()
3414        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3415        .collect()
3416}
3417
3418fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3419    let schema = session_schema();
3420    RecordBatch::try_new(
3421        schema.clone(),
3422        vec![
3423            Arc::new(StringArray::from(
3424                sessions
3425                    .iter()
3426                    .map(|session| session.id.as_str())
3427                    .collect::<Vec<_>>(),
3428            )),
3429            Arc::new(StringArray::from(
3430                sessions
3431                    .iter()
3432                    .map(|session| session.parent_session_id.as_deref())
3433                    .collect::<Vec<_>>(),
3434            )),
3435            Arc::new(StringArray::from(
3436                sessions
3437                    .iter()
3438                    .map(|session| session.parent_message_id.as_deref())
3439                    .collect::<Vec<_>>(),
3440            )),
3441            Arc::new(StringArray::from(
3442                sessions
3443                    .iter()
3444                    .map(|session| session.source_agent.as_str())
3445                    .collect::<Vec<_>>(),
3446            )),
3447            Arc::new(
3448                TimestampMicrosecondArray::from(
3449                    sessions
3450                        .iter()
3451                        .map(|session| micros(session.created_at))
3452                        .collect::<Vec<_>>(),
3453                )
3454                .with_timezone("UTC"),
3455            ),
3456            Arc::new(StringArray::from(
3457                sessions
3458                    .iter()
3459                    .map(|session| session.project.as_str())
3460                    .collect::<Vec<_>>(),
3461            )),
3462            Arc::new(LargeBinaryArray::from_iter_values(
3463                options.iter().map(Vec::as_slice),
3464            )),
3465        ],
3466    )
3467    .context("failed to build session batch")
3468}
3469
3470pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3471    let options = rows
3472        .iter()
3473        .map(|row| json_bytes(row.message.options()))
3474        .collect::<Result<Vec<_>>>()?;
3475    let mut cells = Vec::with_capacity(rows.len());
3476    for (row, encoded) in rows.iter().zip(&options) {
3477        let columns = [
3478            row.message.session_id().len(),
3479            row.message.id().len(),
3480            row.message.role().as_str().len(),
3481            row.source_agent.len(),
3482            row.project.len(),
3483            row.message.system_content().map_or(0, str::len),
3484            row.search_text.map_or(0, str::len),
3485            encoded.len(),
3486        ];
3487        for bytes in columns {
3488            guard_cell("messages", row.message.id(), bytes)?;
3489        }
3490        cells.push(columns.iter().sum());
3491    }
3492    chunk_ranges(&cells)
3493        .into_iter()
3494        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3495        .collect()
3496}
3497
3498fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3499    let schema = message_schema();
3500    RecordBatch::try_new(
3501        schema.clone(),
3502        vec![
3503            Arc::new(StringArray::from(
3504                rows.iter()
3505                    .map(|row| row.message.session_id())
3506                    .collect::<Vec<_>>(),
3507            )),
3508            Arc::new(StringArray::from(
3509                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3510            )),
3511            Arc::new(
3512                TimestampMicrosecondArray::from(
3513                    rows.iter()
3514                        .map(|row| micros(row.message.timestamp()))
3515                        .collect::<Vec<_>>(),
3516                )
3517                .with_timezone("UTC"),
3518            ),
3519            Arc::new(StringArray::from(
3520                rows.iter()
3521                    .map(|row| row.message.role().as_str())
3522                    .collect::<Vec<_>>(),
3523            )),
3524            Arc::new(StringArray::from(
3525                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3526            )),
3527            Arc::new(StringArray::from(
3528                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3529            )),
3530            Arc::new(StringArray::from(
3531                rows.iter()
3532                    .map(|row| row.message.system_content())
3533                    .collect::<Vec<_>>(),
3534            )),
3535            Arc::new(StringArray::from(
3536                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3537            )),
3538            // `vector` / `embedding_model` are written null at ingest; every
3539            // message starts un-embedded and `pond embed` fills them later
3540            // (spec.md#session-embed-from-canonical).
3541            new_null_array(&embedding_vector_type(), rows.len()),
3542            new_null_array(&DataType::Utf8, rows.len()),
3543            Arc::new(LargeBinaryArray::from_iter_values(
3544                options.iter().map(Vec::as_slice),
3545            )),
3546        ],
3547    )
3548    .context("failed to build message batch")
3549}
3550
3551pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3552    let variant_data = parts
3553        .iter()
3554        .map(|part| part_variant_json(&part.kind))
3555        .collect::<Result<Vec<_>>>()?;
3556    let options = parts
3557        .iter()
3558        .map(|part| json_bytes(&part.options))
3559        .collect::<Result<Vec<_>>>()?;
3560    let mut cells = Vec::with_capacity(parts.len());
3561    // The blob column is a BinaryArray, exempt from the text-column bound
3562    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3563    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3564        let columns = [
3565            part.session_id.len(),
3566            part.message_id.len(),
3567            part.id.len(),
3568            part.kind.type_name().len(),
3569            part.provenance.as_str().len(),
3570            variant.len(),
3571            encoded.len(),
3572        ];
3573        for bytes in columns {
3574            guard_cell("parts", &part.id, bytes)?;
3575        }
3576        cells.push(columns.iter().sum());
3577    }
3578    chunk_ranges(&cells)
3579        .into_iter()
3580        .map(|range| {
3581            parts_chunk(
3582                &parts[range.clone()],
3583                &variant_data[range.clone()],
3584                &options[range],
3585            )
3586        })
3587        .collect()
3588}
3589
3590fn parts_chunk(
3591    parts: &[Part],
3592    variant_data: &[Vec<u8>],
3593    options: &[Vec<u8>],
3594) -> Result<RecordBatch> {
3595    let schema = part_schema();
3596    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3597    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3598    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3599    let blob_payloads: Vec<Option<&[u8]>> = parts
3600        .iter()
3601        .map(|part| match &part.kind {
3602            PartKind::File { data, .. } => Some(match data {
3603                FileData::String(value) => value.as_bytes(),
3604                FileData::Bytes(value) => value.as_slice(),
3605                FileData::Url(value) => value.as_bytes(),
3606            }),
3607            PartKind::Text { .. }
3608            | PartKind::Reasoning { .. }
3609            | PartKind::ToolCall { .. }
3610            | PartKind::ToolResult { .. }
3611            | PartKind::ToolApprovalRequest { .. }
3612            | PartKind::ToolApprovalResponse { .. } => None,
3613        })
3614        .collect();
3615    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3616
3617    RecordBatch::try_new(
3618        schema.clone(),
3619        vec![
3620            Arc::new(StringArray::from(
3621                parts
3622                    .iter()
3623                    .map(|part| part.session_id.as_str())
3624                    .collect::<Vec<_>>(),
3625            )),
3626            Arc::new(StringArray::from(
3627                parts
3628                    .iter()
3629                    .map(|part| part.message_id.as_str())
3630                    .collect::<Vec<_>>(),
3631            )),
3632            Arc::new(StringArray::from(
3633                parts
3634                    .iter()
3635                    .map(|part| part.id.as_str())
3636                    .collect::<Vec<_>>(),
3637            )),
3638            Arc::new(Int32Array::from(
3639                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3640            )),
3641            Arc::new(StringArray::from(
3642                parts
3643                    .iter()
3644                    .map(|part| part.kind.type_name())
3645                    .collect::<Vec<_>>(),
3646            )),
3647            Arc::new(StringArray::from(
3648                parts
3649                    .iter()
3650                    .map(|part| part.provenance.as_str())
3651                    .collect::<Vec<_>>(),
3652            )),
3653            Arc::new(LargeBinaryArray::from_iter_values(
3654                variant_data.iter().map(Vec::as_slice),
3655            )),
3656            Arc::new(blob_array),
3657            Arc::new(LargeBinaryArray::from_iter_values(
3658                options.iter().map(Vec::as_slice),
3659            )),
3660        ],
3661    )
3662    .context("failed to build parts batch")
3663}
3664
3665pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3666    Ok(Session {
3667        id: string(batch, "id", row)?.context("session id is null")?,
3668        parent_session_id: string(batch, "parent_session_id", row)?,
3669        parent_message_id: string(batch, "parent_message_id", row)?,
3670        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3671        created_at: datetime(batch, "created_at", row)?,
3672        project: crate::adapter::Extracted::from_stored(
3673            string(batch, "project", row)?.context("project is null")?,
3674        ),
3675        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3676    })
3677}
3678
3679pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3680    let id = string(batch, "id", row)?.context("message id is null")?;
3681    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3682    let timestamp = datetime(batch, "timestamp", row)?;
3683    let options =
3684        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3685
3686    match string(batch, "role", row)?
3687        .context("message role is null")?
3688        .as_str()
3689    {
3690        "system" => Ok(Message::System {
3691            id,
3692            session_id,
3693            timestamp,
3694            // `content` is nullable in the schema; preserve the distinction
3695            // between "no content row stored" (`None`) and "empty string
3696            // stored" (`Some(extracted_empty)`). The value originally
3697            // came from a `Source` extraction at ingest time; rewrap via
3698            // the storage-internal `from_stored` so the type-system seal
3699            // for adapters stays intact.
3700            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3701            options,
3702        }),
3703        "user" => Ok(Message::User {
3704            id,
3705            session_id,
3706            timestamp,
3707            options,
3708        }),
3709        "assistant" => Ok(Message::Assistant {
3710            id,
3711            session_id,
3712            timestamp,
3713            options,
3714        }),
3715        "tool" => Ok(Message::Tool {
3716            id,
3717            session_id,
3718            timestamp,
3719            options,
3720        }),
3721        other => anyhow::bail!("unknown message role {other}"),
3722    }
3723}
3724
3725pub(crate) fn part_from_batch(
3726    batch: &RecordBatch,
3727    row: usize,
3728    file_data: Option<FileData>,
3729) -> Result<Part> {
3730    let type_name = string(batch, "type", row)?.context("part type is null")?;
3731    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3732    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3733    Ok(Part {
3734        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3735        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3736        id: string(batch, "id", row)?.context("part id is null")?,
3737        ordinal: int32(batch, "ordinal", row)?,
3738        provenance: provenance_from_str(&provenance)?,
3739        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3740        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3741    })
3742}
3743
3744fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3745    match value {
3746        "conversational" => Ok(crate::wire::Provenance::Conversational),
3747        "injected" => Ok(crate::wire::Provenance::Injected),
3748        other => anyhow::bail!("unknown part provenance {other}"),
3749    }
3750}
3751
3752fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3753    let kind = file_data_kind(variant_data)?;
3754    match kind.as_str() {
3755        "string" => {
3756            let text = std::str::from_utf8(bytes)
3757                .context("file string payload is not UTF-8")?
3758                .to_owned();
3759            Ok(FileData::String(text))
3760        }
3761        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3762        "url" => Ok(FileData::Url(
3763            std::str::from_utf8(bytes)
3764                .context("file URL payload is not UTF-8")?
3765                .to_owned(),
3766        )),
3767        other => anyhow::bail!("unknown file data_kind {other}"),
3768    }
3769}
3770
3771fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3772    let value = json_parse::<Value>(variant_data)?;
3773    value
3774        .get("data_kind")
3775        .and_then(Value::as_str)
3776        .map(str::to_owned)
3777        .context("file part variant_data missing data_kind")
3778}
3779
3780fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3781    batch
3782        .column_by_name(name)
3783        .with_context(|| format!("missing column {name}"))?
3784        .as_any()
3785        .downcast_ref::<UInt64Array>()
3786        .with_context(|| format!("column {name} is not UInt64"))
3787}
3788
3789pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3790    let array = batch
3791        .column_by_name(name)
3792        .with_context(|| format!("missing column {name}"))?
3793        .as_any()
3794        .downcast_ref::<StringArray>()
3795        .with_context(|| format!("column {name} is not Utf8"))?;
3796    if array.is_null(row) {
3797        Ok(None)
3798    } else {
3799        Ok(Some(array.value(row).to_owned()))
3800    }
3801}
3802
3803fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3804    // Lance can return a `lance.json` column either as raw JSONB bytes
3805    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3806    // LargeUtf8), depending on the read path. Handle both.
3807    let column = batch
3808        .column_by_name(name)
3809        .with_context(|| format!("missing column {name}"))?;
3810    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3811        return if array.is_null(row) {
3812            Ok(None)
3813        } else {
3814            Ok(Some(
3815                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3816            ))
3817        };
3818    }
3819    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3820        return if array.is_null(row) {
3821            Ok(None)
3822        } else {
3823            Ok(Some(array.value(row).as_bytes().to_vec()))
3824        };
3825    }
3826    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3827        return if array.is_null(row) {
3828            Ok(None)
3829        } else {
3830            Ok(Some(array.value(row).as_bytes().to_vec()))
3831        };
3832    }
3833    anyhow::bail!("column {name} is not a JSON-compatible array")
3834}
3835
3836fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3837    let array = batch
3838        .column_by_name(name)
3839        .with_context(|| format!("missing column {name}"))?
3840        .as_any()
3841        .downcast_ref::<Int32Array>()
3842        .with_context(|| format!("column {name} is not Int32"))?;
3843    Ok(array.value(row))
3844}
3845
3846pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3847    let array = batch
3848        .column_by_name(name)
3849        .with_context(|| format!("missing column {name}"))?
3850        .as_any()
3851        .downcast_ref::<Float32Array>()
3852        .with_context(|| format!("column {name} is not Float32"))?;
3853    Ok(array.value(row))
3854}
3855
3856pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3857    let array = batch
3858        .column_by_name(name)
3859        .with_context(|| format!("missing column {name}"))?
3860        .as_any()
3861        .downcast_ref::<TimestampMicrosecondArray>()
3862        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3863    Utc.timestamp_micros(array.value(row))
3864        .single()
3865        .context("timestamp is out of range")
3866}
3867
3868fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3869    Field::new(name, data_type, nullable).with_metadata(
3870        [(
3871            "lance-schema:unenforced-primary-key".to_owned(),
3872            "true".to_owned(),
3873        )]
3874        .into(),
3875    )
3876}
3877
3878// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3879// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3880// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3881// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3882// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3883// allocated only one column_info, so the decoder's second `expect_next()`
3884// fires `"there were more fields in the schema than provided column
3885// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3886// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3887fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3888    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3889        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3890            .into_iter()
3891            .collect(),
3892    )
3893}
3894
3895fn json_field(name: &str, nullable: bool) -> Field {
3896    lance_arrow::json::json_field(name, nullable)
3897}
3898
3899fn micros(timestamp: DateTime<Utc>) -> i64 {
3900    timestamp.timestamp_micros()
3901}
3902
3903fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3904    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3905    // matches the `lance.json` extension contract. Lance's compact path
3906    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3907    // applies `decode_json -> encode_json` on this column; with proper JSONB
3908    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3909    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3910    // `try_into_dfstream`; compact still goes through the adapter).
3911    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3912    lance_arrow::json::encode_json(&text)
3913        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3914}
3915
3916fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3917    serde_json::from_slice(value).context("failed to parse JSON field")
3918}
3919
3920fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3921    if let PartKind::File {
3922        media_type,
3923        file_name,
3924        data,
3925    } = kind
3926    {
3927        let data_kind = match data {
3928            FileData::String(_) => "string",
3929            FileData::Bytes(_) => "bytes",
3930            FileData::Url(_) => "url",
3931        };
3932        return json_bytes(&serde_json::json!({
3933            "media_type": media_type,
3934            "file_name": file_name,
3935            "data_kind": data_kind,
3936        }));
3937    }
3938    let value = serde_json::to_value(kind)?;
3939    let mut object = value
3940        .as_object()
3941        .cloned()
3942        .context("part variant did not serialize to an object")?;
3943    object.remove("type");
3944    json_bytes(&object)
3945}
3946
3947fn part_kind_from_json(
3948    type_name: &str,
3949    variant_data: &[u8],
3950    file_data: Option<FileData>,
3951) -> Result<PartKind> {
3952    let mut value = json_parse::<Value>(variant_data)?;
3953    let object = value
3954        .as_object_mut()
3955        .context("part variant data is not an object")?;
3956    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3957    if let Some(data) = file_data {
3958        object.remove("data_kind");
3959        object.insert("data".to_owned(), serde_json::to_value(data)?);
3960    }
3961    serde_json::from_value(value).context("failed to parse part kind")
3962}
3963
3964#[cfg(test)]
3965mod tests {
3966    #![allow(clippy::expect_used, clippy::unwrap_used)]
3967
3968    use super::*;
3969    use crate::{
3970        adapter::Extracted,
3971        handlers::ingest_events,
3972        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3973    };
3974    use chrono::Utc;
3975    use serde_json::json;
3976    use tempfile::TempDir;
3977
3978    fn synthetic_session(id: &str) -> Session {
3979        Session {
3980            id: id.to_owned(),
3981            parent_session_id: None,
3982            parent_message_id: None,
3983            source_agent: "claude-code".to_owned(),
3984            created_at: Utc::now(),
3985            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3986            options: ProviderOptions::new(),
3987        }
3988    }
3989
3990    #[test]
3991    fn search_text_excludes_injected_parts() {
3992        use crate::wire::Provenance;
3993        let message = Message::User {
3994            id: "m1".to_owned(),
3995            session_id: "s1".to_owned(),
3996            timestamp: Utc::now(),
3997            options: ProviderOptions::new(),
3998        };
3999        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4000            session_id: "s1".to_owned(),
4001            id: id.to_owned(),
4002            message_id: "m1".to_owned(),
4003            ordinal: 0,
4004            provenance,
4005            options: ProviderOptions::new(),
4006            kind: PartKind::Text {
4007                text: Some(Extracted::from_test_value(text.to_owned())),
4008            },
4009        };
4010
4011        // A conversational part contributes; an injected one is excluded
4012        // (spec.md#search).
4013        let conversational = search_text(
4014            &message,
4015            &[text_part(
4016                "p1",
4017                "real human prompt",
4018                Provenance::Conversational,
4019            )],
4020        );
4021        assert_eq!(conversational.as_deref(), Some("real human prompt"));
4022
4023        let injected = search_text(
4024            &message,
4025            &[text_part(
4026                "p2",
4027                "<task-notification>...</task-notification>",
4028                Provenance::Injected,
4029            )],
4030        );
4031        assert!(
4032            injected.is_none(),
4033            "a message whose only part is injected has null search_text"
4034        );
4035    }
4036
4037    #[test]
4038    fn chunk_ranges_splits_on_byte_budget() {
4039        assert!(chunk_ranges(&[]).is_empty());
4040        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4041
4042        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4043        assert_eq!(
4044            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4045            vec![0..1, 1..2, 2..3],
4046        );
4047
4048        // An oversized single row gets its own chunk, never an infinite loop.
4049        assert_eq!(
4050            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4051            vec![0..1, 1..2, 2..3],
4052        );
4053    }
4054
4055    #[tokio::test]
4056    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4057        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
4058        // Message is dropped on the spot, with one Error outcome surfaced. The
4059        // rest of the substream continues normally - subsequent valid messages
4060        // and parts get written.
4061        let temp = TempDir::new()?;
4062        let store = Store::open_local(temp.path()).await?;
4063        let session = synthetic_session("ordering");
4064        let orphan_part = Part {
4065            session_id: session.id.clone(),
4066            id: "orphan-part".to_owned(),
4067            message_id: "missing-message".to_owned(),
4068            ordinal: 0,
4069            provenance: crate::wire::Provenance::Conversational,
4070            options: ProviderOptions::new(),
4071            kind: PartKind::Text {
4072                text: Some(Extracted::from_test_value("orphan".to_owned())),
4073            },
4074        };
4075        let valid_message = Message::User {
4076            id: "valid-message".to_owned(),
4077            session_id: session.id.clone(),
4078            timestamp: Utc::now(),
4079            options: ProviderOptions::new(),
4080        };
4081        let valid_part = Part {
4082            session_id: session.id.clone(),
4083            id: "valid-part".to_owned(),
4084            message_id: valid_message.id().to_owned(),
4085            ordinal: 0,
4086            provenance: crate::wire::Provenance::Conversational,
4087            options: ProviderOptions::new(),
4088            kind: PartKind::Text {
4089                text: Some(Extracted::from_test_value("kept".to_owned())),
4090            },
4091        };
4092
4093        let mut validator = IngestValidator::default();
4094        validator
4095            .push(&store, 0, IngestEvent::Session(session.clone()))
4096            .await?;
4097        let part_outcomes = validator
4098            .push(&store, 1, IngestEvent::Part(orphan_part))
4099            .await?;
4100        assert_eq!(part_outcomes.len(), 1);
4101        assert_eq!(part_outcomes[0].kind, "part");
4102        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4103        assert!(
4104            part_outcomes[0]
4105                .error
4106                .as_ref()
4107                .map(|e| e.message.contains("part event appeared before a message"))
4108                .unwrap_or(false),
4109            "error message must explain the ordering violation: {part_outcomes:?}"
4110        );
4111        validator
4112            .push(&store, 2, IngestEvent::Message(valid_message))
4113            .await?;
4114        validator
4115            .push(&store, 3, IngestEvent::Part(valid_part))
4116            .await?;
4117        validator.finish(&store).await?;
4118
4119        let (sessions, messages, parts) = store.row_counts().await?;
4120        assert_eq!(sessions, 1, "session committed despite the orphan part");
4121        assert_eq!(messages, 1, "valid message committed");
4122        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4123
4124        Ok(())
4125    }
4126
4127    #[tokio::test]
4128    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4129        // Per-event drop: a duplicate message id within a substream drops the
4130        // *duplicate* and surfaces an Error outcome for it. The first wins; the
4131        // session still commits.
4132        let temp = TempDir::new()?;
4133        let store = Store::open_local(temp.path()).await?;
4134        let session = synthetic_session("duplicate-message");
4135        let first = Message::User {
4136            id: "message-1".to_owned(),
4137            session_id: session.id.clone(),
4138            timestamp: Utc::now(),
4139            options: ProviderOptions::new(),
4140        };
4141        let second = Message::Assistant {
4142            id: "message-1".to_owned(),
4143            session_id: session.id.clone(),
4144            timestamp: Utc::now(),
4145            options: ProviderOptions::new(),
4146        };
4147
4148        let mut validator = IngestValidator::default();
4149        validator
4150            .push(&store, 0, IngestEvent::Session(session.clone()))
4151            .await?;
4152        validator
4153            .push(&store, 1, IngestEvent::Message(first))
4154            .await?;
4155        let dup_outcomes = validator
4156            .push(&store, 2, IngestEvent::Message(second))
4157            .await?;
4158        assert_eq!(dup_outcomes.len(), 1);
4159        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4160        assert!(
4161            dup_outcomes[0]
4162                .error
4163                .as_ref()
4164                .map(|e| e.message.contains("duplicate message id message-1"))
4165                .unwrap_or(false),
4166            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4167        );
4168
4169        validator.finish(&store).await?;
4170        let (sessions, messages, _) = store.row_counts().await?;
4171        assert_eq!(sessions, 1, "session committed");
4172        assert_eq!(messages, 1, "only the first message committed");
4173
4174        Ok(())
4175    }
4176
4177    /// Regression: compact_files on `parts` with the blob column tripped a
4178    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
4179    /// batches give compact fragments to merge; every `FileData` variant
4180    /// exercises the blob round-trip. All-File batches sidestep a debug-only
4181    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
4182    /// write batch mixes null + valid rows in the blob column - benign in
4183    /// release, irrelevant to this regression's scope.
4184    #[tokio::test(flavor = "multi_thread")]
4185    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4186        use crate::wire::{FileData, PartKind, Provenance};
4187        let temp = TempDir::new()?;
4188        let store = Store::open_local(temp.path()).await?;
4189
4190        let session = synthetic_session("compact-blob");
4191        store
4192            .upsert_sessions(std::slice::from_ref(&session))
4193            .await?;
4194
4195        let make_part = |idx: usize, kind: PartKind| Part {
4196            session_id: session.id.clone(),
4197            message_id: format!("msg-{idx}"),
4198            id: format!("part-{idx}"),
4199            ordinal: 0,
4200            provenance: Provenance::Conversational,
4201            options: ProviderOptions::new(),
4202            kind,
4203        };
4204
4205        let batch_a = vec![
4206            make_part(
4207                0,
4208                PartKind::File {
4209                    media_type: Some("text/plain".to_owned()),
4210                    file_name: Some("a.txt".to_owned()),
4211                    data: FileData::Bytes(b"alpha".to_vec()),
4212                },
4213            ),
4214            make_part(
4215                1,
4216                PartKind::File {
4217                    media_type: Some("text/plain".to_owned()),
4218                    file_name: Some("b.txt".to_owned()),
4219                    data: FileData::String("beta".to_owned()),
4220                },
4221            ),
4222        ];
4223        store.upsert_parts(&batch_a).await?;
4224
4225        let batch_b = vec![
4226            make_part(
4227                2,
4228                PartKind::File {
4229                    media_type: Some("application/octet-stream".to_owned()),
4230                    file_name: None,
4231                    data: FileData::Url("https://example.com/file".to_owned()),
4232                },
4233            ),
4234            make_part(
4235                3,
4236                PartKind::File {
4237                    media_type: Some("image/png".to_owned()),
4238                    file_name: Some("c.png".to_owned()),
4239                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4240                },
4241            ),
4242        ];
4243        store.upsert_parts(&batch_b).await?;
4244
4245        store
4246            .optimize_indices(None, &MaintenancePolicy::always_compact())
4247            .await?
4248            .into_result()?;
4249
4250        Ok(())
4251    }
4252
4253    #[tokio::test]
4254    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4255        let temp = TempDir::new()?;
4256        let store = Store::open_local(temp.path()).await?;
4257        let session = synthetic_session("blob");
4258        let message = Message::User {
4259            id: "message-1".to_owned(),
4260            session_id: session.id.clone(),
4261            timestamp: Utc::now(),
4262            options: ProviderOptions::new(),
4263        };
4264        let part = Part {
4265            session_id: session.id.clone(),
4266            id: "part-1".to_owned(),
4267            message_id: message.id().to_owned(),
4268            ordinal: 0,
4269            provenance: crate::wire::Provenance::Conversational,
4270            options: ProviderOptions::new(),
4271            kind: PartKind::File {
4272                media_type: Some("text/plain".to_owned()),
4273                file_name: Some("payload.txt".to_owned()),
4274                data: FileData::Bytes(b"pond".to_vec()),
4275            },
4276        };
4277
4278        let mut validator = IngestValidator::default();
4279        validator
4280            .push(&store, 0, IngestEvent::Session(session.clone()))
4281            .await?;
4282        validator
4283            .push(&store, 1, IngestEvent::Message(message.clone()))
4284            .await?;
4285        validator
4286            .push(&store, 2, IngestEvent::Part(part.clone()))
4287            .await?;
4288        validator.finish(&store).await?;
4289
4290        let stored = store
4291            .get_session(&session.id)
4292            .await?
4293            .expect("session should exist");
4294        let stored_part = &stored.messages[0].parts[0];
4295        assert_eq!(stored_part, &part);
4296
4297        Ok(())
4298    }
4299
4300    //
4301    // `Session.source_agent` and `Session.project` are immutable
4302    // post-first-write because `messages` denormalizes them at first
4303    // ingest; a silent overwrite would desync the denormalized
4304    // copies. pond core's `IngestValidator` probes the existing session
4305    // before the merge_insert and emits a per-row `validation_failed`
4306    // outcome with the typed field name when either changes. Other Session
4307    // fields (options, parent_session_id, created_at, parent_message_id)
4308    // re-write idempotently via merge_insert.
4309
4310    fn base_session() -> Session {
4311        Session {
4312            id: "01HXY00000000001".to_owned(),
4313            parent_session_id: None,
4314            parent_message_id: None,
4315            source_agent: "claude-code".to_owned(),
4316            created_at: Utc::now(),
4317            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4318            options: ProviderOptions::new(),
4319        }
4320    }
4321
4322    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4323        outcomes
4324            .iter()
4325            .filter(|outcome| outcome.status == target)
4326            .count()
4327    }
4328
4329    #[tokio::test(flavor = "multi_thread")]
4330    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4331    -> anyhow::Result<()> {
4332        let temp = TempDir::new()?;
4333        let store = Store::open_local(temp.path()).await?;
4334
4335        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4336        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4337
4338        let mut again = base_session();
4339        again.options.insert("title".to_owned(), json!("renamed"));
4340        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4341        assert_eq!(
4342            count_status(&second, OutcomeStatus::Error),
4343            0,
4344            "options is mutable; the re-ingest must not surface an error: {second:?}",
4345        );
4346        assert_eq!(
4347            count_status(&second, OutcomeStatus::Matched),
4348            1,
4349            "unchanged immutable fields must match-insert via merge_insert",
4350        );
4351
4352        Ok(())
4353    }
4354
4355    #[tokio::test(flavor = "multi_thread")]
4356    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4357        let temp = TempDir::new()?;
4358        let store = Store::open_local(temp.path()).await?;
4359
4360        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4361        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4362
4363        let mut tampered = base_session();
4364        tampered.source_agent = "codex-cli".to_owned();
4365        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4366        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4367        let err_row = second
4368            .iter()
4369            .find(|outcome| outcome.status == OutcomeStatus::Error)
4370            .expect("error outcome present");
4371        let err = err_row.error.as_ref().expect("error body present");
4372        assert_eq!(err.field, Some("source_agent"));
4373        assert_eq!(err.reason, Some("immutable"));
4374
4375        // The stored row stayed on the original adapter - no silent rewrite.
4376        let stored = store
4377            .get_session(&base_session().id)
4378            .await?
4379            .expect("session row survives the rejected re-ingest");
4380        assert_eq!(stored.session.source_agent, "claude-code");
4381
4382        Ok(())
4383    }
4384
4385    #[tokio::test(flavor = "multi_thread")]
4386    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4387        let temp = TempDir::new()?;
4388        let store = Store::open_local(temp.path()).await?;
4389
4390        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4391        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4392
4393        let mut tampered = base_session();
4394        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4395        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4396        let err_row = second
4397            .iter()
4398            .find(|outcome| outcome.status == OutcomeStatus::Error)
4399            .expect("project change must surface an error outcome");
4400        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4401
4402        let stored = store
4403            .get_session(&base_session().id)
4404            .await?
4405            .expect("session row survives");
4406        assert_eq!(
4407            stored.session.project.as_str(),
4408            "/home/me/proj",
4409            "stored project must remain the original",
4410        );
4411
4412        Ok(())
4413    }
4414
4415    #[tokio::test(flavor = "multi_thread")]
4416    async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4417        // Regression guard: re-ingesting an existing session with NEW
4418        // messages must surface as sessions_inserted=0, messages_inserted_*>0
4419        // on `BatchCounts`, and per-row outcomes must mark the new message
4420        // rows `Inserted` while the session row is `Matched`. The prior
4421        // implementation derived all per-row statuses from the batch-level
4422        // session inserted count, which silently flipped the new messages
4423        // into `Matched` (visible as "up to date" in the CLI bar tail).
4424        use crate::wire::Provenance;
4425        let temp = TempDir::new()?;
4426        let store = Store::open_local(temp.path()).await?;
4427        let session = base_session();
4428
4429        let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4430            session_id: session.id.clone(),
4431            id: part_id.to_owned(),
4432            message_id: message_id.to_owned(),
4433            ordinal: 0,
4434            provenance: Provenance::Conversational,
4435            options: ProviderOptions::new(),
4436            kind: PartKind::Text {
4437                text: Some(Extracted::from_test_value(body.to_owned())),
4438            },
4439        };
4440        let user_message = |id: &str| Message::User {
4441            id: id.to_owned(),
4442            session_id: session.id.clone(),
4443            timestamp: Utc::now(),
4444            options: ProviderOptions::new(),
4445        };
4446
4447        // First pass: 2 messages land fresh.
4448        let mut validator = IngestValidator::default();
4449        validator
4450            .push(&store, 0, IngestEvent::Session(session.clone()))
4451            .await?;
4452        validator
4453            .push(&store, 1, IngestEvent::Message(user_message("m1")))
4454            .await?;
4455        validator
4456            .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4457            .await?;
4458        validator
4459            .push(&store, 3, IngestEvent::Message(user_message("m2")))
4460            .await?;
4461        validator
4462            .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4463            .await?;
4464        let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4465        assert_eq!(first_counts.sessions_inserted, 1);
4466        assert_eq!(first_counts.messages_inserted_total, 2);
4467        assert_eq!(first_counts.messages_inserted_searchable, 2);
4468
4469        // Second pass: same session id, 3 NEW messages.
4470        let mut validator = IngestValidator::default();
4471        validator
4472            .push(&store, 0, IngestEvent::Session(session.clone()))
4473            .await?;
4474        for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4475            let pid = format!("p{}", idx + 3);
4476            validator
4477                .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4478                .await?;
4479            validator
4480                .push(
4481                    &store,
4482                    idx * 2 + 2,
4483                    IngestEvent::Part(text_part(&pid, mid, "gamma")),
4484                )
4485                .await?;
4486        }
4487        let (second_outcomes, second_counts) = validator.finish(&store).await?;
4488
4489        assert_eq!(
4490            second_counts.sessions_inserted, 0,
4491            "existing session row must report as Matched, not Inserted",
4492        );
4493        assert_eq!(second_counts.sessions_matched, 1);
4494        assert_eq!(
4495            second_counts.messages_inserted_total, 3,
4496            "the three NEW messages must register as Inserted in BatchCounts",
4497        );
4498        assert_eq!(
4499            second_counts.messages_inserted_searchable, 3,
4500            "all three new messages carry conversational text -> searchable",
4501        );
4502        assert_eq!(second_counts.messages_matched_total, 0);
4503        assert_eq!(second_counts.parts_inserted, 3);
4504        assert_eq!(second_counts.parts_matched, 0);
4505
4506        // Per-row outcomes mirror the BatchCounts shape: the session row is
4507        // Matched, every new message + part row is Inserted.
4508        let session_outcome = second_outcomes
4509            .iter()
4510            .find(|outcome| outcome.kind == "session")
4511            .expect("session-row outcome present");
4512        assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4513        for outcome in &second_outcomes {
4514            if outcome.kind == "message" || outcome.kind == "part" {
4515                assert_eq!(
4516                    outcome.status,
4517                    OutcomeStatus::Inserted,
4518                    "new row must be Inserted, got: {outcome:?}",
4519                );
4520            }
4521        }
4522        Ok(())
4523    }
4524
4525    /// Ingest `count` synthetic messages spread across a handful of sessions
4526    /// and projects, each with conversational `search_text`. Returns the store
4527    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4528    async fn store_with_messages(
4529        temp: &TempDir,
4530        count: usize,
4531    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4532        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4533    }
4534
4535    /// Same as [`store_with_messages`] but tests optimize with a custom
4536    /// IVF_PQ activation threshold.
4537    async fn store_with_messages_at_threshold(
4538        temp: &TempDir,
4539        count: usize,
4540        _vector_threshold: usize,
4541    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4542        let store = Store::open_local(temp.path()).await?;
4543        let sessions = 8.min(count.max(1));
4544        let mut events = Vec::new();
4545        for s in 0..sessions {
4546            events.push(IngestEvent::Session(Session {
4547                id: format!("session-{s}"),
4548                parent_session_id: None,
4549                parent_message_id: None,
4550                source_agent: "claude-code".to_owned(),
4551                created_at: Utc::now(),
4552                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4553                options: ProviderOptions::new(),
4554            }));
4555            for i in (s..count).step_by(sessions) {
4556                let message_id = format!("msg-{i}");
4557                events.push(IngestEvent::Message(Message::User {
4558                    id: message_id.clone(),
4559                    session_id: format!("session-{s}"),
4560                    timestamp: Utc::now(),
4561                    options: ProviderOptions::new(),
4562                }));
4563                events.push(IngestEvent::Part(Part {
4564                    session_id: format!("session-{s}"),
4565                    id: format!("{message_id}-part"),
4566                    message_id,
4567                    ordinal: 0,
4568                    provenance: crate::wire::Provenance::Conversational,
4569                    options: ProviderOptions::new(),
4570                    kind: PartKind::Text {
4571                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4572                    },
4573                }));
4574            }
4575        }
4576        ingest_events(&store, events).await?;
4577        let keys = (0..count)
4578            .map(|i| MessageKey {
4579                session_id: format!("session-{}", i % sessions),
4580                message_id: format!("msg-{i}"),
4581            })
4582            .collect();
4583        Ok((store, keys))
4584    }
4585
4586    /// A deterministic pseudo-random vector of the production dimension.
4587    fn synthetic_vector(seed: usize) -> Vec<f32> {
4588        let mut state = (seed as u64)
4589            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4590            .wrapping_add(1);
4591        (0..embedding_dim())
4592            .map(|_| {
4593                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4594                #[allow(clippy::cast_precision_loss)]
4595                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4596                unit - 1.0
4597            })
4598            .collect()
4599    }
4600
4601    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4602    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4603        keys.iter()
4604            .enumerate()
4605            .map(|(seed, key)| EmbeddedMessage {
4606                session_id: key.session_id.clone(),
4607                id: key.message_id.clone(),
4608                vector: synthetic_vector(seed),
4609            })
4610            .collect()
4611    }
4612
4613    fn embedding_update_batch_with_model(
4614        rows: &[EmbeddedMessage],
4615        model: &str,
4616    ) -> Result<RecordBatch> {
4617        let mut batch = embedding_update_batch(rows)?;
4618        let columns = batch
4619            .columns()
4620            .iter()
4621            .take(3)
4622            .cloned()
4623            .chain(std::iter::once(
4624                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4625            ))
4626            .collect::<Vec<_>>();
4627        batch = RecordBatch::try_new(batch.schema(), columns)?;
4628        Ok(batch)
4629    }
4630
4631    #[tokio::test]
4632    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4633        let temp = TempDir::new()?;
4634        // 4 messages cycle session-0..session-3, so `session-3` is a real
4635        // partition. Scalar-index pushdown is volume-independent: the planner
4636        // emits `ScalarIndexQuery` whenever the index exists.
4637        let (store, keys) = store_with_messages(&temp, 4).await?;
4638        store.write_embeddings(&embedded(&keys)).await?;
4639        store
4640            .optimize_indices(None, &MaintenancePolicy::always_compact())
4641            .await?
4642            .into_result()?;
4643
4644        let query = vec![0.01_f32; embedding_dim()];
4645        let plan = store
4646            .explain_vector_plan(
4647                &query,
4648                10,
4649                &Predicate::Eq("session_id", "session-3".into()),
4650                None,
4651            )
4652            .await?;
4653
4654        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4655        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4656        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4657        // and unrelated.)
4658        assert!(
4659            plan.contains("ScalarIndexQuery"),
4660            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4661        );
4662        let predicate_postfiltered = plan
4663            .lines()
4664            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4665        assert!(
4666            !predicate_postfiltered,
4667            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4668        );
4669        Ok(())
4670    }
4671
4672    #[tokio::test]
4673    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4674        let temp = TempDir::new()?;
4675        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4676
4677        // First batch: 255 vectors, one below threshold. Optimize does not
4678        // create the IVF_PQ because the trigger is not met.
4679        store.write_embeddings(&embedded(&keys[..255])).await?;
4680        store
4681            .optimize_indices_with_vector_threshold(256)
4682            .await?
4683            .into_result()?;
4684        assert!(
4685            !store
4686                .handle
4687                .messages_index_names()
4688                .await?
4689                .iter()
4690                .any(|name| name == MESSAGES_VECTOR_INDEX),
4691            "IVF_PQ must not exist below the activation threshold",
4692        );
4693
4694        // Next batch: one more vector. Total reaches 256; optimize creates
4695        // the IVF_PQ.
4696        store.write_embeddings(&embedded(&keys[255..256])).await?;
4697        store
4698            .optimize_indices_with_vector_threshold(256)
4699            .await?
4700            .into_result()?;
4701        assert!(
4702            store
4703                .handle
4704                .messages_index_names()
4705                .await?
4706                .iter()
4707                .any(|name| name == MESSAGES_VECTOR_INDEX),
4708            "optimize must create the IVF_PQ once the threshold is crossed",
4709        );
4710
4711        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4712        // non-null subset and a planted vector is retrievable.
4713        let hits = store
4714            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4715            .await?;
4716        assert!(
4717            hits.iter().any(|(key, _)| key == &keys[0]),
4718            "an embedded row is retrievable via the index",
4719        );
4720        Ok(())
4721    }
4722
4723    #[tokio::test]
4724    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4725    {
4726        let temp = TempDir::new()?;
4727        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4728        let old_rows = embedded(&keys);
4729        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4730        store
4731            .handle
4732            .merge_update(Table::Messages, old_batch, old_rows.len())
4733            .await?;
4734        store
4735            .optimize_indices_with_vector_threshold(256)
4736            .await?
4737            .into_result()?;
4738        assert!(
4739            store
4740                .handle
4741                .messages_index_names()
4742                .await?
4743                .iter()
4744                .any(|name| name == MESSAGES_VECTOR_INDEX),
4745            "IVF_PQ must exist before a model swap",
4746        );
4747        assert_eq!(store.stale_embedding_count().await?, keys.len());
4748
4749        store.drop_vector_index().await?;
4750        let mut pending = Vec::new();
4751        let stream = store.pending_or_stale_messages();
4752        tokio::pin!(stream);
4753        while let Some(row) = stream.next().await {
4754            pending.push(row?);
4755        }
4756        assert_eq!(
4757            pending.len(),
4758            keys.len(),
4759            "force stream should see stale rows"
4760        );
4761        store.write_embeddings(&embedded(&keys)).await?;
4762        assert_eq!(store.stale_embedding_count().await?, 0);
4763        store
4764            .optimize_indices_with_vector_threshold(256)
4765            .await?
4766            .into_result()?;
4767        assert!(
4768            store
4769                .handle
4770                .messages_index_names()
4771                .await?
4772                .iter()
4773                .any(|name| name == MESSAGES_VECTOR_INDEX),
4774            "optimize must rebuild IVF_PQ after force re-embed",
4775        );
4776
4777        let stream = store.pending_or_stale_messages();
4778        tokio::pin!(stream);
4779        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4780        Ok(())
4781    }
4782
4783    #[tokio::test]
4784    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4785        // Regression: `_row_last_updated_at_version` can point at a Lance
4786        // manifest version that `cleanup_old_versions` or the auto_cleanup
4787        // hook has since dropped from `Dataset::versions()`. The old code
4788        // silently dropped any session whose row-version was not in the
4789        // visible list, collapsing the staleness-skip map down to recent
4790        // commits and forcing `pond sync` to re-touch every file. The fix
4791        // falls back to the oldest still-visible commit timestamp - a
4792        // sound upper bound on the row's true ingest time.
4793        let temp = TempDir::new()?;
4794        let (store, _keys) = store_with_messages(&temp, 4).await?;
4795
4796        // Produce several distinct manifest versions on `sessions` so the
4797        // older ones become eligible for cleanup.
4798        for tag in 0..3 {
4799            let extra = synthetic_session(&format!("extra-{tag}"));
4800            store.upsert_sessions(&[extra]).await?;
4801        }
4802
4803        // Prune everything older than ~now, leaving only the latest manifest.
4804        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4805        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4806        // Lance's `delete_unverified` floor still protects in-flight files.
4807        let dataset = store.handle.dataset(Table::Sessions).await?;
4808        dataset
4809            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4810            .await
4811            .context("cleanup_old_versions failed")?;
4812
4813        let map = store.session_last_ingested_at().await?;
4814        let session_count = store.row_counts().await?.0;
4815        assert!(
4816            map.len() >= session_count,
4817            "watermark map ({}) must still cover every session ({}) after \
4818             version cleanup; an empty fallback regresses pond sync to a \
4819             full re-scan",
4820            map.len(),
4821            session_count,
4822        );
4823        Ok(())
4824    }
4825
4826    #[tokio::test]
4827    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4828        let temp = TempDir::new()?;
4829        let (store, keys) = store_with_messages(&temp, 10).await?;
4830
4831        let before = store.embedding_progress().await?;
4832        assert_eq!(before.embedded, 0);
4833        assert_eq!(before.total, 10);
4834        assert_eq!(before.model, crate::embed::model_id());
4835
4836        store.write_embeddings(&embedded(&keys[..4])).await?;
4837        let partial = store.embedding_progress().await?;
4838        assert_eq!(partial.embedded, 4);
4839        assert_eq!(partial.total, 10);
4840
4841        store.write_embeddings(&embedded(&keys[4..])).await?;
4842        let full = store.embedding_progress().await?;
4843        assert_eq!(full.embedded, 10);
4844        assert_eq!(full.total, 10);
4845        Ok(())
4846    }
4847}