1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 path::Path,
4 sync::Arc,
5};
6
7use anyhow::{Context, Result};
8use async_stream::try_stream;
9use chrono::{DateTime, TimeZone, Utc};
10use lance::Dataset;
11use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
12use lance::deps::arrow_array::{
13 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
14 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
15 UInt64Array, new_null_array,
16};
17use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
18use lance_file::version::LanceFileVersion;
19use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use serde_json::Value;
22use tokio_stream::{Stream, StreamExt};
23
24use crate::{
25 config, embed,
26 substrate::{
27 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, OptimizeProgressFn,
28 PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table, TableOptimizeOutcome, TableSizes,
29 VECTOR_INDEX_ACTIVATION_ROWS,
30 },
31 wire::{
32 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
33 SessionFrom,
34 },
35};
36use url::Url;
37
38#[derive(Debug)]
39pub struct Store {
40 handle: Handle,
41}
42
43#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
44pub struct LanceArchiveCounts {
45 pub sessions: usize,
46 pub messages: usize,
47 pub parts: usize,
48}
49
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct LanceArchiveVersions {
52 pub sessions: u64,
53 pub messages: u64,
54 pub parts: u64,
55}
56
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct LanceArchiveExport {
59 pub rows: LanceArchiveCounts,
60 pub source_versions: LanceArchiveVersions,
61}
62
63#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
64pub struct LanceArchiveImport {
65 pub rows: LanceArchiveCounts,
66 pub inserted: LanceArchiveCounts,
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct IndexIntents {
71 pub sessions: Vec<IndexIntent>,
72 pub messages: Vec<IndexIntent>,
73 pub parts: Vec<IndexIntent>,
74}
75
76impl IndexIntents {
77 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
78 [
79 (Table::Sessions, &self.sessions),
80 (Table::Messages, &self.messages),
81 (Table::Parts, &self.parts),
82 ]
83 }
84}
85
86#[derive(Debug, Clone, PartialEq)]
90pub struct PendingMessage {
91 pub session_id: String,
92 pub id: String,
93 pub search_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq)]
99pub struct EmbeddedMessage {
100 pub session_id: String,
101 pub id: String,
102 pub vector: Vec<f32>,
103}
104
105#[derive(Debug, Clone, PartialEq)]
107pub struct MessageMeta {
108 pub message_id: String,
109 pub session_id: String,
110 pub role: String,
111 pub project: String,
112 pub source_agent: String,
113 pub timestamp: DateTime<Utc>,
114 pub search_text: String,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
118pub struct MessageKey {
119 pub session_id: String,
120 pub message_id: String,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub enum UpsertStatus {
125 Inserted,
126 Matched,
127}
128
129#[derive(Debug, Default)]
134pub struct OptimizeOutcome {
135 pub tables: Vec<TableOptimizeOutcome>,
136}
137
138impl OptimizeOutcome {
139 pub fn any_indices_failed(&self) -> bool {
142 self.tables.iter().any(|t| t.indices.is_failed())
143 }
144
145 pub fn into_result(self) -> Result<Self> {
149 for table in &self.tables {
150 if let PhaseOutcome::Failed(error) = &table.indices {
151 anyhow::bail!(
152 "indices phase failed on {}: {error:#}",
153 table.table.as_str()
154 );
155 }
156 if let PhaseOutcome::Failed(error) = &table.compaction {
157 anyhow::bail!(
158 "compaction phase failed on {}: {error:#}",
159 table.table.as_str()
160 );
161 }
162 }
163 Ok(self)
164 }
165}
166
167pub fn default_cleanup_older_than() -> chrono::Duration {
174 chrono::Duration::days(1)
175}
176
177#[derive(Debug, Clone, Copy)]
182pub struct CleanupConfig {
183 pub older_than: chrono::Duration,
184 pub delete_unverified: bool,
185}
186
187impl Default for CleanupConfig {
188 fn default() -> Self {
189 Self {
190 older_than: default_cleanup_older_than(),
191 delete_unverified: false,
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
199pub struct CorpusStats {
200 pub data_url: Url,
201 pub totals: RowTotals,
202 pub adapters: Vec<AdapterStats>,
210 pub include_subagents: bool,
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub struct RowTotals {
218 pub sessions: u64,
219 pub messages: u64,
220 pub parts: u64,
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub struct EmbeddingProgress {
230 pub embedded: usize,
231 pub total: usize,
232 pub model: &'static str,
233}
234
235#[derive(Debug, Clone)]
236pub struct AdapterStats {
237 pub adapter: String,
241 pub sessions: u64,
242 pub messages: u64,
243 pub projects: Vec<ProjectStats>,
246}
247
248#[derive(Debug, Clone)]
249pub struct ProjectStats {
250 pub project: String,
251 pub sessions: u64,
252 pub messages: u64,
253}
254
255#[derive(Default)]
256struct GroupAccumulator {
257 messages: u64,
258 session_ids: HashSet<String>,
259}
260
261#[derive(Debug, Clone, Copy)]
262pub struct MessageWrite<'a> {
263 pub message: &'a Message,
264 pub parts: &'a [Part],
265 pub search_text: Option<&'a str>,
266}
267
268impl Store {
269 pub async fn open(location: &Url) -> Result<Self> {
275 Ok(Self {
276 handle: Handle::open(location).await?,
277 })
278 }
279
280 pub async fn open_with_options(
286 location: &Url,
287 storage_options: std::collections::HashMap<String, String>,
288 caps: crate::substrate::RuntimeCaps,
289 ) -> Result<Self> {
290 Ok(Self {
291 handle: Handle::open_with_options(location, storage_options, caps).await?,
292 })
293 }
294
295 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
300 let url = config::url_for_path(path)?;
301 Self::open_with_options(
302 &url,
303 std::collections::HashMap::new(),
304 crate::substrate::RuntimeCaps::default(),
305 )
306 .await
307 }
308
309 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
317 std::fs::create_dir_all(dest)
318 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
319 let (sessions, sessions_version) = self
320 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
321 .await?;
322 let (messages, messages_version) = self
323 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
324 .await?;
325 let (parts, parts_version) = self
326 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
327 .await?;
328 Ok(LanceArchiveExport {
329 rows: LanceArchiveCounts {
330 sessions,
331 messages,
332 parts,
333 },
334 source_versions: LanceArchiveVersions {
335 sessions: sessions_version,
336 messages: messages_version,
337 parts: parts_version,
338 },
339 })
340 }
341
342 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
343 let sessions_dataset =
344 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
345 let messages_dataset =
346 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
347 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
348 let (sessions, sessions_inserted) = self
349 .import_clean_table(Table::Sessions, sessions_dataset)
350 .await?;
351 let (messages, messages_inserted) = self
352 .import_clean_table(Table::Messages, messages_dataset)
353 .await?;
354 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
355 Ok(LanceArchiveImport {
356 rows: LanceArchiveCounts {
357 sessions,
358 messages,
359 parts,
360 },
361 inserted: LanceArchiveCounts {
362 sessions: sessions_inserted,
363 messages: messages_inserted,
364 parts: parts_inserted,
365 },
366 })
367 }
368
369 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
370 let dataset = self.handle.dataset(table).await?;
371 let source_version = dataset.version_id();
372 let schema = export_schema(table);
373 let mut stream = dataset
374 .scan()
375 .try_into_stream()
376 .await
377 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
378 let dest_uri = dest
379 .to_str()
380 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
381
382 let mut rows = 0usize;
383 let mut wrote = false;
384 while let Some(batch) = stream.next().await {
385 let batch = batch
386 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
387 rows += batch.num_rows();
388 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
389 let mut params = write_params_for_create();
390 if wrote {
391 params.mode = WriteMode::Append;
392 }
393 Dataset::write(reader, dest_uri, Some(params))
394 .await
395 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
396 wrote = true;
397 }
398
399 if !wrote {
400 let batch = RecordBatch::new_empty(schema.clone());
401 let reader = RecordBatchIterator::new([Ok(batch)], schema);
402 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
403 .await
404 .with_context(|| {
405 format!("failed to write empty {} archive table", table.as_str())
406 })?;
407 }
408 Ok((rows, source_version))
409 }
410
411 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
412 let mut stream = dataset
413 .scan()
414 .try_into_stream()
415 .await
416 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
417 let mut rows = 0usize;
418 let mut inserted = 0usize;
419 while let Some(batch) = stream.next().await {
420 let batch = batch
421 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
422 let row_count = batch.num_rows();
423 rows += row_count;
424 inserted += self
425 .handle
426 .merge_insert(table, batch, row_count)
427 .await
428 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
429 as usize;
430 }
431 Ok((rows, inserted))
432 }
433
434 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<Vec<UpsertStatus>> {
435 if sessions.is_empty() {
436 return Ok(Vec::new());
437 }
438 let batches = sessions_batches(sessions)?;
439 let inserted = merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
440 Ok(statuses_from_inserted(sessions.len(), inserted))
442 }
443
444 async fn upsert_session_batch(
468 &self,
469 substreams: Vec<CompletedSubstream>,
470 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
471 if substreams.is_empty() {
472 return Ok((Vec::new(), BatchCounts::default()));
473 }
474
475 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
476 let mut counts = BatchCounts::default();
477
478 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
482 let mut by_session_id: std::collections::HashMap<String, usize> =
483 std::collections::HashMap::with_capacity(substreams.len());
484 for substream in substreams {
485 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
486 let existing = &merged[existing_idx];
487 if existing.session.source_agent != substream.session.source_agent
488 || existing.session.project != substream.session.project
489 {
490 let reason = if existing.session.source_agent != substream.session.source_agent
495 {
496 IngestError::ImmutableField {
497 field: "source_agent",
498 session_id: substream.session.id.clone(),
499 stored: existing.session.source_agent.clone(),
500 attempted: substream.session.source_agent.clone(),
501 }
502 } else {
503 IngestError::ImmutableField {
504 field: "project",
505 session_id: substream.session.id.clone(),
506 stored: (*existing.session.project).clone(),
507 attempted: (*substream.session.project).clone(),
508 }
509 };
510 let field = match &reason {
511 IngestError::ImmutableField { field, .. } => Some(*field),
512 };
513 let reason_key = match field {
514 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
515 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
516 _ => DROP_REASON_UNCATEGORIZED,
517 };
518 outcomes.extend(error_outcomes_for_substream(
519 substream.session_index,
520 &substream.session,
521 &substream.messages,
522 reason.to_string(),
523 field,
524 reason_key,
525 ));
526 continue;
527 }
528 let existing = &mut merged[existing_idx];
533 let mut seen: std::collections::HashSet<String> = existing
534 .messages
535 .iter()
536 .map(|m| m.message.id().to_owned())
537 .collect();
538 for msg in substream.messages {
539 if seen.insert(msg.message.id().to_owned()) {
540 existing.messages.push(msg);
541 }
542 }
543 continue;
544 }
545 by_session_id.insert(substream.session.id.clone(), merged.len());
546 merged.push(substream);
547 }
548
549 let session_id_values: Vec<ScalarValue> = merged
554 .iter()
555 .map(|substream| ScalarValue::String(substream.session.id.clone()))
556 .collect();
557 let existing_sessions: std::collections::HashMap<String, Session> =
558 if session_id_values.is_empty() {
559 std::collections::HashMap::new()
560 } else {
561 let batch = self
562 .handle
563 .scan_batch(
564 Table::Sessions,
565 Some(&Predicate::In("id", session_id_values.clone())),
566 &[],
567 )
568 .await?;
569 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
570 for row in 0..batch.num_rows() {
571 let session = session_from_batch(&batch, row)?;
572 map.insert(session.id.clone(), session);
573 }
574 map
575 };
576 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
577 HashSet::new()
578 } else {
579 let batch = self
580 .handle
581 .scan_batch(
582 Table::Messages,
583 Some(&Predicate::In("session_id", session_id_values.clone())),
584 &["session_id", "id"],
585 )
586 .await?;
587 let mut set = HashSet::with_capacity(batch.num_rows());
588 for row in 0..batch.num_rows() {
589 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
590 let mid = string(&batch, "id", row)?.context("message id is null")?;
591 set.insert((sid, mid));
592 }
593 set
594 };
595 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
596 HashSet::new()
597 } else {
598 let batch = self
599 .handle
600 .scan_batch(
601 Table::Parts,
602 Some(&Predicate::In("session_id", session_id_values)),
603 &["session_id", "message_id", "id"],
604 )
605 .await?;
606 let mut set = HashSet::with_capacity(batch.num_rows());
607 for row in 0..batch.num_rows() {
608 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
609 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
610 let pid = string(&batch, "id", row)?.context("part id is null")?;
611 set.insert((sid, mid, pid));
612 }
613 set
614 };
615
616 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
617 for substream in merged {
618 if let Some(existing) = existing_sessions.get(&substream.session.id)
619 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
620 {
621 let field = match &failure {
622 IngestError::ImmutableField { field, .. } => Some(*field),
623 };
624 let reason_key = match field {
625 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
626 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
627 _ => DROP_REASON_UNCATEGORIZED,
628 };
629 outcomes.extend(error_outcomes_for_substream(
630 substream.session_index,
631 &substream.session,
632 &substream.messages,
633 failure.to_string(),
634 field,
635 reason_key,
636 ));
637 continue;
638 }
639 writeable.push(substream);
640 }
641
642 if writeable.is_empty() {
643 outcomes.sort_by_key(|outcome| outcome.index);
644 return Ok((outcomes, counts));
645 }
646
647 let sessions_owned: Vec<Session> = writeable
648 .iter()
649 .map(|substream| substream.session.clone())
650 .collect();
651 let message_rows: Vec<MessageBatchRow<'_>> = writeable
652 .iter()
653 .flat_map(|substream| {
654 substream.messages.iter().map(|buffered| MessageBatchRow {
655 message: &buffered.message,
656 source_agent: &substream.session.source_agent,
657 project: &substream.session.project,
658 search_text: buffered.search_text.as_deref(),
659 })
660 })
661 .collect();
662 let part_rows: Vec<Part> = writeable
663 .iter()
664 .flat_map(|substream| {
665 substream.messages.iter().flat_map(|buffered| {
666 buffered
667 .parts
668 .iter()
669 .map(|buffered_part| buffered_part.part.clone())
670 })
671 })
672 .collect();
673
674 let session_batches = sessions_batches(&sessions_owned)?;
675 let message_batches = messages_batches(&message_rows)?;
676 let part_batches = parts_batches(&part_rows)?;
677
678 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
686 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
687 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
688 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
689 )?;
690
691 for substream in &writeable {
692 outcomes.extend(success_outcomes_for_substream(
693 substream.session_index,
694 &substream.session,
695 &substream.messages,
696 &existing_sessions,
697 &existing_message_pks,
698 &existing_part_pks,
699 &mut counts,
700 ));
701 }
702
703 outcomes.sort_by_key(|outcome| outcome.index);
704 Ok((outcomes, counts))
705 }
706
707 pub async fn upsert_messages(
708 &self,
709 session: &Session,
710 messages: &[MessageWrite<'_>],
711 ) -> Result<Vec<UpsertStatus>> {
712 if messages.is_empty() {
713 return Ok(Vec::new());
714 }
715
716 let rows = messages
717 .iter()
718 .map(|write| MessageBatchRow {
719 message: write.message,
720 source_agent: &session.source_agent,
721 project: &session.project,
722 search_text: write.search_text,
723 })
724 .collect::<Vec<_>>();
725 let batches = messages_batches(&rows)?;
726 let inserted = merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
727 Ok(statuses_from_inserted(messages.len(), inserted))
728 }
729
730 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<Vec<UpsertStatus>> {
731 if parts.is_empty() {
732 return Ok(Vec::new());
733 }
734 let batches = parts_batches(parts)?;
735 let inserted = merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
736 Ok(statuses_from_inserted(parts.len(), inserted))
737 }
738
739 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
740 let Some(session) = self.find_session(session_id).await? else {
741 return Ok(None);
742 };
743 let messages = self.messages_for_session(session_id).await?;
744 Ok(Some(SessionWithMessages { session, messages }))
745 }
746
747 pub async fn session_ids(&self) -> Result<Vec<String>> {
749 let batch = self
750 .handle
751 .scan_batch(Table::Sessions, None, &["id"])
752 .await?;
753 let mut ids = Vec::with_capacity(batch.num_rows());
754 for row in 0..batch.num_rows() {
755 if let Some(id) = string(&batch, "id", row)? {
756 ids.push(id);
757 }
758 }
759 Ok(ids)
760 }
761
762 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
763 let batch = self
764 .handle
765 .scan_batch(
766 Table::Sessions,
767 Some(&Predicate::Eq(
768 "parent_session_id",
769 parent_session_id.into(),
770 )),
771 &[
772 "id",
773 "parent_session_id",
774 "parent_message_id",
775 "source_agent",
776 "created_at",
777 "project",
778 "options",
779 ],
780 )
781 .await?;
782 let mut sessions = Vec::with_capacity(batch.num_rows());
783 for row in 0..batch.num_rows() {
784 sessions.push(session_from_batch(&batch, row)?);
785 }
786 sessions.sort_by(|left, right| left.id.cmp(&right.id));
787 Ok(sessions)
788 }
789
790 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
796 use lance::deps::arrow_array::UInt64Array;
797
798 let dataset = self.handle.dataset(Table::Sessions).await?;
799 let version_list = dataset.versions().await?;
800 let versions: HashMap<u64, DateTime<Utc>> = version_list
801 .iter()
802 .map(|v| (v.version, v.timestamp))
803 .collect();
804 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
812
813 let scanner = self
814 .handle
815 .scan(
816 Table::Sessions,
817 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
818 )
819 .await?;
820 let mut stream = scanner.try_into_stream().await?;
821 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
822 while let Some(batch) = stream.next().await {
823 let batch = batch?;
824 let version_array = batch
825 .column_by_name("_row_last_updated_at_version")
826 .context("missing _row_last_updated_at_version column")?
827 .as_any()
828 .downcast_ref::<UInt64Array>()
829 .context("_row_last_updated_at_version is not UInt64")?;
830 for row in 0..batch.num_rows() {
831 let Some(id) = string(&batch, "id", row)? else {
832 continue;
833 };
834 if version_array.is_null(row) {
835 continue;
836 }
837 let version = version_array.value(row);
838 let ts = versions.get(&version).copied().or(oldest_visible_ts);
839 if let Some(ts) = ts {
840 out.insert(id, ts);
841 }
842 }
843 }
844 Ok(out)
845 }
846
847 pub async fn session_view(
854 &self,
855 session_id: &str,
856 params: SessionViewParams<'_>,
857 ) -> Result<GetLookup<SessionPage>> {
858 let Some(session) = self.find_session(session_id).await? else {
859 return Ok(GetLookup::NotFound);
860 };
861
862 let mut rows = match params.mode {
863 ResponseMode::Conversational => self
864 .scan_conversational_messages(session_id)
865 .await?
866 .into_iter()
867 .map(|row| ScanRow {
868 id: row.message_id,
869 role: row.role,
870 timestamp: row.timestamp,
871 text: Some(row.text.into_inner()),
872 content: None,
873 })
874 .collect(),
875 ResponseMode::Complete | ResponseMode::Verbatim => {
876 self.scan_all_messages(session_id).await?
877 }
878 };
879 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
880
881 let start_at = match params.after_id {
882 Some(after) => match rows.iter().position(|row| row.id == after) {
885 Some(idx) => idx + 1,
886 None => return Ok(GetLookup::UnknownAfterId),
887 },
888 None => 0,
889 };
890 let remaining = rows.get(start_at..).unwrap_or(&[]);
891 let (emitted, messages_remaining) = match params.session_from {
892 SessionFrom::Start => {
893 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
894 row.text.as_deref().map_or(0, str::len)
895 });
896 (&remaining[..n], remaining.len() - n)
897 }
898 SessionFrom::End => {
902 let mut bytes = 0usize;
903 let mut start = remaining.len();
904 for row in remaining.iter().rev() {
905 if remaining.len() - start >= params.limit {
906 break;
907 }
908 let size = row.text.as_deref().map_or(0, str::len);
909 if start < remaining.len() && bytes + size > params.budget_bytes {
910 break;
911 }
912 bytes += size;
913 start -= 1;
914 }
915 (&remaining[start..], start)
916 }
917 };
918 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
919
920 let mut parts_by_message = match params.mode {
923 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
924 ResponseMode::Conversational | ResponseMode::Complete => {
925 self.summary_parts_for_messages(session_id, &ids).await?
926 }
927 };
928 let messages = emitted
929 .iter()
930 .map(|row| RetrievedMessage {
931 id: row.id.clone(),
932 role: row.role,
933 timestamp: row.timestamp,
934 text: row.text.clone(),
935 content: row.content.clone(),
936 parts: parts_by_message
937 .remove(&(session_id.to_owned(), row.id.clone()))
938 .unwrap_or_default(),
939 })
940 .collect();
941
942 Ok(GetLookup::Found(SessionPage {
943 session,
944 messages,
945 messages_remaining,
946 }))
947 }
948
949 pub async fn message_view(
955 &self,
956 message_id: &str,
957 params: MessageViewParams<'_>,
958 ) -> Result<GetLookup<MessagePage>> {
959 let Some(session_id) = self.session_id_for_message(message_id).await? else {
960 return Ok(GetLookup::NotFound);
961 };
962 let Some(session) = self.find_session(&session_id).await? else {
963 return Ok(GetLookup::NotFound);
964 };
965 let mut rows = self.scan_all_messages(&session_id).await?;
966 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
967 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
968 return Ok(GetLookup::NotFound);
969 };
970
971 let start = target_pos.saturating_sub(params.context_depth);
972 let end = (target_pos + params.context_depth + 1).min(rows.len());
973 let window = &rows[start..end];
974 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
975 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
978
979 let all_parts = parts_by_message
980 .remove(&(session_id.clone(), message_id.to_owned()))
981 .unwrap_or_default();
982 let start_part = match params.after_id {
983 Some(after) => match all_parts.iter().find(|part| part.id == after) {
987 Some(anchor) => all_parts
988 .iter()
989 .position(|part| part.ordinal > anchor.ordinal)
990 .unwrap_or(all_parts.len()),
991 None => return Ok(GetLookup::UnknownAfterId),
992 },
993 None => 0,
994 };
995 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
996 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
997 serde_json::to_string(part).map_or(0, |json| json.len())
998 });
999 let target_parts = remaining_parts[..part_count].to_vec();
1000 let target_parts_remaining = remaining_parts.len() - part_count;
1001
1002 let target_row = &rows[target_pos];
1003 let target = RetrievedMessage {
1004 id: target_row.id.clone(),
1005 role: target_row.role,
1006 timestamp: target_row.timestamp,
1007 text: target_row.text.clone(),
1008 content: target_row.content.clone(),
1009 parts: Vec::new(),
1011 };
1012 let siblings = window
1013 .iter()
1014 .enumerate()
1015 .filter(|(idx, _)| start + idx != target_pos)
1016 .map(|(_, row)| RetrievedMessage {
1017 id: row.id.clone(),
1018 role: row.role,
1019 timestamp: row.timestamp,
1020 text: row.text.clone(),
1021 content: row.content.clone(),
1022 parts: parts_by_message
1023 .get(&(session_id.clone(), row.id.clone()))
1024 .cloned()
1025 .unwrap_or_default(),
1026 })
1027 .collect();
1028
1029 Ok(GetLookup::Found(MessagePage {
1030 session,
1031 target,
1032 target_parts,
1033 target_parts_remaining,
1034 siblings,
1035 }))
1036 }
1037
1038 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1039 let batch = self
1040 .handle
1041 .scan_batch(
1042 Table::Messages,
1043 Some(&Predicate::Eq("session_id", session_id.into())),
1044 &["id", "timestamp", "role", "search_text", "content"],
1045 )
1046 .await?;
1047 let mut rows = Vec::with_capacity(batch.num_rows());
1048 for row in 0..batch.num_rows() {
1049 let id = string(&batch, "id", row)?.context("message id is null")?;
1050 let role =
1051 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1052 let timestamp = datetime(&batch, "timestamp", row)?;
1053 rows.push(ScanRow {
1054 id,
1055 role,
1056 timestamp,
1057 text: string(&batch, "search_text", row)?,
1058 content: string(&batch, "content", row)?,
1059 });
1060 }
1061 Ok(rows)
1062 }
1063
1064 pub async fn scan_conversational_messages(
1068 &self,
1069 session_id: &str,
1070 ) -> Result<Vec<ConversationalRow>> {
1071 let filter = Predicate::And(vec![
1072 Predicate::Eq("session_id", session_id.into()),
1073 Predicate::IsNotNull("search_text"),
1074 ]);
1075 let batch = self
1076 .handle
1077 .scan_batch(
1078 Table::Messages,
1079 Some(&filter),
1080 &["id", "timestamp", "role", "search_text"],
1081 )
1082 .await?;
1083
1084 let mut rows = Vec::with_capacity(batch.num_rows());
1085 for row in 0..batch.num_rows() {
1086 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1087 let role =
1088 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1089 let timestamp = datetime(&batch, "timestamp", row)?;
1090 let text_str = string(&batch, "search_text", row)?.context(
1091 "search_text null after IsNotNull pushdown - storage invariant violated",
1092 )?;
1093 rows.push(ConversationalRow {
1094 session_id: session_id.to_owned(),
1095 message_id,
1096 role,
1097 timestamp,
1098 text: SearchText(text_str),
1099 });
1100 }
1101 rows.sort_by(|a, b| {
1102 a.timestamp
1103 .cmp(&b.timestamp)
1104 .then_with(|| a.message_id.cmp(&b.message_id))
1105 });
1106 Ok(rows)
1107 }
1108
1109 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1112 let batch = self
1113 .handle
1114 .scan_batch(
1115 Table::Messages,
1116 Some(&Predicate::Eq("id", message_id.into())),
1117 &["session_id"],
1118 )
1119 .await?;
1120 if batch.num_rows() == 0 {
1121 return Ok(None);
1122 }
1123 string(&batch, "session_id", 0)
1124 }
1125
1126 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1127 self.handle.row_counts().await
1128 }
1129
1130 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1136 let scanner = self
1137 .handle
1138 .scan(
1139 Table::Messages,
1140 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1141 )
1142 .await?;
1143 let mut stream = scanner.try_into_stream().await?;
1144 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1145 while let Some(batch) = stream.next().await {
1146 let batch = batch?;
1147 for row in 0..batch.num_rows() {
1148 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1149 let project = string(&batch, "project", row)?.unwrap_or_default();
1150 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1151 let is_subagent = source_agent.contains('/');
1152 if is_subagent && !include_subagents {
1153 continue;
1154 }
1155 let entry = groups.entry((source_agent, project)).or_default();
1156 entry.messages += 1;
1157 entry.session_ids.insert(session_id);
1158 }
1159 }
1160
1161 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1162 let totals = RowTotals {
1163 sessions: totals_sessions as u64,
1164 messages: totals_messages as u64,
1165 parts: totals_parts as u64,
1166 };
1167
1168 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1169 for ((adapter, project), acc) in groups {
1170 by_adapter.entry(adapter).or_default().push(ProjectStats {
1171 project,
1172 sessions: acc.session_ids.len() as u64,
1173 messages: acc.messages,
1174 });
1175 }
1176
1177 let mut adapters = Vec::with_capacity(by_adapter.len());
1178 for (adapter, mut projects) in by_adapter {
1179 projects.sort_by(|a, b| {
1180 b.messages
1181 .cmp(&a.messages)
1182 .then_with(|| a.project.cmp(&b.project))
1183 });
1184 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1185 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1186 adapters.push(AdapterStats {
1187 adapter,
1188 sessions,
1189 messages,
1190 projects,
1191 });
1192 }
1193
1194 Ok(CorpusStats {
1195 data_url: self.handle.location().clone(),
1196 totals,
1197 adapters,
1198 include_subagents,
1199 })
1200 }
1201
1202 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1207 if rows.is_empty() {
1208 return Ok(());
1209 }
1210 let batch = embedding_update_batch(rows)?;
1211 self.handle
1212 .merge_update(Table::Messages, batch, rows.len())
1213 .await?;
1214 Ok(())
1215 }
1216
1217 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1220 try_stream! {
1221 let filter = Predicate::And(vec![
1222 Predicate::IsNull("vector"),
1223 Predicate::IsNotNull("search_text"),
1224 ]);
1225 let projection: &[&str] = &["session_id", "id", "search_text"];
1226 let scanner = self
1227 .handle
1228 .scan(
1229 Table::Messages,
1230 ScanOpts::with_predicate_and_projection(&filter, projection),
1231 )
1232 .await?;
1233 let mut batches = scanner
1234 .try_into_stream()
1235 .await
1236 .context("failed to open messages stream")?;
1237 while let Some(batch) = batches.next().await {
1238 let batch = batch?;
1239 for row in 0..batch.num_rows() {
1240 yield PendingMessage {
1241 session_id: string(&batch, "session_id", row)?
1242 .context("session_id is null")?,
1243 id: string(&batch, "id", row)?.context("message id is null")?,
1244 search_text: string(&batch, "search_text", row)?
1245 .context("search_text is null")?,
1246 };
1247 }
1248 }
1249 }
1250 }
1251
1252 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1257 try_stream! {
1258 let filter = Predicate::And(vec![
1259 Predicate::IsNotNull("search_text"),
1260 Predicate::Or(vec![
1261 Predicate::IsNull("vector"),
1262 Predicate::Ne("embedding_model", embed::model_id().into()),
1263 ]),
1264 ]);
1265 let projection: &[&str] = &["session_id", "id", "search_text"];
1266 let scanner = self
1267 .handle
1268 .scan(
1269 Table::Messages,
1270 ScanOpts::with_predicate_and_projection(&filter, projection),
1271 )
1272 .await?;
1273 let mut batches = scanner
1274 .try_into_stream()
1275 .await
1276 .context("failed to open pending-or-stale messages stream")?;
1277 while let Some(batch) = batches.next().await {
1278 let batch = batch?;
1279 for row in 0..batch.num_rows() {
1280 yield PendingMessage {
1281 session_id: string(&batch, "session_id", row)?
1282 .context("session_id is null")?,
1283 id: string(&batch, "id", row)?.context("message id is null")?,
1284 search_text: string(&batch, "search_text", row)?
1285 .context("search_text is null")?,
1286 };
1287 }
1288 }
1289 }
1290 }
1291
1292 pub async fn fts_search(
1294 &self,
1295 query: &str,
1296 limit: usize,
1297 filter: &Predicate,
1298 ) -> Result<Vec<(MessageKey, f32)>> {
1299 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1300 scanner.full_text_search(
1301 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1302 )?;
1303 scanner.disable_scoring_autoprojection();
1309 scanner.project(&["session_id", "id", "_score"])?;
1310 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1311 let batch = scanner.try_into_batch().await?;
1312 let mut hits = Vec::with_capacity(batch.num_rows());
1313 for row in 0..batch.num_rows() {
1314 let key = MessageKey {
1315 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1316 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1317 };
1318 hits.push((key, float32(&batch, "_score", row)?));
1319 }
1320 hits.sort_by(|left, right| {
1328 right
1329 .1
1330 .partial_cmp(&left.1)
1331 .unwrap_or(std::cmp::Ordering::Equal)
1332 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1333 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1334 });
1335 Ok(hits)
1336 }
1337
1338 pub async fn has_embeddings(&self) -> Result<bool> {
1343 let scope = Predicate::IsNotNull("vector");
1344 let mut scanner = self
1345 .handle
1346 .scan(
1347 Table::Messages,
1348 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1349 )
1350 .await?;
1351 scanner.limit(Some(1), None)?;
1352 let batch = scanner.try_into_batch().await?;
1353 Ok(batch.num_rows() > 0)
1354 }
1355
1356 pub async fn vector_search(
1364 &self,
1365 query: &[f32],
1366 limit: usize,
1367 filter: &Predicate,
1368 search: Option<&config::SearchConfig>,
1369 ) -> Result<Vec<(MessageKey, f32)>> {
1370 let scope = embedded_scope(filter);
1371 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1372 let key = Float32Array::from(query.to_vec());
1373 scanner.nearest("vector", &key, limit)?;
1374 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1375 scanner.nprobes(nprobes);
1376 }
1377 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1378 scanner.refine(refine_factor);
1379 }
1380 scanner.disable_scoring_autoprojection();
1384 scanner.project(&["session_id", "id", "_distance"])?;
1385 let batch = scanner.try_into_batch().await?;
1386 let mut hits = Vec::with_capacity(batch.num_rows());
1387 for row in 0..batch.num_rows() {
1388 let key = MessageKey {
1389 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1390 message_id: string(&batch, "id", row)?.context("message id is null")?,
1391 };
1392 hits.push((key, float32(&batch, "_distance", row)?));
1393 }
1394 hits.sort_by(|left, right| {
1400 left.1
1401 .partial_cmp(&right.1)
1402 .unwrap_or(std::cmp::Ordering::Equal)
1403 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1404 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1405 });
1406 Ok(hits)
1407 }
1408
1409 pub async fn explain_vector_plan(
1412 &self,
1413 query: &[f32],
1414 limit: usize,
1415 filter: &Predicate,
1416 search: Option<&config::SearchConfig>,
1417 ) -> Result<String> {
1418 let scope = embedded_scope(filter);
1419 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1420 let key = Float32Array::from(query.to_vec());
1421 scanner.nearest("vector", &key, limit)?;
1422 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1423 scanner.nprobes(nprobes);
1424 }
1425 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1426 scanner.refine(refine_factor);
1427 }
1428 scanner
1429 .explain_plan(true)
1430 .await
1431 .context("explain_plan failed")
1432 }
1433
1434 pub async fn explain_fts_plan(
1435 &self,
1436 query: &str,
1437 limit: usize,
1438 filter: &Predicate,
1439 ) -> Result<String> {
1440 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1441 scanner.full_text_search(
1442 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1443 )?;
1444 scanner.project(&["session_id", "id"])?;
1445 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1446 scanner
1447 .explain_plan(true)
1448 .await
1449 .context("explain_plan failed")
1450 }
1451
1452 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1454 if keys.is_empty() {
1455 return Ok(Vec::new());
1456 }
1457 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1458 let session_ids = keys
1459 .iter()
1460 .map(|key| key.session_id.clone())
1461 .collect::<Vec<_>>();
1462 let message_ids = keys
1463 .iter()
1464 .map(|key| key.message_id.clone())
1465 .collect::<Vec<_>>();
1466 let predicate = Predicate::And(vec![
1467 in_predicate("session_id", &session_ids),
1468 in_predicate("id", &message_ids),
1469 ]);
1470 let batch = self
1471 .handle
1472 .scan_batch(
1473 Table::Messages,
1474 Some(&predicate),
1475 &[
1476 "id",
1477 "session_id",
1478 "role",
1479 "project",
1480 "source_agent",
1481 "timestamp",
1482 "search_text",
1483 ],
1484 )
1485 .await?;
1486 let mut metas = Vec::with_capacity(batch.num_rows());
1487 for row in 0..batch.num_rows() {
1488 let message_id = string(&batch, "id", row)?.context("id is null")?;
1489 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1490 if !wanted.contains(&MessageKey {
1491 session_id: session_id.clone(),
1492 message_id: message_id.clone(),
1493 }) {
1494 continue;
1495 }
1496 metas.push(MessageMeta {
1497 message_id,
1498 session_id,
1499 role: string(&batch, "role", row)?.context("role is null")?,
1500 project: string(&batch, "project", row)?.context("project is null")?,
1501 source_agent: string(&batch, "source_agent", row)?
1502 .context("source_agent is null")?,
1503 timestamp: datetime(&batch, "timestamp", row)?,
1504 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1505 });
1506 }
1507 Ok(metas)
1508 }
1509
1510 pub async fn session_message_counts(
1512 &self,
1513 session_ids: &[String],
1514 ) -> Result<BTreeMap<String, usize>> {
1515 if session_ids.is_empty() {
1516 return Ok(BTreeMap::new());
1517 }
1518 let dataset = self.handle.dataset(Table::Messages).await?;
1519 let mut tasks = tokio::task::JoinSet::new();
1520 for session_id in session_ids {
1521 let dataset = dataset.clone();
1522 let session_id = session_id.clone();
1523 tasks.spawn(async move {
1524 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1525 let count = dataset.count_rows(Some(filter)).await?;
1526 anyhow::Ok((session_id, count))
1527 });
1528 }
1529 let mut counts = BTreeMap::new();
1530 while let Some(joined) = tasks.join_next().await {
1531 let (session_id, count) = joined.context("session count task panicked")??;
1532 counts.insert(session_id, count);
1533 }
1534 Ok(counts)
1535 }
1536
1537 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1540 self.handle
1541 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1542 .await
1543 }
1544
1545 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1551 self.handle
1552 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1553 .await
1554 }
1555
1556 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1563 let dataset = self.handle.dataset(Table::Messages).await?;
1564 let embedded = dataset
1565 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1566 .await?;
1567 let total = dataset
1568 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1569 .await?;
1570 Ok(EmbeddingProgress {
1571 embedded,
1572 total,
1573 model: embed::model_id(),
1574 })
1575 }
1576
1577 pub async fn stale_embedding_count(&self) -> Result<usize> {
1581 let dataset = self.handle.dataset(Table::Messages).await?;
1582 dataset
1583 .count_rows(Some(
1584 Predicate::And(vec![
1585 Predicate::IsNotNull("vector"),
1586 Predicate::Ne("embedding_model", embed::model_id().into()),
1587 ])
1588 .to_lance(),
1589 ))
1590 .await
1591 .map_err(Into::into)
1592 }
1593
1594 pub async fn optimize_indices(
1600 &self,
1601 progress: Option<OptimizeProgressFn>,
1602 cleanup: Option<CleanupConfig>,
1603 ) -> Result<OptimizeOutcome> {
1604 let cleanup = cleanup.unwrap_or_default();
1605 let policy = pond_index_intents();
1606 let mut tables = Vec::with_capacity(3);
1607 for (table, intents) in policy.all() {
1608 let outcome = self
1609 .handle
1610 .optimize_table(table, intents, progress.as_ref(), cleanup)
1611 .await;
1612 tables.push(outcome);
1613 }
1614 Ok(OptimizeOutcome { tables })
1615 }
1616
1617 pub async fn build_indices_only(
1623 &self,
1624 progress: Option<OptimizeProgressFn>,
1625 ) -> Result<OptimizeOutcome> {
1626 let policy = pond_index_intents();
1627 let mut tables = Vec::with_capacity(3);
1628 for (table, intents) in policy.all() {
1629 let indices = self
1630 .handle
1631 .optimize_table_indices_only(table, intents, progress.as_ref())
1632 .await;
1633 tables.push(TableOptimizeOutcome {
1634 table,
1635 indices,
1636 compaction: PhaseOutcome::NotAttempted,
1637 });
1638 }
1639 Ok(OptimizeOutcome { tables })
1640 }
1641
1642 #[cfg(test)]
1643 async fn optimize_indices_with_vector_threshold(
1644 &self,
1645 vector_threshold: usize,
1646 ) -> Result<OptimizeOutcome> {
1647 let cleanup = CleanupConfig::default();
1648 let policy = pond_index_intents_with_vector_threshold(vector_threshold);
1649 let mut tables = Vec::with_capacity(3);
1650 for (table, intents) in policy.all() {
1651 let outcome = self
1652 .handle
1653 .optimize_table(table, intents, None, cleanup)
1654 .await;
1655 tables.push(outcome);
1656 }
1657 Ok(OptimizeOutcome { tables })
1658 }
1659
1660 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1661 let policy = pond_index_intents();
1662 let mut matched = false;
1663 for (table, intents) in policy.all() {
1664 for intent in intents {
1665 if intent_name.is_none_or(|name| name == intent.name) {
1666 matched = true;
1667 self.handle.rebuild_index(table, intent).await?;
1668 }
1669 }
1670 }
1671 if let Some(name) = intent_name
1672 && !matched
1673 {
1674 anyhow::bail!("unknown index intent {name:?}");
1675 }
1676 Ok(())
1677 }
1678
1679 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1680 let policy = pond_index_intents();
1681 let mut statuses = Vec::new();
1682 for (table, intents) in policy.all() {
1683 statuses.extend(self.handle.index_status(table, intents).await?);
1684 }
1685 Ok(statuses)
1686 }
1687
1688 pub async fn drop_vector_index(&self) -> Result<()> {
1692 match self
1693 .handle
1694 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1695 .await
1696 {
1697 Ok(()) => Ok(()),
1698 Err(error) => {
1699 let msg = error.to_string();
1700 if msg.contains("not found") || msg.contains("does not exist") {
1701 Ok(())
1702 } else {
1703 Err(error)
1704 }
1705 }
1706 }
1707 }
1708
1709 pub async fn table_sizes(&self) -> Result<TableSizes> {
1712 self.handle.table_sizes().await
1713 }
1714
1715 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1716 let batch = self
1717 .handle
1718 .scan_batch(
1719 Table::Sessions,
1720 Some(&Predicate::Eq("id", session_id.into())),
1721 &[],
1722 )
1723 .await?;
1724 if batch.num_rows() == 0 {
1725 Ok(None)
1726 } else {
1727 Ok(Some(session_from_batch(&batch, 0)?))
1728 }
1729 }
1730
1731 pub async fn message_vector_by_id(&self, message_id: &str) -> Result<Option<Vec<f32>>> {
1737 let batch = self
1738 .handle
1739 .scan_batch(
1740 Table::Messages,
1741 Some(&Predicate::Eq("id", message_id.into())),
1742 &["vector"],
1743 )
1744 .await?;
1745 if batch.num_rows() == 0 {
1746 return Ok(None);
1747 }
1748 let column = batch
1749 .column(0)
1750 .as_any()
1751 .downcast_ref::<FixedSizeListArray>();
1752 let Some(list) = column else {
1753 return Ok(None);
1754 };
1755 if list.is_null(0) {
1756 return Ok(None);
1757 }
1758 let values = list.value(0);
1759 let halves = values
1760 .as_any()
1761 .downcast_ref::<Float16Array>()
1762 .context("messages.vector inner array is not Float16")?;
1763 let widened = (0..halves.len())
1764 .map(|i| halves.value(i).to_f32())
1765 .collect();
1766 Ok(Some(widened))
1767 }
1768
1769 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1770 let batch = self
1771 .handle
1772 .scan_batch(
1773 Table::Messages,
1774 Some(&Predicate::Eq("session_id", session_id.into())),
1775 &[
1776 "session_id",
1777 "id",
1778 "timestamp",
1779 "role",
1780 "content",
1781 "options",
1782 ],
1783 )
1784 .await?;
1785 let mut messages = Vec::with_capacity(batch.num_rows());
1786 for row in 0..batch.num_rows() {
1787 messages.push(message_from_batch(&batch, row)?);
1788 }
1789 messages.sort_by(|left, right| {
1790 left.timestamp()
1791 .cmp(&right.timestamp())
1792 .then_with(|| left.id().cmp(right.id()))
1793 });
1794
1795 let message_ids = messages
1796 .iter()
1797 .map(|message| message.id().to_owned())
1798 .collect::<Vec<_>>();
1799 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1800
1801 Ok(messages
1802 .into_iter()
1803 .map(|message| {
1804 let key = (message.session_id().to_owned(), message.id().to_owned());
1805 let parts = parts_by_message.remove(&key).unwrap_or_default();
1806 MessageWithParts { message, parts }
1807 })
1808 .collect())
1809 }
1810
1811 pub async fn parts_for_messages(
1815 &self,
1816 session_id: &str,
1817 message_ids: &[String],
1818 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1819 self.scan_parts(session_id, message_ids, None).await
1820 }
1821
1822 pub async fn summary_parts_for_messages(
1827 &self,
1828 session_id: &str,
1829 message_ids: &[String],
1830 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1831 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1832 .await
1833 }
1834
1835 async fn scan_parts(
1836 &self,
1837 session_id: &str,
1838 message_ids: &[String],
1839 part_types: Option<&[&str]>,
1840 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1841 if message_ids.is_empty() {
1842 return Ok(BTreeMap::new());
1843 }
1844 let mut clauses = vec![
1845 Predicate::Eq("session_id", session_id.into()),
1846 in_predicate("message_id", message_ids),
1847 ];
1848 if let Some(types) = part_types {
1849 clauses.push(Predicate::In(
1850 "type",
1851 types.iter().map(|&t| t.into()).collect(),
1852 ));
1853 }
1854 let predicate = Predicate::And(clauses);
1855 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1856 let mut scanner = self
1857 .handle
1858 .scan(
1859 Table::Parts,
1860 ScanOpts::with_predicate_and_projection(
1861 &predicate,
1862 &[
1863 "session_id",
1864 "message_id",
1865 "id",
1866 "ordinal",
1867 "type",
1868 "provenance",
1869 "variant_data",
1870 "options",
1871 ],
1872 ),
1873 )
1874 .await?;
1875 scanner.with_row_address();
1876 let batch = scanner.try_into_batch().await.context("scan failed")?;
1877 let row_addresses = uint64(&batch, "_rowaddr")?;
1878 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1879 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1880 for row in 0..batch.num_rows() {
1881 if string(&batch, "type", row)?.as_deref() == Some("file") {
1882 let variant_data =
1883 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1884 file_rows.push((row, row_addresses.value(row), variant_data));
1885 }
1886 }
1887 if !file_rows.is_empty() {
1888 let addresses = file_rows
1889 .iter()
1890 .map(|(_, address, _)| *address)
1891 .collect::<Vec<_>>();
1892 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1893 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1894 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1898 file_payloads.insert(row, payload);
1899 }
1900 }
1901 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1902 for row in 0..batch.num_rows() {
1903 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1904 parts_by_message
1905 .entry((part.session_id.clone(), part.message_id.clone()))
1906 .or_default()
1907 .push(part);
1908 }
1909 for parts in parts_by_message.values_mut() {
1910 parts.sort_by_key(|part| part.ordinal);
1911 }
1912 Ok(parts_by_message)
1913 }
1914}
1915
1916#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1917#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1918pub enum IngestEvent {
1919 Session(Session),
1920 Message(Message),
1921 Part(Part),
1922}
1923
1924#[derive(Debug, Clone, PartialEq, Eq, Default)]
1932pub struct IngestSummary {
1933 pub inserted: usize,
1937 pub matched: usize,
1939 pub sessions_inserted: usize,
1941 pub messages_inserted_total: usize,
1944 pub messages_inserted_searchable: usize,
1948 pub parts_inserted: usize,
1950 pub sessions_matched: usize,
1952 pub messages_matched_total: usize,
1954 pub messages_matched_searchable: usize,
1956 pub parts_matched: usize,
1958 pub dropped_events: usize,
1968 pub dropped_sessions: usize,
1973 pub skipped_files: usize,
1976 pub skipped_empty: usize,
1981 pub skipped_fresh: usize,
1985 pub storage_errors: usize,
1989 pub truncated_values: usize,
1992 pub drop_reasons: BTreeMap<&'static str, usize>,
1998}
1999
2000pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2006pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2007pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2008pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2009pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2010pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2011pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2012pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2013pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2014pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2015pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2016
2017#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2025pub struct BatchCounts {
2026 pub sessions_inserted: usize,
2027 pub sessions_matched: usize,
2028 pub messages_inserted_total: usize,
2029 pub messages_inserted_searchable: usize,
2030 pub messages_matched_total: usize,
2031 pub messages_matched_searchable: usize,
2032 pub parts_inserted: usize,
2033 pub parts_matched: usize,
2034}
2035
2036impl IngestSummary {
2037 pub fn accepted(&self) -> usize {
2038 self.inserted + self.matched
2039 }
2040
2041 pub fn add_batch(&mut self, counts: &BatchCounts) {
2045 self.sessions_inserted += counts.sessions_inserted;
2046 self.sessions_matched += counts.sessions_matched;
2047 self.messages_inserted_total += counts.messages_inserted_total;
2048 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2049 self.messages_matched_total += counts.messages_matched_total;
2050 self.messages_matched_searchable += counts.messages_matched_searchable;
2051 self.parts_inserted += counts.parts_inserted;
2052 self.parts_matched += counts.parts_matched;
2053 self.inserted +=
2054 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2055 self.matched +=
2056 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2057 }
2058
2059 pub fn merge(&mut self, other: &Self) {
2063 self.inserted += other.inserted;
2064 self.matched += other.matched;
2065 self.sessions_inserted += other.sessions_inserted;
2066 self.messages_inserted_total += other.messages_inserted_total;
2067 self.messages_inserted_searchable += other.messages_inserted_searchable;
2068 self.parts_inserted += other.parts_inserted;
2069 self.sessions_matched += other.sessions_matched;
2070 self.messages_matched_total += other.messages_matched_total;
2071 self.messages_matched_searchable += other.messages_matched_searchable;
2072 self.parts_matched += other.parts_matched;
2073 self.dropped_events += other.dropped_events;
2074 self.dropped_sessions += other.dropped_sessions;
2075 self.skipped_files += other.skipped_files;
2076 self.skipped_empty += other.skipped_empty;
2077 self.skipped_fresh += other.skipped_fresh;
2078 self.storage_errors += other.storage_errors;
2079 self.truncated_values += other.truncated_values;
2080 for (key, value) in &other.drop_reasons {
2081 *self.drop_reasons.entry(key).or_insert(0) += value;
2082 }
2083 }
2084
2085 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2090 for outcome in outcomes {
2091 if !matches!(outcome.status, OutcomeStatus::Error) {
2092 continue;
2093 }
2094 if outcome.kind == "session" {
2095 self.dropped_sessions += 1;
2096 } else {
2097 self.dropped_events += 1;
2098 }
2099 let reason = outcome
2100 .error
2101 .as_ref()
2102 .and_then(|error| error.reason_key)
2103 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2104 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2105 }
2106 }
2107
2108 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2109 for outcome in outcomes {
2110 match outcome.status {
2111 OutcomeStatus::Inserted => {
2112 self.inserted += 1;
2113 match outcome.kind {
2114 "session" => self.sessions_inserted += 1,
2115 "message" => {
2116 self.messages_inserted_total += 1;
2117 if outcome.searchable {
2118 self.messages_inserted_searchable += 1;
2119 }
2120 }
2121 "part" => self.parts_inserted += 1,
2122 _ => {}
2123 }
2124 }
2125 OutcomeStatus::Matched => {
2126 self.matched += 1;
2127 match outcome.kind {
2128 "session" => self.sessions_matched += 1,
2129 "message" => {
2130 self.messages_matched_total += 1;
2131 if outcome.searchable {
2132 self.messages_matched_searchable += 1;
2133 }
2134 }
2135 "part" => self.parts_matched += 1,
2136 _ => {}
2137 }
2138 }
2139 OutcomeStatus::Error => {
2140 if outcome.kind == "session" {
2146 self.dropped_sessions += 1;
2147 } else {
2148 self.dropped_events += 1;
2149 }
2150 let reason = outcome
2151 .error
2152 .as_ref()
2153 .and_then(|e| e.reason_key)
2154 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2155 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2156 }
2157 }
2158 }
2159 }
2160}
2161
2162#[derive(Debug, Clone, PartialEq)]
2167pub struct RowOutcome {
2168 pub index: usize,
2169 pub kind: &'static str,
2170 pub pk: Value,
2171 pub status: OutcomeStatus,
2172 pub error: Option<RowError>,
2173 pub searchable: bool,
2178}
2179
2180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2181pub enum OutcomeStatus {
2182 Inserted,
2183 Matched,
2184 Error,
2185}
2186
2187#[derive(Debug, Clone, PartialEq, Eq)]
2190pub struct RowError {
2191 pub message: String,
2192 pub field: Option<&'static str>,
2193 pub reason: Option<&'static str>,
2194 pub reason_key: Option<&'static str>,
2199}
2200
2201#[derive(Debug)]
2205struct BufferedSession {
2206 index: usize,
2207 session: Session,
2208}
2209
2210#[derive(Debug)]
2211struct BufferedMessage {
2212 index: usize,
2213 message: Message,
2214 parts: Vec<BufferedPart>,
2215 search_text: Option<String>,
2216}
2217
2218#[derive(Debug)]
2219struct BufferedPart {
2220 index: usize,
2221 part: Part,
2222}
2223
2224#[derive(Debug, Default)]
2241pub struct IngestValidator {
2242 session: Option<BufferedSession>,
2243 current_message: Option<BufferedMessage>,
2244 current_parts: Vec<BufferedPart>,
2245 messages: Vec<BufferedMessage>,
2246 seen_message_ids: HashSet<String>,
2250 seen_part_keys: HashSet<(String, String)>,
2253 completed: Vec<CompletedSubstream>,
2257}
2258
2259#[derive(Debug)]
2261struct CompletedSubstream {
2262 session_index: usize,
2263 session: Session,
2264 messages: Vec<BufferedMessage>,
2265}
2266
2267impl IngestValidator {
2268 pub async fn push(
2274 &mut self,
2275 store: &Store,
2276 index: usize,
2277 event: IngestEvent,
2278 ) -> Result<Vec<RowOutcome>> {
2279 match event {
2280 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2281 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2282 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2283 }
2284 }
2285
2286 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2291 self.close_current_substream();
2292 self.flush(store).await
2293 }
2294
2295 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2302 if self.completed.is_empty() {
2303 return Ok((Vec::new(), BatchCounts::default()));
2304 }
2305 let completed = std::mem::take(&mut self.completed);
2306 store.upsert_session_batch(completed).await
2307 }
2308
2309 pub fn pending_substreams(&self) -> usize {
2312 self.completed.len()
2313 }
2314
2315 async fn push_session(
2316 &mut self,
2317 _store: &Store,
2318 index: usize,
2319 mut session: Session,
2320 ) -> Result<Vec<RowOutcome>> {
2321 self.close_current_substream();
2325
2326 let trimmed = session.source_agent.trim();
2331 if trimmed.is_empty() {
2332 return Ok(vec![RowOutcome {
2333 index,
2334 kind: "session",
2335 pk: Value::String(session.id.clone()),
2336 status: OutcomeStatus::Error,
2337 error: Some(RowError {
2338 message: format!("session {} has empty source_agent after trim", session.id),
2339 field: Some("source_agent"),
2340 reason: None,
2341 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2342 }),
2343 searchable: false,
2344 }]);
2345 }
2346 if trimmed.len() != session.source_agent.len() {
2347 session.source_agent = trimmed.to_owned();
2348 }
2349
2350 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2351 return Ok(vec![RowOutcome {
2352 index,
2353 kind: "session",
2354 pk: Value::String(session.id.clone()),
2355 status: OutcomeStatus::Error,
2356 error: Some(RowError {
2357 message: format!(
2358 "session {} has parent_message_id without parent_session_id",
2359 session.id,
2360 ),
2361 field: Some("parent_message_id"),
2362 reason: None,
2363 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2364 }),
2365 searchable: false,
2366 }]);
2367 }
2368
2369 self.seen_message_ids.clear();
2370 self.seen_part_keys.clear();
2371 self.session = Some(BufferedSession { index, session });
2372 Ok(Vec::new())
2373 }
2374
2375 fn close_current_substream(&mut self) {
2376 self.flush_current_message();
2377 let Some(BufferedSession {
2378 index: session_index,
2379 session,
2380 }) = self.session.take()
2381 else {
2382 return;
2383 };
2384 let messages = std::mem::take(&mut self.messages);
2385 self.seen_message_ids.clear();
2386 self.seen_part_keys.clear();
2387 self.completed.push(CompletedSubstream {
2388 session_index,
2389 session,
2390 messages,
2391 });
2392 }
2393
2394 fn push_message(&mut self, index: usize, message: Message) -> Vec<RowOutcome> {
2395 let pk = Value::Array(vec![
2396 Value::String(message.session_id().to_owned()),
2397 Value::String(message.id().to_owned()),
2398 ]);
2399 let Some(session) = &self.session else {
2400 return vec![error_outcome(
2401 index,
2402 "message",
2403 pk,
2404 "first event in a session stream must be Session",
2405 None,
2406 DROP_REASON_MESSAGE_BEFORE_SESSION,
2407 )];
2408 };
2409 if message.session_id() != session.session.id {
2410 let msg = format!(
2411 "message {} references session {}, expected {}",
2412 message.id(),
2413 message.session_id(),
2414 session.session.id
2415 );
2416 return vec![error_outcome(
2417 index,
2418 "message",
2419 pk,
2420 &msg,
2421 Some("session_id"),
2422 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2423 )];
2424 }
2425 if !self.seen_message_ids.insert(message.id().to_owned()) {
2426 let msg = format!("duplicate message id {} in session substream", message.id());
2430 return vec![error_outcome(
2431 index,
2432 "message",
2433 pk,
2434 &msg,
2435 None,
2436 DROP_REASON_DUPLICATE_MESSAGE_ID,
2437 )];
2438 }
2439 self.flush_current_message();
2440 self.current_message = Some(BufferedMessage {
2441 index,
2442 message,
2443 parts: Vec::new(),
2444 search_text: None,
2445 });
2446 Vec::new()
2447 }
2448
2449 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2450 let pk = Value::Array(vec![
2451 Value::String(part.session_id.clone()),
2452 Value::String(part.message_id.clone()),
2453 Value::String(part.id.clone()),
2454 ]);
2455 let Some(current) = &self.current_message else {
2456 return vec![error_outcome(
2457 index,
2458 "part",
2459 pk,
2460 "part event appeared before a message",
2461 None,
2462 DROP_REASON_PART_BEFORE_MESSAGE,
2463 )];
2464 };
2465 if part.session_id != current.message.session_id() {
2466 let msg = format!(
2467 "part {} references session {}, expected {}",
2468 part.id,
2469 part.session_id,
2470 current.message.session_id()
2471 );
2472 return vec![error_outcome(
2473 index,
2474 "part",
2475 pk,
2476 &msg,
2477 Some("session_id"),
2478 DROP_REASON_PART_MESSAGE_MISMATCH,
2479 )];
2480 }
2481 if part.message_id != current.message.id() {
2482 let msg = format!(
2483 "part {} references message {}, expected {}",
2484 part.id,
2485 part.message_id,
2486 current.message.id()
2487 );
2488 return vec![error_outcome(
2489 index,
2490 "part",
2491 pk,
2492 &msg,
2493 Some("message_id"),
2494 DROP_REASON_PART_MESSAGE_MISMATCH,
2495 )];
2496 }
2497 let part_key = (part.message_id.clone(), part.id.clone());
2498 if !self.seen_part_keys.insert(part_key) {
2499 let msg = format!(
2500 "duplicate part id {} for message {} in session substream",
2501 part.id, part.message_id
2502 );
2503 return vec![error_outcome(
2504 index,
2505 "part",
2506 pk,
2507 &msg,
2508 None,
2509 DROP_REASON_DUPLICATE_PART_KEY,
2510 )];
2511 }
2512 self.current_parts.push(BufferedPart { index, part });
2513 Vec::new()
2514 }
2515
2516 fn flush_current_message(&mut self) {
2517 let Some(mut buffered) = self.current_message.take() else {
2518 return;
2519 };
2520 let parts = std::mem::take(&mut self.current_parts);
2521 let mut canonical_parts = Vec::with_capacity(parts.len());
2522 for part in &parts {
2523 canonical_parts.push(part.part.clone());
2524 }
2525 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2526 buffered.parts = parts;
2527 self.messages.push(buffered);
2528 }
2529}
2530
2531fn error_outcome(
2532 index: usize,
2533 kind: &'static str,
2534 pk: Value,
2535 message: &str,
2536 field: Option<&'static str>,
2537 reason_key: &'static str,
2538) -> RowOutcome {
2539 RowOutcome {
2540 index,
2541 kind,
2542 pk,
2543 status: OutcomeStatus::Error,
2544 error: Some(RowError {
2545 message: message.to_owned(),
2546 field,
2547 reason: None,
2548 reason_key: Some(reason_key),
2549 }),
2550 searchable: false,
2551 }
2552}
2553
2554fn error_outcomes_for_substream(
2559 session_index: usize,
2560 session: &Session,
2561 _messages: &[BufferedMessage],
2562 message: impl Into<String>,
2563 field: Option<&'static str>,
2564 reason_key: &'static str,
2565) -> Vec<RowOutcome> {
2566 let reason = field.map(|_| "immutable");
2567 vec![RowOutcome {
2568 index: session_index,
2569 kind: "session",
2570 pk: Value::String(session.id.clone()),
2571 status: OutcomeStatus::Error,
2572 error: Some(RowError {
2573 message: message.into(),
2574 field,
2575 reason,
2576 reason_key: Some(reason_key),
2577 }),
2578 searchable: false,
2579 }]
2580}
2581
2582fn success_outcomes_for_substream(
2588 session_index: usize,
2589 session: &Session,
2590 messages: &[BufferedMessage],
2591 existing_sessions: &std::collections::HashMap<String, Session>,
2592 existing_message_pks: &HashSet<(String, String)>,
2593 existing_part_pks: &HashSet<(String, String, String)>,
2594 counts: &mut BatchCounts,
2595) -> Vec<RowOutcome> {
2596 let session_was_present = existing_sessions.contains_key(&session.id);
2597 let session_status = if session_was_present {
2598 counts.sessions_matched += 1;
2599 UpsertStatus::Matched
2600 } else {
2601 counts.sessions_inserted += 1;
2602 UpsertStatus::Inserted
2603 };
2604
2605 let mut outcomes = Vec::with_capacity(1 + messages.len());
2606 outcomes.push(success_outcome(
2607 session_index,
2608 "session",
2609 Value::String(session.id.clone()),
2610 session_status,
2611 false,
2612 ));
2613 for buffered in messages {
2614 let key = (
2615 buffered.message.session_id().to_owned(),
2616 buffered.message.id().to_owned(),
2617 );
2618 let searchable = buffered.search_text.is_some();
2619 let message_status = if existing_message_pks.contains(&key) {
2620 counts.messages_matched_total += 1;
2621 if searchable {
2622 counts.messages_matched_searchable += 1;
2623 }
2624 UpsertStatus::Matched
2625 } else {
2626 counts.messages_inserted_total += 1;
2627 if searchable {
2628 counts.messages_inserted_searchable += 1;
2629 }
2630 UpsertStatus::Inserted
2631 };
2632 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2633 outcomes.push(success_outcome(
2634 buffered.index,
2635 "message",
2636 pk,
2637 message_status,
2638 searchable,
2639 ));
2640 for part in &buffered.parts {
2641 let part_key = (
2642 part.part.session_id.clone(),
2643 part.part.message_id.clone(),
2644 part.part.id.clone(),
2645 );
2646 let part_status = if existing_part_pks.contains(&part_key) {
2647 counts.parts_matched += 1;
2648 UpsertStatus::Matched
2649 } else {
2650 counts.parts_inserted += 1;
2651 UpsertStatus::Inserted
2652 };
2653 let part_pk = Value::Array(vec![
2654 Value::String(part_key.0),
2655 Value::String(part_key.1),
2656 Value::String(part_key.2),
2657 ]);
2658 outcomes.push(success_outcome(
2659 part.index,
2660 "part",
2661 part_pk,
2662 part_status,
2663 false,
2664 ));
2665 }
2666 }
2667 outcomes
2668}
2669
2670fn success_outcome(
2671 index: usize,
2672 kind: &'static str,
2673 pk: Value,
2674 status: UpsertStatus,
2675 searchable: bool,
2676) -> RowOutcome {
2677 let status = match status {
2678 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2679 UpsertStatus::Matched => OutcomeStatus::Matched,
2680 };
2681 RowOutcome {
2682 index,
2683 kind,
2684 pk,
2685 status,
2686 error: None,
2687 searchable,
2688 }
2689}
2690
2691#[derive(Debug, Clone, PartialEq, Eq)]
2692enum IngestError {
2693 ImmutableField {
2698 field: &'static str,
2699 session_id: String,
2700 stored: String,
2701 attempted: String,
2702 },
2703}
2704
2705impl std::fmt::Display for IngestError {
2706 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2707 match self {
2708 Self::ImmutableField {
2709 field,
2710 session_id,
2711 stored,
2712 attempted,
2713 } => write!(
2714 formatter,
2715 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2716 ),
2717 }
2718 }
2719}
2720
2721impl std::error::Error for IngestError {}
2722
2723fn ensure_immutable_match(
2727 existing: &Session,
2728 incoming: &Session,
2729) -> std::result::Result<(), IngestError> {
2730 if existing.source_agent != incoming.source_agent {
2731 return Err(IngestError::ImmutableField {
2732 field: "source_agent",
2733 session_id: incoming.id.clone(),
2734 stored: existing.source_agent.clone(),
2735 attempted: incoming.source_agent.clone(),
2736 });
2737 }
2738 if existing.project != incoming.project {
2739 return Err(IngestError::ImmutableField {
2740 field: "project",
2741 session_id: incoming.id.clone(),
2742 stored: (*existing.project).clone(),
2743 attempted: (*incoming.project).clone(),
2744 });
2745 }
2746 Ok(())
2747}
2748
2749pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2750 use crate::wire::Provenance;
2751 let mut chunks: Vec<String> = Vec::new();
2752 for part in parts {
2753 if part.provenance != Provenance::Conversational {
2756 continue;
2757 }
2758 match (message.role(), &part.kind) {
2759 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2760 if let Some(text) = text {
2761 chunks.push(text.to_string());
2762 }
2763 }
2764 (
2765 Role::User | Role::Assistant,
2766 PartKind::File {
2767 media_type,
2768 file_name,
2769 data,
2770 },
2771 ) => {
2772 if let Some(file_name) = file_name {
2773 chunks.push(file_name.clone());
2774 }
2775 if let Some(media_type) = media_type {
2776 chunks.push(media_type.clone());
2777 }
2778 if let FileData::Url(uri) = data {
2779 chunks.push(uri.clone());
2780 }
2781 }
2782 (
2783 Role::System | Role::Tool,
2784 PartKind::Text { .. }
2785 | PartKind::Reasoning { .. }
2786 | PartKind::File { .. }
2787 | PartKind::ToolCall { .. }
2788 | PartKind::ToolResult { .. }
2789 | PartKind::ToolApprovalRequest { .. }
2790 | PartKind::ToolApprovalResponse { .. },
2791 )
2792 | (
2793 Role::User | Role::Assistant,
2794 PartKind::Reasoning { .. }
2795 | PartKind::ToolCall { .. }
2796 | PartKind::ToolResult { .. }
2797 | PartKind::ToolApprovalRequest { .. }
2798 | PartKind::ToolApprovalResponse { .. },
2799 ) => {}
2800 }
2801 }
2802
2803 let text = chunks
2804 .into_iter()
2805 .filter(|chunk| !chunk.trim().is_empty())
2806 .collect::<Vec<_>>()
2807 .join("\n");
2808 if text.is_empty() { None } else { Some(text) }
2809}
2810
2811#[derive(Debug, Clone, PartialEq, Eq)]
2813pub struct SearchText(String);
2814
2815impl SearchText {
2816 pub fn as_str(&self) -> &str {
2817 &self.0
2818 }
2819
2820 pub fn into_inner(self) -> String {
2821 self.0
2822 }
2823}
2824
2825impl AsRef<str> for SearchText {
2826 fn as_ref(&self) -> &str {
2827 &self.0
2828 }
2829}
2830
2831#[derive(Debug, Clone, PartialEq)]
2832pub struct MessageWithParts {
2833 pub message: Message,
2834 pub parts: Vec<Part>,
2835}
2836
2837#[derive(Debug, Clone, PartialEq)]
2838pub struct SessionWithMessages {
2839 pub session: Session,
2840 pub messages: Vec<MessageWithParts>,
2841}
2842
2843#[derive(Debug, Clone)]
2844pub struct SessionViewParams<'a> {
2845 pub mode: ResponseMode,
2846 pub after_id: Option<&'a str>,
2847 pub limit: usize,
2848 pub budget_bytes: usize,
2849 pub session_from: SessionFrom,
2850}
2851
2852#[derive(Debug, Clone)]
2853pub struct MessageViewParams<'a> {
2854 pub context_depth: usize,
2855 pub after_id: Option<&'a str>,
2856 pub limit: usize,
2857 pub budget_bytes: usize,
2858}
2859
2860#[derive(Debug, Clone, PartialEq)]
2866pub enum GetLookup<T> {
2867 NotFound,
2868 UnknownAfterId,
2869 Found(T),
2870}
2871
2872#[derive(Debug, Clone, PartialEq)]
2876pub struct SessionPage {
2877 pub session: Session,
2878 pub messages: Vec<RetrievedMessage>,
2879 pub messages_remaining: usize,
2880}
2881
2882#[derive(Debug, Clone, PartialEq)]
2886pub struct MessagePage {
2887 pub session: Session,
2888 pub target: RetrievedMessage,
2889 pub target_parts: Vec<Part>,
2890 pub target_parts_remaining: usize,
2891 pub siblings: Vec<RetrievedMessage>,
2892}
2893
2894#[derive(Debug, Clone, PartialEq)]
2895pub struct RetrievedMessage {
2896 pub id: String,
2897 pub role: Role,
2898 pub timestamp: DateTime<Utc>,
2899 pub text: Option<String>,
2900 pub content: Option<String>,
2901 pub parts: Vec<Part>,
2902}
2903
2904#[derive(Debug, Clone)]
2905struct ScanRow {
2906 id: String,
2907 role: Role,
2908 timestamp: DateTime<Utc>,
2909 text: Option<String>,
2910 content: Option<String>,
2911}
2912
2913#[derive(Debug, Clone)]
2916pub struct ConversationalRow {
2917 pub session_id: String,
2918 pub message_id: String,
2919 pub role: Role,
2920 pub timestamp: DateTime<Utc>,
2921 pub text: SearchText,
2922}
2923
2924fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2929 let capped = items.len().min(limit.clamp(1, 1000));
2930 let mut acc = 0usize;
2931 let mut emitted = 0usize;
2932 for item in &items[..capped] {
2933 let next = acc.saturating_add(size(item));
2934 if emitted > 0 && next > budget_bytes {
2935 break;
2936 }
2937 acc = next;
2938 emitted += 1;
2939 }
2940 emitted
2941}
2942
2943fn role_from_str(value: &str) -> Result<Role> {
2944 match value {
2945 "system" => Ok(Role::System),
2946 "user" => Ok(Role::User),
2947 "assistant" => Ok(Role::Assistant),
2948 "tool" => Ok(Role::Tool),
2949 other => anyhow::bail!("unknown message role {other}"),
2950 }
2951}
2952
2953const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2961 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
2962 (
2963 "session_id",
2964 BuiltinIndexType::BTree,
2965 "messages_session_id_btree",
2966 ),
2967 (
2968 "timestamp",
2969 BuiltinIndexType::BTree,
2970 "messages_timestamp_btree",
2971 ),
2972 (
2973 "source_agent",
2974 BuiltinIndexType::Bitmap,
2975 "messages_source_agent_bitmap",
2976 ),
2977 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
2978];
2979
2980const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
2983 (
2984 "session_id",
2985 BuiltinIndexType::BTree,
2986 "parts_session_id_btree",
2987 ),
2988 (
2989 "message_id",
2990 BuiltinIndexType::BTree,
2991 "parts_message_id_btree",
2992 ),
2993];
2994
2995const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
2998 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
2999
3000fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3001 Predicate::In(
3002 column,
3003 values.iter().cloned().map(ScalarValue::String).collect(),
3004 )
3005}
3006
3007fn embedded_scope(filter: &Predicate) -> Predicate {
3012 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3013}
3014
3015fn statuses_from_inserted(total: usize, inserted_rows: u64) -> Vec<UpsertStatus> {
3016 let inserted = usize::try_from(inserted_rows)
3017 .unwrap_or(usize::MAX)
3018 .min(total);
3019 let mut statuses = Vec::with_capacity(total);
3020 statuses.extend(std::iter::repeat_n(UpsertStatus::Inserted, inserted));
3021 statuses.extend(std::iter::repeat_n(
3022 UpsertStatus::Matched,
3023 total.saturating_sub(inserted),
3024 ));
3025 statuses
3026}
3027
3028pub(crate) const SESSIONS: &str = "sessions";
3032pub(crate) const MESSAGES: &str = "messages";
3033pub(crate) const PARTS: &str = "parts";
3034
3035pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3038
3039pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3042
3043const IVF_PQ_NUM_BITS: u8 = 8;
3049const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3050const IVF_PQ_MAX_ITERS: usize = 15;
3051
3052const FTS_NGRAM_MIN: u32 = 3;
3056const FTS_NGRAM_MAX: u32 = 5;
3057
3058pub fn pond_index_intents() -> IndexIntents {
3061 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3062}
3063
3064pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3068 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3069 messages.push(IndexIntent {
3070 name: MESSAGES_FTS_INDEX,
3071 column: "search_text",
3072 trigger: IndexTrigger::OnAnyRows,
3073 params: IndexParamsKind::InvertedFtsNgram {
3074 min: FTS_NGRAM_MIN,
3075 max: FTS_NGRAM_MAX,
3076 },
3077 });
3078 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3079 messages.push(IndexIntent {
3080 name,
3081 column,
3082 trigger: IndexTrigger::OnAnyRows,
3083 params: IndexParamsKind::Scalar(kind.clone()),
3084 });
3085 }
3086 messages.push(IndexIntent {
3087 name: MESSAGES_VECTOR_INDEX,
3088 column: "vector",
3089 trigger: IndexTrigger::OnNonNullCount {
3090 column: "vector",
3091 threshold: vector_threshold,
3092 },
3093 params: IndexParamsKind::IvfPqCosine {
3094 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3095 num_bits: IVF_PQ_NUM_BITS,
3096 max_iters: IVF_PQ_MAX_ITERS,
3097 },
3098 });
3099 let parts = PARTS_SCALAR_INDICES
3100 .iter()
3101 .map(|(column, kind, name)| IndexIntent {
3102 name,
3103 column,
3104 trigger: IndexTrigger::OnAnyRows,
3105 params: IndexParamsKind::Scalar(kind.clone()),
3106 })
3107 .collect();
3108 let sessions = SESSIONS_SCALAR_INDICES
3109 .iter()
3110 .map(|(column, kind, name)| IndexIntent {
3111 name,
3112 column,
3113 trigger: IndexTrigger::OnAnyRows,
3114 params: IndexParamsKind::Scalar(kind.clone()),
3115 })
3116 .collect();
3117 IndexIntents {
3118 sessions,
3119 messages,
3120 parts,
3121 }
3122}
3123
3124pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3128
3129static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3135
3136pub fn embedding_dim() -> usize {
3139 EMBEDDING_DIM_RUNTIME
3140 .get()
3141 .copied()
3142 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3143}
3144
3145pub fn init_embedding_dim(dim: usize) {
3147 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3148}
3149
3150pub(crate) fn write_params_for_create() -> WriteParams {
3157 WriteParams {
3158 data_storage_version: Some(LanceFileVersion::V2_1),
3159 enable_v2_manifest_paths: true,
3160 enable_stable_row_ids: true,
3161 auto_cleanup: Some(AutoCleanupParams {
3162 interval: 20,
3163 older_than: chrono::TimeDelta::days(1),
3164 }),
3165 skip_auto_cleanup: true,
3166 ..WriteParams::default()
3167 }
3168}
3169
3170fn export_schema(table: Table) -> Arc<Schema> {
3171 match table {
3172 Table::Sessions => session_schema(),
3173 Table::Messages => message_schema(),
3174 Table::Parts => part_schema(),
3175 }
3176}
3177
3178fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3179 let expected = export_schema(table);
3180 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3181 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3182 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3183 if actual_names != expected_names {
3184 anyhow::bail!(
3185 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3186 table.as_str(),
3187 );
3188 }
3189 Ok(())
3190}
3191
3192async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3193 let source_uri = source
3194 .to_str()
3195 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3196 let dataset = Dataset::open(source_uri)
3197 .await
3198 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3199 ensure_schema_matches_archive(&dataset, table)?;
3200 Ok(dataset)
3201}
3202
3203pub(crate) fn session_schema() -> Arc<Schema> {
3204 Arc::new(Schema::new(vec![
3205 primary_field("id", DataType::Utf8, false),
3206 Field::new("parent_session_id", DataType::Utf8, true),
3207 Field::new("parent_message_id", DataType::Utf8, true),
3208 Field::new("source_agent", DataType::Utf8, false),
3209 Field::new(
3210 "created_at",
3211 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3212 false,
3213 ),
3214 Field::new("project", DataType::Utf8, false),
3215 json_field("options", false),
3216 ]))
3217}
3218
3219pub(crate) fn message_schema() -> Arc<Schema> {
3220 Arc::new(Schema::new(vec![
3221 primary_field("session_id", DataType::Utf8, false),
3222 primary_field("id", DataType::Utf8, false),
3223 Field::new(
3224 "timestamp",
3225 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3226 false,
3227 ),
3228 Field::new("role", DataType::Utf8, false),
3229 Field::new("source_agent", DataType::Utf8, false),
3230 Field::new("project", DataType::Utf8, false),
3231 Field::new("content", DataType::Utf8, true),
3232 Field::new("search_text", DataType::Utf8, true),
3233 Field::new("vector", embedding_vector_type(), true),
3236 Field::new("embedding_model", DataType::Utf8, true),
3237 json_field("options", false),
3238 ]))
3239}
3240
3241pub(crate) fn part_schema() -> Arc<Schema> {
3242 Arc::new(Schema::new(vec![
3243 primary_field("session_id", DataType::Utf8, false),
3244 primary_field("message_id", DataType::Utf8, false),
3245 primary_field("id", DataType::Utf8, false),
3246 Field::new("ordinal", DataType::Int32, false),
3247 Field::new("type", DataType::Utf8, false),
3248 Field::new("provenance", DataType::Utf8, false),
3251 json_field("variant_data", false),
3252 legacy_blob_field("data", true),
3253 json_field("options", false),
3254 ]))
3255}
3256
3257pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3258 let arrays = schema
3259 .fields()
3260 .iter()
3261 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3262 .collect();
3263 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3264}
3265
3266pub(crate) fn empty_reader(
3267 schema: Arc<Schema>,
3268) -> Result<
3269 RecordBatchIterator<
3270 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3271 >,
3272> {
3273 let batch = empty_batch(schema.clone())?;
3274 Ok(RecordBatchIterator::new(
3275 vec![Ok(batch)].into_iter(),
3276 schema,
3277 ))
3278}
3279
3280pub(crate) struct MessageBatchRow<'a> {
3281 pub message: &'a Message,
3282 pub source_agent: &'a str,
3283 pub project: &'a str,
3284 pub search_text: Option<&'a str>,
3285}
3286
3287fn embedding_vector_type() -> DataType {
3293 DataType::FixedSizeList(
3294 Arc::new(Field::new("item", DataType::Float16, true)),
3295 embedding_dim() as i32,
3296 )
3297}
3298
3299fn embedding_update_schema() -> Arc<Schema> {
3303 Arc::new(Schema::new(vec![
3304 primary_field("session_id", DataType::Utf8, false),
3305 primary_field("id", DataType::Utf8, false),
3306 Field::new("vector", embedding_vector_type(), true),
3307 Field::new("embedding_model", DataType::Utf8, true),
3308 ]))
3309}
3310
3311pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3314 let dim = embedding_dim();
3315 let mut flat = Vec::with_capacity(rows.len() * dim);
3316 for row in rows {
3317 if row.vector.len() != dim {
3318 anyhow::bail!(
3319 "embedding for message {} has dim {}, expected {dim}",
3320 row.id,
3321 row.vector.len(),
3322 );
3323 }
3324 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3325 }
3326 let values = Float16Array::from(flat);
3327 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3328 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3329 .context("failed to build embedding vector column")?;
3330
3331 RecordBatch::try_new(
3332 embedding_update_schema(),
3333 vec![
3334 Arc::new(StringArray::from(
3335 rows.iter()
3336 .map(|row| row.session_id.as_str())
3337 .collect::<Vec<_>>(),
3338 )),
3339 Arc::new(StringArray::from(
3340 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3341 )),
3342 Arc::new(vectors),
3343 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3344 ],
3345 )
3346 .context("failed to build embedding update batch")
3347}
3348
3349const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3354
3355fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3360 let mut chunks = Vec::new();
3361 let mut start = 0usize;
3362 let mut running = 0usize;
3363 for (index, &row) in cells.iter().enumerate() {
3364 if running + row > COLUMN_BYTE_BUDGET && index > start {
3365 chunks.push(start..index);
3366 start = index;
3367 running = 0;
3368 }
3369 running += row;
3370 }
3371 if start < cells.len() {
3372 chunks.push(start..cells.len());
3373 }
3374 chunks
3375}
3376
3377fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3378 if bytes >= COLUMN_BYTE_BUDGET {
3379 anyhow::bail!(
3380 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3381 overflow Arrow's i32 offset buffer"
3382 );
3383 }
3384 Ok(())
3385}
3386
3387async fn merge_insert_chunks(
3388 handle: &Handle,
3389 table: Table,
3390 batches: Vec<RecordBatch>,
3391) -> Result<u64> {
3392 let mut inserted = 0u64;
3393 for batch in batches {
3394 let rows = batch.num_rows();
3395 inserted += handle.merge_insert(table, batch, rows).await?;
3396 }
3397 Ok(inserted)
3398}
3399
3400pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3401 let options = sessions
3402 .iter()
3403 .map(|session| json_bytes(&session.options))
3404 .collect::<Result<Vec<_>>>()?;
3405 let mut cells = Vec::with_capacity(sessions.len());
3406 for (session, encoded) in sessions.iter().zip(&options) {
3407 let columns = [
3408 session.id.len(),
3409 session.parent_session_id.as_deref().map_or(0, str::len),
3410 session.parent_message_id.as_deref().map_or(0, str::len),
3411 session.source_agent.len(),
3412 session.project.as_str().len(),
3413 encoded.len(),
3414 ];
3415 for bytes in columns {
3416 guard_cell("sessions", &session.id, bytes)?;
3417 }
3418 cells.push(columns.iter().sum());
3419 }
3420 chunk_ranges(&cells)
3421 .into_iter()
3422 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3423 .collect()
3424}
3425
3426fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3427 let schema = session_schema();
3428 RecordBatch::try_new(
3429 schema.clone(),
3430 vec![
3431 Arc::new(StringArray::from(
3432 sessions
3433 .iter()
3434 .map(|session| session.id.as_str())
3435 .collect::<Vec<_>>(),
3436 )),
3437 Arc::new(StringArray::from(
3438 sessions
3439 .iter()
3440 .map(|session| session.parent_session_id.as_deref())
3441 .collect::<Vec<_>>(),
3442 )),
3443 Arc::new(StringArray::from(
3444 sessions
3445 .iter()
3446 .map(|session| session.parent_message_id.as_deref())
3447 .collect::<Vec<_>>(),
3448 )),
3449 Arc::new(StringArray::from(
3450 sessions
3451 .iter()
3452 .map(|session| session.source_agent.as_str())
3453 .collect::<Vec<_>>(),
3454 )),
3455 Arc::new(
3456 TimestampMicrosecondArray::from(
3457 sessions
3458 .iter()
3459 .map(|session| micros(session.created_at))
3460 .collect::<Vec<_>>(),
3461 )
3462 .with_timezone("UTC"),
3463 ),
3464 Arc::new(StringArray::from(
3465 sessions
3466 .iter()
3467 .map(|session| session.project.as_str())
3468 .collect::<Vec<_>>(),
3469 )),
3470 Arc::new(LargeBinaryArray::from_iter_values(
3471 options.iter().map(Vec::as_slice),
3472 )),
3473 ],
3474 )
3475 .context("failed to build session batch")
3476}
3477
3478pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3479 let options = rows
3480 .iter()
3481 .map(|row| json_bytes(row.message.options()))
3482 .collect::<Result<Vec<_>>>()?;
3483 let mut cells = Vec::with_capacity(rows.len());
3484 for (row, encoded) in rows.iter().zip(&options) {
3485 let columns = [
3486 row.message.session_id().len(),
3487 row.message.id().len(),
3488 row.message.role().as_str().len(),
3489 row.source_agent.len(),
3490 row.project.len(),
3491 row.message.system_content().map_or(0, str::len),
3492 row.search_text.map_or(0, str::len),
3493 encoded.len(),
3494 ];
3495 for bytes in columns {
3496 guard_cell("messages", row.message.id(), bytes)?;
3497 }
3498 cells.push(columns.iter().sum());
3499 }
3500 chunk_ranges(&cells)
3501 .into_iter()
3502 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3503 .collect()
3504}
3505
3506fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3507 let schema = message_schema();
3508 RecordBatch::try_new(
3509 schema.clone(),
3510 vec![
3511 Arc::new(StringArray::from(
3512 rows.iter()
3513 .map(|row| row.message.session_id())
3514 .collect::<Vec<_>>(),
3515 )),
3516 Arc::new(StringArray::from(
3517 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3518 )),
3519 Arc::new(
3520 TimestampMicrosecondArray::from(
3521 rows.iter()
3522 .map(|row| micros(row.message.timestamp()))
3523 .collect::<Vec<_>>(),
3524 )
3525 .with_timezone("UTC"),
3526 ),
3527 Arc::new(StringArray::from(
3528 rows.iter()
3529 .map(|row| row.message.role().as_str())
3530 .collect::<Vec<_>>(),
3531 )),
3532 Arc::new(StringArray::from(
3533 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3534 )),
3535 Arc::new(StringArray::from(
3536 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3537 )),
3538 Arc::new(StringArray::from(
3539 rows.iter()
3540 .map(|row| row.message.system_content())
3541 .collect::<Vec<_>>(),
3542 )),
3543 Arc::new(StringArray::from(
3544 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3545 )),
3546 new_null_array(&embedding_vector_type(), rows.len()),
3550 new_null_array(&DataType::Utf8, rows.len()),
3551 Arc::new(LargeBinaryArray::from_iter_values(
3552 options.iter().map(Vec::as_slice),
3553 )),
3554 ],
3555 )
3556 .context("failed to build message batch")
3557}
3558
3559pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3560 let variant_data = parts
3561 .iter()
3562 .map(|part| part_variant_json(&part.kind))
3563 .collect::<Result<Vec<_>>>()?;
3564 let options = parts
3565 .iter()
3566 .map(|part| json_bytes(&part.options))
3567 .collect::<Result<Vec<_>>>()?;
3568 let mut cells = Vec::with_capacity(parts.len());
3569 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3572 let columns = [
3573 part.session_id.len(),
3574 part.message_id.len(),
3575 part.id.len(),
3576 part.kind.type_name().len(),
3577 part.provenance.as_str().len(),
3578 variant.len(),
3579 encoded.len(),
3580 ];
3581 for bytes in columns {
3582 guard_cell("parts", &part.id, bytes)?;
3583 }
3584 cells.push(columns.iter().sum());
3585 }
3586 chunk_ranges(&cells)
3587 .into_iter()
3588 .map(|range| {
3589 parts_chunk(
3590 &parts[range.clone()],
3591 &variant_data[range.clone()],
3592 &options[range],
3593 )
3594 })
3595 .collect()
3596}
3597
3598fn parts_chunk(
3599 parts: &[Part],
3600 variant_data: &[Vec<u8>],
3601 options: &[Vec<u8>],
3602) -> Result<RecordBatch> {
3603 let schema = part_schema();
3604 let blob_payloads: Vec<Option<&[u8]>> = parts
3608 .iter()
3609 .map(|part| match &part.kind {
3610 PartKind::File { data, .. } => Some(match data {
3611 FileData::String(value) => value.as_bytes(),
3612 FileData::Bytes(value) => value.as_slice(),
3613 FileData::Url(value) => value.as_bytes(),
3614 }),
3615 PartKind::Text { .. }
3616 | PartKind::Reasoning { .. }
3617 | PartKind::ToolCall { .. }
3618 | PartKind::ToolResult { .. }
3619 | PartKind::ToolApprovalRequest { .. }
3620 | PartKind::ToolApprovalResponse { .. } => None,
3621 })
3622 .collect();
3623 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3624
3625 RecordBatch::try_new(
3626 schema.clone(),
3627 vec![
3628 Arc::new(StringArray::from(
3629 parts
3630 .iter()
3631 .map(|part| part.session_id.as_str())
3632 .collect::<Vec<_>>(),
3633 )),
3634 Arc::new(StringArray::from(
3635 parts
3636 .iter()
3637 .map(|part| part.message_id.as_str())
3638 .collect::<Vec<_>>(),
3639 )),
3640 Arc::new(StringArray::from(
3641 parts
3642 .iter()
3643 .map(|part| part.id.as_str())
3644 .collect::<Vec<_>>(),
3645 )),
3646 Arc::new(Int32Array::from(
3647 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3648 )),
3649 Arc::new(StringArray::from(
3650 parts
3651 .iter()
3652 .map(|part| part.kind.type_name())
3653 .collect::<Vec<_>>(),
3654 )),
3655 Arc::new(StringArray::from(
3656 parts
3657 .iter()
3658 .map(|part| part.provenance.as_str())
3659 .collect::<Vec<_>>(),
3660 )),
3661 Arc::new(LargeBinaryArray::from_iter_values(
3662 variant_data.iter().map(Vec::as_slice),
3663 )),
3664 Arc::new(blob_array),
3665 Arc::new(LargeBinaryArray::from_iter_values(
3666 options.iter().map(Vec::as_slice),
3667 )),
3668 ],
3669 )
3670 .context("failed to build parts batch")
3671}
3672
3673pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3674 Ok(Session {
3675 id: string(batch, "id", row)?.context("session id is null")?,
3676 parent_session_id: string(batch, "parent_session_id", row)?,
3677 parent_message_id: string(batch, "parent_message_id", row)?,
3678 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3679 created_at: datetime(batch, "created_at", row)?,
3680 project: crate::adapter::Extracted::from_stored(
3681 string(batch, "project", row)?.context("project is null")?,
3682 ),
3683 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3684 })
3685}
3686
3687pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3688 let id = string(batch, "id", row)?.context("message id is null")?;
3689 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3690 let timestamp = datetime(batch, "timestamp", row)?;
3691 let options =
3692 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3693
3694 match string(batch, "role", row)?
3695 .context("message role is null")?
3696 .as_str()
3697 {
3698 "system" => Ok(Message::System {
3699 id,
3700 session_id,
3701 timestamp,
3702 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3709 options,
3710 }),
3711 "user" => Ok(Message::User {
3712 id,
3713 session_id,
3714 timestamp,
3715 options,
3716 }),
3717 "assistant" => Ok(Message::Assistant {
3718 id,
3719 session_id,
3720 timestamp,
3721 options,
3722 }),
3723 "tool" => Ok(Message::Tool {
3724 id,
3725 session_id,
3726 timestamp,
3727 options,
3728 }),
3729 other => anyhow::bail!("unknown message role {other}"),
3730 }
3731}
3732
3733pub(crate) fn part_from_batch(
3734 batch: &RecordBatch,
3735 row: usize,
3736 file_data: Option<FileData>,
3737) -> Result<Part> {
3738 let type_name = string(batch, "type", row)?.context("part type is null")?;
3739 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3740 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3741 Ok(Part {
3742 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3743 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3744 id: string(batch, "id", row)?.context("part id is null")?,
3745 ordinal: int32(batch, "ordinal", row)?,
3746 provenance: provenance_from_str(&provenance)?,
3747 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3748 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3749 })
3750}
3751
3752fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3753 match value {
3754 "conversational" => Ok(crate::wire::Provenance::Conversational),
3755 "injected" => Ok(crate::wire::Provenance::Injected),
3756 other => anyhow::bail!("unknown part provenance {other}"),
3757 }
3758}
3759
3760fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3761 let kind = file_data_kind(variant_data)?;
3762 match kind.as_str() {
3763 "string" => {
3764 let text = std::str::from_utf8(bytes)
3765 .context("file string payload is not UTF-8")?
3766 .to_owned();
3767 Ok(FileData::String(text))
3768 }
3769 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3770 "url" => Ok(FileData::Url(
3771 std::str::from_utf8(bytes)
3772 .context("file URL payload is not UTF-8")?
3773 .to_owned(),
3774 )),
3775 other => anyhow::bail!("unknown file data_kind {other}"),
3776 }
3777}
3778
3779fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3780 let value = json_parse::<Value>(variant_data)?;
3781 value
3782 .get("data_kind")
3783 .and_then(Value::as_str)
3784 .map(str::to_owned)
3785 .context("file part variant_data missing data_kind")
3786}
3787
3788fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3789 batch
3790 .column_by_name(name)
3791 .with_context(|| format!("missing column {name}"))?
3792 .as_any()
3793 .downcast_ref::<UInt64Array>()
3794 .with_context(|| format!("column {name} is not UInt64"))
3795}
3796
3797pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3798 let array = batch
3799 .column_by_name(name)
3800 .with_context(|| format!("missing column {name}"))?
3801 .as_any()
3802 .downcast_ref::<StringArray>()
3803 .with_context(|| format!("column {name} is not Utf8"))?;
3804 if array.is_null(row) {
3805 Ok(None)
3806 } else {
3807 Ok(Some(array.value(row).to_owned()))
3808 }
3809}
3810
3811fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3812 let column = batch
3816 .column_by_name(name)
3817 .with_context(|| format!("missing column {name}"))?;
3818 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3819 return if array.is_null(row) {
3820 Ok(None)
3821 } else {
3822 Ok(Some(
3823 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3824 ))
3825 };
3826 }
3827 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3828 return if array.is_null(row) {
3829 Ok(None)
3830 } else {
3831 Ok(Some(array.value(row).as_bytes().to_vec()))
3832 };
3833 }
3834 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3835 return if array.is_null(row) {
3836 Ok(None)
3837 } else {
3838 Ok(Some(array.value(row).as_bytes().to_vec()))
3839 };
3840 }
3841 anyhow::bail!("column {name} is not a JSON-compatible array")
3842}
3843
3844fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3845 let array = batch
3846 .column_by_name(name)
3847 .with_context(|| format!("missing column {name}"))?
3848 .as_any()
3849 .downcast_ref::<Int32Array>()
3850 .with_context(|| format!("column {name} is not Int32"))?;
3851 Ok(array.value(row))
3852}
3853
3854pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3855 let array = batch
3856 .column_by_name(name)
3857 .with_context(|| format!("missing column {name}"))?
3858 .as_any()
3859 .downcast_ref::<Float32Array>()
3860 .with_context(|| format!("column {name} is not Float32"))?;
3861 Ok(array.value(row))
3862}
3863
3864pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3865 let array = batch
3866 .column_by_name(name)
3867 .with_context(|| format!("missing column {name}"))?
3868 .as_any()
3869 .downcast_ref::<TimestampMicrosecondArray>()
3870 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3871 Utc.timestamp_micros(array.value(row))
3872 .single()
3873 .context("timestamp is out of range")
3874}
3875
3876fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3877 Field::new(name, data_type, nullable).with_metadata(
3878 [(
3879 "lance-schema:unenforced-primary-key".to_owned(),
3880 "true".to_owned(),
3881 )]
3882 .into(),
3883 )
3884}
3885
3886fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3896 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3897 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3898 .into_iter()
3899 .collect(),
3900 )
3901}
3902
3903fn json_field(name: &str, nullable: bool) -> Field {
3904 lance_arrow::json::json_field(name, nullable)
3905}
3906
3907fn micros(timestamp: DateTime<Utc>) -> i64 {
3908 timestamp.timestamp_micros()
3909}
3910
3911fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3912 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3920 lance_arrow::json::encode_json(&text)
3921 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3922}
3923
3924fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3925 serde_json::from_slice(value).context("failed to parse JSON field")
3926}
3927
3928fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3929 if let PartKind::File {
3930 media_type,
3931 file_name,
3932 data,
3933 } = kind
3934 {
3935 let data_kind = match data {
3936 FileData::String(_) => "string",
3937 FileData::Bytes(_) => "bytes",
3938 FileData::Url(_) => "url",
3939 };
3940 return json_bytes(&serde_json::json!({
3941 "media_type": media_type,
3942 "file_name": file_name,
3943 "data_kind": data_kind,
3944 }));
3945 }
3946 let value = serde_json::to_value(kind)?;
3947 let mut object = value
3948 .as_object()
3949 .cloned()
3950 .context("part variant did not serialize to an object")?;
3951 object.remove("type");
3952 json_bytes(&object)
3953}
3954
3955fn part_kind_from_json(
3956 type_name: &str,
3957 variant_data: &[u8],
3958 file_data: Option<FileData>,
3959) -> Result<PartKind> {
3960 let mut value = json_parse::<Value>(variant_data)?;
3961 let object = value
3962 .as_object_mut()
3963 .context("part variant data is not an object")?;
3964 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3965 if let Some(data) = file_data {
3966 object.remove("data_kind");
3967 object.insert("data".to_owned(), serde_json::to_value(data)?);
3968 }
3969 serde_json::from_value(value).context("failed to parse part kind")
3970}
3971
3972#[cfg(test)]
3973mod tests {
3974 #![allow(clippy::expect_used, clippy::unwrap_used)]
3975
3976 use super::*;
3977 use crate::{
3978 adapter::Extracted,
3979 handlers::ingest_events,
3980 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
3981 };
3982 use chrono::Utc;
3983 use serde_json::json;
3984 use tempfile::TempDir;
3985
3986 fn synthetic_session(id: &str) -> Session {
3987 Session {
3988 id: id.to_owned(),
3989 parent_session_id: None,
3990 parent_message_id: None,
3991 source_agent: "claude-code".to_owned(),
3992 created_at: Utc::now(),
3993 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
3994 options: ProviderOptions::new(),
3995 }
3996 }
3997
3998 #[test]
3999 fn search_text_excludes_injected_parts() {
4000 use crate::wire::Provenance;
4001 let message = Message::User {
4002 id: "m1".to_owned(),
4003 session_id: "s1".to_owned(),
4004 timestamp: Utc::now(),
4005 options: ProviderOptions::new(),
4006 };
4007 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4008 session_id: "s1".to_owned(),
4009 id: id.to_owned(),
4010 message_id: "m1".to_owned(),
4011 ordinal: 0,
4012 provenance,
4013 options: ProviderOptions::new(),
4014 kind: PartKind::Text {
4015 text: Some(Extracted::from_test_value(text.to_owned())),
4016 },
4017 };
4018
4019 let conversational = search_text(
4022 &message,
4023 &[text_part(
4024 "p1",
4025 "real human prompt",
4026 Provenance::Conversational,
4027 )],
4028 );
4029 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4030
4031 let injected = search_text(
4032 &message,
4033 &[text_part(
4034 "p2",
4035 "<task-notification>...</task-notification>",
4036 Provenance::Injected,
4037 )],
4038 );
4039 assert!(
4040 injected.is_none(),
4041 "a message whose only part is injected has null search_text"
4042 );
4043 }
4044
4045 #[test]
4046 fn chunk_ranges_splits_on_byte_budget() {
4047 assert!(chunk_ranges(&[]).is_empty());
4048 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4049
4050 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4051 assert_eq!(
4052 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4053 vec![0..1, 1..2, 2..3],
4054 );
4055
4056 assert_eq!(
4058 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4059 vec![0..1, 1..2, 2..3],
4060 );
4061 }
4062
4063 #[tokio::test]
4064 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4065 let temp = TempDir::new()?;
4070 let store = Store::open_local(temp.path()).await?;
4071 let session = synthetic_session("ordering");
4072 let orphan_part = Part {
4073 session_id: session.id.clone(),
4074 id: "orphan-part".to_owned(),
4075 message_id: "missing-message".to_owned(),
4076 ordinal: 0,
4077 provenance: crate::wire::Provenance::Conversational,
4078 options: ProviderOptions::new(),
4079 kind: PartKind::Text {
4080 text: Some(Extracted::from_test_value("orphan".to_owned())),
4081 },
4082 };
4083 let valid_message = Message::User {
4084 id: "valid-message".to_owned(),
4085 session_id: session.id.clone(),
4086 timestamp: Utc::now(),
4087 options: ProviderOptions::new(),
4088 };
4089 let valid_part = Part {
4090 session_id: session.id.clone(),
4091 id: "valid-part".to_owned(),
4092 message_id: valid_message.id().to_owned(),
4093 ordinal: 0,
4094 provenance: crate::wire::Provenance::Conversational,
4095 options: ProviderOptions::new(),
4096 kind: PartKind::Text {
4097 text: Some(Extracted::from_test_value("kept".to_owned())),
4098 },
4099 };
4100
4101 let mut validator = IngestValidator::default();
4102 validator
4103 .push(&store, 0, IngestEvent::Session(session.clone()))
4104 .await?;
4105 let part_outcomes = validator
4106 .push(&store, 1, IngestEvent::Part(orphan_part))
4107 .await?;
4108 assert_eq!(part_outcomes.len(), 1);
4109 assert_eq!(part_outcomes[0].kind, "part");
4110 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4111 assert!(
4112 part_outcomes[0]
4113 .error
4114 .as_ref()
4115 .map(|e| e.message.contains("part event appeared before a message"))
4116 .unwrap_or(false),
4117 "error message must explain the ordering violation: {part_outcomes:?}"
4118 );
4119 validator
4120 .push(&store, 2, IngestEvent::Message(valid_message))
4121 .await?;
4122 validator
4123 .push(&store, 3, IngestEvent::Part(valid_part))
4124 .await?;
4125 validator.finish(&store).await?;
4126
4127 let (sessions, messages, parts) = store.row_counts().await?;
4128 assert_eq!(sessions, 1, "session committed despite the orphan part");
4129 assert_eq!(messages, 1, "valid message committed");
4130 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4131
4132 Ok(())
4133 }
4134
4135 #[tokio::test]
4136 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4137 let temp = TempDir::new()?;
4141 let store = Store::open_local(temp.path()).await?;
4142 let session = synthetic_session("duplicate-message");
4143 let first = Message::User {
4144 id: "message-1".to_owned(),
4145 session_id: session.id.clone(),
4146 timestamp: Utc::now(),
4147 options: ProviderOptions::new(),
4148 };
4149 let second = Message::Assistant {
4150 id: "message-1".to_owned(),
4151 session_id: session.id.clone(),
4152 timestamp: Utc::now(),
4153 options: ProviderOptions::new(),
4154 };
4155
4156 let mut validator = IngestValidator::default();
4157 validator
4158 .push(&store, 0, IngestEvent::Session(session.clone()))
4159 .await?;
4160 validator
4161 .push(&store, 1, IngestEvent::Message(first))
4162 .await?;
4163 let dup_outcomes = validator
4164 .push(&store, 2, IngestEvent::Message(second))
4165 .await?;
4166 assert_eq!(dup_outcomes.len(), 1);
4167 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4168 assert!(
4169 dup_outcomes[0]
4170 .error
4171 .as_ref()
4172 .map(|e| e.message.contains("duplicate message id message-1"))
4173 .unwrap_or(false),
4174 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4175 );
4176
4177 validator.finish(&store).await?;
4178 let (sessions, messages, _) = store.row_counts().await?;
4179 assert_eq!(sessions, 1, "session committed");
4180 assert_eq!(messages, 1, "only the first message committed");
4181
4182 Ok(())
4183 }
4184
4185 #[tokio::test(flavor = "multi_thread")]
4193 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4194 use crate::wire::{FileData, PartKind, Provenance};
4195 let temp = TempDir::new()?;
4196 let store = Store::open_local(temp.path()).await?;
4197
4198 let session = synthetic_session("compact-blob");
4199 store
4200 .upsert_sessions(std::slice::from_ref(&session))
4201 .await?;
4202
4203 let make_part = |idx: usize, kind: PartKind| Part {
4204 session_id: session.id.clone(),
4205 message_id: format!("msg-{idx}"),
4206 id: format!("part-{idx}"),
4207 ordinal: 0,
4208 provenance: Provenance::Conversational,
4209 options: ProviderOptions::new(),
4210 kind,
4211 };
4212
4213 let batch_a = vec![
4214 make_part(
4215 0,
4216 PartKind::File {
4217 media_type: Some("text/plain".to_owned()),
4218 file_name: Some("a.txt".to_owned()),
4219 data: FileData::Bytes(b"alpha".to_vec()),
4220 },
4221 ),
4222 make_part(
4223 1,
4224 PartKind::File {
4225 media_type: Some("text/plain".to_owned()),
4226 file_name: Some("b.txt".to_owned()),
4227 data: FileData::String("beta".to_owned()),
4228 },
4229 ),
4230 ];
4231 store.upsert_parts(&batch_a).await?;
4232
4233 let batch_b = vec![
4234 make_part(
4235 2,
4236 PartKind::File {
4237 media_type: Some("application/octet-stream".to_owned()),
4238 file_name: None,
4239 data: FileData::Url("https://example.com/file".to_owned()),
4240 },
4241 ),
4242 make_part(
4243 3,
4244 PartKind::File {
4245 media_type: Some("image/png".to_owned()),
4246 file_name: Some("c.png".to_owned()),
4247 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4248 },
4249 ),
4250 ];
4251 store.upsert_parts(&batch_b).await?;
4252
4253 store.optimize_indices(None, None).await?.into_result()?;
4254
4255 Ok(())
4256 }
4257
4258 #[tokio::test]
4259 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4260 let temp = TempDir::new()?;
4261 let store = Store::open_local(temp.path()).await?;
4262 let session = synthetic_session("blob");
4263 let message = Message::User {
4264 id: "message-1".to_owned(),
4265 session_id: session.id.clone(),
4266 timestamp: Utc::now(),
4267 options: ProviderOptions::new(),
4268 };
4269 let part = Part {
4270 session_id: session.id.clone(),
4271 id: "part-1".to_owned(),
4272 message_id: message.id().to_owned(),
4273 ordinal: 0,
4274 provenance: crate::wire::Provenance::Conversational,
4275 options: ProviderOptions::new(),
4276 kind: PartKind::File {
4277 media_type: Some("text/plain".to_owned()),
4278 file_name: Some("payload.txt".to_owned()),
4279 data: FileData::Bytes(b"pond".to_vec()),
4280 },
4281 };
4282
4283 let mut validator = IngestValidator::default();
4284 validator
4285 .push(&store, 0, IngestEvent::Session(session.clone()))
4286 .await?;
4287 validator
4288 .push(&store, 1, IngestEvent::Message(message.clone()))
4289 .await?;
4290 validator
4291 .push(&store, 2, IngestEvent::Part(part.clone()))
4292 .await?;
4293 validator.finish(&store).await?;
4294
4295 let stored = store
4296 .get_session(&session.id)
4297 .await?
4298 .expect("session should exist");
4299 let stored_part = &stored.messages[0].parts[0];
4300 assert_eq!(stored_part, &part);
4301
4302 Ok(())
4303 }
4304
4305 fn base_session() -> Session {
4316 Session {
4317 id: "01HXY00000000001".to_owned(),
4318 parent_session_id: None,
4319 parent_message_id: None,
4320 source_agent: "claude-code".to_owned(),
4321 created_at: Utc::now(),
4322 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4323 options: ProviderOptions::new(),
4324 }
4325 }
4326
4327 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4328 outcomes
4329 .iter()
4330 .filter(|outcome| outcome.status == target)
4331 .count()
4332 }
4333
4334 #[tokio::test(flavor = "multi_thread")]
4335 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4336 -> anyhow::Result<()> {
4337 let temp = TempDir::new()?;
4338 let store = Store::open_local(temp.path()).await?;
4339
4340 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4341 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4342
4343 let mut again = base_session();
4344 again.options.insert("title".to_owned(), json!("renamed"));
4345 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4346 assert_eq!(
4347 count_status(&second, OutcomeStatus::Error),
4348 0,
4349 "options is mutable; the re-ingest must not surface an error: {second:?}",
4350 );
4351 assert_eq!(
4352 count_status(&second, OutcomeStatus::Matched),
4353 1,
4354 "unchanged immutable fields must match-insert via merge_insert",
4355 );
4356
4357 Ok(())
4358 }
4359
4360 #[tokio::test(flavor = "multi_thread")]
4361 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4362 let temp = TempDir::new()?;
4363 let store = Store::open_local(temp.path()).await?;
4364
4365 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4366 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4367
4368 let mut tampered = base_session();
4369 tampered.source_agent = "codex-cli".to_owned();
4370 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4371 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4372 let err_row = second
4373 .iter()
4374 .find(|outcome| outcome.status == OutcomeStatus::Error)
4375 .expect("error outcome present");
4376 let err = err_row.error.as_ref().expect("error body present");
4377 assert_eq!(err.field, Some("source_agent"));
4378 assert_eq!(err.reason, Some("immutable"));
4379
4380 let stored = store
4382 .get_session(&base_session().id)
4383 .await?
4384 .expect("session row survives the rejected re-ingest");
4385 assert_eq!(stored.session.source_agent, "claude-code");
4386
4387 Ok(())
4388 }
4389
4390 #[tokio::test(flavor = "multi_thread")]
4391 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4392 let temp = TempDir::new()?;
4393 let store = Store::open_local(temp.path()).await?;
4394
4395 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4396 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4397
4398 let mut tampered = base_session();
4399 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4400 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4401 let err_row = second
4402 .iter()
4403 .find(|outcome| outcome.status == OutcomeStatus::Error)
4404 .expect("project change must surface an error outcome");
4405 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4406
4407 let stored = store
4408 .get_session(&base_session().id)
4409 .await?
4410 .expect("session row survives");
4411 assert_eq!(
4412 stored.session.project.as_str(),
4413 "/home/me/proj",
4414 "stored project must remain the original",
4415 );
4416
4417 Ok(())
4418 }
4419
4420 #[tokio::test(flavor = "multi_thread")]
4421 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4422 use crate::wire::Provenance;
4430 let temp = TempDir::new()?;
4431 let store = Store::open_local(temp.path()).await?;
4432 let session = base_session();
4433
4434 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4435 session_id: session.id.clone(),
4436 id: part_id.to_owned(),
4437 message_id: message_id.to_owned(),
4438 ordinal: 0,
4439 provenance: Provenance::Conversational,
4440 options: ProviderOptions::new(),
4441 kind: PartKind::Text {
4442 text: Some(Extracted::from_test_value(body.to_owned())),
4443 },
4444 };
4445 let user_message = |id: &str| Message::User {
4446 id: id.to_owned(),
4447 session_id: session.id.clone(),
4448 timestamp: Utc::now(),
4449 options: ProviderOptions::new(),
4450 };
4451
4452 let mut validator = IngestValidator::default();
4454 validator
4455 .push(&store, 0, IngestEvent::Session(session.clone()))
4456 .await?;
4457 validator
4458 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4459 .await?;
4460 validator
4461 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4462 .await?;
4463 validator
4464 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4465 .await?;
4466 validator
4467 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4468 .await?;
4469 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4470 assert_eq!(first_counts.sessions_inserted, 1);
4471 assert_eq!(first_counts.messages_inserted_total, 2);
4472 assert_eq!(first_counts.messages_inserted_searchable, 2);
4473
4474 let mut validator = IngestValidator::default();
4476 validator
4477 .push(&store, 0, IngestEvent::Session(session.clone()))
4478 .await?;
4479 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4480 let pid = format!("p{}", idx + 3);
4481 validator
4482 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4483 .await?;
4484 validator
4485 .push(
4486 &store,
4487 idx * 2 + 2,
4488 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4489 )
4490 .await?;
4491 }
4492 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4493
4494 assert_eq!(
4495 second_counts.sessions_inserted, 0,
4496 "existing session row must report as Matched, not Inserted",
4497 );
4498 assert_eq!(second_counts.sessions_matched, 1);
4499 assert_eq!(
4500 second_counts.messages_inserted_total, 3,
4501 "the three NEW messages must register as Inserted in BatchCounts",
4502 );
4503 assert_eq!(
4504 second_counts.messages_inserted_searchable, 3,
4505 "all three new messages carry conversational text -> searchable",
4506 );
4507 assert_eq!(second_counts.messages_matched_total, 0);
4508 assert_eq!(second_counts.parts_inserted, 3);
4509 assert_eq!(second_counts.parts_matched, 0);
4510
4511 let session_outcome = second_outcomes
4514 .iter()
4515 .find(|outcome| outcome.kind == "session")
4516 .expect("session-row outcome present");
4517 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4518 for outcome in &second_outcomes {
4519 if outcome.kind == "message" || outcome.kind == "part" {
4520 assert_eq!(
4521 outcome.status,
4522 OutcomeStatus::Inserted,
4523 "new row must be Inserted, got: {outcome:?}",
4524 );
4525 }
4526 }
4527 Ok(())
4528 }
4529
4530 async fn store_with_messages(
4534 temp: &TempDir,
4535 count: usize,
4536 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4537 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4538 }
4539
4540 async fn store_with_messages_at_threshold(
4543 temp: &TempDir,
4544 count: usize,
4545 _vector_threshold: usize,
4546 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4547 let store = Store::open_local(temp.path()).await?;
4548 let sessions = 8.min(count.max(1));
4549 let mut events = Vec::new();
4550 for s in 0..sessions {
4551 events.push(IngestEvent::Session(Session {
4552 id: format!("session-{s}"),
4553 parent_session_id: None,
4554 parent_message_id: None,
4555 source_agent: "claude-code".to_owned(),
4556 created_at: Utc::now(),
4557 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4558 options: ProviderOptions::new(),
4559 }));
4560 for i in (s..count).step_by(sessions) {
4561 let message_id = format!("msg-{i}");
4562 events.push(IngestEvent::Message(Message::User {
4563 id: message_id.clone(),
4564 session_id: format!("session-{s}"),
4565 timestamp: Utc::now(),
4566 options: ProviderOptions::new(),
4567 }));
4568 events.push(IngestEvent::Part(Part {
4569 session_id: format!("session-{s}"),
4570 id: format!("{message_id}-part"),
4571 message_id,
4572 ordinal: 0,
4573 provenance: crate::wire::Provenance::Conversational,
4574 options: ProviderOptions::new(),
4575 kind: PartKind::Text {
4576 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4577 },
4578 }));
4579 }
4580 }
4581 ingest_events(&store, events).await?;
4582 let keys = (0..count)
4583 .map(|i| MessageKey {
4584 session_id: format!("session-{}", i % sessions),
4585 message_id: format!("msg-{i}"),
4586 })
4587 .collect();
4588 Ok((store, keys))
4589 }
4590
4591 fn synthetic_vector(seed: usize) -> Vec<f32> {
4593 let mut state = (seed as u64)
4594 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4595 .wrapping_add(1);
4596 (0..embedding_dim())
4597 .map(|_| {
4598 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4599 #[allow(clippy::cast_precision_loss)]
4600 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4601 unit - 1.0
4602 })
4603 .collect()
4604 }
4605
4606 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4608 keys.iter()
4609 .enumerate()
4610 .map(|(seed, key)| EmbeddedMessage {
4611 session_id: key.session_id.clone(),
4612 id: key.message_id.clone(),
4613 vector: synthetic_vector(seed),
4614 })
4615 .collect()
4616 }
4617
4618 fn embedding_update_batch_with_model(
4619 rows: &[EmbeddedMessage],
4620 model: &str,
4621 ) -> Result<RecordBatch> {
4622 let mut batch = embedding_update_batch(rows)?;
4623 let columns = batch
4624 .columns()
4625 .iter()
4626 .take(3)
4627 .cloned()
4628 .chain(std::iter::once(
4629 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4630 ))
4631 .collect::<Vec<_>>();
4632 batch = RecordBatch::try_new(batch.schema(), columns)?;
4633 Ok(batch)
4634 }
4635
4636 #[tokio::test]
4637 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4638 let temp = TempDir::new()?;
4639 let (store, keys) = store_with_messages(&temp, 4).await?;
4643 store.write_embeddings(&embedded(&keys)).await?;
4644 store.optimize_indices(None, None).await?.into_result()?;
4645
4646 let query = vec![0.01_f32; embedding_dim()];
4647 let plan = store
4648 .explain_vector_plan(
4649 &query,
4650 10,
4651 &Predicate::Eq("session_id", "session-3".into()),
4652 None,
4653 )
4654 .await?;
4655
4656 assert!(
4661 plan.contains("ScalarIndexQuery"),
4662 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4663 );
4664 let predicate_postfiltered = plan
4665 .lines()
4666 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4667 assert!(
4668 !predicate_postfiltered,
4669 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4670 );
4671 Ok(())
4672 }
4673
4674 #[tokio::test]
4675 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4676 let temp = TempDir::new()?;
4677 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4678
4679 store.write_embeddings(&embedded(&keys[..255])).await?;
4682 store
4683 .optimize_indices_with_vector_threshold(256)
4684 .await?
4685 .into_result()?;
4686 assert!(
4687 !store
4688 .handle
4689 .messages_index_names()
4690 .await?
4691 .iter()
4692 .any(|name| name == MESSAGES_VECTOR_INDEX),
4693 "IVF_PQ must not exist below the activation threshold",
4694 );
4695
4696 store.write_embeddings(&embedded(&keys[255..256])).await?;
4699 store
4700 .optimize_indices_with_vector_threshold(256)
4701 .await?
4702 .into_result()?;
4703 assert!(
4704 store
4705 .handle
4706 .messages_index_names()
4707 .await?
4708 .iter()
4709 .any(|name| name == MESSAGES_VECTOR_INDEX),
4710 "optimize must create the IVF_PQ once the threshold is crossed",
4711 );
4712
4713 let hits = store
4716 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4717 .await?;
4718 assert!(
4719 hits.iter().any(|(key, _)| key == &keys[0]),
4720 "an embedded row is retrievable via the index",
4721 );
4722 Ok(())
4723 }
4724
4725 #[tokio::test]
4726 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4727 {
4728 let temp = TempDir::new()?;
4729 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4730 let old_rows = embedded(&keys);
4731 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4732 store
4733 .handle
4734 .merge_update(Table::Messages, old_batch, old_rows.len())
4735 .await?;
4736 store
4737 .optimize_indices_with_vector_threshold(256)
4738 .await?
4739 .into_result()?;
4740 assert!(
4741 store
4742 .handle
4743 .messages_index_names()
4744 .await?
4745 .iter()
4746 .any(|name| name == MESSAGES_VECTOR_INDEX),
4747 "IVF_PQ must exist before a model swap",
4748 );
4749 assert_eq!(store.stale_embedding_count().await?, keys.len());
4750
4751 store.drop_vector_index().await?;
4752 let mut pending = Vec::new();
4753 let stream = store.pending_or_stale_messages();
4754 tokio::pin!(stream);
4755 while let Some(row) = stream.next().await {
4756 pending.push(row?);
4757 }
4758 assert_eq!(
4759 pending.len(),
4760 keys.len(),
4761 "force stream should see stale rows"
4762 );
4763 store.write_embeddings(&embedded(&keys)).await?;
4764 assert_eq!(store.stale_embedding_count().await?, 0);
4765 store
4766 .optimize_indices_with_vector_threshold(256)
4767 .await?
4768 .into_result()?;
4769 assert!(
4770 store
4771 .handle
4772 .messages_index_names()
4773 .await?
4774 .iter()
4775 .any(|name| name == MESSAGES_VECTOR_INDEX),
4776 "optimize must rebuild IVF_PQ after force re-embed",
4777 );
4778
4779 let stream = store.pending_or_stale_messages();
4780 tokio::pin!(stream);
4781 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4782 Ok(())
4783 }
4784
4785 #[tokio::test]
4786 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4787 let temp = TempDir::new()?;
4796 let (store, _keys) = store_with_messages(&temp, 4).await?;
4797
4798 for tag in 0..3 {
4801 let extra = synthetic_session(&format!("extra-{tag}"));
4802 store.upsert_sessions(&[extra]).await?;
4803 }
4804
4805 let dataset = store.handle.dataset(Table::Sessions).await?;
4810 dataset
4811 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4812 .await
4813 .context("cleanup_old_versions failed")?;
4814
4815 let map = store.session_last_ingested_at().await?;
4816 let session_count = store.row_counts().await?.0;
4817 assert!(
4818 map.len() >= session_count,
4819 "watermark map ({}) must still cover every session ({}) after \
4820 version cleanup; an empty fallback regresses pond sync to a \
4821 full re-scan",
4822 map.len(),
4823 session_count,
4824 );
4825 Ok(())
4826 }
4827
4828 #[tokio::test]
4829 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4830 let temp = TempDir::new()?;
4831 let (store, keys) = store_with_messages(&temp, 10).await?;
4832
4833 let before = store.embedding_progress().await?;
4834 assert_eq!(before.embedded, 0);
4835 assert_eq!(before.total, 10);
4836 assert_eq!(before.model, crate::embed::model_id());
4837
4838 store.write_embeddings(&embedded(&keys[..4])).await?;
4839 let partial = store.embedding_progress().await?;
4840 assert_eq!(partial.embedded, 4);
4841 assert_eq!(partial.total, 10);
4842
4843 store.write_embeddings(&embedded(&keys[4..])).await?;
4844 let full = store.embedding_progress().await?;
4845 assert_eq!(full.embedded, 10);
4846 assert_eq!(full.total, 10);
4847 Ok(())
4848 }
4849}