Skip to main content

pond/
sessions.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    path::Path,
4    sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13    Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14    LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15    UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25    config, embed,
26    substrate::{
27        Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, OptimizeProgressFn,
28        PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table, TableOptimizeOutcome, TableSizes,
29        VECTOR_INDEX_ACTIVATION_ROWS,
30    },
31    wire::{FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session},
32};
33use url::Url;
34
35#[derive(Debug)]
36pub struct Store {
37    handle: Handle,
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
41pub struct LanceArchiveCounts {
42    pub sessions: usize,
43    pub messages: usize,
44    pub parts: usize,
45}
46
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
48pub struct LanceArchiveVersions {
49    pub sessions: u64,
50    pub messages: u64,
51    pub parts: u64,
52}
53
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
55pub struct LanceArchiveExport {
56    pub rows: LanceArchiveCounts,
57    pub source_versions: LanceArchiveVersions,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveImport {
62    pub rows: LanceArchiveCounts,
63    pub inserted: LanceArchiveCounts,
64}
65
66#[derive(Debug, Clone, Default)]
67pub struct IndexIntents {
68    pub sessions: Vec<IndexIntent>,
69    pub messages: Vec<IndexIntent>,
70    pub parts: Vec<IndexIntent>,
71}
72
73impl IndexIntents {
74    fn all(&self) -> [(Table, &[IndexIntent]); 3] {
75        [
76            (Table::Sessions, &self.sessions),
77            (Table::Messages, &self.messages),
78            (Table::Parts, &self.parts),
79        ]
80    }
81}
82
83/// A message awaiting embedding: its primary key plus the `search_text` to
84/// embed. The vector lives on the same `messages` row, so no denormalized
85/// filter columns are needed (spec.md#session-embed-from-canonical).
86#[derive(Debug, Clone, PartialEq)]
87pub struct PendingMessage {
88    pub session_id: String,
89    pub id: String,
90    pub search_text: String,
91}
92
93/// One embedded message: a primary key and the vector to store. `pond embed`
94/// writes a batch of these into `messages.vector` keyed on `(session_id, id)`.
95#[derive(Debug, Clone, PartialEq)]
96pub struct EmbeddedMessage {
97    pub session_id: String,
98    pub id: String,
99    pub vector: Vec<f32>,
100}
101
102/// Message metadata used to hydrate search hits after retriever ranking.
103#[derive(Debug, Clone, PartialEq)]
104pub struct MessageMeta {
105    pub message_id: String,
106    pub session_id: String,
107    pub role: String,
108    pub project: String,
109    pub source_agent: String,
110    pub timestamp: DateTime<Utc>,
111    pub search_text: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
115pub struct MessageKey {
116    pub session_id: String,
117    pub message_id: String,
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum UpsertStatus {
122    Inserted,
123    Matched,
124}
125
126/// What one `Store::optimize_indices` or `Store::build_indices_only` pass did
127/// across every table. Each [`TableOptimizeOutcome`] reports phase-by-phase
128/// results so the CLI can render compaction-skipped (under writer contention)
129/// distinctly from index-build failure (real problem).
130#[derive(Debug, Default)]
131pub struct OptimizeOutcome {
132    pub tables: Vec<TableOptimizeOutcome>,
133}
134
135impl OptimizeOutcome {
136    /// True if any table's indices phase reported a non-conflict failure.
137    /// `SkippedConflict` is expected under contention and does not count.
138    pub fn any_indices_failed(&self) -> bool {
139        self.tables.iter().any(|t| t.indices.is_failed())
140    }
141
142    /// Treat any `Failed` phase as an error. Tests that don't run under
143    /// contention use this to keep their existing `.await?` style: a real
144    /// failure becomes an `Err`, while `SkippedConflict` is impossible there.
145    pub fn into_result(self) -> Result<Self> {
146        for table in &self.tables {
147            if let PhaseOutcome::Failed(error) = &table.indices {
148                anyhow::bail!(
149                    "indices phase failed on {}: {error:#}",
150                    table.table.as_str()
151                );
152            }
153            if let PhaseOutcome::Failed(error) = &table.compaction {
154                anyhow::bail!(
155                    "compaction phase failed on {}: {error:#}",
156                    table.table.as_str()
157                );
158            }
159        }
160        Ok(self)
161    }
162}
163
164/// Default manifest-retention window for `pond index optimize`'s explicit
165/// cleanup pass. Matches LanceDB's recommended OSS-operator practice
166/// (lancedb docs: performance.mdx, tables/update.mdx). Note: with
167/// `delete_unverified=false` (default), Lance protects files newer than
168/// 7 days regardless of this value (`UNVERIFIED_THRESHOLD_DAYS` in
169/// lance/dataset/cleanup.rs).
170pub fn default_cleanup_older_than() -> chrono::Duration {
171    chrono::Duration::days(1)
172}
173
174/// Cleanup parameters for the compaction phase of `pond index optimize`.
175/// `delete_unverified=true` overrides Lance's 7-day in-progress safety
176/// guard - required to reclaim files younger than 7 days, unsafe under
177/// concurrent writers.
178#[derive(Debug, Clone, Copy)]
179pub struct CleanupConfig {
180    pub older_than: chrono::Duration,
181    pub delete_unverified: bool,
182}
183
184impl Default for CleanupConfig {
185    fn default() -> Self {
186        Self {
187            older_than: default_cleanup_older_than(),
188            delete_unverified: false,
189        }
190    }
191}
192
193/// What `pond status` reports: where the data lives, total rows per table,
194/// and a per-(adapter, project) breakdown built from one `messages` scan.
195#[derive(Debug, Clone)]
196pub struct CorpusStats {
197    pub data_url: Url,
198    pub totals: RowTotals,
199    /// One entry per adapter present in the corpus. When `include_subagents`
200    /// is false (the CLI default), sub-agent rows (`source_agent` containing
201    /// `/`) are filtered out so only the main-agent sessions appear. When
202    /// true, each distinct `source_agent` (e.g. `claude-code/general-purpose`)
203    /// gets its own entry. Always in alphabetical order; the CLI re-sorts
204    /// this into registry order at render time so the tree matches the
205    /// discovery picker.
206    pub adapters: Vec<AdapterStats>,
207    /// Whether the rollup includes sub-agent sessions. The renderer prints a
208    /// hint about `--include-subagents` when this is false so users know the
209    /// `totals` row above counts sessions that aren't broken down below.
210    pub include_subagents: bool,
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub struct RowTotals {
215    pub sessions: u64,
216    pub messages: u64,
217    pub parts: u64,
218}
219
220/// Embedding coverage for `pond status` / `pond embed`. `total` is the count of
221/// `messages` rows that carry `search_text` (i.e. are eligible to embed); rows
222/// without `search_text` produce no vector. `embedded` is the subset of those
223/// already carrying a vector under the current [`embed::model_id()`]. The pending
224/// backlog is `total - embedded`.
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub struct EmbeddingProgress {
227    pub embedded: usize,
228    pub total: usize,
229    pub model: &'static str,
230}
231
232#[derive(Debug, Clone)]
233pub struct AdapterStats {
234    /// Either the main-agent name (`claude-code`) when sub-agents are filtered
235    /// out, or the full `source_agent` (`claude-code/general-purpose`) when
236    /// `include_subagents` is on.
237    pub adapter: String,
238    pub sessions: u64,
239    pub messages: u64,
240    /// Projects under this adapter, sorted by message count desc, then by
241    /// project name asc.
242    pub projects: Vec<ProjectStats>,
243}
244
245#[derive(Debug, Clone)]
246pub struct ProjectStats {
247    pub project: String,
248    pub sessions: u64,
249    pub messages: u64,
250}
251
252#[derive(Default)]
253struct GroupAccumulator {
254    messages: u64,
255    session_ids: HashSet<String>,
256}
257
258#[derive(Debug, Clone, Copy)]
259pub struct MessageWrite<'a> {
260    pub message: &'a Message,
261    pub parts: &'a [Part],
262    pub search_text: Option<&'a str>,
263}
264
265impl Store {
266    /// Open against a local filesystem URL or a remote one for which the
267    /// caller has no extra options to pass (env vars suffice). CLI verbs
268    /// that load `[storage]` from config should call
269    /// [`Store::open_with_options`] instead so the same options flow into
270    /// every dataset open and write.
271    pub async fn open(location: &Url) -> Result<Self> {
272        Ok(Self {
273            handle: Handle::open(location).await?,
274        })
275    }
276
277    /// Open with object-store options (S3 creds, region, endpoint, ...)
278    /// threaded through Lance verbatim. Keys are the standard `object_store`
279    /// config names; pond does not parse them. Empty options + default caps
280    /// is equivalent to [`Store::open`]. Cache caps come from the `[runtime]`
281    /// config block via [`crate::substrate::RuntimeCaps`].
282    pub async fn open_with_options(
283        location: &Url,
284        storage_options: std::collections::HashMap<String, String>,
285        caps: crate::substrate::RuntimeCaps,
286    ) -> Result<Self> {
287        Ok(Self {
288            handle: Handle::open_with_options(location, storage_options, caps).await?,
289        })
290    }
291
292    /// Convenience for tests and CLI verbs holding a `&Path`: wraps the path in
293    /// a `file://...` URL via [`config::url_for_path`] before opening. Routes
294    /// through [`Store::open_with_options`] so the production policy is
295    /// applied; tests get the backend-aware local-FS defaults.
296    pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
297        let url = config::url_for_path(path)?;
298        Self::open_with_options(
299            &url,
300            std::collections::HashMap::new(),
301            crate::substrate::RuntimeCaps::default(),
302        )
303        .await
304    }
305
306    /// Export clean, index-free Lance datasets into `dest`.
307    ///
308    /// This rewrites the visible rows of each table instead of copying the
309    /// dataset roots. The resulting manifests therefore contain no references
310    /// to the source store's `_indices`, while `messages.vector` and
311    /// `messages.embedding_model` remain ordinary data columns and are
312    /// preserved.
313    pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
314        std::fs::create_dir_all(dest)
315            .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
316        let (sessions, sessions_version) = self
317            .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
318            .await?;
319        let (messages, messages_version) = self
320            .export_clean_table(Table::Messages, &dest.join("messages.lance"))
321            .await?;
322        let (parts, parts_version) = self
323            .export_clean_table(Table::Parts, &dest.join("parts.lance"))
324            .await?;
325        Ok(LanceArchiveExport {
326            rows: LanceArchiveCounts {
327                sessions,
328                messages,
329                parts,
330            },
331            source_versions: LanceArchiveVersions {
332                sessions: sessions_version,
333                messages: messages_version,
334                parts: parts_version,
335            },
336        })
337    }
338
339    pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
340        let sessions_dataset =
341            open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
342        let messages_dataset =
343            open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
344        let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
345        let (sessions, sessions_inserted) = self
346            .import_clean_table(Table::Sessions, sessions_dataset)
347            .await?;
348        let (messages, messages_inserted) = self
349            .import_clean_table(Table::Messages, messages_dataset)
350            .await?;
351        let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
352        Ok(LanceArchiveImport {
353            rows: LanceArchiveCounts {
354                sessions,
355                messages,
356                parts,
357            },
358            inserted: LanceArchiveCounts {
359                sessions: sessions_inserted,
360                messages: messages_inserted,
361                parts: parts_inserted,
362            },
363        })
364    }
365
366    async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
367        let dataset = self.handle.dataset(table).await?;
368        let source_version = dataset.version_id();
369        let schema = export_schema(table);
370        let mut stream = dataset
371            .scan()
372            .try_into_stream()
373            .await
374            .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
375        let dest_uri = dest
376            .to_str()
377            .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
378
379        let mut rows = 0usize;
380        let mut wrote = false;
381        while let Some(batch) = stream.next().await {
382            let batch = batch
383                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
384            rows += batch.num_rows();
385            let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
386            let mut params = write_params_for_create();
387            if wrote {
388                params.mode = WriteMode::Append;
389            }
390            Dataset::write(reader, dest_uri, Some(params))
391                .await
392                .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
393            wrote = true;
394        }
395
396        if !wrote {
397            let batch = RecordBatch::new_empty(schema.clone());
398            let reader = RecordBatchIterator::new([Ok(batch)], schema);
399            Dataset::write(reader, dest_uri, Some(write_params_for_create()))
400                .await
401                .with_context(|| {
402                    format!("failed to write empty {} archive table", table.as_str())
403                })?;
404        }
405        Ok((rows, source_version))
406    }
407
408    async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
409        let mut stream = dataset
410            .scan()
411            .try_into_stream()
412            .await
413            .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
414        let mut rows = 0usize;
415        let mut inserted = 0usize;
416        while let Some(batch) = stream.next().await {
417            let batch = batch
418                .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
419            let row_count = batch.num_rows();
420            rows += row_count;
421            inserted += self
422                .handle
423                .merge_insert(table, batch, row_count)
424                .await
425                .with_context(|| format!("failed to import {} archive table", table.as_str()))?
426                as usize;
427        }
428        Ok((rows, inserted))
429    }
430
431    pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
432        if sessions.is_empty() {
433            return Ok(Vec::new());
434        }
435        let batches = sessions_batches(sessions)?;
436        let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
437        // Per-row outcomes; no fold here (see `pond index optimize`).
438        Ok(statuses_from_inserted(sessions.len(), inserted))
439    }
440
441    /// Batched write path used by the adapter ingest loop and by the wire
442    /// handler's final flush. Receives N completed substreams from the
443    /// validator and:
444    ///
445    ///   1. Runs the immutable-fields check (spec.md#protocol) against the stored row
446    ///      per session, sequentially. Sessions that fail produce one Error
447    ///      outcome and are excluded from the write batch.
448    ///   2. Deduplicates in-batch at the substream level: when two substreams
449    ///      in the same batch share a `session_id` (Claude Code's subagent
450    ///      files reuse their parent's id), the first occurrence wins. The
451    ///      second is either *merged* (same `source_agent` + `project`:
452    ///      messages/parts append, no duplicate rows) or *rejected*
453    ///      (different `project` - the subagent-vs-parent case). Row-level
454    ///      duplicates that slip past here are caught downstream by Lance's
455    ///      `SourceDedupeBehavior::FirstSeen` in `substrate::merge_insert`
456    ///      (invariant 17): this layer's job is preserving substream merge
457    ///      semantics, not policing the PK uniqueness Lance handles itself.
458    ///   3. Builds one combined `RecordBatch` per table (sessions, messages,
459    ///      parts) across every valid substream.
460    ///   4. Fires the three `merge_insert` calls in parallel via
461    ///      `tokio::try_join!`. Cross-table mutex on `CachedDataset` is
462    ///      independent, so these proceed concurrently.
463    ///   5. Composes per-session [`RowOutcome`]s in original substream order.
464    async fn upsert_session_batch(
465        &self,
466        substreams: Vec<CompletedSubstream>,
467    ) -> Result<Vec<RowOutcome>> {
468        if substreams.is_empty() {
469            return Ok(Vec::new());
470        }
471
472        let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
473
474        // In-batch dedup. First occurrence of each session_id wins; later
475        // occurrences either merge or get rejected. Iteration order preserves
476        // original substream order so outcomes index correctly.
477        let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
478        let mut by_session_id: std::collections::HashMap<String, usize> =
479            std::collections::HashMap::with_capacity(substreams.len());
480        for substream in substreams {
481            if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
482                let existing = &merged[existing_idx];
483                if existing.session.source_agent != substream.session.source_agent
484                    || existing.session.project != substream.session.project
485                {
486                    // Subagent-vs-parent class. The first occurrence's
487                    // metadata stays authoritative; this substream is
488                    // rejected on the same immutable-field axis as the
489                    // storage-side check.
490                    let reason = if existing.session.source_agent != substream.session.source_agent
491                    {
492                        IngestError::ImmutableField {
493                            field: "source_agent",
494                            session_id: substream.session.id.clone(),
495                            stored: existing.session.source_agent.clone(),
496                            attempted: substream.session.source_agent.clone(),
497                        }
498                    } else {
499                        IngestError::ImmutableField {
500                            field: "project",
501                            session_id: substream.session.id.clone(),
502                            stored: (*existing.session.project).clone(),
503                            attempted: (*substream.session.project).clone(),
504                        }
505                    };
506                    let field = match &reason {
507                        IngestError::ImmutableField { field, .. } => Some(*field),
508                    };
509                    let reason_key = match field {
510                        Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
511                        Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
512                        _ => DROP_REASON_UNCATEGORIZED,
513                    };
514                    outcomes.extend(error_outcomes_for_substream(
515                        substream.session_index,
516                        &substream.session,
517                        &substream.messages,
518                        reason.to_string(),
519                        field,
520                        reason_key,
521                    ));
522                    continue;
523                }
524                // Same session, same metadata: merge messages. Dedup message
525                // ids defensively (within one batch, the validator's seen
526                // sets are per-substream so cross-substream dups can happen
527                // legally if both files re-emit the same row).
528                let existing = &mut merged[existing_idx];
529                let mut seen: std::collections::HashSet<String> = existing
530                    .messages
531                    .iter()
532                    .map(|m| m.message.id().to_owned())
533                    .collect();
534                for msg in substream.messages {
535                    if seen.insert(msg.message.id().to_owned()) {
536                        existing.messages.push(msg);
537                    }
538                }
539                continue;
540            }
541            by_session_id.insert(substream.session.id.clone(), merged.len());
542            merged.push(substream);
543        }
544
545        let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
546        for substream in merged {
547            if let Some(existing) = self.find_session(&substream.session.id).await?
548                && let Err(failure) = ensure_immutable_match(&existing, &substream.session)
549            {
550                let field = match &failure {
551                    IngestError::ImmutableField { field, .. } => Some(*field),
552                };
553                let reason_key = match field {
554                    Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
555                    Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
556                    _ => DROP_REASON_UNCATEGORIZED,
557                };
558                outcomes.extend(error_outcomes_for_substream(
559                    substream.session_index,
560                    &substream.session,
561                    &substream.messages,
562                    failure.to_string(),
563                    field,
564                    reason_key,
565                ));
566                continue;
567            }
568            writeable.push(substream);
569        }
570
571        if writeable.is_empty() {
572            outcomes.sort_by_key(|outcome| outcome.index);
573            return Ok(outcomes);
574        }
575
576        let sessions_owned: Vec<Session> = writeable
577            .iter()
578            .map(|substream| substream.session.clone())
579            .collect();
580        let message_rows: Vec<MessageBatchRow<'_>> = writeable
581            .iter()
582            .flat_map(|substream| {
583                substream.messages.iter().map(|buffered| MessageBatchRow {
584                    message: &buffered.message,
585                    source_agent: &substream.session.source_agent,
586                    project: &substream.session.project,
587                    search_text: buffered.search_text.as_deref(),
588                })
589            })
590            .collect();
591        let part_rows: Vec<Part> = writeable
592            .iter()
593            .flat_map(|substream| {
594                substream.messages.iter().flat_map(|buffered| {
595                    buffered
596                        .parts
597                        .iter()
598                        .map(|buffered_part| buffered_part.part.clone())
599                })
600            })
601            .collect();
602
603        let session_batches = sessions_batches(&sessions_owned)?;
604        let message_batches = messages_batches(&message_rows)?;
605        let part_batches = parts_batches(&part_rows)?;
606
607        let sessions_count = sessions_owned.len();
608
609        let (sessions_inserted, messages_inserted, parts_inserted) = tokio::try_join!(
610            merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
611            merge_insert_chunks(&self.handle, Table::Messages, message_batches),
612            merge_insert_chunks(&self.handle, Table::Parts, part_batches),
613        )?;
614        // Per-session success outcomes: each substream's own status row plus
615        // per-message and per-part rows. The Lance `merge_insert` returns a
616        // single batch-level "inserted vs matched" count, not per-row; we
617        // can't tell which row matched which, so for the batched path each
618        // session/message/part is marked `Inserted` if the batch had any
619        // inserts, else `Matched`. This is the same semantic the
620        // single-session path used (see `statuses_from_inserted`).
621        let sessions_status = if sessions_inserted == sessions_count as u64 {
622            UpsertStatus::Inserted
623        } else if sessions_inserted == 0 {
624            UpsertStatus::Matched
625        } else {
626            // Mixed - mark per index by re-checking? Cheaper to just count
627            // all as Inserted; the operator can read the precise insert
628            // count from the `IngestSummary`.
629            UpsertStatus::Inserted
630        };
631        let _ = messages_inserted; // counted via summary, not per-row here
632        let _ = parts_inserted;
633
634        for substream in &writeable {
635            outcomes.extend(success_outcomes_for_substream(
636                substream.session_index,
637                &substream.session,
638                &substream.messages,
639                sessions_status,
640            ));
641        }
642
643        outcomes.sort_by_key(|outcome| outcome.index);
644        Ok(outcomes)
645    }
646
647    pub async fn upsert_messages(
648        &self,
649        session: &Session,
650        messages: &[MessageWrite<'_>],
651    ) -> Result<Vec<UpsertStatus>> {
652        if messages.is_empty() {
653            return Ok(Vec::new());
654        }
655
656        let rows = messages
657            .iter()
658            .map(|write| MessageBatchRow {
659                message: write.message,
660                source_agent: &session.source_agent,
661                project: &session.project,
662                search_text: write.search_text,
663            })
664            .collect::<Vec<_>>();
665        let batches = messages_batches(&rows)?;
666        let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
667        Ok(statuses_from_inserted(messages.len(), inserted))
668    }
669
670    pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
671        if parts.is_empty() {
672            return Ok(Vec::new());
673        }
674        let batches = parts_batches(parts)?;
675        let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
676        Ok(statuses_from_inserted(parts.len(), inserted))
677    }
678
679    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
680        let Some(session) = self.find_session(session_id).await? else {
681            return Ok(None);
682        };
683        let messages = self.messages_for_session(session_id).await?;
684        Ok(Some(SessionWithMessages { session, messages }))
685    }
686
687    /// Every session id currently in the store, unsorted.
688    pub async fn session_ids(&self) -> Result<Vec<String>> {
689        let batch = self
690            .handle
691            .scan_batch(Table::Sessions, None, &["id"])
692            .await?;
693        let mut ids = Vec::with_capacity(batch.num_rows());
694        for row in 0..batch.num_rows() {
695            if let Some(id) = string(&batch, "id", row)? {
696                ids.push(id);
697            }
698        }
699        Ok(ids)
700    }
701
702    pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
703        let batch = self
704            .handle
705            .scan_batch(
706                Table::Sessions,
707                Some(&Predicate::Eq(
708                    "parent_session_id",
709                    parent_session_id.into(),
710                )),
711                &[
712                    "id",
713                    "parent_session_id",
714                    "parent_message_id",
715                    "source_agent",
716                    "created_at",
717                    "project",
718                    "options",
719                ],
720            )
721            .await?;
722        let mut sessions = Vec::with_capacity(batch.num_rows());
723        for row in 0..batch.num_rows() {
724            sessions.push(session_from_batch(&batch, row)?);
725        }
726        sessions.sort_by(|left, right| left.id.cmp(&right.id));
727        Ok(sessions)
728    }
729
730    /// `session_id -> wall-clock time of the Lance manifest version that
731    /// last wrote the row` for the per-session staleness skip
732    /// (spec.md#adapter-integrity-event-ordering). Reads Lance's `_row_last_updated_at_version` system
733    /// column (available because pond enables stable row ids per spec.md#lance-table-creation-stable-row-ids)
734    /// and joins it against `Dataset::versions()` for commit timestamps.
735    pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
736        use lance::deps::arrow_array::UInt64Array;
737
738        let dataset = self.handle.dataset(Table::Sessions).await?;
739        let version_list = dataset.versions().await?;
740        let versions: HashMap<u64, DateTime<Utc>> = version_list
741            .iter()
742            .map(|v| (v.version, v.timestamp))
743            .collect();
744        // `Dataset::cleanup_old_versions` (and the auto_cleanup hook) drops
745        // pruned versions from the manifest list, leaving rows whose
746        // `_row_last_updated_at_version` points at a version that no longer
747        // resolves. Those rows are still real and were ingested at some time
748        // <= the oldest still-visible version's commit timestamp - so falling
749        // back to that bound preserves a sound `mtime <= ingested` upper edge
750        // and keeps the staleness skip working after cleanup.
751        let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
752
753        let scanner = self
754            .handle
755            .scan(
756                Table::Sessions,
757                ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
758            )
759            .await?;
760        let mut stream = scanner.try_into_stream().await?;
761        let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
762        while let Some(batch) = stream.next().await {
763            let batch = batch?;
764            let version_array = batch
765                .column_by_name("_row_last_updated_at_version")
766                .context("missing _row_last_updated_at_version column")?
767                .as_any()
768                .downcast_ref::<UInt64Array>()
769                .context("_row_last_updated_at_version is not UInt64")?;
770            for row in 0..batch.num_rows() {
771                let Some(id) = string(&batch, "id", row)? else {
772                    continue;
773                };
774                if version_array.is_null(row) {
775                    continue;
776                }
777                let version = version_array.value(row);
778                let ts = versions.get(&version).copied().or(oldest_visible_ts);
779                if let Some(ts) = ts {
780                    out.insert(id, ts);
781                }
782            }
783        }
784        Ok(out)
785    }
786
787    /// Whole-session view for `pond_get` session mode (spec.md#protocol).
788    /// Conversational filters to `search_text IS NOT NULL`; Complete and
789    /// Verbatim scan every message. Every mode attaches compact part summaries;
790    /// Verbatim additionally inlines full parts. `after_id` is an exclusive
791    /// lower bound (a message id); the page is bounded by `limit` and a byte
792    /// budget and never cuts mid-message.
793    pub async fn session_view(
794        &self,
795        session_id: &str,
796        params: SessionViewParams<'_>,
797    ) -> Result<GetLookup<SessionPage>> {
798        let Some(session) = self.find_session(session_id).await? else {
799            return Ok(GetLookup::NotFound);
800        };
801
802        let mut rows = match params.mode {
803            ResponseMode::Conversational => self
804                .scan_conversational_messages(session_id)
805                .await?
806                .into_iter()
807                .map(|row| ScanRow {
808                    id: row.message_id,
809                    role: row.role,
810                    timestamp: row.timestamp,
811                    text: Some(row.text.into_inner()),
812                    content: None,
813                })
814                .collect(),
815            ResponseMode::Complete | ResponseMode::Verbatim => {
816                self.scan_all_messages(session_id).await?
817            }
818        };
819        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
820
821        let start_at = match params.after_id {
822            // Append-only stream: a real anchor never vanishes, so an unknown
823            // `after_id` is a stale/mistyped client cursor, not "start over".
824            Some(after) => match rows.iter().position(|row| row.id == after) {
825                Some(idx) => idx + 1,
826                None => return Ok(GetLookup::UnknownAfterId),
827            },
828            None => 0,
829        };
830        let remaining = rows.get(start_at..).unwrap_or(&[]);
831        let emitted_count = page_by(remaining, params.limit, params.budget_bytes, |row| {
832            row.text.as_deref().map_or(0, str::len)
833        });
834        let emitted = &remaining[..emitted_count];
835        let messages_remaining = remaining.len() - emitted_count;
836        let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
837
838        // Conversational/Complete only summarize parts; Verbatim inlines every
839        // part (blobs included).
840        let mut parts_by_message = match params.mode {
841            ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
842            ResponseMode::Conversational | ResponseMode::Complete => {
843                self.summary_parts_for_messages(session_id, &ids).await?
844            }
845        };
846        let messages = emitted
847            .iter()
848            .map(|row| RetrievedMessage {
849                id: row.id.clone(),
850                role: row.role,
851                timestamp: row.timestamp,
852                text: row.text.clone(),
853                content: row.content.clone(),
854                parts: parts_by_message
855                    .remove(&(session_id.to_owned(), row.id.clone()))
856                    .unwrap_or_default(),
857            })
858            .collect();
859
860        Ok(GetLookup::Found(SessionPage {
861            session,
862            messages,
863            messages_remaining,
864        }))
865    }
866
867    /// Message-scope retrieval for `pond_get` message mode (spec.md#protocol):
868    /// the target with its full parts (paginated by `after_id` over part
869    /// ordinals, then budget) plus up to `2*context_depth` siblings around it.
870    /// `None` when no stored message carries `message_id`. Sibling parts are
871    /// carried for summarizing; the target's parts ride `target_parts`.
872    pub async fn message_view(
873        &self,
874        message_id: &str,
875        params: MessageViewParams<'_>,
876    ) -> Result<GetLookup<MessagePage>> {
877        let Some(session_id) = self.session_id_for_message(message_id).await? else {
878            return Ok(GetLookup::NotFound);
879        };
880        let Some(session) = self.find_session(&session_id).await? else {
881            return Ok(GetLookup::NotFound);
882        };
883        let mut rows = self.scan_all_messages(&session_id).await?;
884        rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
885        let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
886            return Ok(GetLookup::NotFound);
887        };
888
889        let start = target_pos.saturating_sub(params.context_depth);
890        let end = (target_pos + params.context_depth + 1).min(rows.len());
891        let window = &rows[start..end];
892        let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
893        // The target's full parts (blobs included) ride the response; siblings
894        // are only summarized, but they share this one window scan.
895        let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
896
897        let all_parts = parts_by_message
898            .remove(&(session_id.clone(), message_id.to_owned()))
899            .unwrap_or_default();
900        let start_part = match params.after_id {
901            // Exclusive over ordinals: parts are ordinal-sorted, so the first
902            // part past the anchor's ordinal is the page start. An anchor absent
903            // from the target's parts is a stale/mistyped client cursor.
904            Some(after) => match all_parts.iter().find(|part| part.id == after) {
905                Some(anchor) => all_parts
906                    .iter()
907                    .position(|part| part.ordinal > anchor.ordinal)
908                    .unwrap_or(all_parts.len()),
909                None => return Ok(GetLookup::UnknownAfterId),
910            },
911            None => 0,
912        };
913        let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
914        let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
915            serde_json::to_string(part).map_or(0, |json| json.len())
916        });
917        let target_parts = remaining_parts[..part_count].to_vec();
918        let target_parts_remaining = remaining_parts.len() - part_count;
919
920        let target_row = &rows[target_pos];
921        let target = RetrievedMessage {
922            id: target_row.id.clone(),
923            role: target_row.role,
924            timestamp: target_row.timestamp,
925            text: target_row.text.clone(),
926            content: target_row.content.clone(),
927            // Target structure is carried in full by `target_parts`.
928            parts: Vec::new(),
929        };
930        let siblings = window
931            .iter()
932            .enumerate()
933            .filter(|(idx, _)| start + idx != target_pos)
934            .map(|(_, row)| RetrievedMessage {
935                id: row.id.clone(),
936                role: row.role,
937                timestamp: row.timestamp,
938                text: row.text.clone(),
939                content: row.content.clone(),
940                parts: parts_by_message
941                    .get(&(session_id.clone(), row.id.clone()))
942                    .cloned()
943                    .unwrap_or_default(),
944            })
945            .collect();
946
947        Ok(GetLookup::Found(MessagePage {
948            session,
949            target,
950            target_parts,
951            target_parts_remaining,
952            siblings,
953        }))
954    }
955
956    async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
957        let batch = self
958            .handle
959            .scan_batch(
960                Table::Messages,
961                Some(&Predicate::Eq("session_id", session_id.into())),
962                &["id", "timestamp", "role", "search_text", "content"],
963            )
964            .await?;
965        let mut rows = Vec::with_capacity(batch.num_rows());
966        for row in 0..batch.num_rows() {
967            let id = string(&batch, "id", row)?.context("message id is null")?;
968            let role =
969                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
970            let timestamp = datetime(&batch, "timestamp", row)?;
971            rows.push(ScanRow {
972                id,
973                role,
974                timestamp,
975                text: string(&batch, "search_text", row)?,
976                content: string(&batch, "content", row)?,
977            });
978        }
979        Ok(rows)
980    }
981
982    /// Conversational scan over one session: rows ordered by
983    /// `(timestamp, id)`, `IsNotNull("search_text")` pushed down at the
984    /// read seam (spec.md#search-prefilter-pushdown).
985    pub async fn scan_conversational_messages(
986        &self,
987        session_id: &str,
988    ) -> Result<Vec<ConversationalRow>> {
989        let filter = Predicate::And(vec![
990            Predicate::Eq("session_id", session_id.into()),
991            Predicate::IsNotNull("search_text"),
992        ]);
993        let batch = self
994            .handle
995            .scan_batch(
996                Table::Messages,
997                Some(&filter),
998                &["id", "timestamp", "role", "search_text"],
999            )
1000            .await?;
1001
1002        let mut rows = Vec::with_capacity(batch.num_rows());
1003        for row in 0..batch.num_rows() {
1004            let message_id = string(&batch, "id", row)?.context("message id is null")?;
1005            let role =
1006                role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1007            let timestamp = datetime(&batch, "timestamp", row)?;
1008            let text_str = string(&batch, "search_text", row)?.context(
1009                "search_text null after IsNotNull pushdown - storage invariant violated",
1010            )?;
1011            rows.push(ConversationalRow {
1012                session_id: session_id.to_owned(),
1013                message_id,
1014                role,
1015                timestamp,
1016                text: SearchText(text_str),
1017            });
1018        }
1019        rows.sort_by(|a, b| {
1020            a.timestamp
1021                .cmp(&b.timestamp)
1022                .then_with(|| a.message_id.cmp(&b.message_id))
1023        });
1024        Ok(rows)
1025    }
1026
1027    /// Locate the session id for a stored message. Cheap when only the routing
1028    /// hint is needed - callers that need the messages use `scan_all_messages`.
1029    pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1030        let batch = self
1031            .handle
1032            .scan_batch(
1033                Table::Messages,
1034                Some(&Predicate::Eq("id", message_id.into())),
1035                &["session_id"],
1036            )
1037            .await?;
1038        if batch.num_rows() == 0 {
1039            return Ok(None);
1040        }
1041        string(&batch, "session_id", 0)
1042    }
1043
1044    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1045        self.handle.row_counts().await
1046    }
1047
1048    /// Compute the per-adapter / per-project rollup that drives
1049    /// `pond status`. One scan over `messages` projecting the three
1050    /// columns the rollup keys on (`source_agent`, `project`, `session_id`),
1051    /// aggregated in-memory. Bounded by the cross product of adapters and
1052    /// projects, which stays small on real corpora.
1053    pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1054        let scanner = self
1055            .handle
1056            .scan(
1057                Table::Messages,
1058                ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1059            )
1060            .await?;
1061        let mut stream = scanner.try_into_stream().await?;
1062        let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1063        while let Some(batch) = stream.next().await {
1064            let batch = batch?;
1065            for row in 0..batch.num_rows() {
1066                let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1067                let project = string(&batch, "project", row)?.unwrap_or_default();
1068                let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1069                let is_subagent = source_agent.contains('/');
1070                if is_subagent && !include_subagents {
1071                    continue;
1072                }
1073                let entry = groups.entry((source_agent, project)).or_default();
1074                entry.messages += 1;
1075                entry.session_ids.insert(session_id);
1076            }
1077        }
1078
1079        let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1080        let totals = RowTotals {
1081            sessions: totals_sessions as u64,
1082            messages: totals_messages as u64,
1083            parts: totals_parts as u64,
1084        };
1085
1086        let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1087        for ((adapter, project), acc) in groups {
1088            by_adapter.entry(adapter).or_default().push(ProjectStats {
1089                project,
1090                sessions: acc.session_ids.len() as u64,
1091                messages: acc.messages,
1092            });
1093        }
1094
1095        let mut adapters = Vec::with_capacity(by_adapter.len());
1096        for (adapter, mut projects) in by_adapter {
1097            projects.sort_by(|a, b| {
1098                b.messages
1099                    .cmp(&a.messages)
1100                    .then_with(|| a.project.cmp(&b.project))
1101            });
1102            let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1103            let messages: u64 = projects.iter().map(|p| p.messages).sum();
1104            adapters.push(AdapterStats {
1105                adapter,
1106                sessions,
1107                messages,
1108                projects,
1109            });
1110        }
1111
1112        Ok(CorpusStats {
1113            data_url: self.handle.location().clone(),
1114            totals,
1115            adapters,
1116            include_subagents,
1117        })
1118    }
1119
1120    /// Write a batch of embeddings into `messages`: set `vector` and
1121    /// `embedding_model` on each row by `(session_id, id)`
1122    /// (spec.md#session-embed-from-canonical). The column update goes through the
1123    /// write seam and lands as a new manifest version (`append-only`).
1124    pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1125        if rows.is_empty() {
1126            return Ok(());
1127        }
1128        let batch = embedding_update_batch(rows)?;
1129        self.handle
1130            .merge_update(Table::Messages, batch, rows.len())
1131            .await?;
1132        Ok(())
1133    }
1134
1135    /// Stream the backlog of messages needing embedding: rows with `search_text`
1136    /// set whose `vector` is null (spec.md#session-embed-from-canonical).
1137    pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1138        try_stream! {
1139            let filter = Predicate::And(vec![
1140                Predicate::IsNull("vector"),
1141                Predicate::IsNotNull("search_text"),
1142            ]);
1143            let projection: &[&str] = &["session_id", "id", "search_text"];
1144            let scanner = self
1145                .handle
1146                .scan(
1147                    Table::Messages,
1148                    ScanOpts::with_predicate_and_projection(&filter, projection),
1149                )
1150                .await?;
1151            let mut batches = scanner
1152                .try_into_stream()
1153                .await
1154                .context("failed to open messages stream")?;
1155            while let Some(batch) = batches.next().await {
1156                let batch = batch?;
1157                for row in 0..batch.num_rows() {
1158                    yield PendingMessage {
1159                        session_id: string(&batch, "session_id", row)?
1160                            .context("session_id is null")?,
1161                        id: string(&batch, "id", row)?.context("message id is null")?,
1162                        search_text: string(&batch, "search_text", row)?
1163                            .context("search_text is null")?,
1164                    };
1165                }
1166            }
1167        }
1168    }
1169
1170    /// Stream messages that are either never embedded or stale under the
1171    /// current model. `pond embed --force` feeds this to the same unconditional
1172    /// merge_update as the normal backlog; the filter makes that semantically
1173    /// equivalent to the conditional update in spec.md#session-embed-from-canonical.
1174    pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1175        try_stream! {
1176            let filter = Predicate::And(vec![
1177                Predicate::IsNotNull("search_text"),
1178                Predicate::Or(vec![
1179                    Predicate::IsNull("vector"),
1180                    Predicate::Ne("embedding_model", embed::model_id().into()),
1181                ]),
1182            ]);
1183            let projection: &[&str] = &["session_id", "id", "search_text"];
1184            let scanner = self
1185                .handle
1186                .scan(
1187                    Table::Messages,
1188                    ScanOpts::with_predicate_and_projection(&filter, projection),
1189                )
1190                .await?;
1191            let mut batches = scanner
1192                .try_into_stream()
1193                .await
1194                .context("failed to open pending-or-stale messages stream")?;
1195            while let Some(batch) = batches.next().await {
1196                let batch = batch?;
1197                for row in 0..batch.num_rows() {
1198                    yield PendingMessage {
1199                        session_id: string(&batch, "session_id", row)?
1200                            .context("session_id is null")?,
1201                        id: string(&batch, "id", row)?.context("message id is null")?,
1202                        search_text: string(&batch, "search_text", row)?
1203                            .context("search_text is null")?,
1204                    };
1205                }
1206            }
1207        }
1208    }
1209
1210    /// BM25 full-text retriever over `messages.search_text`.
1211    pub async fn fts_search(
1212        &self,
1213        query: &str,
1214        limit: usize,
1215        filter: &Predicate,
1216    ) -> Result<Vec<(MessageKey, f32)>> {
1217        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1218        scanner.full_text_search(
1219            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1220        )?;
1221        // Lance ships an autoprojection that silently appends `_score` to FTS
1222        // output when the projection omits it. That behavior is going away;
1223        // we opt into the future explicit-projection contract here so the
1224        // scanner stops emitting a per-call deprecation warning, and we list
1225        // `_score` ourselves since the loop below reads it.
1226        scanner.disable_scoring_autoprojection();
1227        scanner.project(&["session_id", "id", "_score"])?;
1228        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1229        let batch = scanner.try_into_batch().await?;
1230        let mut hits = Vec::with_capacity(batch.num_rows());
1231        for row in 0..batch.num_rows() {
1232            let key = MessageKey {
1233                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1234                message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1235            };
1236            hits.push((key, float32(&batch, "_score", row)?));
1237        }
1238        // Stable secondary sort: Lance returns tied-BM25-score hits in fragment
1239        // order, which varies between runs and across calls with different pool
1240        // sizes (the hybrid arm's `pool=100` and FTS-only's `limit=20` produce
1241        // different orderings at the same tied score). Without an explicit
1242        // tiebreak the downstream RRF dedup-rank for a tied target session can
1243        // flip session-to-session, making fusion outcomes nondeterministic.
1244        // Sort by `score desc`, then `(session_id, message_id)` asc.
1245        hits.sort_by(|left, right| {
1246            right
1247                .1
1248                .partial_cmp(&left.1)
1249                .unwrap_or(std::cmp::Ordering::Equal)
1250                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1251                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1252        });
1253        Ok(hits)
1254    }
1255
1256    /// Whether any `messages` row carries a vector (spec.md#search) - the
1257    /// signal that flips search from FTS-only to hybrid. The single-active-
1258    /// model invariant (see `MESSAGE_SCALAR_INDICES`) means any non-null
1259    /// vector belongs to the current model.
1260    pub async fn has_embeddings(&self) -> Result<bool> {
1261        let scope = Predicate::IsNotNull("vector");
1262        let mut scanner = self
1263            .handle
1264            .scan(
1265                Table::Messages,
1266                ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1267            )
1268            .await?;
1269        scanner.limit(Some(1), None)?;
1270        let batch = scanner.try_into_batch().await?;
1271        Ok(batch.num_rows() > 0)
1272    }
1273
1274    /// Vector kNN retriever over `messages.vector`, prefiltered by the caller's
1275    /// scalar predicate (spec.md#search-prefilter-pushdown). Combines the caller's
1276    /// filter with `vector IS NOT NULL` to exclude un-embedded rows from the
1277    /// scan; the brute-force kNN path requires this (the IVF_PQ path would
1278    /// skip them anyway). The single-active-model invariant lets pond drop
1279    /// the per-row model filter: every non-null vector belongs to the current
1280    /// model.
1281    pub async fn vector_search(
1282        &self,
1283        query: &[f32],
1284        limit: usize,
1285        filter: &Predicate,
1286        search: Option<&config::SearchConfig>,
1287    ) -> Result<Vec<(MessageKey, f32)>> {
1288        let scope = embedded_scope(filter);
1289        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1290        let key = Float32Array::from(query.to_vec());
1291        scanner.nearest("vector", &key, limit)?;
1292        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1293            scanner.nprobes(nprobes);
1294        }
1295        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1296            scanner.refine(refine_factor);
1297        }
1298        // Mirror the explicit-projection contract from `fts_search`: opt out
1299        // of `_distance` autoprojection and list it ourselves since the loop
1300        // below reads it.
1301        scanner.disable_scoring_autoprojection();
1302        scanner.project(&["session_id", "id", "_distance"])?;
1303        let batch = scanner.try_into_batch().await?;
1304        let mut hits = Vec::with_capacity(batch.num_rows());
1305        for row in 0..batch.num_rows() {
1306            let key = MessageKey {
1307                session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1308                message_id: string(&batch, "id", row)?.context("message id is null")?,
1309            };
1310            hits.push((key, float32(&batch, "_distance", row)?));
1311        }
1312        // Stable secondary sort: same reasoning as `fts_search` - IVF_PQ can
1313        // emit hits with effectively identical `_distance` in fragment-dependent
1314        // order, which makes RRF dedup-ranks nondeterministic for tied
1315        // neighbors. Sort by distance asc (smaller = more similar), then by
1316        // `(session_id, message_id)` asc.
1317        hits.sort_by(|left, right| {
1318            left.1
1319                .partial_cmp(&right.1)
1320                .unwrap_or(std::cmp::Ordering::Equal)
1321                .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1322                .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1323        });
1324        Ok(hits)
1325    }
1326
1327    /// The DataFusion plan string for a filtered vector scan - the
1328    /// `search-prefilter-pushdown` regression guard reads it.
1329    pub async fn explain_vector_plan(
1330        &self,
1331        query: &[f32],
1332        limit: usize,
1333        filter: &Predicate,
1334        search: Option<&config::SearchConfig>,
1335    ) -> Result<String> {
1336        let scope = embedded_scope(filter);
1337        let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1338        let key = Float32Array::from(query.to_vec());
1339        scanner.nearest("vector", &key, limit)?;
1340        if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1341            scanner.nprobes(nprobes);
1342        }
1343        if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1344            scanner.refine(refine_factor);
1345        }
1346        scanner
1347            .explain_plan(true)
1348            .await
1349            .context("explain_plan failed")
1350    }
1351
1352    pub async fn explain_fts_plan(
1353        &self,
1354        query: &str,
1355        limit: usize,
1356        filter: &Predicate,
1357    ) -> Result<String> {
1358        let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1359        scanner.full_text_search(
1360            FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1361        )?;
1362        scanner.project(&["session_id", "id"])?;
1363        scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1364        scanner
1365            .explain_plan(true)
1366            .await
1367            .context("explain_plan failed")
1368    }
1369
1370    /// Hydrate search hits: fetch message metadata for `(session_id, message_id)` keys.
1371    pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1372        if keys.is_empty() {
1373            return Ok(Vec::new());
1374        }
1375        let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1376        let session_ids = keys
1377            .iter()
1378            .map(|key| key.session_id.clone())
1379            .collect::<Vec<_>>();
1380        let message_ids = keys
1381            .iter()
1382            .map(|key| key.message_id.clone())
1383            .collect::<Vec<_>>();
1384        let predicate = Predicate::And(vec![
1385            in_predicate("session_id", &session_ids),
1386            in_predicate("id", &message_ids),
1387        ]);
1388        let batch = self
1389            .handle
1390            .scan_batch(
1391                Table::Messages,
1392                Some(&predicate),
1393                &[
1394                    "id",
1395                    "session_id",
1396                    "role",
1397                    "project",
1398                    "source_agent",
1399                    "timestamp",
1400                    "search_text",
1401                ],
1402            )
1403            .await?;
1404        let mut metas = Vec::with_capacity(batch.num_rows());
1405        for row in 0..batch.num_rows() {
1406            let message_id = string(&batch, "id", row)?.context("id is null")?;
1407            let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1408            if !wanted.contains(&MessageKey {
1409                session_id: session_id.clone(),
1410                message_id: message_id.clone(),
1411            }) {
1412                continue;
1413            }
1414            metas.push(MessageMeta {
1415                message_id,
1416                session_id,
1417                role: string(&batch, "role", row)?.context("role is null")?,
1418                project: string(&batch, "project", row)?.context("project is null")?,
1419                source_agent: string(&batch, "source_agent", row)?
1420                    .context("source_agent is null")?,
1421                timestamp: datetime(&batch, "timestamp", row)?,
1422                search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1423            });
1424        }
1425        Ok(metas)
1426    }
1427
1428    /// Total message count per session, for search session summaries.
1429    pub async fn session_message_counts(
1430        &self,
1431        session_ids: &[String],
1432    ) -> Result<BTreeMap<String, usize>> {
1433        if session_ids.is_empty() {
1434            return Ok(BTreeMap::new());
1435        }
1436        let dataset = self.handle.dataset(Table::Messages).await?;
1437        let mut tasks = tokio::task::JoinSet::new();
1438        for session_id in session_ids {
1439            let dataset = dataset.clone();
1440            let session_id = session_id.clone();
1441            tasks.spawn(async move {
1442                let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1443                let count = dataset.count_rows(Some(filter)).await?;
1444                anyhow::Ok((session_id, count))
1445            });
1446        }
1447        let mut counts = BTreeMap::new();
1448        while let Some(joined) = tasks.join_next().await {
1449            let (session_id, count) = joined.context("session count task panicked")??;
1450            counts.insert(session_id, count);
1451        }
1452        Ok(counts)
1453    }
1454
1455    /// Rows appended to `messages` since the FTS index was last optimized.
1456    /// A missing index reports the whole table; the query is manifest-only.
1457    pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1458        self.handle
1459            .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1460            .await
1461    }
1462
1463    /// Rows added or rewritten in `messages` since the IVF_PQ vector index
1464    /// was last optimized. Below
1465    /// [`VECTOR_INDEX_ACTIVATION_ROWS`] no index exists yet, so the caller
1466    /// must read [`embedding_progress`](Self::embedding_progress) too and
1467    /// distinguish "index not built yet" from "index trails data".
1468    pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1469        self.handle
1470            .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1471            .await
1472    }
1473
1474    /// Embedding coverage: how many `messages` rows carry a vector and how
1475    /// many are still eligible. Drives the `pond status` embeddings line and
1476    /// the `pond embed` progress bar's known total. `embedded` reads the
1477    /// `vector IS NOT NULL` count directly - the single-active-model invariant
1478    /// (see `MESSAGE_SCALAR_INDICES`) means there is no need to scope by the
1479    /// `embedding_model` column.
1480    pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1481        let dataset = self.handle.dataset(Table::Messages).await?;
1482        let embedded = dataset
1483            .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1484            .await?;
1485        let total = dataset
1486            .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1487            .await?;
1488        Ok(EmbeddingProgress {
1489            embedded,
1490            total,
1491            model: embed::model_id(),
1492        })
1493    }
1494
1495    /// Count rows whose `embedding_model` is not the currently configured
1496    /// model AND whose `vector` is still populated - the signal `pond embed`
1497    /// uses to detect a model swap and require `--force`.
1498    pub async fn stale_embedding_count(&self) -> Result<usize> {
1499        let dataset = self.handle.dataset(Table::Messages).await?;
1500        dataset
1501            .count_rows(Some(
1502                Predicate::And(vec![
1503                    Predicate::IsNotNull("vector"),
1504                    Predicate::Ne("embedding_model", embed::model_id().into()),
1505                ])
1506                .to_lance(),
1507            ))
1508            .await
1509            .map_err(Into::into)
1510    }
1511
1512    /// Run the per-table maintenance cycle (compact + indices) across every
1513    /// table, never short-circuiting. spec.md#lance-index-maintenance: indices
1514    /// and compaction commit independently, so a hot writer that starves
1515    /// compaction on one table does not abort the index work the operator
1516    /// asked for on other tables (or even on the same table).
1517    pub async fn optimize_indices(
1518        &self,
1519        progress: Option<OptimizeProgressFn>,
1520        cleanup: Option<CleanupConfig>,
1521    ) -> Result<OptimizeOutcome> {
1522        let cleanup = cleanup.unwrap_or_default();
1523        let policy = pond_index_intents();
1524        let mut tables = Vec::with_capacity(3);
1525        for (table, intents) in policy.all() {
1526            let outcome = self
1527                .handle
1528                .optimize_table(table, intents, progress.as_ref(), cleanup)
1529                .await;
1530            tables.push(outcome);
1531        }
1532        Ok(OptimizeOutcome { tables })
1533    }
1534
1535    /// Fold trailing fragments into existing indices across every table,
1536    /// without running compaction. Used by `pond embed`'s tail so newly
1537    /// written vectors land in the FTS / IVF_PQ / btree / bitmap indices
1538    /// without paying the compaction retry budget while embed itself may
1539    /// still be writing in a sibling process.
1540    pub async fn build_indices_only(
1541        &self,
1542        progress: Option<OptimizeProgressFn>,
1543    ) -> Result<OptimizeOutcome> {
1544        let policy = pond_index_intents();
1545        let mut tables = Vec::with_capacity(3);
1546        for (table, intents) in policy.all() {
1547            let indices = self
1548                .handle
1549                .optimize_table_indices_only(table, intents, progress.as_ref())
1550                .await;
1551            tables.push(TableOptimizeOutcome {
1552                table,
1553                indices,
1554                compaction: PhaseOutcome::NotAttempted,
1555            });
1556        }
1557        Ok(OptimizeOutcome { tables })
1558    }
1559
1560    #[cfg(test)]
1561    async fn optimize_indices_with_vector_threshold(
1562        &self,
1563        vector_threshold: usize,
1564    ) -> Result<OptimizeOutcome> {
1565        let cleanup = CleanupConfig::default();
1566        let policy = pond_index_intents_with_vector_threshold(vector_threshold);
1567        let mut tables = Vec::with_capacity(3);
1568        for (table, intents) in policy.all() {
1569            let outcome = self
1570                .handle
1571                .optimize_table(table, intents, None, cleanup)
1572                .await;
1573            tables.push(outcome);
1574        }
1575        Ok(OptimizeOutcome { tables })
1576    }
1577
1578    pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1579        let policy = pond_index_intents();
1580        let mut matched = false;
1581        for (table, intents) in policy.all() {
1582            for intent in intents {
1583                if intent_name.is_none_or(|name| name == intent.name) {
1584                    matched = true;
1585                    self.handle.rebuild_index(table, intent).await?;
1586                }
1587            }
1588        }
1589        if let Some(name) = intent_name
1590            && !matched
1591        {
1592            anyhow::bail!("unknown index intent {name:?}");
1593        }
1594        Ok(())
1595    }
1596
1597    pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1598        let policy = pond_index_intents();
1599        let mut statuses = Vec::new();
1600        for (table, intents) in policy.all() {
1601            statuses.extend(self.handle.index_status(table, intents).await?);
1602        }
1603        Ok(statuses)
1604    }
1605
1606    /// Drop the IVF_PQ index on `messages.vector`. Used by `pond embed
1607    /// --force` before re-bootstrapping under a different model. Silent
1608    /// when the index does not exist.
1609    pub async fn drop_vector_index(&self) -> Result<()> {
1610        match self
1611            .handle
1612            .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1613            .await
1614        {
1615            Ok(()) => Ok(()),
1616            Err(error) => {
1617                let msg = error.to_string();
1618                if msg.contains("not found") || msg.contains("does not exist") {
1619                    Ok(())
1620                } else {
1621                    Err(error)
1622                }
1623            }
1624        }
1625    }
1626
1627    /// On-disk byte totals per dataset, sized through Lance's object store
1628    /// (spec.md#lance-chokepoints-storage) so `pond status` works on any backend.
1629    pub async fn table_sizes(&self) -> Result<TableSizes> {
1630        self.handle.table_sizes().await
1631    }
1632
1633    /// Histogram of Unicode script classes in `messages.search_text`, computed
1634    /// from a sample of up to `max_messages` non-null rows. Returned classes
1635    /// are sorted descending by character count. Lets `pond status` tell an
1636    /// agent whether the corpus is monolingual or mixed - the agent then knows
1637    /// whether bilingual querying is worth attempting (cross-lingual recall is
1638    /// a caller-layer concern; pond does not translate internally).
1639    pub async fn text_script_histogram(&self, max_messages: usize) -> Result<Vec<(String, usize)>> {
1640        use std::collections::HashMap;
1641        let filter = Predicate::IsNotNull("search_text");
1642        let projection: &[&str] = &["search_text"];
1643        let scanner = self
1644            .handle
1645            .scan(
1646                Table::Messages,
1647                ScanOpts::with_predicate_and_projection(&filter, projection),
1648            )
1649            .await?;
1650        let mut batches = scanner
1651            .try_into_stream()
1652            .await
1653            .context("failed to open messages stream for script histogram")?;
1654        let mut counts: HashMap<&'static str, usize> = HashMap::new();
1655        let mut sampled = 0usize;
1656        'outer: while let Some(batch) = batches.next().await {
1657            let batch = batch?;
1658            for row in 0..batch.num_rows() {
1659                if sampled >= max_messages {
1660                    break 'outer;
1661                }
1662                if let Some(text) = string(&batch, "search_text", row)? {
1663                    for ch in text.chars() {
1664                        if let Some(class) = classify_script(ch) {
1665                            *counts.entry(class).or_default() += 1;
1666                        }
1667                    }
1668                    sampled += 1;
1669                }
1670            }
1671        }
1672        let mut histogram: Vec<(String, usize)> = counts
1673            .into_iter()
1674            .map(|(name, count)| (name.to_owned(), count))
1675            .collect();
1676        histogram.sort_by(|left, right| right.1.cmp(&left.1).then(left.0.cmp(&right.0)));
1677        Ok(histogram)
1678    }
1679
1680    async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1681        let batch = self
1682            .handle
1683            .scan_batch(
1684                Table::Sessions,
1685                Some(&Predicate::Eq("id", session_id.into())),
1686                &[],
1687            )
1688            .await?;
1689        if batch.num_rows() == 0 {
1690            Ok(None)
1691        } else {
1692            Ok(Some(session_from_batch(&batch, 0)?))
1693        }
1694    }
1695
1696    /// Fetch the stored vector for one message, for `similar_to`-style
1697    /// retrieval. Returns `Ok(None)` when the message exists but is not yet
1698    /// embedded, or when no message has that id. Vectors are stored Float16
1699    /// (`embedding_vector_type`); the search path takes `&[f32]`, so we
1700    /// widen here. Cheap rare op - one predicate scan, no index.
1701    pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1702        let batch = self
1703            .handle
1704            .scan_batch(
1705                Table::Messages,
1706                Some(&Predicate::Eq("id", message_id.into())),
1707                &["vector"],
1708            )
1709            .await?;
1710        if batch.num_rows() == 0 {
1711            return Ok(None);
1712        }
1713        let column = batch
1714            .column(0)
1715            .as_any()
1716            .downcast_ref::<FixedSizeListArray>();
1717        let Some(list) = column else {
1718            return Ok(None);
1719        };
1720        if list.is_null(0) {
1721            return Ok(None);
1722        }
1723        let values = list.value(0);
1724        let halves = values
1725            .as_any()
1726            .downcast_ref::<Float16Array>()
1727            .context("messages.vector inner array is not Float16")?;
1728        let widened = (0..halves.len())
1729            .map(|i| halves.value(i).to_f32())
1730            .collect();
1731        Ok(Some(widened))
1732    }
1733
1734    async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1735        let batch = self
1736            .handle
1737            .scan_batch(
1738                Table::Messages,
1739                Some(&Predicate::Eq("session_id", session_id.into())),
1740                &[
1741                    "session_id",
1742                    "id",
1743                    "timestamp",
1744                    "role",
1745                    "content",
1746                    "options",
1747                ],
1748            )
1749            .await?;
1750        let mut messages = Vec::with_capacity(batch.num_rows());
1751        for row in 0..batch.num_rows() {
1752            messages.push(message_from_batch(&batch, row)?);
1753        }
1754        messages.sort_by(|left, right| {
1755            left.timestamp()
1756                .cmp(&right.timestamp())
1757                .then_with(|| left.id().cmp(right.id()))
1758        });
1759
1760        let message_ids = messages
1761            .iter()
1762            .map(|message| message.id().to_owned())
1763            .collect::<Vec<_>>();
1764        let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1765
1766        Ok(messages
1767            .into_iter()
1768            .map(|message| {
1769                let key = (message.session_id().to_owned(), message.id().to_owned());
1770                let parts = parts_by_message.remove(&key).unwrap_or_default();
1771                MessageWithParts { message, parts }
1772            })
1773            .collect())
1774    }
1775
1776    /// Every part of these messages, full fidelity (file blobs included). The
1777    /// canonical read primitive - restore/export, verbatim mode, and the
1778    /// message-mode target all need the complete set.
1779    pub async fn parts_for_messages(
1780        &self,
1781        session_id: &str,
1782        message_ids: &[String],
1783    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1784        self.scan_parts(session_id, message_ids, None).await
1785    }
1786
1787    /// Only the parts that yield a [`PartSummary`] ([`SUMMARY_PART_TYPES`]),
1788    /// skipping `text`/`reasoning` (and their blobs) that would summarize to
1789    /// nothing. For the summary-only reads (conversational/complete session
1790    /// views, search hits) - it never feeds restore/export.
1791    pub async fn summary_parts_for_messages(
1792        &self,
1793        session_id: &str,
1794        message_ids: &[String],
1795    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1796        self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1797            .await
1798    }
1799
1800    async fn scan_parts(
1801        &self,
1802        session_id: &str,
1803        message_ids: &[String],
1804        part_types: Option<&[&str]>,
1805    ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1806        if message_ids.is_empty() {
1807            return Ok(BTreeMap::new());
1808        }
1809        let mut clauses = vec![
1810            Predicate::Eq("session_id", session_id.into()),
1811            in_predicate("message_id", message_ids),
1812        ];
1813        if let Some(types) = part_types {
1814            clauses.push(Predicate::In(
1815                "type",
1816                types.iter().map(|&t| t.into()).collect(),
1817            ));
1818        }
1819        let predicate = Predicate::And(clauses);
1820        let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1821        let mut scanner = self
1822            .handle
1823            .scan(
1824                Table::Parts,
1825                ScanOpts::with_predicate_and_projection(
1826                    &predicate,
1827                    &[
1828                        "session_id",
1829                        "message_id",
1830                        "id",
1831                        "ordinal",
1832                        "type",
1833                        "provenance",
1834                        "variant_data",
1835                        "options",
1836                    ],
1837                ),
1838            )
1839            .await?;
1840        scanner.with_row_address();
1841        let batch = scanner.try_into_batch().await.context("scan failed")?;
1842        let row_addresses = uint64(&batch, "_rowaddr")?;
1843        let mut file_payloads = BTreeMap::<usize, FileData>::new();
1844        let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1845        for row in 0..batch.num_rows() {
1846            if string(&batch, "type", row)?.as_deref() == Some("file") {
1847                let variant_data =
1848                    json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1849                file_rows.push((row, row_addresses.value(row), variant_data));
1850            }
1851        }
1852        if !file_rows.is_empty() {
1853            let addresses = file_rows
1854                .iter()
1855                .map(|(_, address, _)| *address)
1856                .collect::<Vec<_>>();
1857            let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1858            for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1859                // Legacy blob (lance-encoding:blob): payload is bytes; the
1860                // url variant stored its URL as UTF-8 bytes, recovered via
1861                // `file_data_from_blob`'s `data_kind = "url"` branch.
1862                let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1863                file_payloads.insert(row, payload);
1864            }
1865        }
1866        let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1867        for row in 0..batch.num_rows() {
1868            let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1869            parts_by_message
1870                .entry((part.session_id.clone(), part.message_id.clone()))
1871                .or_default()
1872                .push(part);
1873        }
1874        for parts in parts_by_message.values_mut() {
1875            parts.sort_by_key(|part| part.ordinal);
1876        }
1877        Ok(parts_by_message)
1878    }
1879}
1880
1881#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1882#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1883pub enum IngestEvent {
1884    Session(Session),
1885    Message(Message),
1886    Part(Part),
1887}
1888
1889/// Aggregate accounting for an ingest pass (CLI sync, adapter-driven).
1890/// The wire layer (`pond_ingest`) instead returns per-row results; the
1891/// aggregate is derived from those at the wire boundary.
1892///
1893/// Fields are bucketed by population so the summary never conflates "100
1894/// validator-rejected rows in 1 bad session" with "100 separate failures."
1895/// The shape is set by spec.md#adapter-integrity-event-ordering.
1896#[derive(Debug, Clone, PartialEq, Eq, Default)]
1897pub struct IngestSummary {
1898    /// Rows actually written to Lance.
1899    pub inserted: usize,
1900    /// Rows that already existed (merge_insert no-op match).
1901    pub matched: usize,
1902    /// Events the validator dropped under per-event-drop policy (ordering
1903    /// violation, orphan part, mismatched parent, adapter parse failure,
1904    /// duplicate-id collision, ...). Counted by event, not by session: a
1905    /// session with one bad part stays in this bucket as 1, not as "the
1906    /// whole substream." Per spec.md#adapter-integrity-dedup, adapters SHOULD dedupe their
1907    /// own emissions upstream when source replay is expected; the
1908    /// validator's in-batch HashSet is a safety net, not a feature
1909    /// adapters may rely on. If this bucket grows on a clean adapter,
1910    /// inspect `drop_reasons` for the top contributors.
1911    pub dropped_events: usize,
1912    /// Sessions whose Session-level invariants (immutable `source_agent` /
1913    /// `project` against the stored row) failed at flush time and
1914    /// whose substream got rejected wholesale. Always small relative to
1915    /// `inserted`; if not, there's a real problem to investigate.
1916    pub dropped_sessions: usize,
1917    /// Files the adapter couldn't decode at all (no Session header
1918    /// extractable: empty `.jsonl`, missing required field).
1919    pub skipped_files: usize,
1920    /// Files that produced no importable session and were benignly skipped:
1921    /// empty `.jsonl`, sidecar-only rows (e.g. an `ai-title`/`agent-name`
1922    /// metadata file), or an unextractable header. Never an error or a drop;
1923    /// the underlying cause is logged at `POND_LOG=debug`.
1924    pub skipped_empty: usize,
1925    /// Sessions short-circuited via the per-session staleness skip
1926    /// (spec.md#adapter-integrity-event-ordering): file `mtime` was at or before the wall-clock time
1927    /// pond last wrote that session's row, so re-decode was bypassed.
1928    pub skipped_fresh: usize,
1929    /// Storage-layer failures whose retries were exhausted (commit
1930    /// conflicts, transient IO that didn't recover). Hard zero on healthy
1931    /// runs.
1932    pub storage_errors: usize,
1933    /// Oversized values truncated to a bounded sentinel at the seam
1934    /// (spec.md#adapter-bounded-values); the rest of each such record is intact.
1935    pub truncated_values: usize,
1936    /// Histogram of stable reason keys for the combined `dropped_events +
1937    /// dropped_sessions` populations. Keys are `&'static str` (see the
1938    /// `DROP_REASON_*` constants) so consumers can match by identity.
1939    /// Empty on a clean run. Used by `pond sync` to print the top reasons
1940    /// and by `benches/ingest_bench.rs` to bucket Partial drops by cause.
1941    pub drop_reasons: BTreeMap<&'static str, usize>,
1942}
1943
1944/// Stable reason keys for the `IngestSummary::drop_reasons` histogram and
1945/// the per-row `RowError::reason_key`. `&'static str` so consumers can
1946/// match by identity rather than prose. Adding a new variant: pick a short
1947/// snake_case identifier, route it from the validator/adapter, and update
1948/// the per-row outcome docs in `docs/spec.md#adapter-integrity-event-ordering`.
1949pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
1950pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
1951pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
1952pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
1953pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
1954pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
1955pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
1956pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
1957pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
1958pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
1959pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
1960
1961impl IngestSummary {
1962    pub fn accepted(&self) -> usize {
1963        self.inserted + self.matched
1964    }
1965
1966    pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
1967        for outcome in outcomes {
1968            match outcome.status {
1969                OutcomeStatus::Inserted => self.inserted += 1,
1970                OutcomeStatus::Matched => self.matched += 1,
1971                OutcomeStatus::Error => {
1972                    // Session-level rejection: exactly one session-kind Error
1973                    // outcome (see `error_outcomes_for_substream`). Per-event
1974                    // drop: one Error per message/part. The two populations
1975                    // are counted separately so the operator can tell a
1976                    // structural reject from a row-level skip.
1977                    if outcome.kind == "session" {
1978                        self.dropped_sessions += 1;
1979                    } else {
1980                        self.dropped_events += 1;
1981                    }
1982                    let reason = outcome
1983                        .error
1984                        .as_ref()
1985                        .and_then(|e| e.reason_key)
1986                        .unwrap_or(DROP_REASON_UNCATEGORIZED);
1987                    *self.drop_reasons.entry(reason).or_insert(0) += 1;
1988                }
1989            }
1990        }
1991    }
1992}
1993
1994/// Per-row outcome surfaced by [`IngestValidator`] (spec.md#protocol). One
1995/// row per input event from the request's `events` array. The validator
1996/// returns these in array order so the wire layer can pack them directly
1997/// into [`crate::wire::IngestResult`] entries.
1998#[derive(Debug, Clone, PartialEq)]
1999pub struct RowOutcome {
2000    pub index: usize,
2001    pub kind: &'static str,
2002    pub pk: Value,
2003    pub status: OutcomeStatus,
2004    pub error: Option<RowError>,
2005}
2006
2007#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2008pub enum OutcomeStatus {
2009    Inserted,
2010    Matched,
2011    Error,
2012}
2013
2014/// Structured per-row error body. Mirrors the wire shape so the handler
2015/// can pass it straight through.
2016#[derive(Debug, Clone, PartialEq, Eq)]
2017pub struct RowError {
2018    pub message: String,
2019    pub field: Option<&'static str>,
2020    pub reason: Option<&'static str>,
2021    /// Stable key for histogramming - see `DROP_REASON_*` constants. The
2022    /// `reason` field above is human-prose; `reason_key` is the machine
2023    /// bucket. `None` means uncategorized; consumers attribute to
2024    /// `DROP_REASON_UNCATEGORIZED`.
2025    pub reason_key: Option<&'static str>,
2026}
2027
2028/// Buffered session events tagged with their input array index, so the
2029/// per-row outcomes can be re-attributed once `merge_insert` returns its
2030/// per-row Inserted/Matched stats.
2031#[derive(Debug)]
2032struct BufferedSession {
2033    index: usize,
2034    session: Session,
2035}
2036
2037#[derive(Debug)]
2038struct BufferedMessage {
2039    index: usize,
2040    message: Message,
2041    parts: Vec<BufferedPart>,
2042    search_text: Option<String>,
2043}
2044
2045#[derive(Debug)]
2046struct BufferedPart {
2047    index: usize,
2048    part: Part,
2049}
2050
2051/// State machine that turns the `events: Vec<IngestEvent>` array into a
2052/// flat `Vec<RowOutcome>` matching the array's index space. Buffers a whole
2053/// session substream so `merge_insert` runs once per substream (three
2054/// batches: sessions, messages, parts). A validation error on a single event
2055/// drops *that event* (one [`OutcomeStatus::Error`] outcome) and the substream
2056/// continues; only Session-level invariants (immutable source_agent / project
2057/// on re-write) drop the whole substream (spec.md#adapter-integrity-event-ordering).
2058///
2059/// Writes are batched at flush time. As complete substreams arrive (a new
2060/// `Session` event closes out the current one), they accumulate in
2061/// `completed` rather than each one calling `merge_insert` immediately.
2062/// The caller drains the buffer via [`Self::flush`] / [`Self::finish`],
2063/// at which point one batched 3-parallel-merge-insert covers all pending
2064/// substreams. This is the load-bearing perf change: per-substream commit
2065/// overhead dominated the ingest profile (see `benches/ingest_bench.rs`),
2066/// and amortizing it across N sessions cuts wall time materially.
2067#[derive(Debug, Default)]
2068pub struct IngestValidator {
2069    session: Option<BufferedSession>,
2070    current_message: Option<BufferedMessage>,
2071    current_parts: Vec<BufferedPart>,
2072    messages: Vec<BufferedMessage>,
2073    /// Message ids already buffered in the current substream. Duplicate ids
2074    /// drop the offending event in-line rather than failing the whole batch
2075    /// downstream.
2076    seen_message_ids: HashSet<String>,
2077    /// `(message_id, part_id)` keys already buffered in the current
2078    /// substream. Same in-line duplicate-drop policy as `seen_message_ids`.
2079    seen_part_keys: HashSet<(String, String)>,
2080    /// Substreams whose end-of-stream boundary has been observed but whose
2081    /// rows haven't been written yet. Flushed in batched mode by
2082    /// [`Self::flush`].
2083    completed: Vec<CompletedSubstream>,
2084}
2085
2086/// One closed substream ready for the batched flush path.
2087#[derive(Debug)]
2088struct CompletedSubstream {
2089    session_index: usize,
2090    session: Session,
2091    messages: Vec<BufferedMessage>,
2092}
2093
2094impl IngestValidator {
2095    /// Drive one input event through the validator. Returns the per-row
2096    /// outcomes the event triggered: empty when the event is just buffered,
2097    /// or N entries when a session substream just flushed (success or
2098    /// failure). `Err` is reserved for catastrophic storage failures that
2099    /// should fail the whole `pond_ingest` request.
2100    pub async fn push(
2101        &mut self,
2102        store: &Store,
2103        index: usize,
2104        event: IngestEvent,
2105    ) -> Result<Vec<RowOutcome>> {
2106        match event {
2107            IngestEvent::Session(session) => self.push_session(store, index, session).await,
2108            IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2109            IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2110        }
2111    }
2112
2113    /// Final flush at end-of-batch. Closes the in-flight substream and
2114    /// drains the pending-flush buffer.
2115    pub async fn finish(&mut self, store: &Store) -> Result<Vec<RowOutcome>> {
2116        self.close_current_substream();
2117        self.flush(store).await
2118    }
2119
2120    /// Drain every completed substream into batched 3-parallel-merge_insert
2121    /// writes. Caller invokes this periodically (every N completed
2122    /// substreams) to keep memory bounded; in adapter-driven sync that
2123    /// happens via the BATCH_SIZE check in `ingest_adapter`. The current
2124    /// in-flight substream stays buffered - close it explicitly via
2125    /// [`Self::finish`] or by feeding the next Session event.
2126    pub async fn flush(&mut self, store: &Store) -> Result<Vec<RowOutcome>> {
2127        if self.completed.is_empty() {
2128            return Ok(Vec::new());
2129        }
2130        let completed = std::mem::take(&mut self.completed);
2131        store.upsert_session_batch(completed).await
2132    }
2133
2134    /// Number of fully-buffered substreams awaiting batched write. Used by
2135    /// the adapter caller to decide when to call [`Self::flush`].
2136    pub fn pending_substreams(&self) -> usize {
2137        self.completed.len()
2138    }
2139
2140    async fn push_session(
2141        &mut self,
2142        _store: &Store,
2143        index: usize,
2144        mut session: Session,
2145    ) -> Result<Vec<RowOutcome>> {
2146        // Close out the current substream (if any) - move it to the pending
2147        // buffer instead of writing immediately. The actual write happens
2148        // when the caller invokes `flush` / `finish`.
2149        self.close_current_substream();
2150
2151        // spec.md#datasets: `source_agent` is trimmed at ingest and rejected
2152        // if empty after trim. A Session event with empty source_agent is
2153        // dropped on the spot - the substream that would follow has nothing
2154        // to anchor on, so subsequent message/part events will also drop.
2155        let trimmed = session.source_agent.trim();
2156        if trimmed.is_empty() {
2157            return Ok(vec![RowOutcome {
2158                index,
2159                kind: "session",
2160                pk: Value::String(session.id.clone()),
2161                status: OutcomeStatus::Error,
2162                error: Some(RowError {
2163                    message: format!("session {} has empty source_agent after trim", session.id),
2164                    field: Some("source_agent"),
2165                    reason: None,
2166                    reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2167                }),
2168            }]);
2169        }
2170        if trimmed.len() != session.source_agent.len() {
2171            session.source_agent = trimmed.to_owned();
2172        }
2173
2174        if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2175            return Ok(vec![RowOutcome {
2176                index,
2177                kind: "session",
2178                pk: Value::String(session.id.clone()),
2179                status: OutcomeStatus::Error,
2180                error: Some(RowError {
2181                    message: format!(
2182                        "session {} has parent_message_id without parent_session_id",
2183                        session.id,
2184                    ),
2185                    field: Some("parent_message_id"),
2186                    reason: None,
2187                    reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2188                }),
2189            }]);
2190        }
2191
2192        self.seen_message_ids.clear();
2193        self.seen_part_keys.clear();
2194        self.session = Some(BufferedSession { index, session });
2195        Ok(Vec::new())
2196    }
2197
2198    fn close_current_substream(&mut self) {
2199        self.flush_current_message();
2200        let Some(BufferedSession {
2201            index: session_index,
2202            session,
2203        }) = self.session.take()
2204        else {
2205            return;
2206        };
2207        let messages = std::mem::take(&mut self.messages);
2208        self.seen_message_ids.clear();
2209        self.seen_part_keys.clear();
2210        self.completed.push(CompletedSubstream {
2211            session_index,
2212            session,
2213            messages,
2214        });
2215    }
2216
2217    fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2218        let pk = Value::Array(vec![
2219            Value::String(message.session_id().to_owned()),
2220            Value::String(message.id().to_owned()),
2221        ]);
2222        let Some(session) = &self.session else {
2223            return vec![error_outcome(
2224                index,
2225                "message",
2226                pk,
2227                "first event in a session stream must be Session",
2228                None,
2229                DROP_REASON_MESSAGE_BEFORE_SESSION,
2230            )];
2231        };
2232        if message.session_id() != session.session.id {
2233            let msg = format!(
2234                "message {} references session {}, expected {}",
2235                message.id(),
2236                message.session_id(),
2237                session.session.id
2238            );
2239            return vec![error_outcome(
2240                index,
2241                "message",
2242                pk,
2243                &msg,
2244                Some("session_id"),
2245                DROP_REASON_MESSAGE_SESSION_MISMATCH,
2246            )];
2247        }
2248        if !self.seen_message_ids.insert(message.id().to_owned()) {
2249            // Keep same-substream duplicate ids visible in `dropped_events`;
2250            // adapters are expected to dedupe upstream (see claude-code's
2251            // per-file `seen_uuids`), so a hit here is worth investigating.
2252            let msg = format!("duplicate message id {} in session substream", message.id());
2253            return vec![error_outcome(
2254                index,
2255                "message",
2256                pk,
2257                &msg,
2258                None,
2259                DROP_REASON_DUPLICATE_MESSAGE_ID,
2260            )];
2261        }
2262        self.flush_current_message();
2263        self.current_message = Some(BufferedMessage {
2264            index,
2265            message,
2266            parts: Vec::new(),
2267            search_text: None,
2268        });
2269        Vec::new()
2270    }
2271
2272    fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2273        let pk = Value::Array(vec![
2274            Value::String(part.session_id.clone()),
2275            Value::String(part.message_id.clone()),
2276            Value::String(part.id.clone()),
2277        ]);
2278        let Some(current) = &self.current_message else {
2279            return vec![error_outcome(
2280                index,
2281                "part",
2282                pk,
2283                "part event appeared before a message",
2284                None,
2285                DROP_REASON_PART_BEFORE_MESSAGE,
2286            )];
2287        };
2288        if part.session_id != current.message.session_id() {
2289            let msg = format!(
2290                "part {} references session {}, expected {}",
2291                part.id,
2292                part.session_id,
2293                current.message.session_id()
2294            );
2295            return vec![error_outcome(
2296                index,
2297                "part",
2298                pk,
2299                &msg,
2300                Some("session_id"),
2301                DROP_REASON_PART_MESSAGE_MISMATCH,
2302            )];
2303        }
2304        if part.message_id != current.message.id() {
2305            let msg = format!(
2306                "part {} references message {}, expected {}",
2307                part.id,
2308                part.message_id,
2309                current.message.id()
2310            );
2311            return vec![error_outcome(
2312                index,
2313                "part",
2314                pk,
2315                &msg,
2316                Some("message_id"),
2317                DROP_REASON_PART_MESSAGE_MISMATCH,
2318            )];
2319        }
2320        let part_key = (part.message_id.clone(), part.id.clone());
2321        if !self.seen_part_keys.insert(part_key) {
2322            let msg = format!(
2323                "duplicate part id {} for message {} in session substream",
2324                part.id, part.message_id
2325            );
2326            return vec![error_outcome(
2327                index,
2328                "part",
2329                pk,
2330                &msg,
2331                None,
2332                DROP_REASON_DUPLICATE_PART_KEY,
2333            )];
2334        }
2335        self.current_parts.push(BufferedPart { index, part });
2336        Vec::new()
2337    }
2338
2339    fn flush_current_message(&mut self) {
2340        let Some(mut buffered) = self.current_message.take() else {
2341            return;
2342        };
2343        let parts = std::mem::take(&mut self.current_parts);
2344        let mut canonical_parts = Vec::with_capacity(parts.len());
2345        for part in &parts {
2346            canonical_parts.push(part.part.clone());
2347        }
2348        buffered.search_text = search_text(&buffered.message, &canonical_parts);
2349        buffered.parts = parts;
2350        self.messages.push(buffered);
2351    }
2352}
2353
2354fn error_outcome(
2355    index: usize,
2356    kind: &'static str,
2357    pk: Value,
2358    message: &str,
2359    field: Option<&'static str>,
2360    reason_key: &'static str,
2361) -> RowOutcome {
2362    RowOutcome {
2363        index,
2364        kind,
2365        pk,
2366        status: OutcomeStatus::Error,
2367        error: Some(RowError {
2368            message: message.to_owned(),
2369            field,
2370            reason: None,
2371            reason_key: Some(reason_key),
2372        }),
2373    }
2374}
2375
2376/// Session-level rejection (immutable `source_agent` / `project` violation):
2377/// emit exactly one Error outcome on the Session row. The buffered messages
2378/// and parts of this substream are *not* surfaced as per-row errors - their
2379/// loss is implied by the single session-rejection (spec.md#adapter-integrity-event-ordering).
2380fn error_outcomes_for_substream(
2381    session_index: usize,
2382    session: &Session,
2383    _messages: &[BufferedMessage],
2384    message: impl Into<String>,
2385    field: Option<&'static str>,
2386    reason_key: &'static str,
2387) -> Vec<RowOutcome> {
2388    let reason = field.map(|_| "immutable");
2389    vec![RowOutcome {
2390        index: session_index,
2391        kind: "session",
2392        pk: Value::String(session.id.clone()),
2393        status: OutcomeStatus::Error,
2394        error: Some(RowError {
2395            message: message.into(),
2396            field,
2397            reason,
2398            reason_key: Some(reason_key),
2399        }),
2400    }]
2401}
2402
2403/// Batched-path success helper: every row in a substream takes the same
2404/// status (the batch-level `Inserted` vs `Matched` decision from
2405/// `merge_insert.num_inserted_rows`).
2406fn success_outcomes_for_substream(
2407    session_index: usize,
2408    session: &Session,
2409    messages: &[BufferedMessage],
2410    status: UpsertStatus,
2411) -> Vec<RowOutcome> {
2412    let mut outcomes = Vec::with_capacity(1 + messages.len());
2413    outcomes.push(success_outcome(
2414        session_index,
2415        "session",
2416        Value::String(session.id.clone()),
2417        status,
2418    ));
2419    for buffered in messages {
2420        let pk = Value::Array(vec![
2421            Value::String(buffered.message.session_id().to_owned()),
2422            Value::String(buffered.message.id().to_owned()),
2423        ]);
2424        outcomes.push(success_outcome(buffered.index, "message", pk, status));
2425        for part in &buffered.parts {
2426            let part_pk = Value::Array(vec![
2427                Value::String(part.part.session_id.clone()),
2428                Value::String(part.part.message_id.clone()),
2429                Value::String(part.part.id.clone()),
2430            ]);
2431            outcomes.push(success_outcome(part.index, "part", part_pk, status));
2432        }
2433    }
2434    outcomes
2435}
2436
2437fn success_outcome(
2438    index: usize,
2439    kind: &'static str,
2440    pk: Value,
2441    status: UpsertStatus,
2442) -> RowOutcome {
2443    let status = match status {
2444        UpsertStatus::Inserted => OutcomeStatus::Inserted,
2445        UpsertStatus::Matched => OutcomeStatus::Matched,
2446    };
2447    RowOutcome {
2448        index,
2449        kind,
2450        pk,
2451        status,
2452        error: None,
2453    }
2454}
2455
2456#[derive(Debug, Clone, PartialEq, Eq)]
2457enum IngestError {
2458    /// spec.md#protocol: `Session.source_agent` and `Session.project` are
2459    /// immutable post-first-write because the denormalized copies on
2460    /// `messages` were stamped from the prior Session at first ingest.
2461    /// A re-write that changes either would silently desync.
2462    ImmutableField {
2463        field: &'static str,
2464        session_id: String,
2465        stored: String,
2466        attempted: String,
2467    },
2468}
2469
2470impl std::fmt::Display for IngestError {
2471    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2472        match self {
2473            Self::ImmutableField {
2474                field,
2475                session_id,
2476                stored,
2477                attempted,
2478            } => write!(
2479                formatter,
2480                "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2481            ),
2482        }
2483    }
2484}
2485
2486impl std::error::Error for IngestError {}
2487
2488/// Compare an incoming Session row against the stored row on the two
2489/// immutable fields (spec.md#protocol). The `Option<String>` `project` field
2490/// counts a NULL-vs-non-NULL change as a mismatch.
2491fn ensure_immutable_match(
2492    existing: &Session,
2493    incoming: &Session,
2494) -> std::result::Result<(), IngestError> {
2495    if existing.source_agent != incoming.source_agent {
2496        return Err(IngestError::ImmutableField {
2497            field: "source_agent",
2498            session_id: incoming.id.clone(),
2499            stored: existing.source_agent.clone(),
2500            attempted: incoming.source_agent.clone(),
2501        });
2502    }
2503    if existing.project != incoming.project {
2504        return Err(IngestError::ImmutableField {
2505            field: "project",
2506            session_id: incoming.id.clone(),
2507            stored: (*existing.project).clone(),
2508            attempted: (*incoming.project).clone(),
2509        });
2510    }
2511    Ok(())
2512}
2513
2514pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2515    use crate::wire::Provenance;
2516    let mut chunks: Vec<String> = Vec::new();
2517    for part in parts {
2518        // spec.md#search: only conversational parts contribute to the indexed
2519        // text; harness-injected scaffolding is excluded from search.
2520        if part.provenance != Provenance::Conversational {
2521            continue;
2522        }
2523        match (message.role(), &part.kind) {
2524            (Role::User | Role::Assistant, PartKind::Text { text }) => {
2525                if let Some(text) = text {
2526                    chunks.push(text.to_string());
2527                }
2528            }
2529            (
2530                Role::User | Role::Assistant,
2531                PartKind::File {
2532                    media_type,
2533                    file_name,
2534                    data,
2535                },
2536            ) => {
2537                if let Some(file_name) = file_name {
2538                    chunks.push(file_name.clone());
2539                }
2540                chunks.push(media_type.clone());
2541                if let FileData::Url(uri) = data {
2542                    chunks.push(uri.clone());
2543                }
2544            }
2545            (
2546                Role::System | Role::Tool,
2547                PartKind::Text { .. }
2548                | PartKind::Reasoning { .. }
2549                | PartKind::File { .. }
2550                | PartKind::ToolCall { .. }
2551                | PartKind::ToolResult { .. }
2552                | PartKind::ToolApprovalRequest { .. }
2553                | PartKind::ToolApprovalResponse { .. },
2554            )
2555            | (
2556                Role::User | Role::Assistant,
2557                PartKind::Reasoning { .. }
2558                | PartKind::ToolCall { .. }
2559                | PartKind::ToolResult { .. }
2560                | PartKind::ToolApprovalRequest { .. }
2561                | PartKind::ToolApprovalResponse { .. },
2562            ) => {}
2563        }
2564    }
2565
2566    let text = chunks
2567        .into_iter()
2568        .filter(|chunk| !chunk.trim().is_empty())
2569        .collect::<Vec<_>>()
2570        .join("\n");
2571    if text.is_empty() { None } else { Some(text) }
2572}
2573
2574/// Non-empty conversational text (spec.md#search).
2575#[derive(Debug, Clone, PartialEq, Eq)]
2576pub struct SearchText(String);
2577
2578impl SearchText {
2579    pub fn as_str(&self) -> &str {
2580        &self.0
2581    }
2582
2583    pub fn into_inner(self) -> String {
2584        self.0
2585    }
2586}
2587
2588impl AsRef<str> for SearchText {
2589    fn as_ref(&self) -> &str {
2590        &self.0
2591    }
2592}
2593
2594#[derive(Debug, Clone, PartialEq)]
2595pub struct MessageWithParts {
2596    pub message: Message,
2597    pub parts: Vec<Part>,
2598}
2599
2600#[derive(Debug, Clone, PartialEq)]
2601pub struct SessionWithMessages {
2602    pub session: Session,
2603    pub messages: Vec<MessageWithParts>,
2604}
2605
2606#[derive(Debug, Clone)]
2607pub struct SessionViewParams<'a> {
2608    pub mode: ResponseMode,
2609    pub after_id: Option<&'a str>,
2610    pub limit: usize,
2611    pub budget_bytes: usize,
2612}
2613
2614#[derive(Debug, Clone)]
2615pub struct MessageViewParams<'a> {
2616    pub context_depth: usize,
2617    pub after_id: Option<&'a str>,
2618    pub limit: usize,
2619    pub budget_bytes: usize,
2620}
2621
2622/// Outcome of a `pond_get` lookup. Separates a missing target (the handler
2623/// maps it to `not_found`) from a stale/unknown `after_id` (mapped to
2624/// `validation_failed`): the message/part stream is append-only, so an anchor
2625/// that was ever valid never disappears - an unknown one is always a client
2626/// error, never a reason to silently restart the page.
2627#[derive(Debug, Clone, PartialEq)]
2628pub enum GetLookup<T> {
2629    NotFound,
2630    UnknownAfterId,
2631    Found(T),
2632}
2633
2634/// Canonical retrieval result for `pond_get` session mode: the stored session
2635/// plus the page of messages (each with its `Part`s) and a remaining count.
2636/// Protocol-shaping into `GetResult`/`MessageView` happens in the handler.
2637#[derive(Debug, Clone, PartialEq)]
2638pub struct SessionPage {
2639    pub session: Session,
2640    pub messages: Vec<RetrievedMessage>,
2641    pub messages_remaining: usize,
2642}
2643
2644/// Canonical retrieval result for `pond_get` message mode. `target.parts` is
2645/// empty - the target's parts ride `target_parts` (paginated); `siblings` carry
2646/// their parts so the handler can summarize them.
2647#[derive(Debug, Clone, PartialEq)]
2648pub struct MessagePage {
2649    pub session: Session,
2650    pub target: RetrievedMessage,
2651    pub target_parts: Vec<Part>,
2652    pub target_parts_remaining: usize,
2653    pub siblings: Vec<RetrievedMessage>,
2654}
2655
2656#[derive(Debug, Clone, PartialEq)]
2657pub struct RetrievedMessage {
2658    pub id: String,
2659    pub role: Role,
2660    pub timestamp: DateTime<Utc>,
2661    pub text: Option<String>,
2662    pub content: Option<String>,
2663    pub parts: Vec<Part>,
2664}
2665
2666#[derive(Debug, Clone)]
2667struct ScanRow {
2668    id: String,
2669    role: Role,
2670    timestamp: DateTime<Utc>,
2671    text: Option<String>,
2672    content: Option<String>,
2673}
2674
2675/// One row of the conversational scan. `text` is non-empty by
2676/// `IsNotNull("search_text")` pushdown (spec.md#search).
2677#[derive(Debug, Clone)]
2678pub struct ConversationalRow {
2679    pub session_id: String,
2680    pub message_id: String,
2681    pub role: Role,
2682    pub timestamp: DateTime<Utc>,
2683    pub text: SearchText,
2684}
2685
2686/// Number of leading `items` that fit within `limit` and the byte budget,
2687/// sizing each by `size`. Always emits at least one (a single oversize item
2688/// never blocks its own page); the budget then stops the page at the next item
2689/// boundary.
2690fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2691    let capped = items.len().min(limit.clamp(1, 1000));
2692    let mut acc = 0usize;
2693    let mut emitted = 0usize;
2694    for item in &items[..capped] {
2695        let next = acc.saturating_add(size(item));
2696        if emitted > 0 && next > budget_bytes {
2697            break;
2698        }
2699        acc = next;
2700        emitted += 1;
2701    }
2702    emitted
2703}
2704
2705fn role_from_str(value: &str) -> Result<Role> {
2706    match value {
2707        "system" => Ok(Role::System),
2708        "user" => Ok(Role::User),
2709        "assistant" => Ok(Role::Assistant),
2710        "tool" => Ok(Role::Tool),
2711        other => anyhow::bail!("unknown message role {other}"),
2712    }
2713}
2714
2715/// Scalar indexes on `messages` (spec.md#datasets): BTREE for high-cardinality
2716/// and range columns, BITMAP for low-cardinality columns. There is no index
2717/// on `embedding_model`: pond's invariant is one active model at a time
2718/// (a model swap goes through `pond embed --force` which drops the IVF_PQ,
2719/// clears stale rows, and re-bootstraps), so `embedding_model` is never a
2720/// query-time predicate - the only embedding-state filter is `vector IS NOT
2721/// NULL`. `id` lookups are rare and full-scan.
2722const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2723    ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2724    (
2725        "session_id",
2726        BuiltinIndexType::BTree,
2727        "messages_session_id_btree",
2728    ),
2729    (
2730        "timestamp",
2731        BuiltinIndexType::BTree,
2732        "messages_timestamp_btree",
2733    ),
2734    (
2735        "source_agent",
2736        BuiltinIndexType::Bitmap,
2737        "messages_source_agent_bitmap",
2738    ),
2739    ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2740];
2741
2742/// Scalar indexes on `parts`: `(session_id, message_id)` is the hot-path lookup key for
2743/// `parts_for_messages` (hydration on every `get` and grouped search).
2744const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2745    (
2746        "session_id",
2747        BuiltinIndexType::BTree,
2748        "parts_session_id_btree",
2749    ),
2750    (
2751        "message_id",
2752        BuiltinIndexType::BTree,
2753        "parts_message_id_btree",
2754    ),
2755];
2756
2757/// Scalar index on `sessions`: `id` is filtered by `find_session` on every
2758/// `get` and every grouped search.
2759const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2760    &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2761
2762fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
2763    Predicate::In(
2764        column,
2765        values.iter().cloned().map(ScalarValue::String).collect(),
2766    )
2767}
2768
2769/// Combine the caller's filter with `vector IS NOT NULL` so the kNN scanner
2770/// never sees a null-vector row. Under the single-active-model invariant,
2771/// `vector IS NOT NULL` is equivalent to "row is currently embedded under
2772/// the configured model" - no per-row `embedding_model` filter needed.
2773fn embedded_scope(filter: &Predicate) -> Predicate {
2774    Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
2775}
2776
2777fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
2778    let inserted = usize::try_from(inserted_rows)
2779        .unwrap_or(usize::MAX)
2780        .min(total);
2781    let mut statuses = Vec::with_capacity(total);
2782    statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
2783    statuses.extend(std::iter::repeat_n(
2784        UpsertStatus::Matched,
2785        total.saturating_sub(inserted),
2786    ));
2787    statuses
2788}
2789
2790// Bare logical table names: the lance-namespace Directory impl owns the
2791// `.lance` directory suffix (spec.md#lance-chokepoints-catalog). No consumer reconstructs
2792// a `.lance` path.
2793pub(crate) const SESSIONS: &str = "sessions";
2794pub(crate) const MESSAGES: &str = "messages";
2795pub(crate) const PARTS: &str = "parts";
2796
2797/// FTS index name on `messages.search_text`. Stable so status and index
2798/// creation name the same index.
2799pub(crate) const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
2800
2801/// IVF_PQ index name on `messages.vector` (spec.md#search). Stable so the
2802/// activation check and index creation name the same index.
2803pub(crate) const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
2804
2805/// IVF_PQ tuning constants (spec.md#search):
2806/// - num_bits = 8 (256 centroids per PQ subspace; needs >= 256 vectors)
2807/// - sub_vectors = embedding_dim / 8 (8-float PQ subspaces)
2808/// - max_iters = 15 (kmeans cap)
2809/// - cosine metric (e5 vectors are L2-normalized)
2810const IVF_PQ_NUM_BITS: u8 = 8;
2811const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
2812const IVF_PQ_MAX_ITERS: usize = 15;
2813
2814/// FTS tokenizer constants (spec.md#search-language-neutral-index): character ngrams
2815/// in `[3, 5]`. 4-5-grams discriminate, min=3 keeps 3-char tokens
2816/// (`FTS`, `OCC`) searchable.
2817const FTS_NGRAM_MIN: u32 = 3;
2818const FTS_NGRAM_MAX: u32 = 5;
2819
2820/// Pond's production IndexIntents: the per-table intent set
2821/// `Store::open_with_options` registers with the substrate.
2822pub fn pond_index_intents() -> IndexIntents {
2823    pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
2824}
2825
2826/// Same as [`pond_index_intents`] but with an overridable IVF_PQ activation
2827/// threshold. Used by tests that need to exercise the activation boundary
2828/// without writing 100k vectors.
2829pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
2830    let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
2831    messages.push(IndexIntent {
2832        name: MESSAGES_FTS_INDEX,
2833        column: "search_text",
2834        trigger: IndexTrigger::OnAnyRows,
2835        params: IndexParamsKind::InvertedFtsNgram {
2836            min: FTS_NGRAM_MIN,
2837            max: FTS_NGRAM_MAX,
2838        },
2839    });
2840    for (column, kind, name) in MESSAGE_SCALAR_INDICES {
2841        messages.push(IndexIntent {
2842            name,
2843            column,
2844            trigger: IndexTrigger::OnAnyRows,
2845            params: IndexParamsKind::Scalar(kind.clone()),
2846        });
2847    }
2848    messages.push(IndexIntent {
2849        name: MESSAGES_VECTOR_INDEX,
2850        column: "vector",
2851        trigger: IndexTrigger::OnNonNullCount {
2852            column: "vector",
2853            threshold: vector_threshold,
2854        },
2855        params: IndexParamsKind::IvfPqCosine {
2856            sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
2857            num_bits: IVF_PQ_NUM_BITS,
2858            max_iters: IVF_PQ_MAX_ITERS,
2859        },
2860    });
2861    let parts = PARTS_SCALAR_INDICES
2862        .iter()
2863        .map(|(column, kind, name)| IndexIntent {
2864            name,
2865            column,
2866            trigger: IndexTrigger::OnAnyRows,
2867            params: IndexParamsKind::Scalar(kind.clone()),
2868        })
2869        .collect();
2870    let sessions = SESSIONS_SCALAR_INDICES
2871        .iter()
2872        .map(|(column, kind, name)| IndexIntent {
2873            name,
2874            column,
2875            trigger: IndexTrigger::OnAnyRows,
2876            params: IndexParamsKind::Scalar(kind.clone()),
2877        })
2878        .collect();
2879    IndexIntents {
2880        sessions,
2881        messages,
2882        parts,
2883    }
2884}
2885
2886/// Default width of the `messages.vector` embedding column (spec.md#search):
2887/// matches [`embed::DEFAULT_MODEL_ID`] (`intfloat/multilingual-e5-small`,
2888/// 384). Used when `[embeddings].dim` is absent.
2889pub const DEFAULT_EMBEDDING_DIM: usize = 384;
2890
2891/// Process-wide vector dimension, seeded once at startup from `[embeddings].dim`
2892/// via [`init_embedding_dim`]. `OnceLock` (not `const`) so a temporary config
2893/// file can pick a different-dim model (e.g. e5-small at 384) for an experiment
2894/// without touching every site. Uninitialized -> [`DEFAULT_EMBEDDING_DIM`],
2895/// which keeps unit tests config-free.
2896static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
2897
2898/// The active embedding dimension. Returns whatever [`init_embedding_dim`]
2899/// installed, or [`DEFAULT_EMBEDDING_DIM`] when nothing has installed one.
2900pub fn embedding_dim() -> usize {
2901    EMBEDDING_DIM_RUNTIME
2902        .get()
2903        .copied()
2904        .unwrap_or(DEFAULT_EMBEDDING_DIM)
2905}
2906
2907/// Seed [`embedding_dim`] from config. First call wins.
2908pub fn init_embedding_dim(dim: usize) {
2909    EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
2910}
2911
2912/// Initial-`CREATE` write params for the namespace-mediated path. The
2913/// substrate seam stamps in `session`, `mode`, and `store_params`.
2914/// `auto_cleanup` is short; long-term recovery is `pond export` snapshots
2915/// plus deferred Lance tags (spec.md#session-durable-copy). `skip_auto_cleanup`
2916/// suppresses the per-commit hook so cleanup stays operator-driven via
2917/// `pond index optimize` (one LIST per command instead of per write).
2918pub(crate) fn write_params_for_create() -> WriteParams {
2919    WriteParams {
2920        data_storage_version: Some(LanceFileVersion::V2_1),
2921        enable_v2_manifest_paths: true,
2922        enable_stable_row_ids: true,
2923        auto_cleanup: Some(AutoCleanupParams {
2924            interval: 20,
2925            older_than: chrono::TimeDelta::days(1),
2926        }),
2927        skip_auto_cleanup: true,
2928        ..WriteParams::default()
2929    }
2930}
2931
2932fn export_schema(table: Table) -> Arc<Schema> {
2933    match table {
2934        Table::Sessions => session_schema(),
2935        Table::Messages => message_schema(),
2936        Table::Parts => part_schema(),
2937    }
2938}
2939
2940fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
2941    let expected = export_schema(table);
2942    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
2943    let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
2944    let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
2945    if actual_names != expected_names {
2946        anyhow::bail!(
2947            "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
2948            table.as_str(),
2949        );
2950    }
2951    Ok(())
2952}
2953
2954async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
2955    let source_uri = source
2956        .to_str()
2957        .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
2958    let dataset = Dataset::open(source_uri)
2959        .await
2960        .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
2961    ensure_schema_matches_archive(&dataset, table)?;
2962    Ok(dataset)
2963}
2964
2965pub(crate) fn session_schema() -> Arc<Schema> {
2966    Arc::new(Schema::new(vec![
2967        primary_field("id", DataType::Utf8, false),
2968        Field::new("parent_session_id", DataType::Utf8, true),
2969        Field::new("parent_message_id", DataType::Utf8, true),
2970        Field::new("source_agent", DataType::Utf8, false),
2971        Field::new(
2972            "created_at",
2973            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
2974            false,
2975        ),
2976        Field::new("project", DataType::Utf8, false),
2977        json_field("options", false),
2978    ]))
2979}
2980
2981pub(crate) fn message_schema() -> Arc<Schema> {
2982    Arc::new(Schema::new(vec![
2983        primary_field("session_id", DataType::Utf8, false),
2984        primary_field("id", DataType::Utf8, false),
2985        Field::new(
2986            "timestamp",
2987            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
2988            false,
2989        ),
2990        Field::new("role", DataType::Utf8, false),
2991        Field::new("source_agent", DataType::Utf8, false),
2992        Field::new("project", DataType::Utf8, false),
2993        Field::new("content", DataType::Utf8, true),
2994        Field::new("search_text", DataType::Utf8, true),
2995        // The message's derived embedding (spec.md#session-embed-from-canonical):
2996        // both null until `pond embed` fills them, set together thereafter.
2997        Field::new("vector", embedding_vector_type(), true),
2998        Field::new("embedding_model", DataType::Utf8, true),
2999        json_field("options", false),
3000    ]))
3001}
3002
3003pub(crate) fn part_schema() -> Arc<Schema> {
3004    Arc::new(Schema::new(vec![
3005        primary_field("session_id", DataType::Utf8, false),
3006        primary_field("message_id", DataType::Utf8, false),
3007        primary_field("id", DataType::Utf8, false),
3008        Field::new("ordinal", DataType::Int32, false),
3009        Field::new("type", DataType::Utf8, false),
3010        // spec.md#model-part-provenance: conversation vs harness-injected; search
3011        // reads this column to exclude injected scaffolding.
3012        Field::new("provenance", DataType::Utf8, false),
3013        json_field("variant_data", false),
3014        legacy_blob_field("data", true),
3015        json_field("options", false),
3016    ]))
3017}
3018
3019pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3020    let arrays = schema
3021        .fields()
3022        .iter()
3023        .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3024        .collect();
3025    RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3026}
3027
3028pub(crate) fn empty_reader(
3029    schema: Arc<Schema>,
3030) -> Result<
3031    RecordBatchIterator<
3032        std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3033    >,
3034> {
3035    let batch = empty_batch(schema.clone())?;
3036    Ok(RecordBatchIterator::new(
3037        vec![Ok(batch)].into_iter(),
3038        schema,
3039    ))
3040}
3041
3042pub(crate) struct MessageBatchRow<'a> {
3043    pub message: &'a Message,
3044    pub source_agent: &'a str,
3045    pub project: &'a str,
3046    pub search_text: Option<&'a str>,
3047}
3048
3049// Lance v7.0.0-beta.16's IVF_PQ build path (`rust/lance/src/index/vector/utils.rs`
3050// `infer_vector_element_type_impl`) accepts only Float16/Float32/Float64/UInt8/Int8;
3051// `FixedSizeBinary(2)`-backed `lance.bfloat16` is rejected. The format docs list
3052// BFloat16 as a future-supported embedding type; until the Rust IVF_PQ build
3053// path catches up, store as Float16 (half-precision, also 2 bytes/element).
3054fn embedding_vector_type() -> DataType {
3055    DataType::FixedSizeList(
3056        Arc::new(Field::new("item", DataType::Float16, true)),
3057        embedding_dim() as i32,
3058    )
3059}
3060
3061/// The partial-schema source for the embedding column update: the `messages`
3062/// primary key plus the two columns `pond embed` fills. The field definitions
3063/// match `message_schema` exactly so Lance accepts it as a subset upsert.
3064fn embedding_update_schema() -> Arc<Schema> {
3065    Arc::new(Schema::new(vec![
3066        primary_field("session_id", DataType::Utf8, false),
3067        primary_field("id", DataType::Utf8, false),
3068        Field::new("vector", embedding_vector_type(), true),
3069        Field::new("embedding_model", DataType::Utf8, true),
3070    ]))
3071}
3072
3073/// Build the merge-update source batch for [`Store::write_embeddings`]: one row
3074/// per embedded message carrying `(session_id, id, vector, embedding_model)`.
3075pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3076    let dim = embedding_dim();
3077    let mut flat = Vec::with_capacity(rows.len() * dim);
3078    for row in rows {
3079        if row.vector.len() != dim {
3080            anyhow::bail!(
3081                "embedding for message {} has dim {}, expected {dim}",
3082                row.id,
3083                row.vector.len(),
3084            );
3085        }
3086        flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3087    }
3088    let values = Float16Array::from(flat);
3089    let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3090    let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3091        .context("failed to build embedding vector column")?;
3092
3093    RecordBatch::try_new(
3094        embedding_update_schema(),
3095        vec![
3096            Arc::new(StringArray::from(
3097                rows.iter()
3098                    .map(|row| row.session_id.as_str())
3099                    .collect::<Vec<_>>(),
3100            )),
3101            Arc::new(StringArray::from(
3102                rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3103            )),
3104            Arc::new(vectors),
3105            Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3106        ],
3107    )
3108    .context("failed to build embedding update batch")
3109}
3110
3111/// The runtime backstop against Arrow's 2 GiB `i32` offset wall: a flush batch
3112/// is split before the running total of its text columns reaches this, and a
3113/// single cell at or above it is rejected rather than left to panic inside
3114/// `StringArray::from` (spec.md#adapter-bounded-values).
3115const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3116
3117/// Contiguous row ranges whose summed text-column byte cost each stays within
3118/// `COLUMN_BYTE_BUDGET`. Budgeting the all-column total bounds every individual
3119/// column too, since no single column's total can exceed it. `cells[i]` is row
3120/// `i`'s byte cost summed across every text column.
3121fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3122    let mut chunks = Vec::new();
3123    let mut start = 0usize;
3124    let mut running = 0usize;
3125    for (index, &row) in cells.iter().enumerate() {
3126        if running + row > COLUMN_BYTE_BUDGET && index > start {
3127            chunks.push(start..index);
3128            start = index;
3129            running = 0;
3130        }
3131        running += row;
3132    }
3133    if start < cells.len() {
3134        chunks.push(start..cells.len());
3135    }
3136    chunks
3137}
3138
3139fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3140    if bytes >= COLUMN_BYTE_BUDGET {
3141        anyhow::bail!(
3142            "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3143             overflow Arrow's i32 offset buffer"
3144        );
3145    }
3146    Ok(())
3147}
3148
3149async fn merge_insert_chunks(
3150    handle: &Handle,
3151    table: Table,
3152    batches: Vec<RecordBatch>,
3153) -> Result<u64> {
3154    let mut inserted = 0u64;
3155    for batch in batches {
3156        let rows = batch.num_rows();
3157        inserted += handle.merge_insert(table, batch, rows).await?;
3158    }
3159    Ok(inserted)
3160}
3161
3162pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3163    let options = sessions
3164        .iter()
3165        .map(|session| json_bytes(&session.options))
3166        .collect::<Result<Vec<_>>>()?;
3167    let mut cells = Vec::with_capacity(sessions.len());
3168    for (session, encoded) in sessions.iter().zip(&options) {
3169        let columns = [
3170            session.id.len(),
3171            session.parent_session_id.as_deref().map_or(0, str::len),
3172            session.parent_message_id.as_deref().map_or(0, str::len),
3173            session.source_agent.len(),
3174            session.project.as_str().len(),
3175            encoded.len(),
3176        ];
3177        for bytes in columns {
3178            guard_cell("sessions", &session.id, bytes)?;
3179        }
3180        cells.push(columns.iter().sum());
3181    }
3182    chunk_ranges(&cells)
3183        .into_iter()
3184        .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3185        .collect()
3186}
3187
3188fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3189    let schema = session_schema();
3190    RecordBatch::try_new(
3191        schema.clone(),
3192        vec![
3193            Arc::new(StringArray::from(
3194                sessions
3195                    .iter()
3196                    .map(|session| session.id.as_str())
3197                    .collect::<Vec<_>>(),
3198            )),
3199            Arc::new(StringArray::from(
3200                sessions
3201                    .iter()
3202                    .map(|session| session.parent_session_id.as_deref())
3203                    .collect::<Vec<_>>(),
3204            )),
3205            Arc::new(StringArray::from(
3206                sessions
3207                    .iter()
3208                    .map(|session| session.parent_message_id.as_deref())
3209                    .collect::<Vec<_>>(),
3210            )),
3211            Arc::new(StringArray::from(
3212                sessions
3213                    .iter()
3214                    .map(|session| session.source_agent.as_str())
3215                    .collect::<Vec<_>>(),
3216            )),
3217            Arc::new(
3218                TimestampMicrosecondArray::from(
3219                    sessions
3220                        .iter()
3221                        .map(|session| micros(session.created_at))
3222                        .collect::<Vec<_>>(),
3223                )
3224                .with_timezone("UTC"),
3225            ),
3226            Arc::new(StringArray::from(
3227                sessions
3228                    .iter()
3229                    .map(|session| session.project.as_str())
3230                    .collect::<Vec<_>>(),
3231            )),
3232            Arc::new(LargeBinaryArray::from_iter_values(
3233                options.iter().map(Vec::as_slice),
3234            )),
3235        ],
3236    )
3237    .context("failed to build session batch")
3238}
3239
3240pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3241    let options = rows
3242        .iter()
3243        .map(|row| json_bytes(row.message.options()))
3244        .collect::<Result<Vec<_>>>()?;
3245    let mut cells = Vec::with_capacity(rows.len());
3246    for (row, encoded) in rows.iter().zip(&options) {
3247        let columns = [
3248            row.message.session_id().len(),
3249            row.message.id().len(),
3250            row.message.role().as_str().len(),
3251            row.source_agent.len(),
3252            row.project.len(),
3253            row.message.system_content().map_or(0, str::len),
3254            row.search_text.map_or(0, str::len),
3255            encoded.len(),
3256        ];
3257        for bytes in columns {
3258            guard_cell("messages", row.message.id(), bytes)?;
3259        }
3260        cells.push(columns.iter().sum());
3261    }
3262    chunk_ranges(&cells)
3263        .into_iter()
3264        .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3265        .collect()
3266}
3267
3268fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3269    let schema = message_schema();
3270    RecordBatch::try_new(
3271        schema.clone(),
3272        vec![
3273            Arc::new(StringArray::from(
3274                rows.iter()
3275                    .map(|row| row.message.session_id())
3276                    .collect::<Vec<_>>(),
3277            )),
3278            Arc::new(StringArray::from(
3279                rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3280            )),
3281            Arc::new(
3282                TimestampMicrosecondArray::from(
3283                    rows.iter()
3284                        .map(|row| micros(row.message.timestamp()))
3285                        .collect::<Vec<_>>(),
3286                )
3287                .with_timezone("UTC"),
3288            ),
3289            Arc::new(StringArray::from(
3290                rows.iter()
3291                    .map(|row| row.message.role().as_str())
3292                    .collect::<Vec<_>>(),
3293            )),
3294            Arc::new(StringArray::from(
3295                rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3296            )),
3297            Arc::new(StringArray::from(
3298                rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3299            )),
3300            Arc::new(StringArray::from(
3301                rows.iter()
3302                    .map(|row| row.message.system_content())
3303                    .collect::<Vec<_>>(),
3304            )),
3305            Arc::new(StringArray::from(
3306                rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3307            )),
3308            // `vector` / `embedding_model` are written null at ingest; every
3309            // message starts un-embedded and `pond embed` fills them later
3310            // (spec.md#session-embed-from-canonical).
3311            new_null_array(&embedding_vector_type(), rows.len()),
3312            new_null_array(&DataType::Utf8, rows.len()),
3313            Arc::new(LargeBinaryArray::from_iter_values(
3314                options.iter().map(Vec::as_slice),
3315            )),
3316        ],
3317    )
3318    .context("failed to build message batch")
3319}
3320
3321pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3322    let variant_data = parts
3323        .iter()
3324        .map(|part| part_variant_json(&part.kind))
3325        .collect::<Result<Vec<_>>>()?;
3326    let options = parts
3327        .iter()
3328        .map(|part| json_bytes(&part.options))
3329        .collect::<Result<Vec<_>>>()?;
3330    let mut cells = Vec::with_capacity(parts.len());
3331    // The blob column is a BinaryArray, exempt from the text-column bound
3332    // (spec.md#adapter-bounded-values); only the StringArray columns are budgeted.
3333    for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3334        let columns = [
3335            part.session_id.len(),
3336            part.message_id.len(),
3337            part.id.len(),
3338            part.kind.type_name().len(),
3339            part.provenance.as_str().len(),
3340            variant.len(),
3341            encoded.len(),
3342        ];
3343        for bytes in columns {
3344            guard_cell("parts", &part.id, bytes)?;
3345        }
3346        cells.push(columns.iter().sum());
3347    }
3348    chunk_ranges(&cells)
3349        .into_iter()
3350        .map(|range| {
3351            parts_chunk(
3352                &parts[range.clone()],
3353                &variant_data[range.clone()],
3354                &options[range],
3355            )
3356        })
3357        .collect()
3358}
3359
3360fn parts_chunk(
3361    parts: &[Part],
3362    variant_data: &[Vec<u8>],
3363    options: &[Vec<u8>],
3364) -> Result<RecordBatch> {
3365    let schema = part_schema();
3366    // Legacy blob (`legacy_blob_field`) is a plain LargeBinary; the URL
3367    // variant is stored as UTF-8 bytes and recovered through `variant_data`'s
3368    // `data_kind = "url"` discriminator (see `file_data_from_blob`).
3369    let blob_payloads: Vec<Option<&[u8]>> = parts
3370        .iter()
3371        .map(|part| match &part.kind {
3372            PartKind::File { data, .. } => Some(match data {
3373                FileData::String(value) => value.as_bytes(),
3374                FileData::Bytes(value) => value.as_slice(),
3375                FileData::Url(value) => value.as_bytes(),
3376            }),
3377            PartKind::Text { .. }
3378            | PartKind::Reasoning { .. }
3379            | PartKind::ToolCall { .. }
3380            | PartKind::ToolResult { .. }
3381            | PartKind::ToolApprovalRequest { .. }
3382            | PartKind::ToolApprovalResponse { .. } => None,
3383        })
3384        .collect();
3385    let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3386
3387    RecordBatch::try_new(
3388        schema.clone(),
3389        vec![
3390            Arc::new(StringArray::from(
3391                parts
3392                    .iter()
3393                    .map(|part| part.session_id.as_str())
3394                    .collect::<Vec<_>>(),
3395            )),
3396            Arc::new(StringArray::from(
3397                parts
3398                    .iter()
3399                    .map(|part| part.message_id.as_str())
3400                    .collect::<Vec<_>>(),
3401            )),
3402            Arc::new(StringArray::from(
3403                parts
3404                    .iter()
3405                    .map(|part| part.id.as_str())
3406                    .collect::<Vec<_>>(),
3407            )),
3408            Arc::new(Int32Array::from(
3409                parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3410            )),
3411            Arc::new(StringArray::from(
3412                parts
3413                    .iter()
3414                    .map(|part| part.kind.type_name())
3415                    .collect::<Vec<_>>(),
3416            )),
3417            Arc::new(StringArray::from(
3418                parts
3419                    .iter()
3420                    .map(|part| part.provenance.as_str())
3421                    .collect::<Vec<_>>(),
3422            )),
3423            Arc::new(LargeBinaryArray::from_iter_values(
3424                variant_data.iter().map(Vec::as_slice),
3425            )),
3426            Arc::new(blob_array),
3427            Arc::new(LargeBinaryArray::from_iter_values(
3428                options.iter().map(Vec::as_slice),
3429            )),
3430        ],
3431    )
3432    .context("failed to build parts batch")
3433}
3434
3435pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3436    Ok(Session {
3437        id: string(batch, "id", row)?.context("session id is null")?,
3438        parent_session_id: string(batch, "parent_session_id", row)?,
3439        parent_message_id: string(batch, "parent_message_id", row)?,
3440        source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3441        created_at: datetime(batch, "created_at", row)?,
3442        project: crate::adapter::Extracted::from_stored(
3443            string(batch, "project", row)?.context("project is null")?,
3444        ),
3445        options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3446    })
3447}
3448
3449pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3450    let id = string(batch, "id", row)?.context("message id is null")?;
3451    let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3452    let timestamp = datetime(batch, "timestamp", row)?;
3453    let options =
3454        json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3455
3456    match string(batch, "role", row)?
3457        .context("message role is null")?
3458        .as_str()
3459    {
3460        "system" => Ok(Message::System {
3461            id,
3462            session_id,
3463            timestamp,
3464            // `content` is nullable in the schema; preserve the distinction
3465            // between "no content row stored" (`None`) and "empty string
3466            // stored" (`Some(extracted_empty)`). The value originally
3467            // came from a `Source` extraction at ingest time; rewrap via
3468            // the storage-internal `from_stored` so the type-system seal
3469            // for adapters stays intact.
3470            content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3471            options,
3472        }),
3473        "user" => Ok(Message::User {
3474            id,
3475            session_id,
3476            timestamp,
3477            options,
3478        }),
3479        "assistant" => Ok(Message::Assistant {
3480            id,
3481            session_id,
3482            timestamp,
3483            options,
3484        }),
3485        "tool" => Ok(Message::Tool {
3486            id,
3487            session_id,
3488            timestamp,
3489            options,
3490        }),
3491        other => anyhow::bail!("unknown message role {other}"),
3492    }
3493}
3494
3495pub(crate) fn part_from_batch(
3496    batch: &RecordBatch,
3497    row: usize,
3498    file_data: Option<FileData>,
3499) -> Result<Part> {
3500    let type_name = string(batch, "type", row)?.context("part type is null")?;
3501    let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3502    let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3503    Ok(Part {
3504        session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3505        message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3506        id: string(batch, "id", row)?.context("part id is null")?,
3507        ordinal: int32(batch, "ordinal", row)?,
3508        provenance: provenance_from_str(&provenance)?,
3509        options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3510        kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3511    })
3512}
3513
3514fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3515    match value {
3516        "conversational" => Ok(crate::wire::Provenance::Conversational),
3517        "injected" => Ok(crate::wire::Provenance::Injected),
3518        other => anyhow::bail!("unknown part provenance {other}"),
3519    }
3520}
3521
3522fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3523    let kind = file_data_kind(variant_data)?;
3524    match kind.as_str() {
3525        "string" => {
3526            let text = std::str::from_utf8(bytes)
3527                .context("file string payload is not UTF-8")?
3528                .to_owned();
3529            Ok(FileData::String(text))
3530        }
3531        "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3532        "url" => Ok(FileData::Url(
3533            std::str::from_utf8(bytes)
3534                .context("file URL payload is not UTF-8")?
3535                .to_owned(),
3536        )),
3537        other => anyhow::bail!("unknown file data_kind {other}"),
3538    }
3539}
3540
3541fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3542    let value = json_parse::<Value>(variant_data)?;
3543    value
3544        .get("data_kind")
3545        .and_then(Value::as_str)
3546        .map(str::to_owned)
3547        .context("file part variant_data missing data_kind")
3548}
3549
3550fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3551    batch
3552        .column_by_name(name)
3553        .with_context(|| format!("missing column {name}"))?
3554        .as_any()
3555        .downcast_ref::<UInt64Array>()
3556        .with_context(|| format!("column {name} is not UInt64"))
3557}
3558
3559/// Map a character to its Unicode script class name, or `None` for
3560/// non-alphabetic characters (digits, punctuation, whitespace). Used by
3561/// `Store::text_script_histogram` to surface corpus language mix in
3562/// `pond status`. The ranges cover the scripts most likely to appear in
3563/// agent-session transcripts; everything else collapses to `"Other"` so the
3564/// histogram stays bounded.
3565fn classify_script(ch: char) -> Option<&'static str> {
3566    if !ch.is_alphabetic() {
3567        return None;
3568    }
3569    let code = ch as u32;
3570    match code {
3571        0x0041..=0x005A | 0x0061..=0x007A | 0x00C0..=0x024F => Some("Latin"),
3572        0x0370..=0x03FF => Some("Greek"),
3573        0x0400..=0x052F => Some("Cyrillic"),
3574        0x0590..=0x05FF => Some("Hebrew"),
3575        0x0600..=0x06FF | 0x0750..=0x077F => Some("Arabic"),
3576        0x0900..=0x097F => Some("Devanagari"),
3577        0x0E00..=0x0E7F => Some("Thai"),
3578        0x3040..=0x309F => Some("Hiragana"),
3579        0x30A0..=0x30FF => Some("Katakana"),
3580        0x4E00..=0x9FFF | 0x3400..=0x4DBF => Some("Han"),
3581        0xAC00..=0xD7AF | 0x1100..=0x11FF => Some("Hangul"),
3582        _ => Some("Other"),
3583    }
3584}
3585
3586pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3587    let array = batch
3588        .column_by_name(name)
3589        .with_context(|| format!("missing column {name}"))?
3590        .as_any()
3591        .downcast_ref::<StringArray>()
3592        .with_context(|| format!("column {name} is not Utf8"))?;
3593    if array.is_null(row) {
3594        Ok(None)
3595    } else {
3596        Ok(Some(array.value(row).to_owned()))
3597    }
3598}
3599
3600fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3601    // Lance can return a `lance.json` column either as raw JSONB bytes
3602    // (LargeBinary) or auto-converted to the Arrow text form (Utf8 /
3603    // LargeUtf8), depending on the read path. Handle both.
3604    let column = batch
3605        .column_by_name(name)
3606        .with_context(|| format!("missing column {name}"))?;
3607    if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3608        return if array.is_null(row) {
3609            Ok(None)
3610        } else {
3611            Ok(Some(
3612                lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3613            ))
3614        };
3615    }
3616    if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3617        return if array.is_null(row) {
3618            Ok(None)
3619        } else {
3620            Ok(Some(array.value(row).as_bytes().to_vec()))
3621        };
3622    }
3623    if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3624        return if array.is_null(row) {
3625            Ok(None)
3626        } else {
3627            Ok(Some(array.value(row).as_bytes().to_vec()))
3628        };
3629    }
3630    anyhow::bail!("column {name} is not a JSON-compatible array")
3631}
3632
3633fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3634    let array = batch
3635        .column_by_name(name)
3636        .with_context(|| format!("missing column {name}"))?
3637        .as_any()
3638        .downcast_ref::<Int32Array>()
3639        .with_context(|| format!("column {name} is not Int32"))?;
3640    Ok(array.value(row))
3641}
3642
3643pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3644    let array = batch
3645        .column_by_name(name)
3646        .with_context(|| format!("missing column {name}"))?
3647        .as_any()
3648        .downcast_ref::<Float32Array>()
3649        .with_context(|| format!("column {name} is not Float32"))?;
3650    Ok(array.value(row))
3651}
3652
3653pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3654    let array = batch
3655        .column_by_name(name)
3656        .with_context(|| format!("missing column {name}"))?
3657        .as_any()
3658        .downcast_ref::<TimestampMicrosecondArray>()
3659        .with_context(|| format!("column {name} is not timestamp_micros"))?;
3660    Utc.timestamp_micros(array.value(row))
3661        .single()
3662        .context("timestamp is out of range")
3663}
3664
3665fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3666    Field::new(name, data_type, nullable).with_metadata(
3667        [(
3668            "lance-schema:unenforced-primary-key".to_owned(),
3669            "true".to_owned(),
3670        )]
3671        .into(),
3672    )
3673}
3674
3675// Legacy blob storage (`LargeBinary` + `lance-encoding:blob=true`). Blob v2's
3676// `Struct<data, uri>` extension requires `data_storage_version >= 2.2`, which
3677// is marked unstable in Lance docs (`format/file/versioning.md`) and at
3678// v7.0.0-beta.16 trips a `compact_files` bug: the AllBinary blob_handling
3679// path leaves the field as a 2-child struct but `BlobV2StructuralEncoder`
3680// allocated only one column_info, so the decoder's second `expect_next()`
3681// fires `"there were more fields in the schema than provided column
3682// indices / infos"`. Legacy blob writes `BlobLayout` pages, which compact
3683// handles correctly (covered by Lance's own `test_compact_blob_columns`).
3684fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3685    Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3686        [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3687            .into_iter()
3688            .collect(),
3689    )
3690}
3691
3692fn json_field(name: &str, nullable: bool) -> Field {
3693    lance_arrow::json::json_field(name, nullable)
3694}
3695
3696fn micros(timestamp: DateTime<Utc>) -> i64 {
3697    timestamp.timestamp_micros()
3698}
3699
3700fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3701    // Write JSONB bytes (not plain UTF-8 JSON text) so the on-disk encoding
3702    // matches the `lance.json` extension contract. Lance's compact path
3703    // (`optimize.rs:908`) reads through `DatasetRecordBatchStream` which
3704    // applies `decode_json -> encode_json` on this column; with proper JSONB
3705    // on disk that roundtrip is idempotent, with plain UTF-8 it corrupts
3706    // (the analogous fix landed for `update.rs` in PR #6741 by switching to
3707    // `try_into_dfstream`; compact still goes through the adapter).
3708    let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3709    lance_arrow::json::encode_json(&text)
3710        .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3711}
3712
3713fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3714    serde_json::from_slice(value).context("failed to parse JSON field")
3715}
3716
3717fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3718    if let PartKind::File {
3719        media_type,
3720        file_name,
3721        data,
3722    } = kind
3723    {
3724        let data_kind = match data {
3725            FileData::String(_) => "string",
3726            FileData::Bytes(_) => "bytes",
3727            FileData::Url(_) => "url",
3728        };
3729        return json_bytes(&serde_json::json!({
3730            "media_type": media_type,
3731            "file_name": file_name,
3732            "data_kind": data_kind,
3733        }));
3734    }
3735    let value = serde_json::to_value(kind)?;
3736    let mut object = value
3737        .as_object()
3738        .cloned()
3739        .context("part variant did not serialize to an object")?;
3740    object.remove("type");
3741    json_bytes(&object)
3742}
3743
3744fn part_kind_from_json(
3745    type_name: &str,
3746    variant_data: &[u8],
3747    file_data: Option<FileData>,
3748) -> Result<PartKind> {
3749    let mut value = json_parse::<Value>(variant_data)?;
3750    let object = value
3751        .as_object_mut()
3752        .context("part variant data is not an object")?;
3753    object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3754    if let Some(data) = file_data {
3755        object.remove("data_kind");
3756        object.insert("data".to_owned(), serde_json::to_value(data)?);
3757    }
3758    serde_json::from_value(value).context("failed to parse part kind")
3759}
3760
3761#[cfg(test)]
3762mod tests {
3763    #![allow(clippy::expect_used, clippy::unwrap_used)]
3764
3765    use super::*;
3766    use crate::{
3767        adapter::Extracted,
3768        handlers::ingest_events,
3769        wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3770    };
3771    use chrono::Utc;
3772    use serde_json::json;
3773    use tempfile::TempDir;
3774
3775    fn synthetic_session(id: &str) -> Session {
3776        Session {
3777            id: id.to_owned(),
3778            parent_session_id: None,
3779            parent_message_id: None,
3780            source_agent: "claude-code".to_owned(),
3781            created_at: Utc::now(),
3782            project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3783            options: ProviderOptions::new(),
3784        }
3785    }
3786
3787    #[test]
3788    fn search_text_excludes_injected_parts() {
3789        use crate::wire::Provenance;
3790        let message = Message::User {
3791            id: "m1".to_owned(),
3792            session_id: "s1".to_owned(),
3793            timestamp: Utc::now(),
3794            options: ProviderOptions::new(),
3795        };
3796        let text_part = |id: &str, text: &str, provenance: Provenance| Part {
3797            session_id: "s1".to_owned(),
3798            id: id.to_owned(),
3799            message_id: "m1".to_owned(),
3800            ordinal: 0,
3801            provenance,
3802            options: ProviderOptions::new(),
3803            kind: PartKind::Text {
3804                text: Some(Extracted::from_test_value(text.to_owned())),
3805            },
3806        };
3807
3808        // A conversational part contributes; an injected one is excluded
3809        // (spec.md#search).
3810        let conversational = search_text(
3811            &message,
3812            &[text_part(
3813                "p1",
3814                "real human prompt",
3815                Provenance::Conversational,
3816            )],
3817        );
3818        assert_eq!(conversational.as_deref(), Some("real human prompt"));
3819
3820        let injected = search_text(
3821            &message,
3822            &[text_part(
3823                "p2",
3824                "<task-notification>...</task-notification>",
3825                Provenance::Injected,
3826            )],
3827        );
3828        assert!(
3829            injected.is_none(),
3830            "a message whose only part is injected has null search_text"
3831        );
3832    }
3833
3834    #[test]
3835    fn chunk_ranges_splits_on_byte_budget() {
3836        assert!(chunk_ranges(&[]).is_empty());
3837        assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
3838
3839        let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
3840        assert_eq!(
3841            chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
3842            vec![0..1, 1..2, 2..3],
3843        );
3844
3845        // An oversized single row gets its own chunk, never an infinite loop.
3846        assert_eq!(
3847            chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
3848            vec![0..1, 1..2, 2..3],
3849        );
3850    }
3851
3852    #[tokio::test]
3853    async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
3854        // Per-event drop semantics (spec.md#adapter-integrity-event-ordering): a Part with no preceding
3855        // Message is dropped on the spot, with one Error outcome surfaced. The
3856        // rest of the substream continues normally - subsequent valid messages
3857        // and parts get written.
3858        let temp = TempDir::new()?;
3859        let store = Store::open_local(temp.path()).await?;
3860        let session = synthetic_session("ordering");
3861        let orphan_part = Part {
3862            session_id: session.id.clone(),
3863            id: "orphan-part".to_owned(),
3864            message_id: "missing-message".to_owned(),
3865            ordinal: 0,
3866            provenance: crate::wire::Provenance::Conversational,
3867            options: ProviderOptions::new(),
3868            kind: PartKind::Text {
3869                text: Some(Extracted::from_test_value("orphan".to_owned())),
3870            },
3871        };
3872        let valid_message = Message::User {
3873            id: "valid-message".to_owned(),
3874            session_id: session.id.clone(),
3875            timestamp: Utc::now(),
3876            options: ProviderOptions::new(),
3877        };
3878        let valid_part = Part {
3879            session_id: session.id.clone(),
3880            id: "valid-part".to_owned(),
3881            message_id: valid_message.id().to_owned(),
3882            ordinal: 0,
3883            provenance: crate::wire::Provenance::Conversational,
3884            options: ProviderOptions::new(),
3885            kind: PartKind::Text {
3886                text: Some(Extracted::from_test_value("kept".to_owned())),
3887            },
3888        };
3889
3890        let mut validator = IngestValidator::default();
3891        validator
3892            .push(&store, 0, IngestEvent::Session(session.clone()))
3893            .await?;
3894        let part_outcomes = validator
3895            .push(&store, 1, IngestEvent::Part(orphan_part))
3896            .await?;
3897        assert_eq!(part_outcomes.len(), 1);
3898        assert_eq!(part_outcomes[0].kind, "part");
3899        assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
3900        assert!(
3901            part_outcomes[0]
3902                .error
3903                .as_ref()
3904                .map(|e| e.message.contains("part event appeared before a message"))
3905                .unwrap_or(false),
3906            "error message must explain the ordering violation: {part_outcomes:?}"
3907        );
3908        validator
3909            .push(&store, 2, IngestEvent::Message(valid_message))
3910            .await?;
3911        validator
3912            .push(&store, 3, IngestEvent::Part(valid_part))
3913            .await?;
3914        validator.finish(&store).await?;
3915
3916        let (sessions, messages, parts) = store.row_counts().await?;
3917        assert_eq!(sessions, 1, "session committed despite the orphan part");
3918        assert_eq!(messages, 1, "valid message committed");
3919        assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
3920
3921        Ok(())
3922    }
3923
3924    #[tokio::test]
3925    async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
3926        // Per-event drop: a duplicate message id within a substream drops the
3927        // *duplicate* and surfaces an Error outcome for it. The first wins; the
3928        // session still commits.
3929        let temp = TempDir::new()?;
3930        let store = Store::open_local(temp.path()).await?;
3931        let session = synthetic_session("duplicate-message");
3932        let first = Message::User {
3933            id: "message-1".to_owned(),
3934            session_id: session.id.clone(),
3935            timestamp: Utc::now(),
3936            options: ProviderOptions::new(),
3937        };
3938        let second = Message::Assistant {
3939            id: "message-1".to_owned(),
3940            session_id: session.id.clone(),
3941            timestamp: Utc::now(),
3942            options: ProviderOptions::new(),
3943        };
3944
3945        let mut validator = IngestValidator::default();
3946        validator
3947            .push(&store, 0, IngestEvent::Session(session.clone()))
3948            .await?;
3949        validator
3950            .push(&store, 1, IngestEvent::Message(first))
3951            .await?;
3952        let dup_outcomes = validator
3953            .push(&store, 2, IngestEvent::Message(second))
3954            .await?;
3955        assert_eq!(dup_outcomes.len(), 1);
3956        assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
3957        assert!(
3958            dup_outcomes[0]
3959                .error
3960                .as_ref()
3961                .map(|e| e.message.contains("duplicate message id message-1"))
3962                .unwrap_or(false),
3963            "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
3964        );
3965
3966        validator.finish(&store).await?;
3967        let (sessions, messages, _) = store.row_counts().await?;
3968        assert_eq!(sessions, 1, "session committed");
3969        assert_eq!(messages, 1, "only the first message committed");
3970
3971        Ok(())
3972    }
3973
3974    /// Regression: compact_files on `parts` with the blob column tripped a
3975    /// Lance v7.0.0-beta.16 dispatch bug under `lance.blob.v2`. Two upsert
3976    /// batches give compact fragments to merge; every `FileData` variant
3977    /// exercises the blob round-trip. All-File batches sidestep a debug-only
3978    /// `debug_assert_eq!` in Lance's legacy blob encoder that trips when one
3979    /// write batch mixes null + valid rows in the blob column - benign in
3980    /// release, irrelevant to this regression's scope.
3981    #[tokio::test(flavor = "multi_thread")]
3982    async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
3983        use crate::wire::{FileData, PartKind, Provenance};
3984        let temp = TempDir::new()?;
3985        let store = Store::open_local(temp.path()).await?;
3986
3987        let session = synthetic_session("compact-blob");
3988        store
3989            .upsert_sessions(std::slice::from_ref(&session))
3990            .await?;
3991
3992        let make_part = |idx: usize, kind: PartKind| Part {
3993            session_id: session.id.clone(),
3994            message_id: format!("msg-{idx}"),
3995            id: format!("part-{idx}"),
3996            ordinal: 0,
3997            provenance: Provenance::Conversational,
3998            options: ProviderOptions::new(),
3999            kind,
4000        };
4001
4002        let batch_a = vec![
4003            make_part(
4004                0,
4005                PartKind::File {
4006                    media_type: "text/plain".to_owned(),
4007                    file_name: Some("a.txt".to_owned()),
4008                    data: FileData::Bytes(b"alpha".to_vec()),
4009                },
4010            ),
4011            make_part(
4012                1,
4013                PartKind::File {
4014                    media_type: "text/plain".to_owned(),
4015                    file_name: Some("b.txt".to_owned()),
4016                    data: FileData::String("beta".to_owned()),
4017                },
4018            ),
4019        ];
4020        store.upsert_parts(&batch_a).await?;
4021
4022        let batch_b = vec![
4023            make_part(
4024                2,
4025                PartKind::File {
4026                    media_type: "application/octet-stream".to_owned(),
4027                    file_name: None,
4028                    data: FileData::Url("https://example.com/file".to_owned()),
4029                },
4030            ),
4031            make_part(
4032                3,
4033                PartKind::File {
4034                    media_type: "image/png".to_owned(),
4035                    file_name: Some("c.png".to_owned()),
4036                    data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4037                },
4038            ),
4039        ];
4040        store.upsert_parts(&batch_b).await?;
4041
4042        store.optimize_indices(None, None).await?.into_result()?;
4043
4044        Ok(())
4045    }
4046
4047    #[tokio::test]
4048    async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4049        let temp = TempDir::new()?;
4050        let store = Store::open_local(temp.path()).await?;
4051        let session = synthetic_session("blob");
4052        let message = Message::User {
4053            id: "message-1".to_owned(),
4054            session_id: session.id.clone(),
4055            timestamp: Utc::now(),
4056            options: ProviderOptions::new(),
4057        };
4058        let part = Part {
4059            session_id: session.id.clone(),
4060            id: "part-1".to_owned(),
4061            message_id: message.id().to_owned(),
4062            ordinal: 0,
4063            provenance: crate::wire::Provenance::Conversational,
4064            options: ProviderOptions::new(),
4065            kind: PartKind::File {
4066                media_type: "text/plain".to_owned(),
4067                file_name: Some("payload.txt".to_owned()),
4068                data: FileData::Bytes(b"pond".to_vec()),
4069            },
4070        };
4071
4072        let mut validator = IngestValidator::default();
4073        validator
4074            .push(&store, 0, IngestEvent::Session(session.clone()))
4075            .await?;
4076        validator
4077            .push(&store, 1, IngestEvent::Message(message.clone()))
4078            .await?;
4079        validator
4080            .push(&store, 2, IngestEvent::Part(part.clone()))
4081            .await?;
4082        validator.finish(&store).await?;
4083
4084        let stored = store
4085            .get_session(&session.id)
4086            .await?
4087            .expect("session should exist");
4088        let stored_part = &stored.messages[0].parts[0];
4089        assert_eq!(stored_part, &part);
4090
4091        Ok(())
4092    }
4093
4094    //
4095    // `Session.source_agent` and `Session.project` are immutable
4096    // post-first-write because `messages` denormalizes them at first
4097    // ingest; a silent overwrite would desync the denormalized
4098    // copies. pond core's `IngestValidator` probes the existing session
4099    // before the merge_insert and emits a per-row `validation_failed`
4100    // outcome with the typed field name when either changes. Other Session
4101    // fields (options, parent_session_id, created_at, parent_message_id)
4102    // re-write idempotently via merge_insert.
4103
4104    fn base_session() -> Session {
4105        Session {
4106            id: "01HXY00000000001".to_owned(),
4107            parent_session_id: None,
4108            parent_message_id: None,
4109            source_agent: "claude-code".to_owned(),
4110            created_at: Utc::now(),
4111            project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4112            options: ProviderOptions::new(),
4113        }
4114    }
4115
4116    fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4117        outcomes
4118            .iter()
4119            .filter(|outcome| outcome.status == target)
4120            .count()
4121    }
4122
4123    #[tokio::test(flavor = "multi_thread")]
4124    async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4125    -> anyhow::Result<()> {
4126        let temp = TempDir::new()?;
4127        let store = Store::open_local(temp.path()).await?;
4128
4129        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4130        assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4131
4132        let mut again = base_session();
4133        again.options.insert("title".to_owned(), json!("renamed"));
4134        let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4135        assert_eq!(
4136            count_status(&second, OutcomeStatus::Error),
4137            0,
4138            "options is mutable; the re-ingest must not surface an error: {second:?}",
4139        );
4140        assert_eq!(
4141            count_status(&second, OutcomeStatus::Matched),
4142            1,
4143            "unchanged immutable fields must match-insert via merge_insert",
4144        );
4145
4146        Ok(())
4147    }
4148
4149    #[tokio::test(flavor = "multi_thread")]
4150    async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4151        let temp = TempDir::new()?;
4152        let store = Store::open_local(temp.path()).await?;
4153
4154        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4155        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4156
4157        let mut tampered = base_session();
4158        tampered.source_agent = "codex-cli".to_owned();
4159        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4160        assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4161        let err_row = second
4162            .iter()
4163            .find(|outcome| outcome.status == OutcomeStatus::Error)
4164            .expect("error outcome present");
4165        let err = err_row.error.as_ref().expect("error body present");
4166        assert_eq!(err.field, Some("source_agent"));
4167        assert_eq!(err.reason, Some("immutable"));
4168
4169        // The stored row stayed on the original adapter - no silent rewrite.
4170        let stored = store
4171            .get_session(&base_session().id)
4172            .await?
4173            .expect("session row survives the rejected re-ingest");
4174        assert_eq!(stored.session.source_agent, "claude-code");
4175
4176        Ok(())
4177    }
4178
4179    #[tokio::test(flavor = "multi_thread")]
4180    async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4181        let temp = TempDir::new()?;
4182        let store = Store::open_local(temp.path()).await?;
4183
4184        let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4185        assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4186
4187        let mut tampered = base_session();
4188        tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4189        let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4190        let err_row = second
4191            .iter()
4192            .find(|outcome| outcome.status == OutcomeStatus::Error)
4193            .expect("project change must surface an error outcome");
4194        assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4195
4196        let stored = store
4197            .get_session(&base_session().id)
4198            .await?
4199            .expect("session row survives");
4200        assert_eq!(
4201            stored.session.project.as_str(),
4202            "/home/me/proj",
4203            "stored project must remain the original",
4204        );
4205
4206        Ok(())
4207    }
4208
4209    /// Ingest `count` synthetic messages spread across a handful of sessions
4210    /// and projects, each with conversational `search_text`. Returns the store
4211    /// and the message keys in `msg-{i}` order; every `vector` starts null.
4212    async fn store_with_messages(
4213        temp: &TempDir,
4214        count: usize,
4215    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4216        store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4217    }
4218
4219    /// Same as [`store_with_messages`] but tests optimize with a custom
4220    /// IVF_PQ activation threshold.
4221    async fn store_with_messages_at_threshold(
4222        temp: &TempDir,
4223        count: usize,
4224        _vector_threshold: usize,
4225    ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4226        let store = Store::open_local(temp.path()).await?;
4227        let sessions = 8.min(count.max(1));
4228        let mut events = Vec::new();
4229        for s in 0..sessions {
4230            events.push(IngestEvent::Session(Session {
4231                id: format!("session-{s}"),
4232                parent_session_id: None,
4233                parent_message_id: None,
4234                source_agent: "claude-code".to_owned(),
4235                created_at: Utc::now(),
4236                project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4237                options: ProviderOptions::new(),
4238            }));
4239            for i in (s..count).step_by(sessions) {
4240                let message_id = format!("msg-{i}");
4241                events.push(IngestEvent::Message(Message::User {
4242                    id: message_id.clone(),
4243                    session_id: format!("session-{s}"),
4244                    timestamp: Utc::now(),
4245                    options: ProviderOptions::new(),
4246                }));
4247                events.push(IngestEvent::Part(Part {
4248                    session_id: format!("session-{s}"),
4249                    id: format!("{message_id}-part"),
4250                    message_id,
4251                    ordinal: 0,
4252                    provenance: crate::wire::Provenance::Conversational,
4253                    options: ProviderOptions::new(),
4254                    kind: PartKind::Text {
4255                        text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4256                    },
4257                }));
4258            }
4259        }
4260        ingest_events(&store, events).await?;
4261        let keys = (0..count)
4262            .map(|i| MessageKey {
4263                session_id: format!("session-{}", i % sessions),
4264                message_id: format!("msg-{i}"),
4265            })
4266            .collect();
4267        Ok((store, keys))
4268    }
4269
4270    /// A deterministic pseudo-random vector of the production dimension.
4271    fn synthetic_vector(seed: usize) -> Vec<f32> {
4272        let mut state = (seed as u64)
4273            .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4274            .wrapping_add(1);
4275        (0..embedding_dim())
4276            .map(|_| {
4277                state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4278                #[allow(clippy::cast_precision_loss)]
4279                let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4280                unit - 1.0
4281            })
4282            .collect()
4283    }
4284
4285    /// One [`EmbeddedMessage`] per key, vectors seeded by slice position.
4286    fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4287        keys.iter()
4288            .enumerate()
4289            .map(|(seed, key)| EmbeddedMessage {
4290                session_id: key.session_id.clone(),
4291                id: key.message_id.clone(),
4292                vector: synthetic_vector(seed),
4293            })
4294            .collect()
4295    }
4296
4297    fn embedding_update_batch_with_model(
4298        rows: &[EmbeddedMessage],
4299        model: &str,
4300    ) -> Result<RecordBatch> {
4301        let mut batch = embedding_update_batch(rows)?;
4302        let columns = batch
4303            .columns()
4304            .iter()
4305            .take(3)
4306            .cloned()
4307            .chain(std::iter::once(
4308                Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4309            ))
4310            .collect::<Vec<_>>();
4311        batch = RecordBatch::try_new(batch.schema(), columns)?;
4312        Ok(batch)
4313    }
4314
4315    #[tokio::test]
4316    async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4317        let temp = TempDir::new()?;
4318        // 4 messages cycle session-0..session-3, so `session-3` is a real
4319        // partition. Scalar-index pushdown is volume-independent: the planner
4320        // emits `ScalarIndexQuery` whenever the index exists.
4321        let (store, keys) = store_with_messages(&temp, 4).await?;
4322        store.write_embeddings(&embedded(&keys)).await?;
4323        store.optimize_indices(None, None).await?.into_result()?;
4324
4325        let query = vec![0.01_f32; embedding_dim()];
4326        let plan = store
4327            .explain_vector_plan(
4328                &query,
4329                10,
4330                &Predicate::Eq("session_id", "session-3".into()),
4331                None,
4332            )
4333            .await?;
4334
4335        // The load-bearing assertion (spec.md#search-prefilter-pushdown): the predicate
4336        // is served by a scalar-index node, not a postfilter `FilterExec`. (A
4337        // `FilterExec` for the KNN-internal `_distance IS NOT NULL` is expected
4338        // and unrelated.)
4339        assert!(
4340            plan.contains("ScalarIndexQuery"),
4341            "expected a ScalarIndexQuery node in the plan:\n{plan}",
4342        );
4343        let predicate_postfiltered = plan
4344            .lines()
4345            .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4346        assert!(
4347            !predicate_postfiltered,
4348            "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4349        );
4350        Ok(())
4351    }
4352
4353    #[tokio::test]
4354    async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4355        let temp = TempDir::new()?;
4356        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4357
4358        // First batch: 255 vectors, one below threshold. Optimize does not
4359        // create the IVF_PQ because the trigger is not met.
4360        store.write_embeddings(&embedded(&keys[..255])).await?;
4361        store
4362            .optimize_indices_with_vector_threshold(256)
4363            .await?
4364            .into_result()?;
4365        assert!(
4366            !store
4367                .handle
4368                .messages_index_names()
4369                .await?
4370                .iter()
4371                .any(|name| name == MESSAGES_VECTOR_INDEX),
4372            "IVF_PQ must not exist below the activation threshold",
4373        );
4374
4375        // Next batch: one more vector. Total reaches 256; optimize creates
4376        // the IVF_PQ.
4377        store.write_embeddings(&embedded(&keys[255..256])).await?;
4378        store
4379            .optimize_indices_with_vector_threshold(256)
4380            .await?
4381            .into_result()?;
4382        assert!(
4383            store
4384                .handle
4385                .messages_index_names()
4386                .await?
4387                .iter()
4388                .any(|name| name == MESSAGES_VECTOR_INDEX),
4389            "optimize must create the IVF_PQ once the threshold is crossed",
4390        );
4391
4392        // The remaining 44 rows stay un-embedded; the IVF_PQ trains over the
4393        // non-null subset and a planted vector is retrievable.
4394        let hits = store
4395            .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4396            .await?;
4397        assert!(
4398            hits.iter().any(|(key, _)| key == &keys[0]),
4399            "an embedded row is retrievable via the index",
4400        );
4401        Ok(())
4402    }
4403
4404    #[tokio::test]
4405    async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4406    {
4407        let temp = TempDir::new()?;
4408        let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4409        let old_rows = embedded(&keys);
4410        let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4411        store
4412            .handle
4413            .merge_update(Table::Messages, old_batch, old_rows.len())
4414            .await?;
4415        store
4416            .optimize_indices_with_vector_threshold(256)
4417            .await?
4418            .into_result()?;
4419        assert!(
4420            store
4421                .handle
4422                .messages_index_names()
4423                .await?
4424                .iter()
4425                .any(|name| name == MESSAGES_VECTOR_INDEX),
4426            "IVF_PQ must exist before a model swap",
4427        );
4428        assert_eq!(store.stale_embedding_count().await?, keys.len());
4429
4430        store.drop_vector_index().await?;
4431        let mut pending = Vec::new();
4432        let stream = store.pending_or_stale_messages();
4433        tokio::pin!(stream);
4434        while let Some(row) = stream.next().await {
4435            pending.push(row?);
4436        }
4437        assert_eq!(
4438            pending.len(),
4439            keys.len(),
4440            "force stream should see stale rows"
4441        );
4442        store.write_embeddings(&embedded(&keys)).await?;
4443        assert_eq!(store.stale_embedding_count().await?, 0);
4444        store
4445            .optimize_indices_with_vector_threshold(256)
4446            .await?
4447            .into_result()?;
4448        assert!(
4449            store
4450                .handle
4451                .messages_index_names()
4452                .await?
4453                .iter()
4454                .any(|name| name == MESSAGES_VECTOR_INDEX),
4455            "optimize must rebuild IVF_PQ after force re-embed",
4456        );
4457
4458        let stream = store.pending_or_stale_messages();
4459        tokio::pin!(stream);
4460        assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4461        Ok(())
4462    }
4463
4464    #[tokio::test]
4465    async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4466        // Regression: `_row_last_updated_at_version` can point at a Lance
4467        // manifest version that `cleanup_old_versions` or the auto_cleanup
4468        // hook has since dropped from `Dataset::versions()`. The old code
4469        // silently dropped any session whose row-version was not in the
4470        // visible list, collapsing the staleness-skip map down to recent
4471        // commits and forcing `pond sync` to re-touch every file. The fix
4472        // falls back to the oldest still-visible commit timestamp - a
4473        // sound upper bound on the row's true ingest time.
4474        let temp = TempDir::new()?;
4475        let (store, _keys) = store_with_messages(&temp, 4).await?;
4476
4477        // Produce several distinct manifest versions on `sessions` so the
4478        // older ones become eligible for cleanup.
4479        for tag in 0..3 {
4480            let extra = synthetic_session(&format!("extra-{tag}"));
4481            store.upsert_sessions(&[extra]).await?;
4482        }
4483
4484        // Prune everything older than ~now, leaving only the latest manifest.
4485        // `delete_unverified=None` and `error_if_tagged=Some(false)` mirror
4486        // Lance's auto-cleanup hook semantics. The chrono 0-duration is fine:
4487        // Lance's `delete_unverified` floor still protects in-flight files.
4488        let dataset = store.handle.dataset(Table::Sessions).await?;
4489        dataset
4490            .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4491            .await
4492            .context("cleanup_old_versions failed")?;
4493
4494        let map = store.session_last_ingested_at().await?;
4495        let session_count = store.row_counts().await?.0;
4496        assert!(
4497            map.len() >= session_count,
4498            "watermark map ({}) must still cover every session ({}) after \
4499             version cleanup; an empty fallback regresses pond sync to a \
4500             full re-scan",
4501            map.len(),
4502            session_count,
4503        );
4504        Ok(())
4505    }
4506
4507    #[tokio::test]
4508    async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4509        let temp = TempDir::new()?;
4510        let (store, keys) = store_with_messages(&temp, 10).await?;
4511
4512        let before = store.embedding_progress().await?;
4513        assert_eq!(before.embedded, 0);
4514        assert_eq!(before.total, 10);
4515        assert_eq!(before.model, crate::embed::model_id());
4516
4517        store.write_embeddings(&embedded(&keys[..4])).await?;
4518        let partial = store.embedding_progress().await?;
4519        assert_eq!(partial.embedded, 4);
4520        assert_eq!(partial.total, 10);
4521
4522        store.write_embeddings(&embedded(&keys[4..])).await?;
4523        let full = store.embedding_progress().await?;
4524        assert_eq!(full.embedded, 10);
4525        assert_eq!(full.total, 10);
4526        Ok(())
4527    }
4528}