1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use async_stream::try_stream;
12use chrono::{DateTime, TimeZone, Utc};
13use lance::Dataset;
14use lance::dataset::{AutoCleanupParams, WriteMode, WriteParams};
15use lance::deps::arrow_array::{
16 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
17 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
18 UInt64Array, new_null_array,
19};
20use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
21use lance_file::version::LanceFileVersion;
22use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
23use serde::{Deserialize, Serialize, de::DeserializeOwned};
24use serde_json::Value;
25use tokio_stream::{Stream, StreamExt};
26
27use crate::{
28 config, embed,
29 substrate::{
30 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
31 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
32 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
33 },
34 wire::{
35 FileData, Message, Part, PartKind, ResponseMode, Role, SUMMARY_PART_TYPES, Session,
36 SessionFrom,
37 },
38};
39use url::Url;
40
41#[derive(Debug)]
42pub struct Store {
43 handle: Handle,
44}
45
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
47pub struct LanceArchiveCounts {
48 pub sessions: usize,
49 pub messages: usize,
50 pub parts: usize,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveVersions {
55 pub sessions: u64,
56 pub messages: u64,
57 pub parts: u64,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveExport {
62 pub rows: LanceArchiveCounts,
63 pub source_versions: LanceArchiveVersions,
64}
65
66#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
67pub struct LanceArchiveImport {
68 pub rows: LanceArchiveCounts,
69 pub inserted: LanceArchiveCounts,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct IndexIntents {
74 pub sessions: Vec<IndexIntent>,
75 pub messages: Vec<IndexIntent>,
76 pub parts: Vec<IndexIntent>,
77}
78
79impl IndexIntents {
80 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
81 [
82 (Table::Sessions, &self.sessions),
83 (Table::Messages, &self.messages),
84 (Table::Parts, &self.parts),
85 ]
86 }
87}
88
89#[derive(Debug, Clone, PartialEq)]
93pub struct PendingMessage {
94 pub session_id: String,
95 pub id: String,
96 pub search_text: String,
97}
98
99#[derive(Debug, Clone, PartialEq)]
102pub struct EmbeddedMessage {
103 pub session_id: String,
104 pub id: String,
105 pub vector: Vec<f32>,
106}
107
108#[derive(Debug, Clone, PartialEq)]
110pub struct MessageMeta {
111 pub message_id: String,
112 pub session_id: String,
113 pub role: String,
114 pub project: String,
115 pub source_agent: String,
116 pub timestamp: DateTime<Utc>,
117 pub search_text: String,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
121pub struct MessageKey {
122 pub session_id: String,
123 pub message_id: String,
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum UpsertStatus {
128 Inserted,
129 Matched,
130}
131
132#[derive(Debug, Default)]
137pub struct OptimizeOutcome {
138 pub tables: Vec<TableOptimizeOutcome>,
139}
140
141impl OptimizeOutcome {
142 pub fn any_indices_failed(&self) -> bool {
145 self.tables.iter().any(|t| t.indices.is_failed())
146 }
147
148 pub fn into_result(self) -> Result<Self> {
152 for table in &self.tables {
153 if let PhaseOutcome::Failed(error) = &table.indices {
154 anyhow::bail!(
155 "indices phase failed on {}: {error:#}",
156 table.table.as_str()
157 );
158 }
159 if let PhaseOutcome::Failed(error) = &table.compaction {
160 anyhow::bail!(
161 "compaction phase failed on {}: {error:#}",
162 table.table.as_str()
163 );
164 }
165 }
166 Ok(self)
167 }
168}
169
170#[derive(Debug, Clone)]
173pub struct CorpusStats {
174 pub data_url: Url,
175 pub totals: RowTotals,
176 pub adapters: Vec<AdapterStats>,
184 pub include_subagents: bool,
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub struct RowTotals {
192 pub sessions: u64,
193 pub messages: u64,
194 pub parts: u64,
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203pub struct EmbeddingProgress {
204 pub embedded: usize,
205 pub total: usize,
206 pub model: &'static str,
207}
208
209#[derive(Debug, Clone)]
210pub struct AdapterStats {
211 pub adapter: String,
215 pub sessions: u64,
216 pub messages: u64,
217 pub projects: Vec<ProjectStats>,
220}
221
222#[derive(Debug, Clone)]
223pub struct ProjectStats {
224 pub project: String,
225 pub sessions: u64,
226 pub messages: u64,
227}
228
229#[derive(Default)]
230struct GroupAccumulator {
231 messages: u64,
232 session_ids: HashSet<String>,
233}
234
235#[derive(Debug, Clone, Copy)]
236pub struct MessageWrite<'a> {
237 pub message: &'a Message,
238 pub parts: &'a [Part],
239 pub search_text: Option<&'a str>,
240}
241
242impl Store {
243 pub async fn open(location: &Url) -> Result<Self> {
249 Ok(Self {
250 handle: Handle::open(location).await?,
251 })
252 }
253
254 pub async fn open_with_options(
260 location: &Url,
261 storage_options: std::collections::HashMap<String, String>,
262 caps: crate::substrate::RuntimeCaps,
263 ) -> Result<Self> {
264 Ok(Self {
265 handle: Handle::open_with_options(location, storage_options, caps).await?,
266 })
267 }
268
269 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
274 let url = config::url_for_path(path)?;
275 Self::open_with_options(
276 &url,
277 std::collections::HashMap::new(),
278 crate::substrate::RuntimeCaps::default(),
279 )
280 .await
281 }
282
283 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
291 std::fs::create_dir_all(dest)
292 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
293 let (sessions, sessions_version) = self
294 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
295 .await?;
296 let (messages, messages_version) = self
297 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
298 .await?;
299 let (parts, parts_version) = self
300 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
301 .await?;
302 Ok(LanceArchiveExport {
303 rows: LanceArchiveCounts {
304 sessions,
305 messages,
306 parts,
307 },
308 source_versions: LanceArchiveVersions {
309 sessions: sessions_version,
310 messages: messages_version,
311 parts: parts_version,
312 },
313 })
314 }
315
316 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
317 let sessions_dataset =
318 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
319 let messages_dataset =
320 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
321 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
322 let (sessions, sessions_inserted) = self
323 .import_clean_table(Table::Sessions, sessions_dataset)
324 .await?;
325 let (messages, messages_inserted) = self
326 .import_clean_table(Table::Messages, messages_dataset)
327 .await?;
328 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
329 Ok(LanceArchiveImport {
330 rows: LanceArchiveCounts {
331 sessions,
332 messages,
333 parts,
334 },
335 inserted: LanceArchiveCounts {
336 sessions: sessions_inserted,
337 messages: messages_inserted,
338 parts: parts_inserted,
339 },
340 })
341 }
342
343 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
344 let dataset = self.handle.dataset(table).await?;
345 let source_version = dataset.version_id();
346 let schema = export_schema(table);
347 let mut scan = dataset.scan();
348 scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
353 let mut stream = scan
354 .try_into_stream()
355 .await
356 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
357 let dest_uri = dest
358 .to_str()
359 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
360
361 let mut rows = 0usize;
362 let mut wrote = false;
363 while let Some(batch) = stream.next().await {
364 let batch = batch
365 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
366 rows += batch.num_rows();
367 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
368 let mut params = write_params_for_create();
369 if wrote {
370 params.mode = WriteMode::Append;
371 }
372 Dataset::write(reader, dest_uri, Some(params))
373 .await
374 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
375 wrote = true;
376 }
377
378 if !wrote {
379 let batch = RecordBatch::new_empty(schema.clone());
380 let reader = RecordBatchIterator::new([Ok(batch)], schema);
381 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
382 .await
383 .with_context(|| {
384 format!("failed to write empty {} archive table", table.as_str())
385 })?;
386 }
387 Ok((rows, source_version))
388 }
389
390 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
391 let _ = self.handle.dataset(table).await?;
395 let mut scan = dataset.scan();
396 scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
399 let mut stream = scan
400 .try_into_stream()
401 .await
402 .with_context(|| format!("failed to scan {} archive table", table.as_str()))?;
403 let mut rows = 0usize;
404 let mut inserted = 0usize;
405 while let Some(batch) = stream.next().await {
406 let batch = batch
407 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
408 let row_count = batch.num_rows();
409 rows += row_count;
410 inserted += self
411 .handle
412 .merge_insert(table, batch, row_count)
413 .await
414 .with_context(|| format!("failed to import {} archive table", table.as_str()))?
415 as usize;
416 }
417 Ok((rows, inserted))
418 }
419
420 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
425 if sessions.is_empty() {
426 return Ok(());
427 }
428 let batches = sessions_batches(sessions)?;
429 merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
430 Ok(())
431 }
432
433 async fn upsert_session_batch(
457 &self,
458 substreams: Vec<CompletedSubstream>,
459 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
460 if substreams.is_empty() {
461 return Ok((Vec::new(), BatchCounts::default()));
462 }
463
464 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
465 let mut counts = BatchCounts::default();
466
467 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
471 let mut by_session_id: std::collections::HashMap<String, usize> =
472 std::collections::HashMap::with_capacity(substreams.len());
473 for substream in substreams {
474 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
475 let existing = &merged[existing_idx];
476 if existing.session.source_agent != substream.session.source_agent
477 || existing.session.project != substream.session.project
478 {
479 let reason = if existing.session.source_agent != substream.session.source_agent
484 {
485 IngestError::ImmutableField {
486 field: "source_agent",
487 session_id: substream.session.id.clone(),
488 stored: existing.session.source_agent.clone(),
489 attempted: substream.session.source_agent.clone(),
490 }
491 } else {
492 IngestError::ImmutableField {
493 field: "project",
494 session_id: substream.session.id.clone(),
495 stored: (*existing.session.project).clone(),
496 attempted: (*substream.session.project).clone(),
497 }
498 };
499 let field = match &reason {
500 IngestError::ImmutableField { field, .. } => Some(*field),
501 };
502 let reason_key = match field {
503 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
504 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
505 _ => DROP_REASON_UNCATEGORIZED,
506 };
507 outcomes.extend(error_outcomes_for_substream(
508 substream.session_index,
509 &substream.session,
510 &substream.messages,
511 reason.to_string(),
512 field,
513 reason_key,
514 ));
515 continue;
516 }
517 let existing = &mut merged[existing_idx];
522 let mut seen: std::collections::HashSet<String> = existing
523 .messages
524 .iter()
525 .map(|m| m.message.id().to_owned())
526 .collect();
527 for msg in substream.messages {
528 if seen.insert(msg.message.id().to_owned()) {
529 existing.messages.push(msg);
530 }
531 }
532 continue;
533 }
534 by_session_id.insert(substream.session.id.clone(), merged.len());
535 merged.push(substream);
536 }
537
538 let session_id_values: Vec<ScalarValue> = merged
543 .iter()
544 .map(|substream| ScalarValue::String(substream.session.id.clone()))
545 .collect();
546 let existing_sessions: std::collections::HashMap<String, Session> =
547 if session_id_values.is_empty() {
548 std::collections::HashMap::new()
549 } else {
550 let batch = self
551 .handle
552 .scan_batch(
553 Table::Sessions,
554 Some(&Predicate::In("id", session_id_values.clone())),
555 &[],
556 )
557 .await?;
558 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
559 for row in 0..batch.num_rows() {
560 let session = session_from_batch(&batch, row)?;
561 map.insert(session.id.clone(), session);
562 }
563 map
564 };
565 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
566 HashSet::new()
567 } else {
568 let batch = self
569 .handle
570 .scan_batch(
571 Table::Messages,
572 Some(&Predicate::In("session_id", session_id_values.clone())),
573 &["session_id", "id"],
574 )
575 .await?;
576 let mut set = HashSet::with_capacity(batch.num_rows());
577 for row in 0..batch.num_rows() {
578 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
579 let mid = string(&batch, "id", row)?.context("message id is null")?;
580 set.insert((sid, mid));
581 }
582 set
583 };
584 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
585 HashSet::new()
586 } else {
587 let batch = self
588 .handle
589 .scan_batch(
590 Table::Parts,
591 Some(&Predicate::In("session_id", session_id_values)),
592 &["session_id", "message_id", "id"],
593 )
594 .await?;
595 let mut set = HashSet::with_capacity(batch.num_rows());
596 for row in 0..batch.num_rows() {
597 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
598 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
599 let pid = string(&batch, "id", row)?.context("part id is null")?;
600 set.insert((sid, mid, pid));
601 }
602 set
603 };
604
605 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
606 for substream in merged {
607 if let Some(existing) = existing_sessions.get(&substream.session.id)
608 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
609 {
610 let field = match &failure {
611 IngestError::ImmutableField { field, .. } => Some(*field),
612 };
613 let reason_key = match field {
614 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
615 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
616 _ => DROP_REASON_UNCATEGORIZED,
617 };
618 outcomes.extend(error_outcomes_for_substream(
619 substream.session_index,
620 &substream.session,
621 &substream.messages,
622 failure.to_string(),
623 field,
624 reason_key,
625 ));
626 continue;
627 }
628 writeable.push(substream);
629 }
630
631 if writeable.is_empty() {
632 outcomes.sort_by_key(|outcome| outcome.index);
633 return Ok((outcomes, counts));
634 }
635
636 let sessions_owned: Vec<Session> = writeable
637 .iter()
638 .map(|substream| substream.session.clone())
639 .collect();
640 let message_rows: Vec<MessageBatchRow<'_>> = writeable
641 .iter()
642 .flat_map(|substream| {
643 substream.messages.iter().map(|buffered| MessageBatchRow {
644 message: &buffered.message,
645 source_agent: &substream.session.source_agent,
646 project: &substream.session.project,
647 search_text: buffered.search_text.as_deref(),
648 })
649 })
650 .collect();
651 let part_rows: Vec<Part> = writeable
652 .iter()
653 .flat_map(|substream| {
654 substream.messages.iter().flat_map(|buffered| {
655 buffered
656 .parts
657 .iter()
658 .map(|buffered_part| buffered_part.part.clone())
659 })
660 })
661 .collect();
662
663 let session_batches = sessions_batches(&sessions_owned)?;
664 let message_batches = messages_batches(&message_rows)?;
665 let part_batches = parts_batches(&part_rows)?;
666
667 let (_sessions_inserted, _messages_inserted, _parts_inserted) = tokio::try_join!(
675 merge_insert_chunks(&self.handle, Table::Sessions, session_batches),
676 merge_insert_chunks(&self.handle, Table::Messages, message_batches),
677 merge_insert_chunks(&self.handle, Table::Parts, part_batches),
678 )?;
679
680 for substream in &writeable {
681 outcomes.extend(success_outcomes_for_substream(
682 substream.session_index,
683 &substream.session,
684 &substream.messages,
685 &existing_sessions,
686 &existing_message_pks,
687 &existing_part_pks,
688 &mut counts,
689 ));
690 }
691
692 outcomes.sort_by_key(|outcome| outcome.index);
693 Ok((outcomes, counts))
694 }
695
696 pub async fn upsert_messages(
697 &self,
698 session: &Session,
699 messages: &[MessageWrite<'_>],
700 ) -> Result<()> {
701 if messages.is_empty() {
702 return Ok(());
703 }
704
705 let rows = messages
706 .iter()
707 .map(|write| MessageBatchRow {
708 message: write.message,
709 source_agent: &session.source_agent,
710 project: &session.project,
711 search_text: write.search_text,
712 })
713 .collect::<Vec<_>>();
714 let batches = messages_batches(&rows)?;
715 merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
716 Ok(())
717 }
718
719 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
720 if parts.is_empty() {
721 return Ok(());
722 }
723 let batches = parts_batches(parts)?;
724 merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
725 Ok(())
726 }
727
728 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
729 let Some(session) = self.find_session(session_id).await? else {
730 return Ok(None);
731 };
732 let messages = self.messages_for_session(session_id).await?;
733 Ok(Some(SessionWithMessages { session, messages }))
734 }
735
736 pub async fn session_ids(&self) -> Result<Vec<String>> {
738 let batch = self
739 .handle
740 .scan_batch(Table::Sessions, None, &["id"])
741 .await?;
742 let mut ids = Vec::with_capacity(batch.num_rows());
743 for row in 0..batch.num_rows() {
744 if let Some(id) = string(&batch, "id", row)? {
745 ids.push(id);
746 }
747 }
748 Ok(ids)
749 }
750
751 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
752 let batch = self
753 .handle
754 .scan_batch(
755 Table::Sessions,
756 Some(&Predicate::Eq(
757 "parent_session_id",
758 parent_session_id.into(),
759 )),
760 &[
761 "id",
762 "parent_session_id",
763 "parent_message_id",
764 "source_agent",
765 "created_at",
766 "project",
767 "options",
768 ],
769 )
770 .await?;
771 let mut sessions = Vec::with_capacity(batch.num_rows());
772 for row in 0..batch.num_rows() {
773 sessions.push(session_from_batch(&batch, row)?);
774 }
775 sessions.sort_by(|left, right| left.id.cmp(&right.id));
776 Ok(sessions)
777 }
778
779 pub async fn session_last_ingested_at(&self) -> Result<HashMap<String, DateTime<Utc>>> {
785 use lance::deps::arrow_array::UInt64Array;
786
787 let dataset = self.handle.dataset(Table::Sessions).await?;
788 let version_list = dataset.versions().await?;
789 let versions: HashMap<u64, DateTime<Utc>> = version_list
790 .iter()
791 .map(|v| (v.version, v.timestamp))
792 .collect();
793 let oldest_visible_ts = version_list.iter().map(|v| v.timestamp).min();
801
802 let scanner = self
803 .handle
804 .scan(
805 Table::Sessions,
806 ScanOpts::project_only(&["id", "_row_last_updated_at_version"]),
807 )
808 .await?;
809 let mut stream = scanner.try_into_stream().await?;
810 let mut out: HashMap<String, DateTime<Utc>> = HashMap::new();
811 while let Some(batch) = stream.next().await {
812 let batch = batch?;
813 let version_array = batch
814 .column_by_name("_row_last_updated_at_version")
815 .context("missing _row_last_updated_at_version column")?
816 .as_any()
817 .downcast_ref::<UInt64Array>()
818 .context("_row_last_updated_at_version is not UInt64")?;
819 for row in 0..batch.num_rows() {
820 let Some(id) = string(&batch, "id", row)? else {
821 continue;
822 };
823 if version_array.is_null(row) {
824 continue;
825 }
826 let version = version_array.value(row);
827 let ts = versions.get(&version).copied().or(oldest_visible_ts);
828 if let Some(ts) = ts {
829 out.insert(id, ts);
830 }
831 }
832 }
833 Ok(out)
834 }
835
836 pub async fn session_view(
843 &self,
844 session_id: &str,
845 params: SessionViewParams<'_>,
846 ) -> Result<GetLookup<SessionPage>> {
847 let Some(session) = self.find_session(session_id).await? else {
848 return Ok(GetLookup::NotFound);
849 };
850
851 let mut rows = match params.mode {
852 ResponseMode::Conversational => self
853 .scan_conversational_messages(session_id)
854 .await?
855 .into_iter()
856 .map(|row| ScanRow {
857 id: row.message_id,
858 role: row.role,
859 timestamp: row.timestamp,
860 text: Some(row.text.into_inner()),
861 content: None,
862 })
863 .collect(),
864 ResponseMode::Complete | ResponseMode::Verbatim => {
865 self.scan_all_messages(session_id).await?
866 }
867 };
868 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
869
870 let start_at = match params.after_id {
871 Some(after) => match rows.iter().position(|row| row.id == after) {
874 Some(idx) => idx + 1,
875 None => return Ok(GetLookup::UnknownAfterId),
876 },
877 None => 0,
878 };
879 let remaining = rows.get(start_at..).unwrap_or(&[]);
880 let (emitted, messages_remaining) = match params.session_from {
881 SessionFrom::Start => {
882 let n = page_by(remaining, params.limit, params.budget_bytes, |row| {
883 row.text.as_deref().map_or(0, str::len)
884 });
885 (&remaining[..n], remaining.len() - n)
886 }
887 SessionFrom::End => {
891 let mut bytes = 0usize;
892 let mut start = remaining.len();
893 for row in remaining.iter().rev() {
894 if remaining.len() - start >= params.limit {
895 break;
896 }
897 let size = row.text.as_deref().map_or(0, str::len);
898 if start < remaining.len() && bytes + size > params.budget_bytes {
899 break;
900 }
901 bytes += size;
902 start -= 1;
903 }
904 (&remaining[start..], start)
905 }
906 };
907 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
908
909 let mut parts_by_message = match params.mode {
912 ResponseMode::Verbatim => self.parts_for_messages(session_id, &ids).await?,
913 ResponseMode::Conversational | ResponseMode::Complete => {
914 self.summary_parts_for_messages(session_id, &ids).await?
915 }
916 };
917 let messages = emitted
918 .iter()
919 .map(|row| RetrievedMessage {
920 id: row.id.clone(),
921 role: row.role,
922 timestamp: row.timestamp,
923 text: row.text.clone(),
924 content: row.content.clone(),
925 parts: parts_by_message
926 .remove(&(session_id.to_owned(), row.id.clone()))
927 .unwrap_or_default(),
928 })
929 .collect();
930
931 Ok(GetLookup::Found(SessionPage {
932 session,
933 messages,
934 messages_remaining,
935 }))
936 }
937
938 pub async fn message_view(
944 &self,
945 message_id: &str,
946 params: MessageViewParams<'_>,
947 ) -> Result<GetLookup<MessagePage>> {
948 let Some(session_id) = self.session_id_for_message(message_id).await? else {
949 return Ok(GetLookup::NotFound);
950 };
951 let Some(session) = self.find_session(&session_id).await? else {
952 return Ok(GetLookup::NotFound);
953 };
954 let mut rows = self.scan_all_messages(&session_id).await?;
955 if matches!(params.mode, ResponseMode::Conversational) {
961 rows.retain(|row| row.text.is_some() || row.id == message_id);
962 }
963 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
964 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
965 return Ok(GetLookup::NotFound);
966 };
967
968 let start = target_pos.saturating_sub(params.context_depth);
969 let end = (target_pos + params.context_depth + 1).min(rows.len());
970 let window = &rows[start..end];
971 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
972 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
975
976 let all_parts = parts_by_message
977 .remove(&(session_id.clone(), message_id.to_owned()))
978 .unwrap_or_default();
979 let start_part = match params.after_id {
980 Some(after) => match all_parts.iter().find(|part| part.id == after) {
984 Some(anchor) => all_parts
985 .iter()
986 .position(|part| part.ordinal > anchor.ordinal)
987 .unwrap_or(all_parts.len()),
988 None => return Ok(GetLookup::UnknownAfterId),
989 },
990 None => 0,
991 };
992 let remaining_parts = all_parts.get(start_part..).unwrap_or(&[]);
993 let part_count = page_by(remaining_parts, params.limit, params.budget_bytes, |part| {
994 serde_json::to_string(part).map_or(0, |json| json.len())
995 });
996 let target_parts = remaining_parts[..part_count].to_vec();
997 let target_parts_remaining = remaining_parts.len() - part_count;
998
999 let target_row = &rows[target_pos];
1000 let target = RetrievedMessage {
1001 id: target_row.id.clone(),
1002 role: target_row.role,
1003 timestamp: target_row.timestamp,
1004 text: target_row.text.clone(),
1005 content: target_row.content.clone(),
1006 parts: Vec::new(),
1008 };
1009 let siblings = window
1010 .iter()
1011 .enumerate()
1012 .filter(|(idx, _)| start + idx != target_pos)
1013 .map(|(_, row)| RetrievedMessage {
1014 id: row.id.clone(),
1015 role: row.role,
1016 timestamp: row.timestamp,
1017 text: row.text.clone(),
1018 content: row.content.clone(),
1019 parts: parts_by_message
1020 .get(&(session_id.clone(), row.id.clone()))
1021 .cloned()
1022 .unwrap_or_default(),
1023 })
1024 .collect();
1025
1026 Ok(GetLookup::Found(MessagePage {
1027 session,
1028 target,
1029 target_parts,
1030 target_parts_remaining,
1031 siblings,
1032 }))
1033 }
1034
1035 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1036 let batch = self
1037 .handle
1038 .scan_batch(
1039 Table::Messages,
1040 Some(&Predicate::Eq("session_id", session_id.into())),
1041 &["id", "timestamp", "role", "search_text", "content"],
1042 )
1043 .await?;
1044 let mut rows = Vec::with_capacity(batch.num_rows());
1045 for row in 0..batch.num_rows() {
1046 let id = string(&batch, "id", row)?.context("message id is null")?;
1047 let role =
1048 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1049 let timestamp = datetime(&batch, "timestamp", row)?;
1050 rows.push(ScanRow {
1051 id,
1052 role,
1053 timestamp,
1054 text: string(&batch, "search_text", row)?,
1055 content: string(&batch, "content", row)?,
1056 });
1057 }
1058 Ok(rows)
1059 }
1060
1061 pub async fn scan_conversational_messages(
1065 &self,
1066 session_id: &str,
1067 ) -> Result<Vec<ConversationalRow>> {
1068 let filter = Predicate::And(vec![
1069 Predicate::Eq("session_id", session_id.into()),
1070 Predicate::IsNotNull("search_text"),
1071 ]);
1072 let batch = self
1073 .handle
1074 .scan_batch(
1075 Table::Messages,
1076 Some(&filter),
1077 &["id", "timestamp", "role", "search_text"],
1078 )
1079 .await?;
1080
1081 let mut rows = Vec::with_capacity(batch.num_rows());
1082 for row in 0..batch.num_rows() {
1083 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1084 let role =
1085 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1086 let timestamp = datetime(&batch, "timestamp", row)?;
1087 let text_str = string(&batch, "search_text", row)?.context(
1088 "search_text null after IsNotNull pushdown - storage invariant violated",
1089 )?;
1090 rows.push(ConversationalRow {
1091 session_id: session_id.to_owned(),
1092 message_id,
1093 role,
1094 timestamp,
1095 text: SearchText(text_str),
1096 });
1097 }
1098 rows.sort_by(|a, b| {
1099 a.timestamp
1100 .cmp(&b.timestamp)
1101 .then_with(|| a.message_id.cmp(&b.message_id))
1102 });
1103 Ok(rows)
1104 }
1105
1106 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1109 let batch = self
1110 .handle
1111 .scan_batch(
1112 Table::Messages,
1113 Some(&Predicate::Eq("id", message_id.into())),
1114 &["session_id"],
1115 )
1116 .await?;
1117 if batch.num_rows() == 0 {
1118 return Ok(None);
1119 }
1120 string(&batch, "session_id", 0)
1121 }
1122
1123 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1124 self.handle.row_counts().await
1125 }
1126
1127 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1131 Ok(Arc::new(self.handle.dataset(table).await?))
1132 }
1133
1134 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1136 self.handle.export_write(name, bytes).await
1137 }
1138
1139 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1141 self.handle.export_read(name).await
1142 }
1143
1144 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1146 self.handle.export_local_path(name)
1147 }
1148
1149 pub async fn corpus_stats(&self, include_subagents: bool) -> Result<CorpusStats> {
1155 let scanner = self
1156 .handle
1157 .scan(
1158 Table::Messages,
1159 ScanOpts::project_only(&["source_agent", "project", "session_id"]),
1160 )
1161 .await?;
1162 let mut stream = scanner.try_into_stream().await?;
1163 let mut groups: HashMap<(String, String), GroupAccumulator> = HashMap::new();
1164 while let Some(batch) = stream.next().await {
1165 let batch = batch?;
1166 for row in 0..batch.num_rows() {
1167 let source_agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1168 let project = string(&batch, "project", row)?.unwrap_or_default();
1169 let session_id = string(&batch, "session_id", row)?.unwrap_or_default();
1170 let is_subagent = source_agent.contains('/');
1171 if is_subagent && !include_subagents {
1172 continue;
1173 }
1174 let entry = groups.entry((source_agent, project)).or_default();
1175 entry.messages += 1;
1176 entry.session_ids.insert(session_id);
1177 }
1178 }
1179
1180 let (totals_sessions, totals_messages, totals_parts) = self.handle.row_counts().await?;
1181 let totals = RowTotals {
1182 sessions: totals_sessions as u64,
1183 messages: totals_messages as u64,
1184 parts: totals_parts as u64,
1185 };
1186
1187 let mut by_adapter: BTreeMap<String, Vec<ProjectStats>> = BTreeMap::new();
1188 for ((adapter, project), acc) in groups {
1189 by_adapter.entry(adapter).or_default().push(ProjectStats {
1190 project,
1191 sessions: acc.session_ids.len() as u64,
1192 messages: acc.messages,
1193 });
1194 }
1195
1196 let mut adapters = Vec::with_capacity(by_adapter.len());
1197 for (adapter, mut projects) in by_adapter {
1198 projects.sort_by(|a, b| {
1199 b.messages
1200 .cmp(&a.messages)
1201 .then_with(|| a.project.cmp(&b.project))
1202 });
1203 let sessions: u64 = projects.iter().map(|p| p.sessions).sum();
1204 let messages: u64 = projects.iter().map(|p| p.messages).sum();
1205 adapters.push(AdapterStats {
1206 adapter,
1207 sessions,
1208 messages,
1209 projects,
1210 });
1211 }
1212
1213 Ok(CorpusStats {
1214 data_url: self.handle.location().clone(),
1215 totals,
1216 adapters,
1217 include_subagents,
1218 })
1219 }
1220
1221 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1226 if rows.is_empty() {
1227 return Ok(());
1228 }
1229 let batch = embedding_update_batch(rows)?;
1230 self.handle
1231 .merge_update(Table::Messages, batch, rows.len())
1232 .await?;
1233 Ok(())
1234 }
1235
1236 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1239 try_stream! {
1240 let filter = Predicate::And(vec![
1241 Predicate::IsNull("vector"),
1242 Predicate::IsNotNull("search_text"),
1243 ]);
1244 let projection: &[&str] = &["session_id", "id", "search_text"];
1245 let scanner = self
1246 .handle
1247 .scan(
1248 Table::Messages,
1249 ScanOpts::with_predicate_and_projection(&filter, projection),
1250 )
1251 .await?;
1252 let mut batches = scanner
1253 .try_into_stream()
1254 .await
1255 .context("failed to open messages stream")?;
1256 while let Some(batch) = batches.next().await {
1257 let batch = batch?;
1258 for row in 0..batch.num_rows() {
1259 yield PendingMessage {
1260 session_id: string(&batch, "session_id", row)?
1261 .context("session_id is null")?,
1262 id: string(&batch, "id", row)?.context("message id is null")?,
1263 search_text: string(&batch, "search_text", row)?
1264 .context("search_text is null")?,
1265 };
1266 }
1267 }
1268 }
1269 }
1270
1271 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1276 try_stream! {
1277 let filter = Predicate::And(vec![
1278 Predicate::IsNotNull("search_text"),
1279 Predicate::Or(vec![
1280 Predicate::IsNull("vector"),
1281 Predicate::Ne("embedding_model", embed::model_id().into()),
1282 ]),
1283 ]);
1284 let projection: &[&str] = &["session_id", "id", "search_text"];
1285 let scanner = self
1286 .handle
1287 .scan(
1288 Table::Messages,
1289 ScanOpts::with_predicate_and_projection(&filter, projection),
1290 )
1291 .await?;
1292 let mut batches = scanner
1293 .try_into_stream()
1294 .await
1295 .context("failed to open pending-or-stale messages stream")?;
1296 while let Some(batch) = batches.next().await {
1297 let batch = batch?;
1298 for row in 0..batch.num_rows() {
1299 yield PendingMessage {
1300 session_id: string(&batch, "session_id", row)?
1301 .context("session_id is null")?,
1302 id: string(&batch, "id", row)?.context("message id is null")?,
1303 search_text: string(&batch, "search_text", row)?
1304 .context("search_text is null")?,
1305 };
1306 }
1307 }
1308 }
1309 }
1310
1311 pub async fn fts_search(
1313 &self,
1314 query: &str,
1315 limit: usize,
1316 filter: &Predicate,
1317 ) -> Result<Vec<(MessageKey, f32)>> {
1318 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1319 scanner.full_text_search(
1320 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1321 )?;
1322 scanner.disable_scoring_autoprojection();
1328 scanner.project(&["session_id", "id", "_score"])?;
1329 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1330 let batch = scanner.try_into_batch().await?;
1331 let mut hits = Vec::with_capacity(batch.num_rows());
1332 for row in 0..batch.num_rows() {
1333 let key = MessageKey {
1334 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1335 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1336 };
1337 hits.push((key, float32(&batch, "_score", row)?));
1338 }
1339 hits.sort_by(|left, right| {
1347 right
1348 .1
1349 .partial_cmp(&left.1)
1350 .unwrap_or(std::cmp::Ordering::Equal)
1351 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1352 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1353 });
1354 Ok(hits)
1355 }
1356
1357 pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
1364 let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
1365 let dataset = self.handle.dataset(Table::Messages).await?;
1366 dataset
1367 .count_rows(Some(scope.to_lance()))
1368 .await
1369 .map_err(Into::into)
1370 }
1371
1372 pub async fn has_embeddings(&self) -> Result<bool> {
1377 let scope = Predicate::IsNotNull("vector");
1378 let mut scanner = self
1379 .handle
1380 .scan(
1381 Table::Messages,
1382 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
1383 )
1384 .await?;
1385 scanner.limit(Some(1), None)?;
1386 let batch = scanner.try_into_batch().await?;
1387 Ok(batch.num_rows() > 0)
1388 }
1389
1390 pub async fn vector_search(
1398 &self,
1399 query: &[f32],
1400 limit: usize,
1401 filter: &Predicate,
1402 search: Option<&config::SearchConfig>,
1403 ) -> Result<Vec<(MessageKey, f32)>> {
1404 let scope = embedded_scope(filter);
1405 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1406 let key = Float32Array::from(query.to_vec());
1407 scanner.nearest("vector", &key, limit)?;
1408 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1409 scanner.nprobes(nprobes);
1410 }
1411 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1412 scanner.refine(refine_factor);
1413 }
1414 scanner.disable_scoring_autoprojection();
1418 scanner.project(&["session_id", "id", "_distance"])?;
1419 let batch = scanner.try_into_batch().await?;
1420 let mut hits = Vec::with_capacity(batch.num_rows());
1421 for row in 0..batch.num_rows() {
1422 let key = MessageKey {
1423 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1424 message_id: string(&batch, "id", row)?.context("message id is null")?,
1425 };
1426 hits.push((key, float32(&batch, "_distance", row)?));
1427 }
1428 hits.sort_by(|left, right| {
1434 left.1
1435 .partial_cmp(&right.1)
1436 .unwrap_or(std::cmp::Ordering::Equal)
1437 .then_with(|| left.0.session_id.cmp(&right.0.session_id))
1438 .then_with(|| left.0.message_id.cmp(&right.0.message_id))
1439 });
1440 Ok(hits)
1441 }
1442
1443 pub async fn explain_vector_plan(
1446 &self,
1447 query: &[f32],
1448 limit: usize,
1449 filter: &Predicate,
1450 search: Option<&config::SearchConfig>,
1451 ) -> Result<String> {
1452 let scope = embedded_scope(filter);
1453 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
1454 let key = Float32Array::from(query.to_vec());
1455 scanner.nearest("vector", &key, limit)?;
1456 if let Some(nprobes) = search.and_then(|cfg| cfg.nprobes) {
1457 scanner.nprobes(nprobes);
1458 }
1459 if let Some(refine_factor) = search.and_then(|cfg| cfg.refine_factor) {
1460 scanner.refine(refine_factor);
1461 }
1462 scanner
1463 .explain_plan(true)
1464 .await
1465 .context("explain_plan failed")
1466 }
1467
1468 pub async fn explain_fts_plan(
1469 &self,
1470 query: &str,
1471 limit: usize,
1472 filter: &Predicate,
1473 ) -> Result<String> {
1474 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
1475 scanner.full_text_search(
1476 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
1477 )?;
1478 scanner.project(&["session_id", "id"])?;
1479 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
1480 scanner
1481 .explain_plan(true)
1482 .await
1483 .context("explain_plan failed")
1484 }
1485
1486 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
1488 if keys.is_empty() {
1489 return Ok(Vec::new());
1490 }
1491 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
1492 let session_ids = keys
1493 .iter()
1494 .map(|key| key.session_id.clone())
1495 .collect::<Vec<_>>();
1496 let message_ids = keys
1497 .iter()
1498 .map(|key| key.message_id.clone())
1499 .collect::<Vec<_>>();
1500 let predicate = Predicate::And(vec![
1501 in_predicate("session_id", &session_ids),
1502 in_predicate("id", &message_ids),
1503 ]);
1504 let batch = self
1505 .handle
1506 .scan_batch(
1507 Table::Messages,
1508 Some(&predicate),
1509 &[
1510 "id",
1511 "session_id",
1512 "role",
1513 "project",
1514 "source_agent",
1515 "timestamp",
1516 "search_text",
1517 ],
1518 )
1519 .await?;
1520 let mut metas = Vec::with_capacity(batch.num_rows());
1521 for row in 0..batch.num_rows() {
1522 let message_id = string(&batch, "id", row)?.context("id is null")?;
1523 let session_id = string(&batch, "session_id", row)?.context("session_id is null")?;
1524 if !wanted.contains(&MessageKey {
1525 session_id: session_id.clone(),
1526 message_id: message_id.clone(),
1527 }) {
1528 continue;
1529 }
1530 metas.push(MessageMeta {
1531 message_id,
1532 session_id,
1533 role: string(&batch, "role", row)?.context("role is null")?,
1534 project: string(&batch, "project", row)?.context("project is null")?,
1535 source_agent: string(&batch, "source_agent", row)?
1536 .context("source_agent is null")?,
1537 timestamp: datetime(&batch, "timestamp", row)?,
1538 search_text: string(&batch, "search_text", row)?.unwrap_or_default(),
1539 });
1540 }
1541 Ok(metas)
1542 }
1543
1544 pub async fn session_message_counts(
1546 &self,
1547 session_ids: &[String],
1548 ) -> Result<BTreeMap<String, usize>> {
1549 if session_ids.is_empty() {
1550 return Ok(BTreeMap::new());
1551 }
1552 let dataset = self.handle.dataset(Table::Messages).await?;
1553 let mut tasks = tokio::task::JoinSet::new();
1554 for session_id in session_ids {
1555 let dataset = dataset.clone();
1556 let session_id = session_id.clone();
1557 tasks.spawn(async move {
1558 let filter = Predicate::Eq("session_id", session_id.as_str().into()).to_lance();
1559 let count = dataset.count_rows(Some(filter)).await?;
1560 anyhow::Ok((session_id, count))
1561 });
1562 }
1563 let mut counts = BTreeMap::new();
1564 while let Some(joined) = tasks.join_next().await {
1565 let (session_id, count) = joined.context("session count task panicked")??;
1566 counts.insert(session_id, count);
1567 }
1568 Ok(counts)
1569 }
1570
1571 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
1574 self.handle
1575 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
1576 .await
1577 }
1578
1579 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
1585 self.handle
1586 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
1587 .await
1588 }
1589
1590 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
1597 let dataset = self.handle.dataset(Table::Messages).await?;
1598 let embedded = dataset
1599 .count_rows(Some(Predicate::IsNotNull("vector").to_lance()))
1600 .await?;
1601 let total = dataset
1602 .count_rows(Some(Predicate::IsNotNull("search_text").to_lance()))
1603 .await?;
1604 Ok(EmbeddingProgress {
1605 embedded,
1606 total,
1607 model: embed::model_id(),
1608 })
1609 }
1610
1611 pub async fn stale_embedding_count(&self) -> Result<usize> {
1615 let dataset = self.handle.dataset(Table::Messages).await?;
1616 dataset
1617 .count_rows(Some(
1618 Predicate::And(vec![
1619 Predicate::IsNotNull("vector"),
1620 Predicate::Ne("embedding_model", embed::model_id().into()),
1621 ])
1622 .to_lance(),
1623 ))
1624 .await
1625 .map_err(Into::into)
1626 }
1627
1628 pub async fn optimize_indices(
1634 &self,
1635 progress: Option<OptimizeProgressFn>,
1636 maintenance: &MaintenancePolicy,
1637 ) -> Result<OptimizeOutcome> {
1638 let intents = pond_index_intents();
1639 let mut tables = Vec::with_capacity(3);
1640 for (table, intents) in intents.all() {
1641 let outcome = self
1642 .handle
1643 .optimize_table(table, intents, progress.as_ref(), maintenance)
1644 .await;
1645 tables.push(outcome);
1646 }
1647 Ok(OptimizeOutcome { tables })
1648 }
1649
1650 pub async fn build_indices_only(
1656 &self,
1657 progress: Option<OptimizeProgressFn>,
1658 ) -> Result<OptimizeOutcome> {
1659 let policy = pond_index_intents();
1660 let mut tables = Vec::with_capacity(3);
1661 for (table, intents) in policy.all() {
1662 let indices = self
1663 .handle
1664 .optimize_table_indices_only(table, intents, progress.as_ref())
1665 .await;
1666 tables.push(TableOptimizeOutcome {
1667 table,
1668 indices,
1669 compaction: PhaseOutcome::NotAttempted,
1670 });
1671 }
1672 Ok(OptimizeOutcome { tables })
1673 }
1674
1675 #[cfg(test)]
1676 async fn optimize_indices_with_vector_threshold(
1677 &self,
1678 vector_threshold: usize,
1679 ) -> Result<OptimizeOutcome> {
1680 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
1681 let policy = MaintenancePolicy::always_compact();
1682 let mut tables = Vec::with_capacity(3);
1683 for (table, intents) in intents.all() {
1684 let outcome = self
1685 .handle
1686 .optimize_table(table, intents, None, &policy)
1687 .await;
1688 tables.push(outcome);
1689 }
1690 Ok(OptimizeOutcome { tables })
1691 }
1692
1693 pub async fn rebuild_indices(&self, intent_name: Option<&str>) -> Result<()> {
1694 let policy = pond_index_intents();
1695 let mut matched = false;
1696 for (table, intents) in policy.all() {
1697 for intent in intents {
1698 if intent_name.is_none_or(|name| name == intent.name) {
1699 matched = true;
1700 self.handle.rebuild_index(table, intent).await?;
1701 }
1702 }
1703 }
1704 if let Some(name) = intent_name
1705 && !matched
1706 {
1707 anyhow::bail!("unknown index intent {name:?}");
1708 }
1709 Ok(())
1710 }
1711
1712 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
1713 let policy = pond_index_intents();
1714 let mut statuses = Vec::new();
1715 for (table, intents) in policy.all() {
1716 statuses.extend(self.handle.index_status(table, intents).await?);
1717 }
1718 Ok(statuses)
1719 }
1720
1721 pub async fn drop_vector_index(&self) -> Result<()> {
1725 match self
1726 .handle
1727 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
1728 .await
1729 {
1730 Ok(()) => Ok(()),
1731 Err(error) => {
1732 let msg = error.to_string();
1733 if msg.contains("not found") || msg.contains("does not exist") {
1734 Ok(())
1735 } else {
1736 Err(error)
1737 }
1738 }
1739 }
1740 }
1741
1742 pub async fn table_sizes(&self) -> Result<TableSizes> {
1745 self.handle.table_sizes().await
1746 }
1747
1748 pub async fn initialized(&self) -> Result<bool> {
1749 self.handle.initialized().await
1750 }
1751
1752 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
1753 let batch = self
1754 .handle
1755 .scan_batch(
1756 Table::Sessions,
1757 Some(&Predicate::Eq("id", session_id.into())),
1758 &[],
1759 )
1760 .await?;
1761 if batch.num_rows() == 0 {
1762 Ok(None)
1763 } else {
1764 Ok(Some(session_from_batch(&batch, 0)?))
1765 }
1766 }
1767
1768 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
1769 let batch = self
1770 .handle
1771 .scan_batch(
1772 Table::Messages,
1773 Some(&Predicate::Eq("session_id", session_id.into())),
1774 &[
1775 "session_id",
1776 "id",
1777 "timestamp",
1778 "role",
1779 "content",
1780 "options",
1781 ],
1782 )
1783 .await?;
1784 let mut messages = Vec::with_capacity(batch.num_rows());
1785 for row in 0..batch.num_rows() {
1786 messages.push(message_from_batch(&batch, row)?);
1787 }
1788 messages.sort_by(|left, right| {
1789 left.timestamp()
1790 .cmp(&right.timestamp())
1791 .then_with(|| left.id().cmp(right.id()))
1792 });
1793
1794 let message_ids = messages
1795 .iter()
1796 .map(|message| message.id().to_owned())
1797 .collect::<Vec<_>>();
1798 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
1799
1800 Ok(messages
1801 .into_iter()
1802 .map(|message| {
1803 let key = (message.session_id().to_owned(), message.id().to_owned());
1804 let parts = parts_by_message.remove(&key).unwrap_or_default();
1805 MessageWithParts { message, parts }
1806 })
1807 .collect())
1808 }
1809
1810 pub async fn parts_for_messages(
1814 &self,
1815 session_id: &str,
1816 message_ids: &[String],
1817 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1818 self.scan_parts(session_id, message_ids, None).await
1819 }
1820
1821 pub async fn summary_parts_for_messages(
1826 &self,
1827 session_id: &str,
1828 message_ids: &[String],
1829 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1830 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
1831 .await
1832 }
1833
1834 async fn scan_parts(
1835 &self,
1836 session_id: &str,
1837 message_ids: &[String],
1838 part_types: Option<&[&str]>,
1839 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
1840 if message_ids.is_empty() {
1841 return Ok(BTreeMap::new());
1842 }
1843 let mut clauses = vec![
1844 Predicate::Eq("session_id", session_id.into()),
1845 in_predicate("message_id", message_ids),
1846 ];
1847 if let Some(types) = part_types {
1848 clauses.push(Predicate::In(
1849 "type",
1850 types.iter().map(|&t| t.into()).collect(),
1851 ));
1852 }
1853 let predicate = Predicate::And(clauses);
1854 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
1855 let mut scanner = self
1856 .handle
1857 .scan(
1858 Table::Parts,
1859 ScanOpts::with_predicate_and_projection(
1860 &predicate,
1861 &[
1862 "session_id",
1863 "message_id",
1864 "id",
1865 "ordinal",
1866 "type",
1867 "provenance",
1868 "variant_data",
1869 "options",
1870 ],
1871 ),
1872 )
1873 .await?;
1874 scanner.with_row_address();
1875 let batch = scanner.try_into_batch().await.context("scan failed")?;
1876 let row_addresses = uint64(&batch, "_rowaddr")?;
1877 let mut file_payloads = BTreeMap::<usize, FileData>::new();
1878 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
1879 for row in 0..batch.num_rows() {
1880 if string(&batch, "type", row)?.as_deref() == Some("file") {
1881 let variant_data =
1882 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
1883 file_rows.push((row, row_addresses.value(row), variant_data));
1884 }
1885 }
1886 if !file_rows.is_empty() {
1887 let addresses = file_rows
1888 .iter()
1889 .map(|(_, address, _)| *address)
1890 .collect::<Vec<_>>();
1891 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
1892 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
1893 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
1897 file_payloads.insert(row, payload);
1898 }
1899 }
1900 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
1901 for row in 0..batch.num_rows() {
1902 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
1903 parts_by_message
1904 .entry((part.session_id.clone(), part.message_id.clone()))
1905 .or_default()
1906 .push(part);
1907 }
1908 for parts in parts_by_message.values_mut() {
1909 parts.sort_by_key(|part| part.ordinal);
1910 }
1911 Ok(parts_by_message)
1912 }
1913}
1914
1915#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1916#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
1917pub enum IngestEvent {
1918 Session(Session),
1919 Message(Message),
1920 Part(Part),
1921}
1922
1923#[derive(Debug, Clone, PartialEq, Eq, Default)]
1931pub struct IngestSummary {
1932 pub inserted: usize,
1936 pub matched: usize,
1938 pub sessions_inserted: usize,
1940 pub messages_inserted_total: usize,
1943 pub messages_inserted_searchable: usize,
1947 pub parts_inserted: usize,
1949 pub sessions_matched: usize,
1951 pub messages_matched_total: usize,
1953 pub messages_matched_searchable: usize,
1955 pub parts_matched: usize,
1957 pub dropped_events: usize,
1967 pub dropped_sessions: usize,
1972 pub skipped_files: usize,
1975 pub skipped_empty: usize,
1980 pub skipped_fresh: usize,
1984 pub storage_errors: usize,
1988 pub truncated_values: usize,
1991 pub drop_reasons: BTreeMap<&'static str, usize>,
1997}
1998
1999pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
2005pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
2006pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
2007pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
2008pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
2009pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
2010pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
2011pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
2012pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
2013pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
2014pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
2015
2016#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
2024pub struct BatchCounts {
2025 pub sessions_inserted: usize,
2026 pub sessions_matched: usize,
2027 pub messages_inserted_total: usize,
2028 pub messages_inserted_searchable: usize,
2029 pub messages_matched_total: usize,
2030 pub messages_matched_searchable: usize,
2031 pub parts_inserted: usize,
2032 pub parts_matched: usize,
2033}
2034
2035impl IngestSummary {
2036 pub fn accepted(&self) -> usize {
2037 self.inserted + self.matched
2038 }
2039
2040 pub fn add_batch(&mut self, counts: &BatchCounts) {
2044 self.sessions_inserted += counts.sessions_inserted;
2045 self.sessions_matched += counts.sessions_matched;
2046 self.messages_inserted_total += counts.messages_inserted_total;
2047 self.messages_inserted_searchable += counts.messages_inserted_searchable;
2048 self.messages_matched_total += counts.messages_matched_total;
2049 self.messages_matched_searchable += counts.messages_matched_searchable;
2050 self.parts_inserted += counts.parts_inserted;
2051 self.parts_matched += counts.parts_matched;
2052 self.inserted +=
2053 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
2054 self.matched +=
2055 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
2056 }
2057
2058 pub fn merge(&mut self, other: &Self) {
2062 self.inserted += other.inserted;
2063 self.matched += other.matched;
2064 self.sessions_inserted += other.sessions_inserted;
2065 self.messages_inserted_total += other.messages_inserted_total;
2066 self.messages_inserted_searchable += other.messages_inserted_searchable;
2067 self.parts_inserted += other.parts_inserted;
2068 self.sessions_matched += other.sessions_matched;
2069 self.messages_matched_total += other.messages_matched_total;
2070 self.messages_matched_searchable += other.messages_matched_searchable;
2071 self.parts_matched += other.parts_matched;
2072 self.dropped_events += other.dropped_events;
2073 self.dropped_sessions += other.dropped_sessions;
2074 self.skipped_files += other.skipped_files;
2075 self.skipped_empty += other.skipped_empty;
2076 self.skipped_fresh += other.skipped_fresh;
2077 self.storage_errors += other.storage_errors;
2078 self.truncated_values += other.truncated_values;
2079 for (key, value) in &other.drop_reasons {
2080 *self.drop_reasons.entry(key).or_insert(0) += value;
2081 }
2082 }
2083
2084 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
2089 for outcome in outcomes {
2090 if !matches!(outcome.status, OutcomeStatus::Error) {
2091 continue;
2092 }
2093 if outcome.kind == "session" {
2094 self.dropped_sessions += 1;
2095 } else {
2096 self.dropped_events += 1;
2097 }
2098 let reason = outcome
2099 .error
2100 .as_ref()
2101 .and_then(|error| error.reason_key)
2102 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2103 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2104 }
2105 }
2106
2107 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
2108 for outcome in outcomes {
2109 match outcome.status {
2110 OutcomeStatus::Inserted => {
2111 self.inserted += 1;
2112 match outcome.kind {
2113 "session" => self.sessions_inserted += 1,
2114 "message" => {
2115 self.messages_inserted_total += 1;
2116 if outcome.searchable {
2117 self.messages_inserted_searchable += 1;
2118 }
2119 }
2120 "part" => self.parts_inserted += 1,
2121 _ => {}
2122 }
2123 }
2124 OutcomeStatus::Matched => {
2125 self.matched += 1;
2126 match outcome.kind {
2127 "session" => self.sessions_matched += 1,
2128 "message" => {
2129 self.messages_matched_total += 1;
2130 if outcome.searchable {
2131 self.messages_matched_searchable += 1;
2132 }
2133 }
2134 "part" => self.parts_matched += 1,
2135 _ => {}
2136 }
2137 }
2138 OutcomeStatus::Error => {
2139 if outcome.kind == "session" {
2145 self.dropped_sessions += 1;
2146 } else {
2147 self.dropped_events += 1;
2148 }
2149 let reason = outcome
2150 .error
2151 .as_ref()
2152 .and_then(|e| e.reason_key)
2153 .unwrap_or(DROP_REASON_UNCATEGORIZED);
2154 *self.drop_reasons.entry(reason).or_insert(0) += 1;
2155 }
2156 }
2157 }
2158 }
2159}
2160
2161#[derive(Debug, Clone, PartialEq)]
2166pub struct RowOutcome {
2167 pub index: usize,
2168 pub kind: &'static str,
2169 pub pk: Value,
2170 pub status: OutcomeStatus,
2171 pub error: Option<RowError>,
2172 pub searchable: bool,
2177}
2178
2179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2180pub enum OutcomeStatus {
2181 Inserted,
2182 Matched,
2183 Error,
2184}
2185
2186#[derive(Debug, Clone, PartialEq, Eq)]
2189pub struct RowError {
2190 pub message: String,
2191 pub field: Option<&'static str>,
2192 pub reason: Option<&'static str>,
2193 pub reason_key: Option<&'static str>,
2198}
2199
2200#[derive(Debug)]
2204struct BufferedSession {
2205 index: usize,
2206 session: Session,
2207}
2208
2209#[derive(Debug)]
2210struct BufferedMessage {
2211 index: usize,
2212 message: Message,
2213 parts: Vec<BufferedPart>,
2214 search_text: Option<String>,
2215}
2216
2217#[derive(Debug)]
2218struct BufferedPart {
2219 index: usize,
2220 part: Part,
2221}
2222
2223#[derive(Debug, Default)]
2240pub struct IngestValidator {
2241 session: Option<BufferedSession>,
2242 current_message: Option<BufferedMessage>,
2243 current_parts: Vec<BufferedPart>,
2244 messages: Vec<BufferedMessage>,
2245 seen_message_ids: HashSet<String>,
2249 seen_part_keys: HashSet<(String, String)>,
2252 completed: Vec<CompletedSubstream>,
2256}
2257
2258#[derive(Debug)]
2260struct CompletedSubstream {
2261 session_index: usize,
2262 session: Session,
2263 messages: Vec<BufferedMessage>,
2264}
2265
2266fn ingest_host_stamp() -> Option<&'static Value> {
2271 static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
2272 STAMP
2273 .get_or_init(|| {
2274 let mut host = serde_json::Map::new();
2275 if let Ok(username) = whoami::username() {
2276 host.insert("username".to_owned(), username.into());
2277 }
2278 if let Ok(hostname) = whoami::hostname() {
2279 host.insert("hostname".to_owned(), hostname.into());
2280 }
2281 if let Ok(devicename) = whoami::devicename() {
2282 host.insert("device_name".to_owned(), devicename.into());
2283 }
2284 (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
2285 })
2286 .as_ref()
2287}
2288
2289impl IngestValidator {
2290 pub async fn push(
2296 &mut self,
2297 store: &Store,
2298 index: usize,
2299 event: IngestEvent,
2300 ) -> Result<Vec<RowOutcome>> {
2301 match event {
2302 IngestEvent::Session(session) => self.push_session(store, index, session).await,
2303 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
2304 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
2305 }
2306 }
2307
2308 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2313 self.close_current_substream();
2314 self.flush(store).await
2315 }
2316
2317 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
2324 if self.completed.is_empty() {
2325 return Ok((Vec::new(), BatchCounts::default()));
2326 }
2327 let completed = std::mem::take(&mut self.completed);
2328 store.upsert_session_batch(completed).await
2329 }
2330
2331 pub fn pending_substreams(&self) -> usize {
2334 self.completed.len()
2335 }
2336
2337 async fn push_session(
2338 &mut self,
2339 _store: &Store,
2340 index: usize,
2341 mut session: Session,
2342 ) -> Result<Vec<RowOutcome>> {
2343 self.close_current_substream();
2347
2348 let trimmed = session.source_agent.trim();
2353 if trimmed.is_empty() {
2354 return Ok(vec![RowOutcome {
2355 index,
2356 kind: "session",
2357 pk: Value::String(session.id.clone()),
2358 status: OutcomeStatus::Error,
2359 error: Some(RowError {
2360 message: format!("session {} has empty source_agent after trim", session.id),
2361 field: Some("source_agent"),
2362 reason: None,
2363 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
2364 }),
2365 searchable: false,
2366 }]);
2367 }
2368 if trimmed.len() != session.source_agent.len() {
2369 session.source_agent = trimmed.to_owned();
2370 }
2371
2372 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
2373 return Ok(vec![RowOutcome {
2374 index,
2375 kind: "session",
2376 pk: Value::String(session.id.clone()),
2377 status: OutcomeStatus::Error,
2378 error: Some(RowError {
2379 message: format!(
2380 "session {} has parent_message_id without parent_session_id",
2381 session.id,
2382 ),
2383 field: Some("parent_message_id"),
2384 reason: None,
2385 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
2386 }),
2387 searchable: false,
2388 }]);
2389 }
2390
2391 self.seen_message_ids.clear();
2392 self.seen_part_keys.clear();
2393 self.session = Some(BufferedSession { index, session });
2394 Ok(Vec::new())
2395 }
2396
2397 fn close_current_substream(&mut self) {
2398 self.flush_current_message();
2399 let Some(BufferedSession {
2400 index: session_index,
2401 session,
2402 }) = self.session.take()
2403 else {
2404 return;
2405 };
2406 let messages = std::mem::take(&mut self.messages);
2407 self.seen_message_ids.clear();
2408 self.seen_part_keys.clear();
2409 self.completed.push(CompletedSubstream {
2410 session_index,
2411 session,
2412 messages,
2413 });
2414 }
2415
2416 fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
2417 let pk = Value::Array(vec![
2418 Value::String(message.session_id().to_owned()),
2419 Value::String(message.id().to_owned()),
2420 ]);
2421 let Some(session) = &self.session else {
2422 return vec![error_outcome(
2423 index,
2424 "message",
2425 pk,
2426 "first event in a session stream must be Session",
2427 None,
2428 DROP_REASON_MESSAGE_BEFORE_SESSION,
2429 )];
2430 };
2431 if message.session_id() != session.session.id {
2432 let msg = format!(
2433 "message {} references session {}, expected {}",
2434 message.id(),
2435 message.session_id(),
2436 session.session.id
2437 );
2438 return vec![error_outcome(
2439 index,
2440 "message",
2441 pk,
2442 &msg,
2443 Some("session_id"),
2444 DROP_REASON_MESSAGE_SESSION_MISMATCH,
2445 )];
2446 }
2447 if !self.seen_message_ids.insert(message.id().to_owned()) {
2448 let msg = format!("duplicate message id {} in session substream", message.id());
2452 return vec![error_outcome(
2453 index,
2454 "message",
2455 pk,
2456 &msg,
2457 None,
2458 DROP_REASON_DUPLICATE_MESSAGE_ID,
2459 )];
2460 }
2461 match ingest_host_stamp() {
2466 Some(stamp) => {
2467 message
2468 .options_mut()
2469 .insert("pond".to_owned(), stamp.clone());
2470 }
2471 None => {
2472 message.options_mut().remove("pond");
2473 }
2474 }
2475 self.flush_current_message();
2476 self.current_message = Some(BufferedMessage {
2477 index,
2478 message,
2479 parts: Vec::new(),
2480 search_text: None,
2481 });
2482 Vec::new()
2483 }
2484
2485 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
2486 let pk = Value::Array(vec![
2487 Value::String(part.session_id.clone()),
2488 Value::String(part.message_id.clone()),
2489 Value::String(part.id.clone()),
2490 ]);
2491 let Some(current) = &self.current_message else {
2492 return vec![error_outcome(
2493 index,
2494 "part",
2495 pk,
2496 "part event appeared before a message",
2497 None,
2498 DROP_REASON_PART_BEFORE_MESSAGE,
2499 )];
2500 };
2501 if part.session_id != current.message.session_id() {
2502 let msg = format!(
2503 "part {} references session {}, expected {}",
2504 part.id,
2505 part.session_id,
2506 current.message.session_id()
2507 );
2508 return vec![error_outcome(
2509 index,
2510 "part",
2511 pk,
2512 &msg,
2513 Some("session_id"),
2514 DROP_REASON_PART_MESSAGE_MISMATCH,
2515 )];
2516 }
2517 if part.message_id != current.message.id() {
2518 let msg = format!(
2519 "part {} references message {}, expected {}",
2520 part.id,
2521 part.message_id,
2522 current.message.id()
2523 );
2524 return vec![error_outcome(
2525 index,
2526 "part",
2527 pk,
2528 &msg,
2529 Some("message_id"),
2530 DROP_REASON_PART_MESSAGE_MISMATCH,
2531 )];
2532 }
2533 let part_key = (part.message_id.clone(), part.id.clone());
2534 if !self.seen_part_keys.insert(part_key) {
2535 let msg = format!(
2536 "duplicate part id {} for message {} in session substream",
2537 part.id, part.message_id
2538 );
2539 return vec![error_outcome(
2540 index,
2541 "part",
2542 pk,
2543 &msg,
2544 None,
2545 DROP_REASON_DUPLICATE_PART_KEY,
2546 )];
2547 }
2548 self.current_parts.push(BufferedPart { index, part });
2549 Vec::new()
2550 }
2551
2552 fn flush_current_message(&mut self) {
2553 let Some(mut buffered) = self.current_message.take() else {
2554 return;
2555 };
2556 let parts = std::mem::take(&mut self.current_parts);
2557 let mut canonical_parts = Vec::with_capacity(parts.len());
2558 for part in &parts {
2559 canonical_parts.push(part.part.clone());
2560 }
2561 buffered.search_text = search_text(&buffered.message, &canonical_parts);
2562 buffered.parts = parts;
2563 self.messages.push(buffered);
2564 }
2565}
2566
2567fn error_outcome(
2568 index: usize,
2569 kind: &'static str,
2570 pk: Value,
2571 message: &str,
2572 field: Option<&'static str>,
2573 reason_key: &'static str,
2574) -> RowOutcome {
2575 RowOutcome {
2576 index,
2577 kind,
2578 pk,
2579 status: OutcomeStatus::Error,
2580 error: Some(RowError {
2581 message: message.to_owned(),
2582 field,
2583 reason: None,
2584 reason_key: Some(reason_key),
2585 }),
2586 searchable: false,
2587 }
2588}
2589
2590fn error_outcomes_for_substream(
2595 session_index: usize,
2596 session: &Session,
2597 _messages: &[BufferedMessage],
2598 message: impl Into<String>,
2599 field: Option<&'static str>,
2600 reason_key: &'static str,
2601) -> Vec<RowOutcome> {
2602 let reason = field.map(|_| "immutable");
2603 vec![RowOutcome {
2604 index: session_index,
2605 kind: "session",
2606 pk: Value::String(session.id.clone()),
2607 status: OutcomeStatus::Error,
2608 error: Some(RowError {
2609 message: message.into(),
2610 field,
2611 reason,
2612 reason_key: Some(reason_key),
2613 }),
2614 searchable: false,
2615 }]
2616}
2617
2618fn success_outcomes_for_substream(
2624 session_index: usize,
2625 session: &Session,
2626 messages: &[BufferedMessage],
2627 existing_sessions: &std::collections::HashMap<String, Session>,
2628 existing_message_pks: &HashSet<(String, String)>,
2629 existing_part_pks: &HashSet<(String, String, String)>,
2630 counts: &mut BatchCounts,
2631) -> Vec<RowOutcome> {
2632 let session_was_present = existing_sessions.contains_key(&session.id);
2633 let session_status = if session_was_present {
2634 counts.sessions_matched += 1;
2635 UpsertStatus::Matched
2636 } else {
2637 counts.sessions_inserted += 1;
2638 UpsertStatus::Inserted
2639 };
2640
2641 let mut outcomes = Vec::with_capacity(1 + messages.len());
2642 outcomes.push(success_outcome(
2643 session_index,
2644 "session",
2645 Value::String(session.id.clone()),
2646 session_status,
2647 false,
2648 ));
2649 for buffered in messages {
2650 let key = (
2651 buffered.message.session_id().to_owned(),
2652 buffered.message.id().to_owned(),
2653 );
2654 let searchable = buffered.search_text.is_some();
2655 let message_status = if existing_message_pks.contains(&key) {
2656 counts.messages_matched_total += 1;
2657 if searchable {
2658 counts.messages_matched_searchable += 1;
2659 }
2660 UpsertStatus::Matched
2661 } else {
2662 counts.messages_inserted_total += 1;
2663 if searchable {
2664 counts.messages_inserted_searchable += 1;
2665 }
2666 UpsertStatus::Inserted
2667 };
2668 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
2669 outcomes.push(success_outcome(
2670 buffered.index,
2671 "message",
2672 pk,
2673 message_status,
2674 searchable,
2675 ));
2676 for part in &buffered.parts {
2677 let part_key = (
2678 part.part.session_id.clone(),
2679 part.part.message_id.clone(),
2680 part.part.id.clone(),
2681 );
2682 let part_status = if existing_part_pks.contains(&part_key) {
2683 counts.parts_matched += 1;
2684 UpsertStatus::Matched
2685 } else {
2686 counts.parts_inserted += 1;
2687 UpsertStatus::Inserted
2688 };
2689 let part_pk = Value::Array(vec![
2690 Value::String(part_key.0),
2691 Value::String(part_key.1),
2692 Value::String(part_key.2),
2693 ]);
2694 outcomes.push(success_outcome(
2695 part.index,
2696 "part",
2697 part_pk,
2698 part_status,
2699 false,
2700 ));
2701 }
2702 }
2703 outcomes
2704}
2705
2706fn success_outcome(
2707 index: usize,
2708 kind: &'static str,
2709 pk: Value,
2710 status: UpsertStatus,
2711 searchable: bool,
2712) -> RowOutcome {
2713 let status = match status {
2714 UpsertStatus::Inserted => OutcomeStatus::Inserted,
2715 UpsertStatus::Matched => OutcomeStatus::Matched,
2716 };
2717 RowOutcome {
2718 index,
2719 kind,
2720 pk,
2721 status,
2722 error: None,
2723 searchable,
2724 }
2725}
2726
2727#[derive(Debug, Clone, PartialEq, Eq)]
2728enum IngestError {
2729 ImmutableField {
2734 field: &'static str,
2735 session_id: String,
2736 stored: String,
2737 attempted: String,
2738 },
2739}
2740
2741impl std::fmt::Display for IngestError {
2742 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2743 match self {
2744 Self::ImmutableField {
2745 field,
2746 session_id,
2747 stored,
2748 attempted,
2749 } => write!(
2750 formatter,
2751 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
2752 ),
2753 }
2754 }
2755}
2756
2757impl std::error::Error for IngestError {}
2758
2759fn ensure_immutable_match(
2763 existing: &Session,
2764 incoming: &Session,
2765) -> std::result::Result<(), IngestError> {
2766 if existing.source_agent != incoming.source_agent {
2767 return Err(IngestError::ImmutableField {
2768 field: "source_agent",
2769 session_id: incoming.id.clone(),
2770 stored: existing.source_agent.clone(),
2771 attempted: incoming.source_agent.clone(),
2772 });
2773 }
2774 if existing.project != incoming.project {
2775 return Err(IngestError::ImmutableField {
2776 field: "project",
2777 session_id: incoming.id.clone(),
2778 stored: (*existing.project).clone(),
2779 attempted: (*incoming.project).clone(),
2780 });
2781 }
2782 Ok(())
2783}
2784
2785pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
2786 use crate::wire::Provenance;
2787 let mut chunks: Vec<String> = Vec::new();
2788 for part in parts {
2789 if part.provenance != Provenance::Conversational {
2792 continue;
2793 }
2794 match (message.role(), &part.kind) {
2795 (Role::User | Role::Assistant, PartKind::Text { text }) => {
2796 if let Some(text) = text {
2797 chunks.push(text.to_string());
2798 }
2799 }
2800 (
2801 Role::User | Role::Assistant,
2802 PartKind::File {
2803 media_type,
2804 file_name,
2805 data,
2806 },
2807 ) => {
2808 if let Some(file_name) = file_name {
2809 chunks.push(file_name.clone());
2810 }
2811 if let Some(media_type) = media_type {
2812 chunks.push(media_type.clone());
2813 }
2814 if let FileData::Url(uri) = data {
2815 chunks.push(uri.clone());
2816 }
2817 }
2818 (
2819 Role::System | Role::Tool,
2820 PartKind::Text { .. }
2821 | PartKind::Reasoning { .. }
2822 | PartKind::File { .. }
2823 | PartKind::ToolCall { .. }
2824 | PartKind::ToolResult { .. }
2825 | PartKind::ToolApprovalRequest { .. }
2826 | PartKind::ToolApprovalResponse { .. },
2827 )
2828 | (
2829 Role::User | Role::Assistant,
2830 PartKind::Reasoning { .. }
2831 | PartKind::ToolCall { .. }
2832 | PartKind::ToolResult { .. }
2833 | PartKind::ToolApprovalRequest { .. }
2834 | PartKind::ToolApprovalResponse { .. },
2835 ) => {}
2836 }
2837 }
2838
2839 let text = chunks
2840 .into_iter()
2841 .filter(|chunk| !chunk.trim().is_empty())
2842 .collect::<Vec<_>>()
2843 .join("\n");
2844 if text.is_empty() { None } else { Some(text) }
2845}
2846
2847#[derive(Debug, Clone, PartialEq, Eq)]
2849pub struct SearchText(String);
2850
2851impl SearchText {
2852 pub fn as_str(&self) -> &str {
2853 &self.0
2854 }
2855
2856 pub fn into_inner(self) -> String {
2857 self.0
2858 }
2859}
2860
2861impl AsRef<str> for SearchText {
2862 fn as_ref(&self) -> &str {
2863 &self.0
2864 }
2865}
2866
2867#[derive(Debug, Clone, PartialEq)]
2868pub struct MessageWithParts {
2869 pub message: Message,
2870 pub parts: Vec<Part>,
2871}
2872
2873#[derive(Debug, Clone, PartialEq)]
2874pub struct SessionWithMessages {
2875 pub session: Session,
2876 pub messages: Vec<MessageWithParts>,
2877}
2878
2879#[derive(Debug, Clone)]
2880pub struct SessionViewParams<'a> {
2881 pub mode: ResponseMode,
2882 pub after_id: Option<&'a str>,
2883 pub limit: usize,
2884 pub budget_bytes: usize,
2885 pub session_from: SessionFrom,
2886}
2887
2888#[derive(Debug, Clone)]
2889pub struct MessageViewParams<'a> {
2890 pub context_depth: usize,
2891 pub mode: ResponseMode,
2895 pub after_id: Option<&'a str>,
2896 pub limit: usize,
2897 pub budget_bytes: usize,
2898}
2899
2900#[derive(Debug, Clone, PartialEq)]
2906pub enum GetLookup<T> {
2907 NotFound,
2908 UnknownAfterId,
2909 Found(T),
2910}
2911
2912#[derive(Debug, Clone, PartialEq)]
2916pub struct SessionPage {
2917 pub session: Session,
2918 pub messages: Vec<RetrievedMessage>,
2919 pub messages_remaining: usize,
2920}
2921
2922#[derive(Debug, Clone, PartialEq)]
2926pub struct MessagePage {
2927 pub session: Session,
2928 pub target: RetrievedMessage,
2929 pub target_parts: Vec<Part>,
2930 pub target_parts_remaining: usize,
2931 pub siblings: Vec<RetrievedMessage>,
2932}
2933
2934#[derive(Debug, Clone, PartialEq)]
2935pub struct RetrievedMessage {
2936 pub id: String,
2937 pub role: Role,
2938 pub timestamp: DateTime<Utc>,
2939 pub text: Option<String>,
2940 pub content: Option<String>,
2941 pub parts: Vec<Part>,
2942}
2943
2944#[derive(Debug, Clone)]
2945struct ScanRow {
2946 id: String,
2947 role: Role,
2948 timestamp: DateTime<Utc>,
2949 text: Option<String>,
2950 content: Option<String>,
2951}
2952
2953#[derive(Debug, Clone)]
2956pub struct ConversationalRow {
2957 pub session_id: String,
2958 pub message_id: String,
2959 pub role: Role,
2960 pub timestamp: DateTime<Utc>,
2961 pub text: SearchText,
2962}
2963
2964fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
2969 let capped = items.len().min(limit.clamp(1, 1000));
2970 let mut acc = 0usize;
2971 let mut emitted = 0usize;
2972 for item in &items[..capped] {
2973 let next = acc.saturating_add(size(item));
2974 if emitted > 0 && next > budget_bytes {
2975 break;
2976 }
2977 acc = next;
2978 emitted += 1;
2979 }
2980 emitted
2981}
2982
2983fn role_from_str(value: &str) -> Result<Role> {
2984 match value {
2985 "system" => Ok(Role::System),
2986 "user" => Ok(Role::User),
2987 "assistant" => Ok(Role::Assistant),
2988 "tool" => Ok(Role::Tool),
2989 other => anyhow::bail!("unknown message role {other}"),
2990 }
2991}
2992
2993const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3001 ("project", BuiltinIndexType::BTree, "messages_project_btree"),
3002 (
3003 "session_id",
3004 BuiltinIndexType::BTree,
3005 "messages_session_id_btree",
3006 ),
3007 (
3008 "timestamp",
3009 BuiltinIndexType::BTree,
3010 "messages_timestamp_btree",
3011 ),
3012 (
3013 "source_agent",
3014 BuiltinIndexType::Bitmap,
3015 "messages_source_agent_bitmap",
3016 ),
3017 ("role", BuiltinIndexType::Bitmap, "messages_role_bitmap"),
3018];
3019
3020const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
3023 (
3024 "session_id",
3025 BuiltinIndexType::BTree,
3026 "parts_session_id_btree",
3027 ),
3028 (
3029 "message_id",
3030 BuiltinIndexType::BTree,
3031 "parts_message_id_btree",
3032 ),
3033];
3034
3035const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
3038 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
3039
3040fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
3041 Predicate::In(
3042 column,
3043 values.iter().cloned().map(ScalarValue::String).collect(),
3044 )
3045}
3046
3047fn embedded_scope(filter: &Predicate) -> Predicate {
3052 Predicate::And(vec![Predicate::IsNotNull("vector"), filter.clone()])
3053}
3054
3055pub(crate) const SESSIONS: &str = "sessions";
3059pub(crate) const MESSAGES: &str = "messages";
3060pub(crate) const PARTS: &str = "parts";
3061
3062pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
3065
3066pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
3069
3070const IVF_PQ_NUM_BITS: u8 = 8;
3076const IVF_PQ_SUB_VECTOR_STRIDE: usize = 8;
3077const IVF_PQ_MAX_ITERS: usize = 15;
3078
3079const FTS_NGRAM_MIN: u32 = 3;
3083const FTS_NGRAM_MAX: u32 = 5;
3084
3085pub fn pond_index_intents() -> IndexIntents {
3088 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
3089}
3090
3091pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
3095 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
3096 messages.push(IndexIntent {
3097 name: MESSAGES_FTS_INDEX,
3098 column: "search_text",
3099 trigger: IndexTrigger::OnAnyRows,
3100 params: IndexParamsKind::InvertedFtsNgram {
3101 min: FTS_NGRAM_MIN,
3102 max: FTS_NGRAM_MAX,
3103 },
3104 });
3105 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
3106 messages.push(IndexIntent {
3107 name,
3108 column,
3109 trigger: IndexTrigger::OnAnyRows,
3110 params: IndexParamsKind::Scalar(kind.clone()),
3111 });
3112 }
3113 messages.push(IndexIntent {
3114 name: MESSAGES_VECTOR_INDEX,
3115 column: "vector",
3116 trigger: IndexTrigger::OnNonNullCount {
3117 column: "vector",
3118 threshold: vector_threshold,
3119 },
3120 params: IndexParamsKind::IvfPqCosine {
3121 sub_vectors: embedding_dim() / IVF_PQ_SUB_VECTOR_STRIDE,
3122 num_bits: IVF_PQ_NUM_BITS,
3123 max_iters: IVF_PQ_MAX_ITERS,
3124 },
3125 });
3126 let parts = PARTS_SCALAR_INDICES
3127 .iter()
3128 .map(|(column, kind, name)| IndexIntent {
3129 name,
3130 column,
3131 trigger: IndexTrigger::OnAnyRows,
3132 params: IndexParamsKind::Scalar(kind.clone()),
3133 })
3134 .collect();
3135 let sessions = SESSIONS_SCALAR_INDICES
3136 .iter()
3137 .map(|(column, kind, name)| IndexIntent {
3138 name,
3139 column,
3140 trigger: IndexTrigger::OnAnyRows,
3141 params: IndexParamsKind::Scalar(kind.clone()),
3142 })
3143 .collect();
3144 IndexIntents {
3145 sessions,
3146 messages,
3147 parts,
3148 }
3149}
3150
3151pub const DEFAULT_EMBEDDING_DIM: usize = 384;
3155
3156static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
3162
3163pub fn embedding_dim() -> usize {
3166 EMBEDDING_DIM_RUNTIME
3167 .get()
3168 .copied()
3169 .unwrap_or(DEFAULT_EMBEDDING_DIM)
3170}
3171
3172pub fn init_embedding_dim(dim: usize) {
3174 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
3175}
3176
3177pub(crate) fn write_params_for_create() -> WriteParams {
3184 WriteParams {
3185 data_storage_version: Some(LanceFileVersion::V2_1),
3186 enable_v2_manifest_paths: true,
3187 enable_stable_row_ids: true,
3188 auto_cleanup: Some(AutoCleanupParams {
3189 interval: 20,
3190 older_than: chrono::TimeDelta::days(1),
3191 }),
3192 skip_auto_cleanup: true,
3193 ..WriteParams::default()
3194 }
3195}
3196
3197fn export_schema(table: Table) -> Arc<Schema> {
3198 match table {
3199 Table::Sessions => session_schema(),
3200 Table::Messages => message_schema(),
3201 Table::Parts => part_schema(),
3202 }
3203}
3204
3205fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
3206 let expected = export_schema(table);
3207 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3208 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
3209 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
3210 if actual_names != expected_names {
3211 anyhow::bail!(
3212 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
3213 table.as_str(),
3214 );
3215 }
3216 Ok(())
3217}
3218
3219async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
3220 let source_uri = source
3221 .to_str()
3222 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
3223 let dataset = Dataset::open(source_uri)
3224 .await
3225 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
3226 ensure_schema_matches_archive(&dataset, table)?;
3227 Ok(dataset)
3228}
3229
3230pub(crate) fn session_schema() -> Arc<Schema> {
3231 Arc::new(Schema::new(vec![
3232 primary_field("id", DataType::Utf8, false),
3233 Field::new("parent_session_id", DataType::Utf8, true),
3234 Field::new("parent_message_id", DataType::Utf8, true),
3235 Field::new("source_agent", DataType::Utf8, false),
3236 Field::new(
3237 "created_at",
3238 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3239 false,
3240 ),
3241 Field::new("project", DataType::Utf8, false),
3242 json_field("options", false),
3243 ]))
3244}
3245
3246pub(crate) fn message_schema() -> Arc<Schema> {
3247 Arc::new(Schema::new(vec![
3248 primary_field("session_id", DataType::Utf8, false),
3249 primary_field("id", DataType::Utf8, false),
3250 Field::new(
3251 "timestamp",
3252 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
3253 false,
3254 ),
3255 Field::new("role", DataType::Utf8, false),
3256 Field::new("source_agent", DataType::Utf8, false),
3257 Field::new("project", DataType::Utf8, false),
3258 Field::new("content", DataType::Utf8, true),
3259 Field::new("search_text", DataType::Utf8, true),
3260 Field::new("vector", embedding_vector_type(), true),
3263 Field::new("embedding_model", DataType::Utf8, true),
3264 json_field("options", false),
3265 ]))
3266}
3267
3268pub(crate) fn part_schema() -> Arc<Schema> {
3269 Arc::new(Schema::new(vec![
3270 primary_field("session_id", DataType::Utf8, false),
3271 primary_field("message_id", DataType::Utf8, false),
3272 primary_field("id", DataType::Utf8, false),
3273 Field::new("ordinal", DataType::Int32, false),
3274 Field::new("type", DataType::Utf8, false),
3275 Field::new("provenance", DataType::Utf8, false),
3278 json_field("variant_data", false),
3279 legacy_blob_field("data", true),
3280 json_field("options", false),
3281 ]))
3282}
3283
3284pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
3285 let arrays = schema
3286 .fields()
3287 .iter()
3288 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
3289 .collect();
3290 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
3291}
3292
3293pub(crate) fn empty_reader(
3294 schema: Arc<Schema>,
3295) -> Result<
3296 RecordBatchIterator<
3297 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
3298 >,
3299> {
3300 let batch = empty_batch(schema.clone())?;
3301 Ok(RecordBatchIterator::new(
3302 vec![Ok(batch)].into_iter(),
3303 schema,
3304 ))
3305}
3306
3307pub(crate) struct MessageBatchRow<'a> {
3308 pub message: &'a Message,
3309 pub source_agent: &'a str,
3310 pub project: &'a str,
3311 pub search_text: Option<&'a str>,
3312}
3313
3314fn embedding_vector_type() -> DataType {
3320 DataType::FixedSizeList(
3321 Arc::new(Field::new("item", DataType::Float16, true)),
3322 embedding_dim() as i32,
3323 )
3324}
3325
3326fn embedding_update_schema() -> Arc<Schema> {
3330 Arc::new(Schema::new(vec![
3331 primary_field("session_id", DataType::Utf8, false),
3332 primary_field("id", DataType::Utf8, false),
3333 Field::new("vector", embedding_vector_type(), true),
3334 Field::new("embedding_model", DataType::Utf8, true),
3335 ]))
3336}
3337
3338pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
3341 let dim = embedding_dim();
3342 let mut flat = Vec::with_capacity(rows.len() * dim);
3343 for row in rows {
3344 if row.vector.len() != dim {
3345 anyhow::bail!(
3346 "embedding for message {} has dim {}, expected {dim}",
3347 row.id,
3348 row.vector.len(),
3349 );
3350 }
3351 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
3352 }
3353 let values = Float16Array::from(flat);
3354 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
3355 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
3356 .context("failed to build embedding vector column")?;
3357
3358 RecordBatch::try_new(
3359 embedding_update_schema(),
3360 vec![
3361 Arc::new(StringArray::from(
3362 rows.iter()
3363 .map(|row| row.session_id.as_str())
3364 .collect::<Vec<_>>(),
3365 )),
3366 Arc::new(StringArray::from(
3367 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
3368 )),
3369 Arc::new(vectors),
3370 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
3371 ],
3372 )
3373 .context("failed to build embedding update batch")
3374}
3375
3376const COLUMN_BYTE_BUDGET: usize = 1 << 30;
3381
3382fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
3387 let mut chunks = Vec::new();
3388 let mut start = 0usize;
3389 let mut running = 0usize;
3390 for (index, &row) in cells.iter().enumerate() {
3391 if running + row > COLUMN_BYTE_BUDGET && index > start {
3392 chunks.push(start..index);
3393 start = index;
3394 running = 0;
3395 }
3396 running += row;
3397 }
3398 if start < cells.len() {
3399 chunks.push(start..cells.len());
3400 }
3401 chunks
3402}
3403
3404fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
3405 if bytes >= COLUMN_BYTE_BUDGET {
3406 anyhow::bail!(
3407 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
3408 overflow Arrow's i32 offset buffer"
3409 );
3410 }
3411 Ok(())
3412}
3413
3414async fn merge_insert_chunks(
3415 handle: &Handle,
3416 table: Table,
3417 batches: Vec<RecordBatch>,
3418) -> Result<u64> {
3419 let mut inserted = 0u64;
3420 for batch in batches {
3421 let rows = batch.num_rows();
3422 inserted += handle.merge_insert(table, batch, rows).await?;
3423 }
3424 Ok(inserted)
3425}
3426
3427pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
3428 let options = sessions
3429 .iter()
3430 .map(|session| json_bytes(&session.options))
3431 .collect::<Result<Vec<_>>>()?;
3432 let mut cells = Vec::with_capacity(sessions.len());
3433 for (session, encoded) in sessions.iter().zip(&options) {
3434 let columns = [
3435 session.id.len(),
3436 session.parent_session_id.as_deref().map_or(0, str::len),
3437 session.parent_message_id.as_deref().map_or(0, str::len),
3438 session.source_agent.len(),
3439 session.project.as_str().len(),
3440 encoded.len(),
3441 ];
3442 for bytes in columns {
3443 guard_cell("sessions", &session.id, bytes)?;
3444 }
3445 cells.push(columns.iter().sum());
3446 }
3447 chunk_ranges(&cells)
3448 .into_iter()
3449 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
3450 .collect()
3451}
3452
3453fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
3454 let schema = session_schema();
3455 RecordBatch::try_new(
3456 schema.clone(),
3457 vec![
3458 Arc::new(StringArray::from(
3459 sessions
3460 .iter()
3461 .map(|session| session.id.as_str())
3462 .collect::<Vec<_>>(),
3463 )),
3464 Arc::new(StringArray::from(
3465 sessions
3466 .iter()
3467 .map(|session| session.parent_session_id.as_deref())
3468 .collect::<Vec<_>>(),
3469 )),
3470 Arc::new(StringArray::from(
3471 sessions
3472 .iter()
3473 .map(|session| session.parent_message_id.as_deref())
3474 .collect::<Vec<_>>(),
3475 )),
3476 Arc::new(StringArray::from(
3477 sessions
3478 .iter()
3479 .map(|session| session.source_agent.as_str())
3480 .collect::<Vec<_>>(),
3481 )),
3482 Arc::new(
3483 TimestampMicrosecondArray::from(
3484 sessions
3485 .iter()
3486 .map(|session| micros(session.created_at))
3487 .collect::<Vec<_>>(),
3488 )
3489 .with_timezone("UTC"),
3490 ),
3491 Arc::new(StringArray::from(
3492 sessions
3493 .iter()
3494 .map(|session| session.project.as_str())
3495 .collect::<Vec<_>>(),
3496 )),
3497 Arc::new(LargeBinaryArray::from_iter_values(
3498 options.iter().map(Vec::as_slice),
3499 )),
3500 ],
3501 )
3502 .context("failed to build session batch")
3503}
3504
3505pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
3506 let options = rows
3507 .iter()
3508 .map(|row| json_bytes(row.message.options()))
3509 .collect::<Result<Vec<_>>>()?;
3510 let mut cells = Vec::with_capacity(rows.len());
3511 for (row, encoded) in rows.iter().zip(&options) {
3512 let columns = [
3513 row.message.session_id().len(),
3514 row.message.id().len(),
3515 row.message.role().as_str().len(),
3516 row.source_agent.len(),
3517 row.project.len(),
3518 row.message.system_content().map_or(0, str::len),
3519 row.search_text.map_or(0, str::len),
3520 encoded.len(),
3521 ];
3522 for bytes in columns {
3523 guard_cell("messages", row.message.id(), bytes)?;
3524 }
3525 cells.push(columns.iter().sum());
3526 }
3527 chunk_ranges(&cells)
3528 .into_iter()
3529 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
3530 .collect()
3531}
3532
3533fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
3534 let schema = message_schema();
3535 RecordBatch::try_new(
3536 schema.clone(),
3537 vec![
3538 Arc::new(StringArray::from(
3539 rows.iter()
3540 .map(|row| row.message.session_id())
3541 .collect::<Vec<_>>(),
3542 )),
3543 Arc::new(StringArray::from(
3544 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
3545 )),
3546 Arc::new(
3547 TimestampMicrosecondArray::from(
3548 rows.iter()
3549 .map(|row| micros(row.message.timestamp()))
3550 .collect::<Vec<_>>(),
3551 )
3552 .with_timezone("UTC"),
3553 ),
3554 Arc::new(StringArray::from(
3555 rows.iter()
3556 .map(|row| row.message.role().as_str())
3557 .collect::<Vec<_>>(),
3558 )),
3559 Arc::new(StringArray::from(
3560 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
3561 )),
3562 Arc::new(StringArray::from(
3563 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
3564 )),
3565 Arc::new(StringArray::from(
3566 rows.iter()
3567 .map(|row| row.message.system_content())
3568 .collect::<Vec<_>>(),
3569 )),
3570 Arc::new(StringArray::from(
3571 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
3572 )),
3573 new_null_array(&embedding_vector_type(), rows.len()),
3577 new_null_array(&DataType::Utf8, rows.len()),
3578 Arc::new(LargeBinaryArray::from_iter_values(
3579 options.iter().map(Vec::as_slice),
3580 )),
3581 ],
3582 )
3583 .context("failed to build message batch")
3584}
3585
3586pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
3587 let variant_data = parts
3588 .iter()
3589 .map(|part| part_variant_json(&part.kind))
3590 .collect::<Result<Vec<_>>>()?;
3591 let options = parts
3592 .iter()
3593 .map(|part| json_bytes(&part.options))
3594 .collect::<Result<Vec<_>>>()?;
3595 let mut cells = Vec::with_capacity(parts.len());
3596 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
3599 let columns = [
3600 part.session_id.len(),
3601 part.message_id.len(),
3602 part.id.len(),
3603 part.kind.type_name().len(),
3604 part.provenance.as_str().len(),
3605 variant.len(),
3606 encoded.len(),
3607 ];
3608 for bytes in columns {
3609 guard_cell("parts", &part.id, bytes)?;
3610 }
3611 cells.push(columns.iter().sum());
3612 }
3613 chunk_ranges(&cells)
3614 .into_iter()
3615 .map(|range| {
3616 parts_chunk(
3617 &parts[range.clone()],
3618 &variant_data[range.clone()],
3619 &options[range],
3620 )
3621 })
3622 .collect()
3623}
3624
3625fn parts_chunk(
3626 parts: &[Part],
3627 variant_data: &[Vec<u8>],
3628 options: &[Vec<u8>],
3629) -> Result<RecordBatch> {
3630 let schema = part_schema();
3631 let blob_payloads: Vec<Option<&[u8]>> = parts
3635 .iter()
3636 .map(|part| match &part.kind {
3637 PartKind::File { data, .. } => Some(match data {
3638 FileData::String(value) => value.as_bytes(),
3639 FileData::Bytes(value) => value.as_slice(),
3640 FileData::Url(value) => value.as_bytes(),
3641 }),
3642 PartKind::Text { .. }
3643 | PartKind::Reasoning { .. }
3644 | PartKind::ToolCall { .. }
3645 | PartKind::ToolResult { .. }
3646 | PartKind::ToolApprovalRequest { .. }
3647 | PartKind::ToolApprovalResponse { .. } => None,
3648 })
3649 .collect();
3650 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
3651
3652 RecordBatch::try_new(
3653 schema.clone(),
3654 vec![
3655 Arc::new(StringArray::from(
3656 parts
3657 .iter()
3658 .map(|part| part.session_id.as_str())
3659 .collect::<Vec<_>>(),
3660 )),
3661 Arc::new(StringArray::from(
3662 parts
3663 .iter()
3664 .map(|part| part.message_id.as_str())
3665 .collect::<Vec<_>>(),
3666 )),
3667 Arc::new(StringArray::from(
3668 parts
3669 .iter()
3670 .map(|part| part.id.as_str())
3671 .collect::<Vec<_>>(),
3672 )),
3673 Arc::new(Int32Array::from(
3674 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
3675 )),
3676 Arc::new(StringArray::from(
3677 parts
3678 .iter()
3679 .map(|part| part.kind.type_name())
3680 .collect::<Vec<_>>(),
3681 )),
3682 Arc::new(StringArray::from(
3683 parts
3684 .iter()
3685 .map(|part| part.provenance.as_str())
3686 .collect::<Vec<_>>(),
3687 )),
3688 Arc::new(LargeBinaryArray::from_iter_values(
3689 variant_data.iter().map(Vec::as_slice),
3690 )),
3691 Arc::new(blob_array),
3692 Arc::new(LargeBinaryArray::from_iter_values(
3693 options.iter().map(Vec::as_slice),
3694 )),
3695 ],
3696 )
3697 .context("failed to build parts batch")
3698}
3699
3700pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
3701 Ok(Session {
3702 id: string(batch, "id", row)?.context("session id is null")?,
3703 parent_session_id: string(batch, "parent_session_id", row)?,
3704 parent_message_id: string(batch, "parent_message_id", row)?,
3705 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
3706 created_at: datetime(batch, "created_at", row)?,
3707 project: crate::adapter::Extracted::from_stored(
3708 string(batch, "project", row)?.context("project is null")?,
3709 ),
3710 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
3711 })
3712}
3713
3714pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
3715 let id = string(batch, "id", row)?.context("message id is null")?;
3716 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
3717 let timestamp = datetime(batch, "timestamp", row)?;
3718 let options =
3719 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
3720
3721 match string(batch, "role", row)?
3722 .context("message role is null")?
3723 .as_str()
3724 {
3725 "system" => Ok(Message::System {
3726 id,
3727 session_id,
3728 timestamp,
3729 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
3736 options,
3737 }),
3738 "user" => Ok(Message::User {
3739 id,
3740 session_id,
3741 timestamp,
3742 options,
3743 }),
3744 "assistant" => Ok(Message::Assistant {
3745 id,
3746 session_id,
3747 timestamp,
3748 options,
3749 }),
3750 "tool" => Ok(Message::Tool {
3751 id,
3752 session_id,
3753 timestamp,
3754 options,
3755 }),
3756 other => anyhow::bail!("unknown message role {other}"),
3757 }
3758}
3759
3760pub(crate) fn part_from_batch(
3761 batch: &RecordBatch,
3762 row: usize,
3763 file_data: Option<FileData>,
3764) -> Result<Part> {
3765 let type_name = string(batch, "type", row)?.context("part type is null")?;
3766 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
3767 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
3768 Ok(Part {
3769 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
3770 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
3771 id: string(batch, "id", row)?.context("part id is null")?,
3772 ordinal: int32(batch, "ordinal", row)?,
3773 provenance: provenance_from_str(&provenance)?,
3774 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
3775 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
3776 })
3777}
3778
3779fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
3780 match value {
3781 "conversational" => Ok(crate::wire::Provenance::Conversational),
3782 "injected" => Ok(crate::wire::Provenance::Injected),
3783 other => anyhow::bail!("unknown part provenance {other}"),
3784 }
3785}
3786
3787fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
3788 let kind = file_data_kind(variant_data)?;
3789 match kind.as_str() {
3790 "string" => {
3791 let text = std::str::from_utf8(bytes)
3792 .context("file string payload is not UTF-8")?
3793 .to_owned();
3794 Ok(FileData::String(text))
3795 }
3796 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
3797 "url" => Ok(FileData::Url(
3798 std::str::from_utf8(bytes)
3799 .context("file URL payload is not UTF-8")?
3800 .to_owned(),
3801 )),
3802 other => anyhow::bail!("unknown file data_kind {other}"),
3803 }
3804}
3805
3806fn file_data_kind(variant_data: &[u8]) -> Result<String> {
3807 let value = json_parse::<Value>(variant_data)?;
3808 value
3809 .get("data_kind")
3810 .and_then(Value::as_str)
3811 .map(str::to_owned)
3812 .context("file part variant_data missing data_kind")
3813}
3814
3815fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
3816 batch
3817 .column_by_name(name)
3818 .with_context(|| format!("missing column {name}"))?
3819 .as_any()
3820 .downcast_ref::<UInt64Array>()
3821 .with_context(|| format!("column {name} is not UInt64"))
3822}
3823
3824pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
3825 let array = batch
3826 .column_by_name(name)
3827 .with_context(|| format!("missing column {name}"))?
3828 .as_any()
3829 .downcast_ref::<StringArray>()
3830 .with_context(|| format!("column {name} is not Utf8"))?;
3831 if array.is_null(row) {
3832 Ok(None)
3833 } else {
3834 Ok(Some(array.value(row).to_owned()))
3835 }
3836}
3837
3838fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
3839 let column = batch
3843 .column_by_name(name)
3844 .with_context(|| format!("missing column {name}"))?;
3845 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
3846 return if array.is_null(row) {
3847 Ok(None)
3848 } else {
3849 Ok(Some(
3850 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
3851 ))
3852 };
3853 }
3854 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
3855 return if array.is_null(row) {
3856 Ok(None)
3857 } else {
3858 Ok(Some(array.value(row).as_bytes().to_vec()))
3859 };
3860 }
3861 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
3862 return if array.is_null(row) {
3863 Ok(None)
3864 } else {
3865 Ok(Some(array.value(row).as_bytes().to_vec()))
3866 };
3867 }
3868 anyhow::bail!("column {name} is not a JSON-compatible array")
3869}
3870
3871fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
3872 let array = batch
3873 .column_by_name(name)
3874 .with_context(|| format!("missing column {name}"))?
3875 .as_any()
3876 .downcast_ref::<Int32Array>()
3877 .with_context(|| format!("column {name} is not Int32"))?;
3878 Ok(array.value(row))
3879}
3880
3881pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
3882 let array = batch
3883 .column_by_name(name)
3884 .with_context(|| format!("missing column {name}"))?
3885 .as_any()
3886 .downcast_ref::<Float32Array>()
3887 .with_context(|| format!("column {name} is not Float32"))?;
3888 Ok(array.value(row))
3889}
3890
3891pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
3892 let array = batch
3893 .column_by_name(name)
3894 .with_context(|| format!("missing column {name}"))?
3895 .as_any()
3896 .downcast_ref::<TimestampMicrosecondArray>()
3897 .with_context(|| format!("column {name} is not timestamp_micros"))?;
3898 Utc.timestamp_micros(array.value(row))
3899 .single()
3900 .context("timestamp is out of range")
3901}
3902
3903fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
3904 Field::new(name, data_type, nullable).with_metadata(
3905 [(
3906 "lance-schema:unenforced-primary-key".to_owned(),
3907 "true".to_owned(),
3908 )]
3909 .into(),
3910 )
3911}
3912
3913fn legacy_blob_field(name: &str, nullable: bool) -> Field {
3923 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
3924 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
3925 .into_iter()
3926 .collect(),
3927 )
3928}
3929
3930fn json_field(name: &str, nullable: bool) -> Field {
3931 lance_arrow::json::json_field(name, nullable)
3932}
3933
3934fn micros(timestamp: DateTime<Utc>) -> i64 {
3935 timestamp.timestamp_micros()
3936}
3937
3938fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
3939 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
3947 lance_arrow::json::encode_json(&text)
3948 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
3949}
3950
3951fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
3952 serde_json::from_slice(value).context("failed to parse JSON field")
3953}
3954
3955fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
3956 if let PartKind::File {
3957 media_type,
3958 file_name,
3959 data,
3960 } = kind
3961 {
3962 let data_kind = match data {
3963 FileData::String(_) => "string",
3964 FileData::Bytes(_) => "bytes",
3965 FileData::Url(_) => "url",
3966 };
3967 return json_bytes(&serde_json::json!({
3968 "media_type": media_type,
3969 "file_name": file_name,
3970 "data_kind": data_kind,
3971 }));
3972 }
3973 let value = serde_json::to_value(kind)?;
3974 let mut object = value
3975 .as_object()
3976 .cloned()
3977 .context("part variant did not serialize to an object")?;
3978 object.remove("type");
3979 json_bytes(&object)
3980}
3981
3982fn part_kind_from_json(
3983 type_name: &str,
3984 variant_data: &[u8],
3985 file_data: Option<FileData>,
3986) -> Result<PartKind> {
3987 let mut value = json_parse::<Value>(variant_data)?;
3988 let object = value
3989 .as_object_mut()
3990 .context("part variant data is not an object")?;
3991 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
3992 if let Some(data) = file_data {
3993 object.remove("data_kind");
3994 object.insert("data".to_owned(), serde_json::to_value(data)?);
3995 }
3996 serde_json::from_value(value).context("failed to parse part kind")
3997}
3998
3999#[cfg(test)]
4000mod tests {
4001 #![allow(clippy::expect_used, clippy::unwrap_used)]
4002
4003 use super::*;
4004 use crate::{
4005 adapter::Extracted,
4006 handlers::ingest_events,
4007 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
4008 };
4009 use chrono::Utc;
4010 use serde_json::json;
4011 use tempfile::TempDir;
4012
4013 fn synthetic_session(id: &str) -> Session {
4014 Session {
4015 id: id.to_owned(),
4016 parent_session_id: None,
4017 parent_message_id: None,
4018 source_agent: "claude-code".to_owned(),
4019 created_at: Utc::now(),
4020 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
4021 options: ProviderOptions::new(),
4022 }
4023 }
4024
4025 #[test]
4026 fn search_text_excludes_injected_parts() {
4027 use crate::wire::Provenance;
4028 let message = Message::User {
4029 id: "m1".to_owned(),
4030 session_id: "s1".to_owned(),
4031 timestamp: Utc::now(),
4032 options: ProviderOptions::new(),
4033 };
4034 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
4035 session_id: "s1".to_owned(),
4036 id: id.to_owned(),
4037 message_id: "m1".to_owned(),
4038 ordinal: 0,
4039 provenance,
4040 options: ProviderOptions::new(),
4041 kind: PartKind::Text {
4042 text: Some(Extracted::from_test_value(text.to_owned())),
4043 },
4044 };
4045
4046 let conversational = search_text(
4049 &message,
4050 &[text_part(
4051 "p1",
4052 "real human prompt",
4053 Provenance::Conversational,
4054 )],
4055 );
4056 assert_eq!(conversational.as_deref(), Some("real human prompt"));
4057
4058 let injected = search_text(
4059 &message,
4060 &[text_part(
4061 "p2",
4062 "<task-notification>...</task-notification>",
4063 Provenance::Injected,
4064 )],
4065 );
4066 assert!(
4067 injected.is_none(),
4068 "a message whose only part is injected has null search_text"
4069 );
4070 }
4071
4072 #[test]
4073 fn chunk_ranges_splits_on_byte_budget() {
4074 assert!(chunk_ranges(&[]).is_empty());
4075 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
4076
4077 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
4078 assert_eq!(
4079 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
4080 vec![0..1, 1..2, 2..3],
4081 );
4082
4083 assert_eq!(
4085 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
4086 vec![0..1, 1..2, 2..3],
4087 );
4088 }
4089
4090 #[tokio::test]
4091 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
4092 let temp = TempDir::new()?;
4097 let store = Store::open_local(temp.path()).await?;
4098 let session = synthetic_session("ordering");
4099 let orphan_part = Part {
4100 session_id: session.id.clone(),
4101 id: "orphan-part".to_owned(),
4102 message_id: "missing-message".to_owned(),
4103 ordinal: 0,
4104 provenance: crate::wire::Provenance::Conversational,
4105 options: ProviderOptions::new(),
4106 kind: PartKind::Text {
4107 text: Some(Extracted::from_test_value("orphan".to_owned())),
4108 },
4109 };
4110 let valid_message = Message::User {
4111 id: "valid-message".to_owned(),
4112 session_id: session.id.clone(),
4113 timestamp: Utc::now(),
4114 options: ProviderOptions::new(),
4115 };
4116 let valid_part = Part {
4117 session_id: session.id.clone(),
4118 id: "valid-part".to_owned(),
4119 message_id: valid_message.id().to_owned(),
4120 ordinal: 0,
4121 provenance: crate::wire::Provenance::Conversational,
4122 options: ProviderOptions::new(),
4123 kind: PartKind::Text {
4124 text: Some(Extracted::from_test_value("kept".to_owned())),
4125 },
4126 };
4127
4128 let mut validator = IngestValidator::default();
4129 validator
4130 .push(&store, 0, IngestEvent::Session(session.clone()))
4131 .await?;
4132 let part_outcomes = validator
4133 .push(&store, 1, IngestEvent::Part(orphan_part))
4134 .await?;
4135 assert_eq!(part_outcomes.len(), 1);
4136 assert_eq!(part_outcomes[0].kind, "part");
4137 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
4138 assert!(
4139 part_outcomes[0]
4140 .error
4141 .as_ref()
4142 .map(|e| e.message.contains("part event appeared before a message"))
4143 .unwrap_or(false),
4144 "error message must explain the ordering violation: {part_outcomes:?}"
4145 );
4146 validator
4147 .push(&store, 2, IngestEvent::Message(valid_message))
4148 .await?;
4149 validator
4150 .push(&store, 3, IngestEvent::Part(valid_part))
4151 .await?;
4152 validator.finish(&store).await?;
4153
4154 let (sessions, messages, parts) = store.row_counts().await?;
4155 assert_eq!(sessions, 1, "session committed despite the orphan part");
4156 assert_eq!(messages, 1, "valid message committed");
4157 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
4158
4159 Ok(())
4160 }
4161
4162 #[tokio::test]
4163 async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
4164 let temp = TempDir::new()?;
4169 let store = Store::open_local(temp.path()).await?;
4170 assert!(
4171 !store.initialized().await?,
4172 "fresh store has no parts table"
4173 );
4174
4175 let session = synthetic_session("initialized-probe");
4176 let message = Message::User {
4177 id: "message-1".to_owned(),
4178 session_id: session.id.clone(),
4179 timestamp: Utc::now(),
4180 options: ProviderOptions::new(),
4181 };
4182 let part = Part {
4183 session_id: session.id.clone(),
4184 id: "part-1".to_owned(),
4185 message_id: message.id().to_owned(),
4186 ordinal: 0,
4187 provenance: crate::wire::Provenance::Conversational,
4188 options: ProviderOptions::new(),
4189 kind: PartKind::Text {
4190 text: Some(Extracted::from_test_value("hello".to_owned())),
4191 },
4192 };
4193 let mut validator = IngestValidator::default();
4194 validator
4195 .push(&store, 0, IngestEvent::Session(session))
4196 .await?;
4197 validator
4198 .push(&store, 1, IngestEvent::Message(message))
4199 .await?;
4200 validator.push(&store, 2, IngestEvent::Part(part)).await?;
4201 validator.finish(&store).await?;
4202
4203 assert!(store.initialized().await?, "ingest creates the parts table");
4204 Ok(())
4205 }
4206
4207 #[tokio::test]
4208 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
4209 let temp = TempDir::new()?;
4213 let store = Store::open_local(temp.path()).await?;
4214 let session = synthetic_session("duplicate-message");
4215 let first = Message::User {
4216 id: "message-1".to_owned(),
4217 session_id: session.id.clone(),
4218 timestamp: Utc::now(),
4219 options: ProviderOptions::new(),
4220 };
4221 let second = Message::Assistant {
4222 id: "message-1".to_owned(),
4223 session_id: session.id.clone(),
4224 timestamp: Utc::now(),
4225 options: ProviderOptions::new(),
4226 };
4227
4228 let mut validator = IngestValidator::default();
4229 validator
4230 .push(&store, 0, IngestEvent::Session(session.clone()))
4231 .await?;
4232 validator
4233 .push(&store, 1, IngestEvent::Message(first))
4234 .await?;
4235 let dup_outcomes = validator
4236 .push(&store, 2, IngestEvent::Message(second))
4237 .await?;
4238 assert_eq!(dup_outcomes.len(), 1);
4239 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
4240 assert!(
4241 dup_outcomes[0]
4242 .error
4243 .as_ref()
4244 .map(|e| e.message.contains("duplicate message id message-1"))
4245 .unwrap_or(false),
4246 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
4247 );
4248
4249 validator.finish(&store).await?;
4250 let (sessions, messages, _) = store.row_counts().await?;
4251 assert_eq!(sessions, 1, "session committed");
4252 assert_eq!(messages, 1, "only the first message committed");
4253
4254 Ok(())
4255 }
4256
4257 #[tokio::test]
4258 async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
4259 -> anyhow::Result<()> {
4260 let temp = TempDir::new()?;
4264 let store = Store::open_local(temp.path()).await?;
4265 let session = synthetic_session("host-provenance");
4266 let mut spoofed = ProviderOptions::new();
4267 spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
4268 let message = Message::User {
4269 id: "message-1".to_owned(),
4270 session_id: session.id.clone(),
4271 timestamp: Utc::now(),
4272 options: spoofed,
4273 };
4274 let part = Part {
4275 session_id: session.id.clone(),
4276 id: "part-1".to_owned(),
4277 message_id: "message-1".to_owned(),
4278 ordinal: 0,
4279 provenance: crate::wire::Provenance::Conversational,
4280 options: ProviderOptions::new(),
4281 kind: PartKind::Text {
4282 text: Some(Extracted::from_test_value("hello".to_owned())),
4283 },
4284 };
4285
4286 let mut validator = IngestValidator::default();
4287 validator
4288 .push(&store, 0, IngestEvent::Session(session.clone()))
4289 .await?;
4290 validator
4291 .push(&store, 1, IngestEvent::Message(message))
4292 .await?;
4293 validator.push(&store, 2, IngestEvent::Part(part)).await?;
4294 validator.finish(&store).await?;
4295
4296 let stored = store
4297 .get_session(&session.id)
4298 .await?
4299 .expect("ingested session is readable");
4300 assert!(
4301 !stored.session.options.contains_key("pond"),
4302 "session rows are not stamped (attribution derives from messages)"
4303 );
4304 let stored_message = &stored.messages[0].message;
4305 match ingest_host_stamp() {
4306 Some(stamp) => {
4307 assert_eq!(
4308 stored_message.options().get("pond"),
4309 Some(stamp),
4310 "stored message carries the real stamp, never the spoof"
4311 );
4312 let host = stamp
4313 .pointer("/ingest/host")
4314 .and_then(Value::as_object)
4315 .expect("stamp shape is {ingest: {host: {..}}}");
4316 assert!(!host.is_empty(), "an all-empty stamp must be None instead");
4317 assert!(
4318 host.values()
4319 .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
4320 "stamp fields are omitted when unavailable, never empty: {host:?}"
4321 );
4322 }
4323 None => assert!(
4324 stored_message.options().get("pond").is_none(),
4325 "with no resolvable stamp the spoofed key is still stripped"
4326 ),
4327 }
4328 assert!(
4329 !stored.messages[0].parts[0].options.contains_key("pond"),
4330 "part rows are not stamped (covered by their message's stamp)"
4331 );
4332
4333 Ok(())
4334 }
4335
4336 #[tokio::test(flavor = "multi_thread")]
4344 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
4345 use crate::wire::{FileData, PartKind, Provenance};
4346 let temp = TempDir::new()?;
4347 let store = Store::open_local(temp.path()).await?;
4348
4349 let session = synthetic_session("compact-blob");
4350 store
4351 .upsert_sessions(std::slice::from_ref(&session))
4352 .await?;
4353
4354 let make_part = |idx: usize, kind: PartKind| Part {
4355 session_id: session.id.clone(),
4356 message_id: format!("msg-{idx}"),
4357 id: format!("part-{idx}"),
4358 ordinal: 0,
4359 provenance: Provenance::Conversational,
4360 options: ProviderOptions::new(),
4361 kind,
4362 };
4363
4364 let batch_a = vec![
4365 make_part(
4366 0,
4367 PartKind::File {
4368 media_type: Some("text/plain".to_owned()),
4369 file_name: Some("a.txt".to_owned()),
4370 data: FileData::Bytes(b"alpha".to_vec()),
4371 },
4372 ),
4373 make_part(
4374 1,
4375 PartKind::File {
4376 media_type: Some("text/plain".to_owned()),
4377 file_name: Some("b.txt".to_owned()),
4378 data: FileData::String("beta".to_owned()),
4379 },
4380 ),
4381 ];
4382 store.upsert_parts(&batch_a).await?;
4383
4384 let batch_b = vec![
4385 make_part(
4386 2,
4387 PartKind::File {
4388 media_type: Some("application/octet-stream".to_owned()),
4389 file_name: None,
4390 data: FileData::Url("https://example.com/file".to_owned()),
4391 },
4392 ),
4393 make_part(
4394 3,
4395 PartKind::File {
4396 media_type: Some("image/png".to_owned()),
4397 file_name: Some("c.png".to_owned()),
4398 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
4399 },
4400 ),
4401 ];
4402 store.upsert_parts(&batch_b).await?;
4403
4404 store
4405 .optimize_indices(None, &MaintenancePolicy::always_compact())
4406 .await?
4407 .into_result()?;
4408
4409 Ok(())
4410 }
4411
4412 #[tokio::test]
4413 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
4414 let temp = TempDir::new()?;
4415 let store = Store::open_local(temp.path()).await?;
4416 let session = synthetic_session("blob");
4417 let message = Message::User {
4418 id: "message-1".to_owned(),
4419 session_id: session.id.clone(),
4420 timestamp: Utc::now(),
4421 options: ProviderOptions::new(),
4422 };
4423 let part = Part {
4424 session_id: session.id.clone(),
4425 id: "part-1".to_owned(),
4426 message_id: message.id().to_owned(),
4427 ordinal: 0,
4428 provenance: crate::wire::Provenance::Conversational,
4429 options: ProviderOptions::new(),
4430 kind: PartKind::File {
4431 media_type: Some("text/plain".to_owned()),
4432 file_name: Some("payload.txt".to_owned()),
4433 data: FileData::Bytes(b"pond".to_vec()),
4434 },
4435 };
4436
4437 let mut validator = IngestValidator::default();
4438 validator
4439 .push(&store, 0, IngestEvent::Session(session.clone()))
4440 .await?;
4441 validator
4442 .push(&store, 1, IngestEvent::Message(message.clone()))
4443 .await?;
4444 validator
4445 .push(&store, 2, IngestEvent::Part(part.clone()))
4446 .await?;
4447 validator.finish(&store).await?;
4448
4449 let stored = store
4450 .get_session(&session.id)
4451 .await?
4452 .expect("session should exist");
4453 let stored_part = &stored.messages[0].parts[0];
4454 assert_eq!(stored_part, &part);
4455
4456 Ok(())
4457 }
4458
4459 fn base_session() -> Session {
4470 Session {
4471 id: "01HXY00000000001".to_owned(),
4472 parent_session_id: None,
4473 parent_message_id: None,
4474 source_agent: "claude-code".to_owned(),
4475 created_at: Utc::now(),
4476 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
4477 options: ProviderOptions::new(),
4478 }
4479 }
4480
4481 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
4482 outcomes
4483 .iter()
4484 .filter(|outcome| outcome.status == target)
4485 .count()
4486 }
4487
4488 #[tokio::test(flavor = "multi_thread")]
4489 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
4490 -> anyhow::Result<()> {
4491 let temp = TempDir::new()?;
4492 let store = Store::open_local(temp.path()).await?;
4493
4494 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4495 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
4496
4497 let mut again = base_session();
4498 again.options.insert("title".to_owned(), json!("renamed"));
4499 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
4500 assert_eq!(
4501 count_status(&second, OutcomeStatus::Error),
4502 0,
4503 "options is mutable; the re-ingest must not surface an error: {second:?}",
4504 );
4505 assert_eq!(
4506 count_status(&second, OutcomeStatus::Matched),
4507 1,
4508 "unchanged immutable fields must match-insert via merge_insert",
4509 );
4510
4511 Ok(())
4512 }
4513
4514 #[tokio::test(flavor = "multi_thread")]
4515 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
4516 let temp = TempDir::new()?;
4517 let store = Store::open_local(temp.path()).await?;
4518
4519 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4520 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4521
4522 let mut tampered = base_session();
4523 tampered.source_agent = "codex-cli".to_owned();
4524 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4525 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
4526 let err_row = second
4527 .iter()
4528 .find(|outcome| outcome.status == OutcomeStatus::Error)
4529 .expect("error outcome present");
4530 let err = err_row.error.as_ref().expect("error body present");
4531 assert_eq!(err.field, Some("source_agent"));
4532 assert_eq!(err.reason, Some("immutable"));
4533
4534 let stored = store
4536 .get_session(&base_session().id)
4537 .await?
4538 .expect("session row survives the rejected re-ingest");
4539 assert_eq!(stored.session.source_agent, "claude-code");
4540
4541 Ok(())
4542 }
4543
4544 #[tokio::test(flavor = "multi_thread")]
4545 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
4546 let temp = TempDir::new()?;
4547 let store = Store::open_local(temp.path()).await?;
4548
4549 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
4550 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
4551
4552 let mut tampered = base_session();
4553 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
4554 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
4555 let err_row = second
4556 .iter()
4557 .find(|outcome| outcome.status == OutcomeStatus::Error)
4558 .expect("project change must surface an error outcome");
4559 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
4560
4561 let stored = store
4562 .get_session(&base_session().id)
4563 .await?
4564 .expect("session row survives");
4565 assert_eq!(
4566 stored.session.project.as_str(),
4567 "/home/me/proj",
4568 "stored project must remain the original",
4569 );
4570
4571 Ok(())
4572 }
4573
4574 #[tokio::test(flavor = "multi_thread")]
4575 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
4576 use crate::wire::Provenance;
4584 let temp = TempDir::new()?;
4585 let store = Store::open_local(temp.path()).await?;
4586 let session = base_session();
4587
4588 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
4589 session_id: session.id.clone(),
4590 id: part_id.to_owned(),
4591 message_id: message_id.to_owned(),
4592 ordinal: 0,
4593 provenance: Provenance::Conversational,
4594 options: ProviderOptions::new(),
4595 kind: PartKind::Text {
4596 text: Some(Extracted::from_test_value(body.to_owned())),
4597 },
4598 };
4599 let user_message = |id: &str| Message::User {
4600 id: id.to_owned(),
4601 session_id: session.id.clone(),
4602 timestamp: Utc::now(),
4603 options: ProviderOptions::new(),
4604 };
4605
4606 let mut validator = IngestValidator::default();
4608 validator
4609 .push(&store, 0, IngestEvent::Session(session.clone()))
4610 .await?;
4611 validator
4612 .push(&store, 1, IngestEvent::Message(user_message("m1")))
4613 .await?;
4614 validator
4615 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
4616 .await?;
4617 validator
4618 .push(&store, 3, IngestEvent::Message(user_message("m2")))
4619 .await?;
4620 validator
4621 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
4622 .await?;
4623 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
4624 assert_eq!(first_counts.sessions_inserted, 1);
4625 assert_eq!(first_counts.messages_inserted_total, 2);
4626 assert_eq!(first_counts.messages_inserted_searchable, 2);
4627
4628 let mut validator = IngestValidator::default();
4630 validator
4631 .push(&store, 0, IngestEvent::Session(session.clone()))
4632 .await?;
4633 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
4634 let pid = format!("p{}", idx + 3);
4635 validator
4636 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
4637 .await?;
4638 validator
4639 .push(
4640 &store,
4641 idx * 2 + 2,
4642 IngestEvent::Part(text_part(&pid, mid, "gamma")),
4643 )
4644 .await?;
4645 }
4646 let (second_outcomes, second_counts) = validator.finish(&store).await?;
4647
4648 assert_eq!(
4649 second_counts.sessions_inserted, 0,
4650 "existing session row must report as Matched, not Inserted",
4651 );
4652 assert_eq!(second_counts.sessions_matched, 1);
4653 assert_eq!(
4654 second_counts.messages_inserted_total, 3,
4655 "the three NEW messages must register as Inserted in BatchCounts",
4656 );
4657 assert_eq!(
4658 second_counts.messages_inserted_searchable, 3,
4659 "all three new messages carry conversational text -> searchable",
4660 );
4661 assert_eq!(second_counts.messages_matched_total, 0);
4662 assert_eq!(second_counts.parts_inserted, 3);
4663 assert_eq!(second_counts.parts_matched, 0);
4664
4665 let session_outcome = second_outcomes
4668 .iter()
4669 .find(|outcome| outcome.kind == "session")
4670 .expect("session-row outcome present");
4671 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
4672 for outcome in &second_outcomes {
4673 if outcome.kind == "message" || outcome.kind == "part" {
4674 assert_eq!(
4675 outcome.status,
4676 OutcomeStatus::Inserted,
4677 "new row must be Inserted, got: {outcome:?}",
4678 );
4679 }
4680 }
4681 Ok(())
4682 }
4683
4684 async fn store_with_messages(
4688 temp: &TempDir,
4689 count: usize,
4690 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4691 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
4692 }
4693
4694 async fn store_with_messages_at_threshold(
4697 temp: &TempDir,
4698 count: usize,
4699 _vector_threshold: usize,
4700 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
4701 let store = Store::open_local(temp.path()).await?;
4702 let sessions = 8.min(count.max(1));
4703 let mut events = Vec::new();
4704 for s in 0..sessions {
4705 events.push(IngestEvent::Session(Session {
4706 id: format!("session-{s}"),
4707 parent_session_id: None,
4708 parent_message_id: None,
4709 source_agent: "claude-code".to_owned(),
4710 created_at: Utc::now(),
4711 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
4712 options: ProviderOptions::new(),
4713 }));
4714 for i in (s..count).step_by(sessions) {
4715 let message_id = format!("msg-{i}");
4716 events.push(IngestEvent::Message(Message::User {
4717 id: message_id.clone(),
4718 session_id: format!("session-{s}"),
4719 timestamp: Utc::now(),
4720 options: ProviderOptions::new(),
4721 }));
4722 events.push(IngestEvent::Part(Part {
4723 session_id: format!("session-{s}"),
4724 id: format!("{message_id}-part"),
4725 message_id,
4726 ordinal: 0,
4727 provenance: crate::wire::Provenance::Conversational,
4728 options: ProviderOptions::new(),
4729 kind: PartKind::Text {
4730 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
4731 },
4732 }));
4733 }
4734 }
4735 ingest_events(&store, events).await?;
4736 let keys = (0..count)
4737 .map(|i| MessageKey {
4738 session_id: format!("session-{}", i % sessions),
4739 message_id: format!("msg-{i}"),
4740 })
4741 .collect();
4742 Ok((store, keys))
4743 }
4744
4745 fn synthetic_vector(seed: usize) -> Vec<f32> {
4747 let mut state = (seed as u64)
4748 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
4749 .wrapping_add(1);
4750 (0..embedding_dim())
4751 .map(|_| {
4752 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
4753 #[allow(clippy::cast_precision_loss)]
4754 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
4755 unit - 1.0
4756 })
4757 .collect()
4758 }
4759
4760 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
4762 keys.iter()
4763 .enumerate()
4764 .map(|(seed, key)| EmbeddedMessage {
4765 session_id: key.session_id.clone(),
4766 id: key.message_id.clone(),
4767 vector: synthetic_vector(seed),
4768 })
4769 .collect()
4770 }
4771
4772 fn embedding_update_batch_with_model(
4773 rows: &[EmbeddedMessage],
4774 model: &str,
4775 ) -> Result<RecordBatch> {
4776 let mut batch = embedding_update_batch(rows)?;
4777 let columns = batch
4778 .columns()
4779 .iter()
4780 .take(3)
4781 .cloned()
4782 .chain(std::iter::once(
4783 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
4784 ))
4785 .collect::<Vec<_>>();
4786 batch = RecordBatch::try_new(batch.schema(), columns)?;
4787 Ok(batch)
4788 }
4789
4790 #[tokio::test]
4791 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
4792 let temp = TempDir::new()?;
4793 let (store, keys) = store_with_messages(&temp, 4).await?;
4797 store.write_embeddings(&embedded(&keys)).await?;
4798 store
4799 .optimize_indices(None, &MaintenancePolicy::always_compact())
4800 .await?
4801 .into_result()?;
4802
4803 let query = vec![0.01_f32; embedding_dim()];
4804 let plan = store
4805 .explain_vector_plan(
4806 &query,
4807 10,
4808 &Predicate::Eq("session_id", "session-3".into()),
4809 None,
4810 )
4811 .await?;
4812
4813 assert!(
4818 plan.contains("ScalarIndexQuery"),
4819 "expected a ScalarIndexQuery node in the plan:\n{plan}",
4820 );
4821 let predicate_postfiltered = plan
4822 .lines()
4823 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
4824 assert!(
4825 !predicate_postfiltered,
4826 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
4827 );
4828 Ok(())
4829 }
4830
4831 #[tokio::test]
4832 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
4833 let temp = TempDir::new()?;
4834 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4835
4836 store.write_embeddings(&embedded(&keys[..255])).await?;
4839 store
4840 .optimize_indices_with_vector_threshold(256)
4841 .await?
4842 .into_result()?;
4843 assert!(
4844 !store
4845 .handle
4846 .messages_index_names()
4847 .await?
4848 .iter()
4849 .any(|name| name == MESSAGES_VECTOR_INDEX),
4850 "IVF_PQ must not exist below the activation threshold",
4851 );
4852
4853 store.write_embeddings(&embedded(&keys[255..256])).await?;
4856 store
4857 .optimize_indices_with_vector_threshold(256)
4858 .await?
4859 .into_result()?;
4860 assert!(
4861 store
4862 .handle
4863 .messages_index_names()
4864 .await?
4865 .iter()
4866 .any(|name| name == MESSAGES_VECTOR_INDEX),
4867 "optimize must create the IVF_PQ once the threshold is crossed",
4868 );
4869
4870 let hits = store
4873 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
4874 .await?;
4875 assert!(
4876 hits.iter().any(|(key, _)| key == &keys[0]),
4877 "an embedded row is retrievable via the index",
4878 );
4879 Ok(())
4880 }
4881
4882 #[tokio::test]
4883 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
4884 {
4885 let temp = TempDir::new()?;
4886 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
4887 let old_rows = embedded(&keys);
4888 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
4889 store
4890 .handle
4891 .merge_update(Table::Messages, old_batch, old_rows.len())
4892 .await?;
4893 store
4894 .optimize_indices_with_vector_threshold(256)
4895 .await?
4896 .into_result()?;
4897 assert!(
4898 store
4899 .handle
4900 .messages_index_names()
4901 .await?
4902 .iter()
4903 .any(|name| name == MESSAGES_VECTOR_INDEX),
4904 "IVF_PQ must exist before a model swap",
4905 );
4906 assert_eq!(store.stale_embedding_count().await?, keys.len());
4907
4908 store.drop_vector_index().await?;
4909 let mut pending = Vec::new();
4910 let stream = store.pending_or_stale_messages();
4911 tokio::pin!(stream);
4912 while let Some(row) = stream.next().await {
4913 pending.push(row?);
4914 }
4915 assert_eq!(
4916 pending.len(),
4917 keys.len(),
4918 "force stream should see stale rows"
4919 );
4920 store.write_embeddings(&embedded(&keys)).await?;
4921 assert_eq!(store.stale_embedding_count().await?, 0);
4922 store
4923 .optimize_indices_with_vector_threshold(256)
4924 .await?
4925 .into_result()?;
4926 assert!(
4927 store
4928 .handle
4929 .messages_index_names()
4930 .await?
4931 .iter()
4932 .any(|name| name == MESSAGES_VECTOR_INDEX),
4933 "optimize must rebuild IVF_PQ after force re-embed",
4934 );
4935
4936 let stream = store.pending_or_stale_messages();
4937 tokio::pin!(stream);
4938 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
4939 Ok(())
4940 }
4941
4942 #[tokio::test]
4943 async fn session_last_ingested_at_falls_back_when_versions_pruned() -> anyhow::Result<()> {
4944 let temp = TempDir::new()?;
4953 let (store, _keys) = store_with_messages(&temp, 4).await?;
4954
4955 for tag in 0..3 {
4958 let extra = synthetic_session(&format!("extra-{tag}"));
4959 store.upsert_sessions(&[extra]).await?;
4960 }
4961
4962 let dataset = store.handle.dataset(Table::Sessions).await?;
4967 dataset
4968 .cleanup_old_versions(chrono::Duration::zero(), None, Some(false))
4969 .await
4970 .context("cleanup_old_versions failed")?;
4971
4972 let map = store.session_last_ingested_at().await?;
4973 let session_count = store.row_counts().await?.0;
4974 assert!(
4975 map.len() >= session_count,
4976 "watermark map ({}) must still cover every session ({}) after \
4977 version cleanup; an empty fallback regresses pond sync to a \
4978 full re-scan",
4979 map.len(),
4980 session_count,
4981 );
4982 Ok(())
4983 }
4984
4985 #[tokio::test]
4986 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
4987 let temp = TempDir::new()?;
4988 let (store, keys) = store_with_messages(&temp, 10).await?;
4989
4990 let before = store.embedding_progress().await?;
4991 assert_eq!(before.embedded, 0);
4992 assert_eq!(before.total, 10);
4993 assert_eq!(before.model, crate::embed::model_id());
4994
4995 store.write_embeddings(&embedded(&keys[..4])).await?;
4996 let partial = store.embedding_progress().await?;
4997 assert_eq!(partial.embedded, 4);
4998 assert_eq!(partial.total, 10);
4999
5000 store.write_embeddings(&embedded(&keys[4..])).await?;
5001 let full = store.embedding_progress().await?;
5002 assert_eq!(full.embedded, 10);
5003 assert_eq!(full.total, 10);
5004 Ok(())
5005 }
5006}