1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use arc_swap::ArcSwapOption;
12use async_stream::try_stream;
13use chrono::{DateTime, TimeZone, Utc};
14use lance::Dataset;
15use lance::dataset::{AutoCleanupParams, ProjectionRequest, WriteMode, WriteParams};
16use lance::deps::arrow_array::{
17 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
18 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
19 UInt64Array, new_null_array,
20};
21use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
22use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
23use lance::index::DatasetIndexExt;
24use lance_file::version::LanceFileVersion;
25use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
26use serde::{Deserialize, Serialize, de::DeserializeOwned};
27use serde_json::Value;
28use tokio_stream::{Stream, StreamExt};
29
30use crate::{
31 config, embed,
32 rowmap::{RowMetaEntry, RowMetaMap, RowMetaSet, discover_chain},
33 substrate::{
34 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
35 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
36 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
37 },
38 wire::{FileData, Message, Part, PartKind, Role, SUMMARY_PART_TYPES, Session, SessionFrom},
39};
40use url::Url;
41
42#[derive(Debug)]
43pub struct Store {
44 handle: Handle,
45 rowmap: ArcSwapOption<RowMetaSet>,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveCounts {
55 pub sessions: usize,
56 pub messages: usize,
57 pub parts: usize,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveVersions {
62 pub sessions: u64,
63 pub messages: u64,
64 pub parts: u64,
65}
66
67#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
68pub struct LanceArchiveExport {
69 pub rows: LanceArchiveCounts,
70 pub source_versions: LanceArchiveVersions,
71}
72
73#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
74pub struct LanceArchiveImport {
75 pub rows: LanceArchiveCounts,
76 pub inserted: LanceArchiveCounts,
77}
78
79#[derive(Debug, Clone, Default)]
91pub struct TablePlan {
92 pub append: Vec<String>,
93 pub merge: Vec<String>,
94}
95
96impl TablePlan {
97 pub fn is_empty(&self) -> bool {
98 self.append.is_empty() && self.merge.is_empty()
99 }
100}
101
102#[derive(Debug, Clone, Default)]
108pub struct DeltaPlan {
109 pub sessions: TablePlan,
110 pub messages: TablePlan,
111 pub parts: TablePlan,
112 pub source_sessions: usize,
113}
114
115impl DeltaPlan {
116 pub fn is_empty(&self) -> bool {
117 self.sessions.is_empty() && self.messages.is_empty() && self.parts.is_empty()
118 }
119
120 pub fn new_sessions(&self) -> usize {
124 self.sessions.append.len()
125 }
126
127 pub fn total(&self) -> usize {
130 let mut seen = std::collections::HashSet::new();
131 for plan in [&self.sessions, &self.messages, &self.parts] {
132 seen.extend(plan.append.iter());
133 seen.extend(plan.merge.iter());
134 }
135 seen.len()
136 }
137}
138
139#[derive(Debug, Clone, Default)]
140pub struct IndexIntents {
141 pub sessions: Vec<IndexIntent>,
142 pub messages: Vec<IndexIntent>,
143 pub parts: Vec<IndexIntent>,
144}
145
146impl IndexIntents {
147 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
148 [
149 (Table::Sessions, &self.sessions),
150 (Table::Messages, &self.messages),
151 (Table::Parts, &self.parts),
152 ]
153 }
154}
155
156#[derive(Debug, Clone, PartialEq)]
160pub struct PendingMessage {
161 pub session_id: String,
162 pub id: String,
163 pub search_text: String,
164}
165
166#[derive(Debug, Clone, PartialEq)]
169pub struct EmbeddedMessage {
170 pub session_id: String,
171 pub id: String,
172 pub vector: Vec<f32>,
173}
174
175#[derive(Debug, Clone, PartialEq)]
177pub struct MessageMeta {
178 pub message_id: String,
179 pub session_id: String,
180 pub role: String,
181 pub project: String,
182 pub source_agent: String,
183 pub timestamp: DateTime<Utc>,
184 pub search_text: String,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
188pub struct MessageKey {
189 pub session_id: String,
190 pub message_id: String,
191}
192
193#[derive(Debug, Clone, PartialEq)]
198pub struct SearchHit {
199 pub rowid: Option<u64>,
200 pub key: MessageKey,
201 pub score: f32,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub enum UpsertStatus {
206 Inserted,
207 Matched,
208}
209
210#[derive(Debug, Default)]
215pub struct OptimizeOutcome {
216 pub tables: Vec<TableOptimizeOutcome>,
217}
218
219impl OptimizeOutcome {
220 pub fn any_indices_failed(&self) -> bool {
223 self.tables.iter().any(|t| t.indices.is_failed())
224 }
225
226 pub fn into_result(self) -> Result<Self> {
230 for table in &self.tables {
231 if let PhaseOutcome::Failed(error) = &table.indices {
232 anyhow::bail!(
233 "indices phase failed on {}: {error:#}",
234 table.table.as_str()
235 );
236 }
237 if let PhaseOutcome::Failed(error) = &table.compaction {
238 anyhow::bail!(
239 "compaction phase failed on {}: {error:#}",
240 table.table.as_str()
241 );
242 }
243 }
244 Ok(self)
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249pub struct RowTotals {
250 pub sessions: u64,
251 pub messages: u64,
252 pub parts: u64,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct EmbeddingProgress {
265 pub embedded: usize,
266 pub total: usize,
267 pub backlog: usize,
268 pub model: &'static str,
269}
270
271#[derive(Debug, Clone, Copy)]
272pub struct MessageWrite<'a> {
273 pub message: &'a Message,
274 pub parts: &'a [Part],
275 pub search_text: Option<&'a str>,
276}
277
278impl Store {
279 pub async fn open(location: &Url) -> Result<Self> {
285 Ok(Self {
286 handle: Handle::open(location).await?,
287 rowmap: ArcSwapOption::empty(),
288 })
289 }
290
291 pub fn lance_cache_bytes(&self) -> u64 {
294 self.handle.lance_cache_bytes()
295 }
296
297 pub async fn open_with_options(
303 location: &Url,
304 storage_options: std::collections::HashMap<String, String>,
305 caps: crate::substrate::RuntimeCaps,
306 ) -> Result<Self> {
307 Ok(Self {
308 handle: Handle::open_with_options(location, storage_options, caps).await?,
309 rowmap: ArcSwapOption::empty(),
310 })
311 }
312
313 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
318 let url = config::url_for_path(path)?;
319 Self::open_with_options(
320 &url,
321 std::collections::HashMap::new(),
322 crate::substrate::RuntimeCaps::default(),
323 )
324 .await
325 }
326
327 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
335 std::fs::create_dir_all(dest)
336 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
337 let (sessions, sessions_version) = self
338 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
339 .await?;
340 let (messages, messages_version) = self
341 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
342 .await?;
343 let (parts, parts_version) = self
344 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
345 .await?;
346 Ok(LanceArchiveExport {
347 rows: LanceArchiveCounts {
348 sessions,
349 messages,
350 parts,
351 },
352 source_versions: LanceArchiveVersions {
353 sessions: sessions_version,
354 messages: messages_version,
355 parts: parts_version,
356 },
357 })
358 }
359
360 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
361 let sessions_dataset =
362 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
363 let messages_dataset =
364 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
365 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
366 let (sessions, sessions_inserted) = self
367 .import_clean_table(Table::Sessions, sessions_dataset)
368 .await?;
369 let (messages, messages_inserted) = self
370 .import_clean_table(Table::Messages, messages_dataset)
371 .await?;
372 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
373 Ok(LanceArchiveImport {
374 rows: LanceArchiveCounts {
375 sessions,
376 messages,
377 parts,
378 },
379 inserted: LanceArchiveCounts {
380 sessions: sessions_inserted,
381 messages: messages_inserted,
382 parts: parts_inserted,
383 },
384 })
385 }
386
387 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
388 let dataset = self.handle.dataset(table).await?;
389 let source_version = dataset.version_id();
390 let schema = export_schema(table);
391 let mut scan = dataset.scan();
392 scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
397 let mut stream = scan
398 .try_into_stream()
399 .await
400 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
401 let dest_uri = dest
402 .to_str()
403 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
404
405 let mut rows = 0usize;
406 let mut wrote = false;
407 while let Some(batch) = stream.next().await {
408 let batch = batch
409 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
410 rows += batch.num_rows();
411 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
412 let mut params = write_params_for_create();
413 if wrote {
414 params.mode = WriteMode::Append;
415 }
416 Dataset::write(reader, dest_uri, Some(params))
417 .await
418 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
419 wrote = true;
420 }
421
422 if !wrote {
423 let batch = RecordBatch::new_empty(schema.clone());
424 let reader = RecordBatchIterator::new([Ok(batch)], schema);
425 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
426 .await
427 .with_context(|| {
428 format!("failed to write empty {} archive table", table.as_str())
429 })?;
430 }
431 Ok((rows, source_version))
432 }
433
434 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
435 let _ = self.handle.dataset(table).await?;
439 self.merge_scanner(table, dataset.scan(), "archive import")
440 .await
441 }
442
443 async fn merge_scanner(
449 &self,
450 table: Table,
451 mut scanner: lance::dataset::scanner::Scanner,
452 context: &'static str,
453 ) -> Result<(usize, usize)> {
454 scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
455 let mut stream = scanner
456 .try_into_stream()
457 .await
458 .with_context(|| format!("failed to scan {} for {context}", table.as_str()))?;
459 let mut rows = 0usize;
460 let mut inserted = 0usize;
461 while let Some(batch) = stream.next().await {
462 let batch = batch
463 .with_context(|| format!("failed to read {} {context} batch", table.as_str()))?;
464 let row_count = batch.num_rows();
465 rows += row_count;
466 let stats = self
467 .handle
468 .merge_insert_stats(table, batch, row_count)
469 .await
470 .with_context(|| format!("failed to merge {} during {context}", table.as_str()))?;
471 inserted += (stats.num_inserted_rows + stats.num_updated_rows) as usize;
472 }
473 Ok((rows, inserted))
474 }
475
476 pub async fn all_session_message_counts(&self) -> Result<HashMap<String, usize>> {
489 self.all_session_row_counts(Table::Messages).await
490 }
491
492 pub async fn all_session_part_counts(&self) -> Result<HashMap<String, usize>> {
493 self.all_session_row_counts(Table::Parts).await
494 }
495
496 async fn all_session_row_counts(&self, table: Table) -> Result<HashMap<String, usize>> {
501 let scanner = self
502 .handle
503 .scan(table, ScanOpts::project_only(&["session_id"]))
504 .await?;
505 let mut stream = scanner.try_into_stream().await?;
506 let mut out: HashMap<String, usize> = HashMap::new();
507 while let Some(batch) = stream.next().await {
508 let batch = batch?;
509 let session_ids = batch
510 .column_by_name("session_id")
511 .context("scan projection dropped the session_id column")?
512 .as_any()
513 .downcast_ref::<StringArray>()
514 .context("session_id column is not Utf8")?;
515 for row in 0..batch.num_rows() {
516 if session_ids.is_null(row) {
517 continue;
518 }
519 let session_id = session_ids.value(row);
520 if let Some(count) = out.get_mut(session_id) {
521 *count += 1;
522 } else {
523 out.insert(session_id.to_owned(), 1);
524 }
525 }
526 }
527 Ok(out)
528 }
529
530 pub async fn plan_incremental_from(&self, source: &Store) -> Result<DeltaPlan> {
539 let (
540 source_ids,
541 dest_ids,
542 source_msg_counts,
543 dest_msg_counts,
544 source_part_counts,
545 dest_part_counts,
546 ) = tokio::try_join!(
547 source.collect_ids(Table::Sessions),
548 self.collect_ids(Table::Sessions),
549 source.all_session_message_counts(),
550 self.all_session_message_counts(),
551 source.all_session_part_counts(),
552 self.all_session_part_counts(),
553 )?;
554 let source_sessions = source_ids.len();
555 let mut plan = DeltaPlan {
556 source_sessions,
557 ..DeltaPlan::default()
558 };
559 for id in &source_ids {
560 if !dest_ids.contains(id) {
563 plan.sessions.append.push(id.clone());
564 }
565 let source_msgs = source_msg_counts.get(id).copied().unwrap_or(0);
566 let dest_msgs = dest_msg_counts.get(id).copied().unwrap_or(0);
567 if dest_msgs == 0 {
568 if source_msgs > 0 {
569 plan.messages.append.push(id.clone());
570 }
571 } else if source_msgs > dest_msgs {
572 plan.messages.merge.push(id.clone());
573 }
574 let source_parts = source_part_counts.get(id).copied().unwrap_or(0);
575 let dest_parts = dest_part_counts.get(id).copied().unwrap_or(0);
576 if dest_parts == 0 {
577 if source_parts > 0 {
578 plan.parts.append.push(id.clone());
579 }
580 } else if source_parts > dest_parts {
581 plan.parts.merge.push(id.clone());
582 }
583 }
584 Ok(plan)
585 }
586
587 pub async fn copy_delta_from(
598 &self,
599 source: &Store,
600 plan: &DeltaPlan,
601 ) -> Result<LanceArchiveImport> {
602 let ((sessions, sessions_inserted), (messages, messages_inserted), (parts, parts_inserted)) =
606 tokio::try_join!(
607 self.copy_table(
608 source,
609 Table::Sessions,
610 "id",
611 &plan.sessions,
612 plan.source_sessions,
613 ),
614 self.copy_table(
615 source,
616 Table::Messages,
617 "session_id",
618 &plan.messages,
619 plan.source_sessions,
620 ),
621 self.copy_table(
622 source,
623 Table::Parts,
624 "session_id",
625 &plan.parts,
626 plan.source_sessions,
627 ),
628 )?;
629 Ok(LanceArchiveImport {
630 rows: LanceArchiveCounts {
631 sessions,
632 messages,
633 parts,
634 },
635 inserted: LanceArchiveCounts {
636 sessions: sessions_inserted,
637 messages: messages_inserted,
638 parts: parts_inserted,
639 },
640 })
641 }
642
643 async fn copy_table(
649 &self,
650 source: &Store,
651 table: Table,
652 key_column: &'static str,
653 table_plan: &TablePlan,
654 source_sessions: usize,
655 ) -> Result<(usize, usize)> {
656 let _ = self.handle.dataset(table).await?;
660
661 let appended = self
662 .append_sessions(
663 source,
664 table,
665 key_column,
666 &table_plan.append,
667 source_sessions,
668 )
669 .await?;
670
671 let mut merged_rows = 0usize;
675 let mut merged_inserted = 0usize;
676 for chunk in table_plan.merge.chunks(COPY_SESSION_IN_CHUNK) {
677 let predicate = in_predicate(key_column, chunk);
678 let scanner = source
679 .handle
680 .scan(
681 table,
682 ScanOpts {
683 predicate: Some(&predicate),
684 projection: None,
685 },
686 )
687 .await?;
688 let (r, i) = self.merge_scanner(table, scanner, "copy").await?;
689 merged_rows += r;
690 merged_inserted += i;
691 }
692
693 Ok((appended + merged_rows, appended + merged_inserted))
694 }
695
696 async fn append_sessions(
703 &self,
704 source: &Store,
705 table: Table,
706 key_column: &'static str,
707 session_ids: &[String],
708 source_sessions: usize,
709 ) -> Result<usize> {
710 if session_ids.is_empty() {
711 return Ok(0);
712 }
713 if session_ids.len() == source_sessions {
714 return self.append_scanner(source, table, None).await;
715 }
716 let mut rows = 0usize;
717 for chunk in session_ids.chunks(COPY_SESSION_IN_CHUNK) {
718 let predicate = in_predicate(key_column, chunk);
719 rows += self.append_scanner(source, table, Some(&predicate)).await?;
720 }
721 Ok(rows)
722 }
723
724 async fn append_scanner(
730 &self,
731 source: &Store,
732 table: Table,
733 predicate: Option<&Predicate>,
734 ) -> Result<usize> {
735 let make_source = || async {
736 let mut scanner = source
737 .handle
738 .scan(
739 table,
740 ScanOpts {
741 predicate,
742 projection: None,
743 },
744 )
745 .await?;
746 scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
747 let stream: SendableRecordBatchStream = scanner
748 .try_into_stream()
749 .await
750 .with_context(|| format!("failed to scan {} for copy", table.as_str()))?
751 .into();
752 Ok(stream)
753 };
754 let stats = self.handle.append_stream(table, make_source).await?;
755 Ok(stats.rows as usize)
756 }
757
758 pub async fn append_absent_rows(
763 &self,
764 source: &Store,
765 table: Table,
766 filter_column: &'static str,
767 values: &[String],
768 ) -> Result<usize> {
769 if values.is_empty() {
770 return Ok(0);
771 }
772 let _ = self.handle.dataset(table).await?;
773 let mut rows = 0usize;
774 for chunk in values.chunks(COPY_SESSION_IN_CHUNK) {
775 let predicate = in_predicate(filter_column, chunk);
776 rows += self.append_scanner(source, table, Some(&predicate)).await?;
777 }
778 Ok(rows)
779 }
780
781 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
786 if sessions.is_empty() {
787 return Ok(());
788 }
789 let batches = sessions_batches(sessions)?;
790 merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
791 Ok(())
792 }
793
794 async fn upsert_session_batch(
818 &self,
819 substreams: Vec<CompletedSubstream>,
820 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
821 if substreams.is_empty() {
822 return Ok((Vec::new(), BatchCounts::default()));
823 }
824
825 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
826 let mut counts = BatchCounts::default();
827
828 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
832 let mut by_session_id: std::collections::HashMap<String, usize> =
833 std::collections::HashMap::with_capacity(substreams.len());
834 for substream in substreams {
835 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
836 let existing = &merged[existing_idx];
837 if existing.session.source_agent != substream.session.source_agent
838 || existing.session.project != substream.session.project
839 {
840 let reason = if existing.session.source_agent != substream.session.source_agent
845 {
846 IngestError::ImmutableField {
847 field: "source_agent",
848 session_id: substream.session.id.clone(),
849 stored: existing.session.source_agent.clone(),
850 attempted: substream.session.source_agent.clone(),
851 }
852 } else {
853 IngestError::ImmutableField {
854 field: "project",
855 session_id: substream.session.id.clone(),
856 stored: (*existing.session.project).clone(),
857 attempted: (*substream.session.project).clone(),
858 }
859 };
860 let field = match &reason {
861 IngestError::ImmutableField { field, .. } => Some(*field),
862 };
863 let reason_key = match field {
864 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
865 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
866 _ => DROP_REASON_UNCATEGORIZED,
867 };
868 outcomes.extend(error_outcomes_for_substream(
869 substream.session_index,
870 &substream.session,
871 &substream.messages,
872 reason.to_string(),
873 field,
874 reason_key,
875 ));
876 continue;
877 }
878 let existing = &mut merged[existing_idx];
883 let mut seen: std::collections::HashSet<String> = existing
884 .messages
885 .iter()
886 .map(|m| m.message.id().to_owned())
887 .collect();
888 for msg in substream.messages {
889 if seen.insert(msg.message.id().to_owned()) {
890 existing.messages.push(msg);
891 }
892 }
893 continue;
894 }
895 by_session_id.insert(substream.session.id.clone(), merged.len());
896 merged.push(substream);
897 }
898
899 let session_id_values: Vec<ScalarValue> = merged
904 .iter()
905 .map(|substream| ScalarValue::String(substream.session.id.clone()))
906 .collect();
907 let existing_sessions: std::collections::HashMap<String, Session> =
908 if session_id_values.is_empty() {
909 std::collections::HashMap::new()
910 } else {
911 let batch = self
912 .handle
913 .scan_batch(
914 Table::Sessions,
915 Some(&Predicate::In("id", session_id_values.clone())),
916 &[],
917 )
918 .await?;
919 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
920 for row in 0..batch.num_rows() {
921 let session = session_from_batch(&batch, row)?;
922 map.insert(session.id.clone(), session);
923 }
924 map
925 };
926 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
927 HashSet::new()
928 } else {
929 let batch = self
930 .handle
931 .scan_batch(
932 Table::Messages,
933 Some(&Predicate::In("session_id", session_id_values.clone())),
934 &["session_id", "id"],
935 )
936 .await?;
937 let mut set = HashSet::with_capacity(batch.num_rows());
938 for row in 0..batch.num_rows() {
939 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
940 let mid = string(&batch, "id", row)?.context("message id is null")?;
941 set.insert((sid, mid));
942 }
943 set
944 };
945 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
946 HashSet::new()
947 } else {
948 let batch = self
949 .handle
950 .scan_batch(
951 Table::Parts,
952 Some(&Predicate::In("session_id", session_id_values)),
953 &["session_id", "message_id", "id"],
954 )
955 .await?;
956 let mut set = HashSet::with_capacity(batch.num_rows());
957 for row in 0..batch.num_rows() {
958 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
959 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
960 let pid = string(&batch, "id", row)?.context("part id is null")?;
961 set.insert((sid, mid, pid));
962 }
963 set
964 };
965
966 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
967 for substream in merged {
968 if let Some(existing) = existing_sessions.get(&substream.session.id)
969 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
970 {
971 let field = match &failure {
972 IngestError::ImmutableField { field, .. } => Some(*field),
973 };
974 let reason_key = match field {
975 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
976 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
977 _ => DROP_REASON_UNCATEGORIZED,
978 };
979 outcomes.extend(error_outcomes_for_substream(
980 substream.session_index,
981 &substream.session,
982 &substream.messages,
983 failure.to_string(),
984 field,
985 reason_key,
986 ));
987 continue;
988 }
989 writeable.push(substream);
990 }
991
992 if writeable.is_empty() {
993 outcomes.sort_by_key(|outcome| outcome.index);
994 return Ok((outcomes, counts));
995 }
996
997 let sessions_owned: Vec<Session> = writeable
998 .iter()
999 .map(|substream| substream.session.clone())
1000 .collect();
1001 let mut seen_messages: HashSet<(String, String)> = HashSet::new();
1005 let message_rows: Vec<MessageBatchRow<'_>> = writeable
1006 .iter()
1007 .flat_map(|substream| {
1008 substream.messages.iter().map(|buffered| MessageBatchRow {
1009 message: &buffered.message,
1010 source_agent: &substream.session.source_agent,
1011 project: &substream.session.project,
1012 search_text: buffered.search_text.as_deref(),
1013 })
1014 })
1015 .filter(|row| {
1016 let key = (
1017 row.message.session_id().to_owned(),
1018 row.message.id().to_owned(),
1019 );
1020 !existing_message_pks.contains(&key) && seen_messages.insert(key)
1021 })
1022 .collect();
1023 let mut seen_parts: HashSet<(String, String, String)> = HashSet::new();
1024 let part_rows: Vec<Part> = writeable
1025 .iter()
1026 .flat_map(|substream| {
1027 substream.messages.iter().flat_map(|buffered| {
1028 buffered
1029 .parts
1030 .iter()
1031 .map(|buffered_part| buffered_part.part.clone())
1032 })
1033 })
1034 .filter(|part| {
1035 let key = (
1036 part.session_id.clone(),
1037 part.message_id.clone(),
1038 part.id.clone(),
1039 );
1040 !existing_part_pks.contains(&key) && seen_parts.insert(key)
1041 })
1042 .collect();
1043
1044 let session_batches = sessions_batches(&sessions_owned)?;
1045 let message_batches = messages_batches(&message_rows)?;
1046 let part_batches = parts_batches(&part_rows)?;
1047
1048 let (_messages_appended, _parts_appended) = tokio::try_join!(
1049 self.handle.append_batches(Table::Messages, message_batches),
1050 self.handle.append_batches(Table::Parts, part_batches),
1051 )?;
1052 let _sessions_inserted =
1053 merge_insert_chunks(&self.handle, Table::Sessions, session_batches).await?;
1054
1055 for substream in &writeable {
1056 outcomes.extend(success_outcomes_for_substream(
1057 substream.session_index,
1058 &substream.session,
1059 &substream.messages,
1060 &existing_sessions,
1061 &existing_message_pks,
1062 &existing_part_pks,
1063 &mut counts,
1064 ));
1065 }
1066
1067 outcomes.sort_by_key(|outcome| outcome.index);
1068 Ok((outcomes, counts))
1069 }
1070
1071 pub async fn upsert_messages(
1072 &self,
1073 session: &Session,
1074 messages: &[MessageWrite<'_>],
1075 ) -> Result<()> {
1076 if messages.is_empty() {
1077 return Ok(());
1078 }
1079
1080 let rows = messages
1081 .iter()
1082 .map(|write| MessageBatchRow {
1083 message: write.message,
1084 source_agent: &session.source_agent,
1085 project: &session.project,
1086 search_text: write.search_text,
1087 })
1088 .collect::<Vec<_>>();
1089 let batches = messages_batches(&rows)?;
1090 merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
1091 Ok(())
1092 }
1093
1094 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
1095 if parts.is_empty() {
1096 return Ok(());
1097 }
1098 let batches = parts_batches(parts)?;
1099 merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
1100 Ok(())
1101 }
1102
1103 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
1104 let Some(session) = self.find_session(session_id).await? else {
1105 return Ok(None);
1106 };
1107 let messages = self.messages_for_session(session_id).await?;
1108 Ok(Some(SessionWithMessages { session, messages }))
1109 }
1110
1111 pub async fn session_ids(&self) -> Result<Vec<String>> {
1113 let batch = self
1114 .handle
1115 .scan_batch(Table::Sessions, None, &["id"])
1116 .await?;
1117 let mut ids = Vec::with_capacity(batch.num_rows());
1118 for row in 0..batch.num_rows() {
1119 if let Some(id) = string(&batch, "id", row)? {
1120 ids.push(id);
1121 }
1122 }
1123 Ok(ids)
1124 }
1125
1126 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
1127 let batch = self
1128 .handle
1129 .scan_batch(
1130 Table::Sessions,
1131 Some(&Predicate::Eq(
1132 "parent_session_id",
1133 parent_session_id.into(),
1134 )),
1135 &[
1136 "id",
1137 "parent_session_id",
1138 "parent_message_id",
1139 "source_agent",
1140 "created_at",
1141 "project",
1142 "options",
1143 ],
1144 )
1145 .await?;
1146 let mut sessions = Vec::with_capacity(batch.num_rows());
1147 for row in 0..batch.num_rows() {
1148 sessions.push(session_from_batch(&batch, row)?);
1149 }
1150 sessions.sort_by(|left, right| left.id.cmp(&right.id));
1151 Ok(sessions)
1152 }
1153
1154 pub async fn session_last_message_ids(&self) -> Result<HashMap<String, String>> {
1167 let (session_ids, latest) = tokio::try_join!(self.collect_ids(Table::Sessions), async {
1168 let scanner = self
1169 .handle
1170 .scan(
1171 Table::Messages,
1172 ScanOpts::project_only(&["session_id", "id", "timestamp"]),
1173 )
1174 .await?;
1175 let mut stream = scanner.try_into_stream().await?;
1176 let mut latest: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
1177 while let Some(batch) = stream.next().await {
1178 let batch = batch?;
1179 let session_ids = batch
1180 .column_by_name("session_id")
1181 .context("scan projection dropped the session_id column")?
1182 .as_any()
1183 .downcast_ref::<StringArray>()
1184 .context("session_id column is not Utf8")?;
1185 for row in 0..batch.num_rows() {
1186 if session_ids.is_null(row) {
1187 continue;
1188 }
1189 let session_id = session_ids.value(row);
1190 let Some(id) = string(&batch, "id", row)? else {
1191 continue;
1192 };
1193 let timestamp = datetime(&batch, "timestamp", row)?;
1194 match latest.get_mut(session_id) {
1195 Some((stored_ts, stored_id))
1196 if timestamp > *stored_ts
1197 || (timestamp == *stored_ts
1198 && id.as_str() > stored_id.as_str()) =>
1199 {
1200 *stored_ts = timestamp;
1201 *stored_id = id;
1202 }
1203 None => {
1204 latest.insert(session_id.to_owned(), (timestamp, id));
1205 }
1206 _ => {}
1207 }
1208 }
1209 }
1210 Ok::<_, anyhow::Error>(latest)
1211 })?;
1212 Ok(latest
1213 .into_iter()
1214 .filter(|(session_id, _)| session_ids.contains(session_id))
1215 .map(|(session_id, (_, message_id))| (session_id, message_id))
1216 .collect())
1217 }
1218
1219 pub async fn session_view(
1228 &self,
1229 session_id: &str,
1230 params: SessionViewParams<'_>,
1231 ) -> Result<GetLookup<SessionPage>> {
1232 let Some(session) = self.find_session(session_id).await? else {
1233 return Ok(GetLookup::NotFound);
1234 };
1235 let mut rows: Vec<ScanRow> = self
1236 .scan_conversational_messages(session_id)
1237 .await?
1238 .into_iter()
1239 .map(|row| ScanRow {
1240 id: row.message_id,
1241 role: row.role,
1242 timestamp: row.timestamp,
1243 text: Some(row.text.into_inner()),
1244 content: None,
1245 })
1246 .collect();
1247 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1248
1249 let size = |row: &ScanRow| row.text.as_deref().map_or(0, str::len);
1250 let total = rows.len();
1251 let (win_start, win_end) = match (params.after_message_id, params.before_message_id) {
1254 (Some(after), _) => {
1255 let pos = match rows.iter().position(|row| row.id == after) {
1256 Some(idx) => idx + 1,
1257 None => return Ok(GetLookup::UnknownAnchor),
1258 };
1259 let n = page_by(&rows[pos..], params.limit, params.budget_bytes, size);
1260 (pos, pos + n)
1261 }
1262 (None, Some(before)) => {
1263 let pos = match rows.iter().position(|row| row.id == before) {
1264 Some(idx) => idx,
1265 None => return Ok(GetLookup::UnknownAnchor),
1266 };
1267 let n = page_tail(&rows[..pos], params.limit, params.budget_bytes, size);
1268 (pos - n, pos)
1269 }
1270 (None, None) => match params.session_from {
1271 SessionFrom::Start => (0, page_by(&rows, params.limit, params.budget_bytes, size)),
1272 SessionFrom::End => {
1273 let n = page_tail(&rows, params.limit, params.budget_bytes, size);
1274 (total - n, total)
1275 }
1276 },
1277 };
1278 let emitted = &rows[win_start..win_end];
1279 let before_remaining = win_start;
1280 let after_remaining = total - win_end;
1281 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
1282
1283 let mut parts_by_message = self.summary_parts_for_messages(session_id, &ids).await?;
1284 let messages = emitted
1285 .iter()
1286 .map(|row| RetrievedMessage {
1287 id: row.id.clone(),
1288 role: row.role,
1289 timestamp: row.timestamp,
1290 text: row.text.clone(),
1291 content: row.content.clone(),
1292 parts: parts_by_message
1293 .remove(&(session_id.to_owned(), row.id.clone()))
1294 .unwrap_or_default(),
1295 })
1296 .collect();
1297
1298 Ok(GetLookup::Found(SessionPage {
1299 session,
1300 messages,
1301 before_remaining,
1302 after_remaining,
1303 }))
1304 }
1305
1306 pub async fn message_view(
1312 &self,
1313 message_id: &str,
1314 params: MessageViewParams,
1315 ) -> Result<GetLookup<MessagePage>> {
1316 let Some(session_id) = self.session_id_for_message(message_id).await? else {
1317 return Ok(GetLookup::NotFound);
1318 };
1319 let Some(session) = self.find_session(&session_id).await? else {
1320 return Ok(GetLookup::NotFound);
1321 };
1322 let mut rows = self.scan_all_messages(&session_id).await?;
1323 rows.retain(|row| row.text.is_some() || row.id == message_id);
1328 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1329 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
1330 return Ok(GetLookup::NotFound);
1331 };
1332
1333 let start = target_pos.saturating_sub(params.context_before);
1334 let end = (target_pos + params.context_after + 1).min(rows.len());
1335 let window = &rows[start..end];
1336 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
1337 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
1340
1341 let all_parts = parts_by_message
1342 .remove(&(session_id.clone(), message_id.to_owned()))
1343 .unwrap_or_default();
1344 let part_count = page_by(&all_parts, 1000, params.budget_bytes, |part| {
1347 serde_json::to_string(part).map_or(0, |json| json.len())
1348 });
1349 let target_parts = all_parts[..part_count].to_vec();
1350 let target_parts_remaining = all_parts.len() - part_count;
1351
1352 let target_row = &rows[target_pos];
1353 let target = RetrievedMessage {
1354 id: target_row.id.clone(),
1355 role: target_row.role,
1356 timestamp: target_row.timestamp,
1357 text: target_row.text.clone(),
1358 content: target_row.content.clone(),
1359 parts: Vec::new(),
1361 };
1362 let siblings = window
1363 .iter()
1364 .enumerate()
1365 .filter(|(idx, _)| start + idx != target_pos)
1366 .map(|(_, row)| RetrievedMessage {
1367 id: row.id.clone(),
1368 role: row.role,
1369 timestamp: row.timestamp,
1370 text: row.text.clone(),
1371 content: row.content.clone(),
1372 parts: parts_by_message
1373 .get(&(session_id.clone(), row.id.clone()))
1374 .cloned()
1375 .unwrap_or_default(),
1376 })
1377 .collect();
1378
1379 Ok(GetLookup::Found(MessagePage {
1380 session,
1381 target,
1382 target_parts,
1383 target_parts_remaining,
1384 siblings,
1385 }))
1386 }
1387
1388 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1389 let batch = self
1390 .handle
1391 .scan_batch(
1392 Table::Messages,
1393 Some(&Predicate::Eq("session_id", session_id.into())),
1394 &["id", "timestamp", "role", "search_text", "content"],
1395 )
1396 .await?;
1397 let mut rows = Vec::with_capacity(batch.num_rows());
1398 for row in 0..batch.num_rows() {
1399 let id = string(&batch, "id", row)?.context("message id is null")?;
1400 let role =
1401 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1402 let timestamp = datetime(&batch, "timestamp", row)?;
1403 rows.push(ScanRow {
1404 id,
1405 role,
1406 timestamp,
1407 text: string(&batch, "search_text", row)?,
1408 content: string(&batch, "content", row)?,
1409 });
1410 }
1411 Ok(rows)
1412 }
1413
1414 pub async fn scan_conversational_messages(
1418 &self,
1419 session_id: &str,
1420 ) -> Result<Vec<ConversationalRow>> {
1421 let filter = Predicate::And(vec![
1422 Predicate::Eq("session_id", session_id.into()),
1423 Predicate::IsNotNull("search_text"),
1424 ]);
1425 let batch = self
1426 .handle
1427 .scan_batch(
1428 Table::Messages,
1429 Some(&filter),
1430 &["id", "timestamp", "role", "search_text"],
1431 )
1432 .await?;
1433
1434 let mut rows = Vec::with_capacity(batch.num_rows());
1435 for row in 0..batch.num_rows() {
1436 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1437 let role =
1438 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1439 let timestamp = datetime(&batch, "timestamp", row)?;
1440 let text_str = string(&batch, "search_text", row)?.context(
1441 "search_text null after IsNotNull pushdown - storage invariant violated",
1442 )?;
1443 rows.push(ConversationalRow {
1444 session_id: session_id.to_owned(),
1445 message_id,
1446 role,
1447 timestamp,
1448 text: SearchText(text_str),
1449 });
1450 }
1451 rows.sort_by(|a, b| {
1452 a.timestamp
1453 .cmp(&b.timestamp)
1454 .then_with(|| a.message_id.cmp(&b.message_id))
1455 });
1456 Ok(rows)
1457 }
1458
1459 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1462 let batch = self
1463 .handle
1464 .scan_batch(
1465 Table::Messages,
1466 Some(&Predicate::Eq("id", message_id.into())),
1467 &["session_id"],
1468 )
1469 .await?;
1470 if batch.num_rows() == 0 {
1471 return Ok(None);
1472 }
1473 string(&batch, "session_id", 0)
1474 }
1475
1476 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1477 self.handle.row_counts().await
1478 }
1479
1480 pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
1483 self.handle.collect_ids(table).await
1484 }
1485
1486 pub async fn id_diff_against(
1491 &self,
1492 table: Table,
1493 present: &std::collections::HashSet<String>,
1494 ) -> Result<(usize, usize)> {
1495 let scanner = self
1496 .handle
1497 .scan(table, ScanOpts::project_only(&["id"]))
1498 .await?;
1499 let mut stream = scanner.try_into_stream().await?;
1500 let (mut rows, mut absent) = (0usize, 0usize);
1501 while let Some(batch) = stream.next().await {
1502 let batch = batch?;
1503 let ids = batch
1504 .column_by_name("id")
1505 .context("scan projection dropped the id column")?
1506 .as_any()
1507 .downcast_ref::<StringArray>()
1508 .context("id column is not Utf8")?;
1509 for id in ids.iter().flatten() {
1510 rows += 1;
1511 if !present.contains(id) {
1512 absent += 1;
1513 }
1514 }
1515 }
1516 Ok((rows, absent))
1517 }
1518
1519 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1523 Ok(Arc::new(self.handle.dataset(table).await?))
1524 }
1525
1526 pub async fn prewarm(&self, cache_dir: &Path) -> Result<()> {
1537 let messages = self.dataset(Table::Messages).await?;
1538 if let Err(error) = messages.prewarm_index(MESSAGES_VECTOR_INDEX).await {
1539 tracing::debug!(%error, "vector index prewarm skipped");
1540 }
1541 if let Err(error) = self.ensure_rowmap(cache_dir).await {
1544 tracing::warn!(%error, "rowmap build skipped; arms fall back to data-take resolution");
1545 }
1546 if let Err(error) = self
1549 .fts_search("pond", 1, &Predicate::And(Vec::new()))
1550 .await
1551 {
1552 tracing::debug!(%error, "fts index prewarm skipped");
1553 }
1554 Ok(())
1555 }
1556
1557 fn store_key(&self) -> String {
1560 let digest = blake3::hash(self.handle.location().as_str().as_bytes());
1561 digest.to_hex()[..16].to_owned()
1562 }
1563
1564 const MAX_ROWMAP_DELTAS: usize = 16;
1566
1567 const ROW_META_COLUMNS: [&str; 7] = [
1572 "session_id",
1573 "id",
1574 "role",
1575 "project",
1576 "source_agent",
1577 "timestamp",
1578 "search_text",
1579 ];
1580
1581 pub async fn ensure_rowmap(&self, cache_dir: &Path) -> Result<()> {
1588 let version = self.messages_version().await?;
1589 if let Some(current) = self.rowmap.load_full()
1590 && current.version() == version
1591 {
1592 return Ok(());
1593 }
1594 std::fs::create_dir_all(cache_dir)
1595 .with_context(|| format!("create cache dir {}", cache_dir.display()))?;
1596 let store_key = self.store_key();
1597
1598 if let Some(chain) = discover_chain(cache_dir, &store_key)
1601 && chain.version() == version
1602 && let Ok(set) = RowMetaSet::open(&chain)
1603 {
1604 self.rowmap.store(Some(Arc::new(set)));
1605 Self::sweep_stale_rowmaps(cache_dir, &store_key, chain.base_version);
1606 return Ok(());
1607 }
1608 if let Some(set) = self
1609 .extend_rowmap_coordinated(cache_dir, &store_key, version)
1610 .await?
1611 {
1612 self.rowmap.store(Some(Arc::new(set)));
1613 }
1614 Ok(())
1615 }
1616
1617 async fn extend_rowmap_coordinated(
1623 &self,
1624 cache_dir: &Path,
1625 store_key: &str,
1626 version: u64,
1627 ) -> Result<Option<RowMetaSet>> {
1628 let lock_path = cache_dir.join(format!("rowmetamap-{store_key}.lock"));
1629 let lock = std::fs::File::create(&lock_path)
1630 .with_context(|| format!("create rowmap build lock {}", lock_path.display()))?;
1631 match lock.try_lock() {
1632 Ok(()) => {}
1633 Err(std::fs::TryLockError::WouldBlock) => return Ok(None),
1634 Err(std::fs::TryLockError::Error(error)) => {
1635 return Err(error).context("lock rowmap build");
1636 }
1637 }
1638
1639 if let Some(chain) = discover_chain(cache_dir, store_key)
1643 && chain.version() == version
1644 && let Ok(set) = RowMetaSet::open(&chain)
1645 {
1646 return Ok(Some(set));
1647 }
1648
1649 Self::sweep_orphan_temps(cache_dir, store_key);
1652
1653 let chain = discover_chain(cache_dir, store_key);
1659 let existing = match &chain {
1660 Some(paths) => match RowMetaSet::open(paths) {
1661 Ok(set) => Some((paths, set)),
1662 Err(error) => {
1663 tracing::warn!(%error, store = store_key, "rowmap unreadable; purging and rebuilding");
1664 Self::purge_rowmaps(cache_dir, store_key);
1665 None
1666 }
1667 },
1668 None => None,
1669 };
1670 let delta = match &existing {
1673 Some((_, set)) => {
1674 self.collect_row_metas_delta(
1675 set.version(),
1676 set.max_row_id().unwrap_or(0),
1677 set.len(),
1678 )
1679 .await?
1680 }
1681 None => None,
1682 };
1683
1684 let base_version = match (&existing, delta) {
1685 (Some((paths, _)), Some(entries)) if paths.deltas.len() < Self::MAX_ROWMAP_DELTAS => {
1687 let path = RowMetaMap::delta_path(cache_dir, store_key, version);
1688 RowMetaMap::build(&path, version, entries)?;
1689 paths.base_version
1690 }
1691 (Some((_, set)), Some(entries)) => {
1695 let mut merged = set.merged_entries();
1696 merged.extend(entries);
1697 let path = RowMetaMap::path_for(cache_dir, store_key, version);
1698 RowMetaMap::build(&path, version, merged)?;
1699 version
1700 }
1701 _ => {
1703 let entries = self.collect_row_metas().await?;
1704 let path = RowMetaMap::path_for(cache_dir, store_key, version);
1705 RowMetaMap::build(&path, version, entries)?;
1706 version
1707 }
1708 };
1709
1710 let chain =
1711 discover_chain(cache_dir, store_key).context("rowmap chain missing after build")?;
1712 let set = RowMetaSet::open(&chain)?;
1713 Self::sweep_stale_rowmaps(cache_dir, store_key, base_version);
1714 Ok(Some(set))
1715 }
1716
1717 fn sweep_stale_rowmaps(cache_dir: &Path, store_key: &str, keep: u64) {
1722 let prefix = format!("rowmetamap-{store_key}-");
1723 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1724 return;
1725 };
1726 for entry in entries.flatten() {
1727 let name = entry.file_name();
1728 let Some(rest) = name
1729 .to_str()
1730 .and_then(|name| name.strip_prefix(&prefix))
1731 .and_then(|rest| rest.strip_suffix(".rmm"))
1732 else {
1733 continue;
1734 };
1735 let version = rest
1736 .strip_prefix('v')
1737 .or_else(|| rest.strip_prefix('d'))
1738 .and_then(|digits| digits.parse::<u64>().ok());
1739 if let Some(version) = version
1740 && version < keep
1741 {
1742 let _ = std::fs::remove_file(entry.path());
1743 }
1744 }
1745 }
1746
1747 fn purge_rowmaps(cache_dir: &Path, store_key: &str) {
1752 let prefix = format!("rowmetamap-{store_key}-");
1753 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1754 return;
1755 };
1756 for entry in entries.flatten() {
1757 if let Some(name) = entry.file_name().to_str()
1758 && name.starts_with(&prefix)
1759 && name.ends_with(".rmm")
1760 {
1761 let _ = std::fs::remove_file(entry.path());
1762 }
1763 }
1764 }
1765
1766 fn sweep_orphan_temps(cache_dir: &Path, store_key: &str) {
1770 let prefix = format!("rowmetamap-{store_key}-");
1771 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1772 return;
1773 };
1774 for entry in entries.flatten() {
1775 let name = entry.file_name();
1776 let Some(name) = name.to_str() else { continue };
1777 if name.starts_with(&prefix) && name.contains(".tmp-") {
1778 let _ = std::fs::remove_file(entry.path());
1779 }
1780 }
1781 }
1782
1783 #[cfg(test)]
1784 pub(crate) fn rowmap_delta_count(&self) -> Option<usize> {
1785 self.rowmap.load_full().map(|set| set.delta_count())
1786 }
1787
1788 pub fn rowmap_snapshot(&self) -> Option<Arc<RowMetaSet>> {
1792 self.rowmap.load_full()
1793 }
1794
1795 async fn resolve_rowid_hits(
1799 &self,
1800 map: &RowMetaSet,
1801 hits: Vec<(u64, f32)>,
1802 ) -> Result<Vec<SearchHit>> {
1803 let mut resolved = Vec::with_capacity(hits.len());
1804 let mut misses: Vec<(u64, f32)> = Vec::new();
1805 for (rowid, score) in hits {
1806 match map.lookup(rowid) {
1807 Some((session_id, message_id)) => resolved.push(SearchHit {
1808 rowid: Some(rowid),
1809 key: MessageKey {
1810 session_id: session_id.to_owned(),
1811 message_id: message_id.to_owned(),
1812 },
1813 score,
1814 }),
1815 None => misses.push((rowid, score)),
1816 }
1817 }
1818 if !misses.is_empty() {
1821 let rowids: Vec<u64> = misses.iter().map(|(rowid, _)| *rowid).collect();
1822 let keys = self.message_keys_by_rowids(&rowids).await?;
1823 for ((rowid, score), key) in misses.into_iter().zip(keys) {
1824 resolved.push(SearchHit {
1825 rowid: Some(rowid),
1826 key,
1827 score,
1828 });
1829 }
1830 }
1831 Ok(resolved)
1832 }
1833
1834 async fn message_keys_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageKey>> {
1837 let dataset = self.handle.dataset(Table::Messages).await?;
1838 let projection = ProjectionRequest::from_columns(["session_id", "id"], dataset.schema());
1839 let batch = dataset.take_rows(rowids, projection).await?;
1840 let mut keys = Vec::with_capacity(batch.num_rows());
1841 for row in 0..batch.num_rows() {
1842 keys.push(MessageKey {
1843 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1844 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1845 });
1846 }
1847 Ok(keys)
1848 }
1849
1850 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1852 self.handle.export_write(name, bytes).await
1853 }
1854
1855 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1857 self.handle.export_read(name).await
1858 }
1859
1860 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1862 self.handle.export_local_path(name)
1863 }
1864
1865 pub async fn adapter_names(&self, include_subagents: bool) -> Result<Vec<String>> {
1871 let scanner = self
1872 .handle
1873 .scan(Table::Sessions, ScanOpts::project_only(&["source_agent"]))
1874 .await?;
1875 let mut stream = scanner.try_into_stream().await?;
1876 let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
1877 while let Some(batch) = stream.next().await {
1878 let batch = batch?;
1879 for row in 0..batch.num_rows() {
1880 let agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1881 if !include_subagents && agent.contains('/') {
1882 continue;
1883 }
1884 names.insert(agent);
1885 }
1886 }
1887 Ok(names.into_iter().collect())
1888 }
1889
1890 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1895 if rows.is_empty() {
1896 return Ok(());
1897 }
1898 let batch = embedding_update_batch(rows)?;
1899 self.handle
1900 .merge_update(Table::Messages, batch, rows.len())
1901 .await?;
1902 Ok(())
1903 }
1904
1905 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1908 try_stream! {
1909 let filter = Predicate::And(vec![
1915 Predicate::IsNull("embedding_model"),
1916 Predicate::IsNotNull("search_text"),
1917 ]);
1918 let projection: &[&str] = &["session_id", "id", "search_text"];
1919 let scanner = self
1920 .handle
1921 .scan(
1922 Table::Messages,
1923 ScanOpts::with_predicate_and_projection(&filter, projection),
1924 )
1925 .await?;
1926 let mut batches = scanner
1927 .try_into_stream()
1928 .await
1929 .context("failed to open messages stream")?;
1930 while let Some(batch) = batches.next().await {
1931 let batch = batch?;
1932 for row in 0..batch.num_rows() {
1933 yield PendingMessage {
1934 session_id: string(&batch, "session_id", row)?
1935 .context("session_id is null")?,
1936 id: string(&batch, "id", row)?.context("message id is null")?,
1937 search_text: string(&batch, "search_text", row)?
1938 .context("search_text is null")?,
1939 };
1940 }
1941 }
1942 }
1943 }
1944
1945 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1950 try_stream! {
1951 let filter = Predicate::And(vec![
1955 Predicate::IsNotNull("search_text"),
1956 Predicate::Or(vec![
1957 Predicate::IsNull("embedding_model"),
1958 Predicate::Ne("embedding_model", embed::model_id().into()),
1959 ]),
1960 ]);
1961 let projection: &[&str] = &["session_id", "id", "search_text"];
1962 let scanner = self
1963 .handle
1964 .scan(
1965 Table::Messages,
1966 ScanOpts::with_predicate_and_projection(&filter, projection),
1967 )
1968 .await?;
1969 let mut batches = scanner
1970 .try_into_stream()
1971 .await
1972 .context("failed to open pending-or-stale messages stream")?;
1973 while let Some(batch) = batches.next().await {
1974 let batch = batch?;
1975 for row in 0..batch.num_rows() {
1976 yield PendingMessage {
1977 session_id: string(&batch, "session_id", row)?
1978 .context("session_id is null")?,
1979 id: string(&batch, "id", row)?.context("message id is null")?,
1980 search_text: string(&batch, "search_text", row)?
1981 .context("search_text is null")?,
1982 };
1983 }
1984 }
1985 }
1986 }
1987
1988 pub async fn fts_search(
1993 &self,
1994 query: &str,
1995 limit: usize,
1996 filter: &Predicate,
1997 ) -> Result<Vec<SearchHit>> {
1998 let mut hits = if let Some(map) = self.rowmap.load_full() {
1999 let rowid_hits = self.fts_search_rowids(query, limit, filter).await?;
2000 self.resolve_rowid_hits(&map, rowid_hits).await?
2001 } else {
2002 self.fts_search_keys(query, limit, filter).await?
2003 };
2004 hits.sort_by(|left, right| {
2010 right
2011 .score
2012 .partial_cmp(&left.score)
2013 .unwrap_or(std::cmp::Ordering::Equal)
2014 .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2015 .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2016 });
2017 Ok(hits)
2018 }
2019
2020 async fn fts_scanner(
2023 &self,
2024 query: &str,
2025 limit: usize,
2026 filter: &Predicate,
2027 ) -> Result<lance::dataset::scanner::Scanner> {
2028 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2029 scanner.full_text_search(
2030 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2031 )?;
2032 if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2033 scanner.fast_search();
2034 }
2035 scanner.disable_scoring_autoprojection();
2041 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2042 Ok(scanner)
2043 }
2044
2045 async fn fts_search_keys(
2048 &self,
2049 query: &str,
2050 limit: usize,
2051 filter: &Predicate,
2052 ) -> Result<Vec<SearchHit>> {
2053 let mut scanner = self.fts_scanner(query, limit, filter).await?;
2054 scanner.project(&["session_id", "id", "_score"])?;
2055 let batch = scanner.try_into_batch().await?;
2056 let mut hits = Vec::with_capacity(batch.num_rows());
2057 for row in 0..batch.num_rows() {
2058 let key = MessageKey {
2059 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2060 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
2061 };
2062 hits.push(SearchHit {
2063 rowid: None,
2064 key,
2065 score: float32(&batch, "_score", row)?,
2066 });
2067 }
2068 Ok(hits)
2069 }
2070
2071 pub async fn messages_version(&self) -> Result<u64> {
2074 Ok(self
2075 .handle
2076 .dataset(Table::Messages)
2077 .await?
2078 .version()
2079 .version)
2080 }
2081
2082 pub async fn collect_row_metas(&self) -> Result<Vec<RowMetaEntry>> {
2086 let mut scanner = self.handle.scanner(Table::Messages, None).await?;
2087 scanner.with_row_id();
2088 scanner.project(&Self::ROW_META_COLUMNS)?;
2089 let mut stream = scanner.try_into_stream().await?;
2090 let mut out = Vec::new();
2091 while let Some(batch) = stream.next().await {
2092 let batch = batch?;
2093 let rowids = uint64(&batch, "_rowid")?;
2094 for row in 0..batch.num_rows() {
2095 out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2096 }
2097 }
2098 Ok(out)
2099 }
2100
2101 async fn collect_row_metas_delta(
2117 &self,
2118 base_version: u64,
2119 base_max_row_id: u64,
2120 base_row_count: usize,
2121 ) -> Result<Option<Vec<RowMetaEntry>>> {
2122 let dataset = self.handle.dataset(Table::Messages).await?;
2123 let Ok(old) = dataset.checkout_version(base_version).await else {
2124 return Ok(None);
2125 };
2126 if dataset.count_rows(None).await? < base_row_count {
2127 return Ok(None);
2128 }
2129 let old_ids: HashSet<u64> = old.get_fragments().iter().map(|f| f.id() as u64).collect();
2134 let added: Vec<_> = dataset
2135 .get_fragments()
2136 .iter()
2137 .filter(|fragment| !old_ids.contains(&(fragment.id() as u64)))
2138 .map(|fragment| fragment.metadata().clone())
2139 .collect();
2140 if added.is_empty() {
2141 return Ok(Some(Vec::new()));
2142 }
2143 let mut scanner = dataset.scan();
2144 scanner.with_fragments(added);
2145 scanner.with_row_id();
2146 scanner.project(&Self::ROW_META_COLUMNS)?;
2147 let mut stream = scanner.try_into_stream().await?;
2148 let mut out = Vec::new();
2149 while let Some(batch) = stream.next().await {
2150 let batch = batch?;
2151 let rowids = uint64(&batch, "_rowid")?;
2152 for row in 0..batch.num_rows() {
2153 let row_id = rowids.value(row);
2154 if row_id > base_max_row_id {
2155 out.push(row_meta_entry(&batch, row_id, row)?);
2156 }
2157 }
2158 }
2159 Ok(Some(out))
2160 }
2161
2162 async fn fts_search_rowids(
2165 &self,
2166 query: &str,
2167 limit: usize,
2168 filter: &Predicate,
2169 ) -> Result<Vec<(u64, f32)>> {
2170 let mut scanner = self.fts_scanner(query, limit, filter).await?;
2171 scanner.with_row_id();
2172 scanner.project(&["_score"])?;
2173 let batch = scanner.try_into_batch().await?;
2174 let rowids = uint64(&batch, "_rowid")?;
2175 let mut hits = Vec::with_capacity(batch.num_rows());
2176 for row in 0..batch.num_rows() {
2177 hits.push((rowids.value(row), float32(&batch, "_score", row)?));
2178 }
2179 Ok(hits)
2180 }
2181
2182 pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
2189 if matches!(filter, Predicate::And(clauses) if clauses.is_empty())
2195 && let Some(count) = self.fts_num_docs().await?
2196 {
2197 return Ok(count);
2198 }
2199 let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
2200 let dataset = self.handle.dataset(Table::Messages).await?;
2201 let count = dataset.count_rows(Some(scope.to_lance())).await?;
2202 Ok(count)
2203 }
2204
2205 async fn fts_num_docs(&self) -> Result<Option<usize>> {
2209 if !self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2210 return Ok(None);
2211 }
2212 let dataset = self.handle.dataset(Table::Messages).await?;
2213 let json = dataset.index_statistics(MESSAGES_FTS_INDEX).await?;
2214 let parsed: Value =
2215 serde_json::from_str(&json).context("failed to parse FTS index_statistics")?;
2216 let total: u64 = parsed["indices"]
2217 .as_array()
2218 .map(|segments| {
2219 segments
2220 .iter()
2221 .filter_map(|segment| segment["num_docs"].as_u64())
2222 .sum()
2223 })
2224 .unwrap_or(0);
2225 Ok(Some(usize::try_from(total).unwrap_or(usize::MAX)))
2226 }
2227
2228 pub async fn has_embeddings(&self) -> Result<bool> {
2233 let scope = Predicate::IsNotNull("vector");
2234 let mut scanner = self
2235 .handle
2236 .scan(
2237 Table::Messages,
2238 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
2239 )
2240 .await?;
2241 scanner.limit(Some(1), None)?;
2242 let batch = scanner.try_into_batch().await?;
2243 Ok(batch.num_rows() > 0)
2244 }
2245
2246 pub async fn sample_embedded_model(&self) -> Result<Option<String>> {
2252 let scope = Predicate::IsNotNull("embedding_model");
2253 let mut scanner = self
2254 .handle
2255 .scan(
2256 Table::Messages,
2257 ScanOpts::with_predicate_and_projection(&scope, &["embedding_model"]),
2258 )
2259 .await?;
2260 scanner.limit(Some(1), None)?;
2261 let batch = scanner.try_into_batch().await?;
2262 if batch.num_rows() == 0 {
2263 return Ok(None);
2264 }
2265 string(&batch, "embedding_model", 0)
2266 }
2267
2268 pub async fn vector_search(
2276 &self,
2277 query: &[f32],
2278 limit: usize,
2279 filter: &Predicate,
2280 search: Option<&config::SearchConfig>,
2281 ) -> Result<Vec<SearchHit>> {
2282 let mut hits = if let Some(map) = self.rowmap.load_full() {
2283 let rowid_hits = self
2284 .vector_search_rowids(query, limit, filter, search)
2285 .await?;
2286 self.resolve_rowid_hits(&map, rowid_hits).await?
2287 } else {
2288 self.vector_search_keys(query, limit, filter, search)
2289 .await?
2290 };
2291 hits.sort_by(|left, right| {
2297 left.score
2298 .partial_cmp(&right.score)
2299 .unwrap_or(std::cmp::Ordering::Equal)
2300 .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2301 .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2302 });
2303 Ok(hits)
2304 }
2305
2306 async fn vector_scanner(
2308 &self,
2309 query: &[f32],
2310 limit: usize,
2311 filter: &Predicate,
2312 search: Option<&config::SearchConfig>,
2313 ) -> Result<lance::dataset::scanner::Scanner> {
2314 let scope = embedded_scope(filter);
2315 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2316 let key = Float32Array::from(query.to_vec());
2317 scanner.nearest("vector", &key, limit)?;
2318 apply_vector_search_knobs(&mut scanner, search);
2319 if self
2320 .handle
2321 .messages_has_index(MESSAGES_VECTOR_INDEX)
2322 .await?
2323 {
2324 scanner.fast_search();
2325 }
2326 scanner.disable_scoring_autoprojection();
2327 Ok(scanner)
2328 }
2329
2330 async fn vector_search_rowids(
2333 &self,
2334 query: &[f32],
2335 limit: usize,
2336 filter: &Predicate,
2337 search: Option<&config::SearchConfig>,
2338 ) -> Result<Vec<(u64, f32)>> {
2339 let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2340 scanner.with_row_id();
2341 scanner.project(&["_distance"])?;
2342 let batch = scanner.try_into_batch().await?;
2343 let rowids = uint64(&batch, "_rowid")?;
2344 let mut hits = Vec::with_capacity(batch.num_rows());
2345 for row in 0..batch.num_rows() {
2346 hits.push((rowids.value(row), float32(&batch, "_distance", row)?));
2347 }
2348 Ok(hits)
2349 }
2350
2351 async fn vector_search_keys(
2354 &self,
2355 query: &[f32],
2356 limit: usize,
2357 filter: &Predicate,
2358 search: Option<&config::SearchConfig>,
2359 ) -> Result<Vec<SearchHit>> {
2360 let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2361 scanner.project(&["session_id", "id", "_distance"])?;
2362 let batch = scanner.try_into_batch().await?;
2363 let mut hits = Vec::with_capacity(batch.num_rows());
2364 for row in 0..batch.num_rows() {
2365 let key = MessageKey {
2366 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2367 message_id: string(&batch, "id", row)?.context("message id is null")?,
2368 };
2369 hits.push(SearchHit {
2370 rowid: None,
2371 key,
2372 score: float32(&batch, "_distance", row)?,
2373 });
2374 }
2375 Ok(hits)
2376 }
2377
2378 pub async fn explain_vector_plan(
2381 &self,
2382 query: &[f32],
2383 limit: usize,
2384 filter: &Predicate,
2385 search: Option<&config::SearchConfig>,
2386 ) -> Result<String> {
2387 let scope = embedded_scope(filter);
2388 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2389 let key = Float32Array::from(query.to_vec());
2390 scanner.nearest("vector", &key, limit)?;
2391 apply_vector_search_knobs(&mut scanner, search);
2392 if self
2393 .handle
2394 .messages_has_index(MESSAGES_VECTOR_INDEX)
2395 .await?
2396 {
2397 scanner.fast_search();
2398 }
2399 scanner
2400 .explain_plan(true)
2401 .await
2402 .context("explain_plan failed")
2403 }
2404
2405 pub async fn explain_fts_plan(
2406 &self,
2407 query: &str,
2408 limit: usize,
2409 filter: &Predicate,
2410 ) -> Result<String> {
2411 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2412 scanner.full_text_search(
2413 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2414 )?;
2415 if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2416 scanner.fast_search();
2417 }
2418 scanner.project(&["session_id", "id"])?;
2419 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2420 scanner
2421 .explain_plan(true)
2422 .await
2423 .context("explain_plan failed")
2424 }
2425
2426 pub async fn message_metas_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2434 if rowids.is_empty() {
2435 return Ok(Vec::new());
2436 }
2437 let mut metas = Vec::with_capacity(rowids.len());
2438 let misses: Vec<u64> = if let Some(map) = self.rowmap.load_full() {
2439 let (hits, misses) = map.hydrate(rowids);
2440 metas.extend(hits.into_iter().map(|entry| MessageMeta {
2441 message_id: entry.message_id,
2442 session_id: entry.session_id,
2443 role: entry.role,
2444 project: entry.project,
2445 source_agent: entry.source_agent,
2446 timestamp:
2447 DateTime::from_timestamp_micros(entry.timestamp_micros).unwrap_or_default(),
2448 search_text: entry.search_text,
2449 }));
2450 misses
2451 } else {
2452 rowids.to_vec()
2453 };
2454 if !misses.is_empty() {
2455 metas.extend(self.message_metas_by_rowids_take(&misses).await?);
2456 }
2457 Ok(metas)
2458 }
2459
2460 async fn message_metas_by_rowids_take(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2465 let dataset = self.handle.dataset(Table::Messages).await?;
2466 let projection = ProjectionRequest::from_columns(
2467 [
2468 "id",
2469 "session_id",
2470 "role",
2471 "project",
2472 "source_agent",
2473 "timestamp",
2474 "search_text",
2475 ],
2476 dataset.schema(),
2477 );
2478 let batch = dataset.take_rows(rowids, projection).await?;
2479 let mut metas = Vec::with_capacity(batch.num_rows());
2480 for row in 0..batch.num_rows() {
2481 metas.push(message_meta_from_batch(&batch, row)?);
2482 }
2483 Ok(metas)
2484 }
2485
2486 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
2488 if keys.is_empty() {
2489 return Ok(Vec::new());
2490 }
2491 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
2492 let session_ids = keys
2493 .iter()
2494 .map(|key| key.session_id.clone())
2495 .collect::<Vec<_>>();
2496 let message_ids = keys
2497 .iter()
2498 .map(|key| key.message_id.clone())
2499 .collect::<Vec<_>>();
2500 let predicate = Predicate::And(vec![
2501 in_predicate("session_id", &session_ids),
2502 in_predicate("id", &message_ids),
2503 ]);
2504 let batch = self
2505 .handle
2506 .scan_batch(
2507 Table::Messages,
2508 Some(&predicate),
2509 &[
2510 "id",
2511 "session_id",
2512 "role",
2513 "project",
2514 "source_agent",
2515 "timestamp",
2516 "search_text",
2517 ],
2518 )
2519 .await?;
2520 let mut metas = Vec::with_capacity(batch.num_rows());
2521 for row in 0..batch.num_rows() {
2522 let meta = message_meta_from_batch(&batch, row)?;
2525 if wanted.contains(&MessageKey {
2526 session_id: meta.session_id.clone(),
2527 message_id: meta.message_id.clone(),
2528 }) {
2529 metas.push(meta);
2530 }
2531 }
2532 Ok(metas)
2533 }
2534
2535 pub async fn session_message_counts(
2544 &self,
2545 session_ids: &[String],
2546 ) -> Result<BTreeMap<String, usize>> {
2547 if session_ids.is_empty() {
2548 return Ok(BTreeMap::new());
2549 }
2550 if let Some(map) = self.rowmap.load_full()
2557 && map.version() == self.messages_version().await?
2558 {
2559 return Ok(session_ids
2560 .iter()
2561 .map(|id| (id.clone(), map.lookup_count(id).unwrap_or(0)))
2562 .collect());
2563 }
2564 let predicate = in_predicate("session_id", session_ids);
2565 let scanner = self
2566 .handle
2567 .scan(
2568 Table::Messages,
2569 ScanOpts::with_predicate_and_projection(&predicate, &["session_id"]),
2570 )
2571 .await?;
2572 let mut stream = scanner
2573 .try_into_stream()
2574 .await
2575 .context("failed to open session_message_counts stream")?;
2576 let mut counts: BTreeMap<String, usize> =
2577 session_ids.iter().map(|id| (id.clone(), 0)).collect();
2578 while let Some(batch) = stream.next().await {
2579 let batch = batch.context("failed to read session_message_counts batch")?;
2580 let column = batch
2581 .column_by_name("session_id")
2582 .context("session_message_counts: session_id column missing")?
2583 .as_any()
2584 .downcast_ref::<StringArray>()
2585 .context("session_message_counts: session_id column is not Utf8")?;
2586 for value in column.iter().flatten() {
2587 if let Some(entry) = counts.get_mut(value) {
2588 *entry += 1;
2589 }
2590 }
2591 }
2592 Ok(counts)
2593 }
2594
2595 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
2598 self.handle
2599 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
2600 .await
2601 }
2602
2603 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
2609 self.handle
2610 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
2611 .await
2612 }
2613
2614 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
2618 let dataset = self.handle.dataset(Table::Messages).await?;
2619 let embedded = dataset
2625 .count_rows(Some(Predicate::IsNotNull("embedding_model").to_lance()))
2626 .await?;
2627 let backlog = self.embed_backlog_count().await?;
2634 Ok(EmbeddingProgress {
2635 embedded,
2636 total: embedded + backlog,
2637 backlog,
2638 model: embed::model_id(),
2639 })
2640 }
2641
2642 pub async fn embed_backlog_count(&self) -> Result<usize> {
2648 let dataset = self.handle.dataset(Table::Messages).await?;
2649 let filter = Predicate::And(vec![
2650 Predicate::IsNull("embedding_model"),
2651 Predicate::IsNotNull("search_text"),
2652 ]);
2653 Ok(dataset.count_rows(Some(filter.to_lance())).await?)
2654 }
2655
2656 pub async fn stale_embedding_count(&self) -> Result<usize> {
2660 let dataset = self.handle.dataset(Table::Messages).await?;
2661 dataset
2667 .count_rows(Some(
2668 Predicate::And(vec![
2669 Predicate::IsNotNull("embedding_model"),
2670 Predicate::Ne("embedding_model", embed::model_id().into()),
2671 ])
2672 .to_lance(),
2673 ))
2674 .await
2675 .map_err(Into::into)
2676 }
2677
2678 pub async fn optimize_indices(
2684 &self,
2685 progress: Option<OptimizeProgressFn>,
2686 maintenance: &MaintenancePolicy,
2687 ) -> Result<OptimizeOutcome> {
2688 let intents = pond_index_intents();
2689 let mut tables = Vec::with_capacity(3);
2690 for (table, intents) in intents.all() {
2691 let outcome = self
2692 .handle
2693 .optimize_table(table, intents, progress.as_ref(), maintenance)
2694 .await;
2695 tables.push(outcome);
2696 }
2697 Ok(OptimizeOutcome { tables })
2698 }
2699
2700 pub async fn build_indices_only(
2706 &self,
2707 progress: Option<OptimizeProgressFn>,
2708 ) -> Result<OptimizeOutcome> {
2709 let policy = pond_index_intents();
2710 let mut tables = Vec::with_capacity(3);
2711 for (table, intents) in policy.all() {
2712 let indices = self
2713 .handle
2714 .optimize_table_indices_only(table, intents, progress.as_ref())
2715 .await;
2716 tables.push(TableOptimizeOutcome {
2717 table,
2718 indices,
2719 compaction: PhaseOutcome::NotAttempted,
2720 });
2721 }
2722 Ok(OptimizeOutcome { tables })
2723 }
2724
2725 #[cfg(test)]
2726 async fn optimize_indices_with_vector_threshold(
2727 &self,
2728 vector_threshold: usize,
2729 ) -> Result<OptimizeOutcome> {
2730 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
2731 let policy = MaintenancePolicy::always_compact();
2732 let mut tables = Vec::with_capacity(3);
2733 for (table, intents) in intents.all() {
2734 let outcome = self
2735 .handle
2736 .optimize_table(table, intents, None, &policy)
2737 .await;
2738 tables.push(outcome);
2739 }
2740 Ok(OptimizeOutcome { tables })
2741 }
2742
2743 pub async fn cleanup_old_versions(&self, older_than: chrono::Duration) -> Result<()> {
2749 for (table, _) in pond_index_intents().all() {
2750 self.handle
2751 .cleanup_table_versions(table, older_than)
2752 .await?;
2753 }
2754 Ok(())
2755 }
2756
2757 pub async fn rebuild_indices(
2758 &self,
2759 intent_name: Option<&str>,
2760 progress: Option<OptimizeProgressFn>,
2761 ) -> Result<()> {
2762 let policy = pond_index_intents();
2763 let mut matched = false;
2764 for (table, intents) in policy.all() {
2765 for intent in intents {
2766 if intent_name.is_none_or(|name| name == intent.name) {
2767 matched = true;
2768 self.handle
2769 .rebuild_index(table, intent, progress.as_ref())
2770 .await?;
2771 }
2772 }
2773 }
2774 if let Some(name) = intent_name
2775 && !matched
2776 {
2777 anyhow::bail!("unknown index intent {name:?}");
2778 }
2779 Ok(())
2780 }
2781
2782 pub async fn drop_index_by_name(&self, name: &str) -> Result<()> {
2789 let Some(owner) = self.handle.find_index_owner(name).await? else {
2790 anyhow::bail!("no index named {name:?} found on any table");
2791 };
2792 self.handle.drop_index(owner, name).await
2793 }
2794
2795 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
2796 let policy = pond_index_intents();
2797 let mut statuses = Vec::new();
2798 for (table, intents) in policy.all() {
2799 statuses.extend(self.handle.index_status(table, intents).await?);
2800 }
2801 Ok(statuses)
2802 }
2803
2804 pub async fn drop_vector_index(&self) -> Result<()> {
2808 match self
2809 .handle
2810 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
2811 .await
2812 {
2813 Ok(()) => Ok(()),
2814 Err(error) => {
2815 let msg = error.to_string();
2816 if msg.contains("not found") || msg.contains("does not exist") {
2817 Ok(())
2818 } else {
2819 Err(error)
2820 }
2821 }
2822 }
2823 }
2824
2825 pub async fn table_sizes(&self) -> Result<TableSizes> {
2828 self.handle.table_sizes().await
2829 }
2830
2831 pub async fn initialized(&self) -> Result<bool> {
2832 self.handle.initialized().await
2833 }
2834
2835 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
2836 let batch = self
2837 .handle
2838 .scan_batch(
2839 Table::Sessions,
2840 Some(&Predicate::Eq("id", session_id.into())),
2841 &[],
2842 )
2843 .await?;
2844 if batch.num_rows() == 0 {
2845 Ok(None)
2846 } else {
2847 Ok(Some(session_from_batch(&batch, 0)?))
2848 }
2849 }
2850
2851 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
2852 let batch = self
2853 .handle
2854 .scan_batch(
2855 Table::Messages,
2856 Some(&Predicate::Eq("session_id", session_id.into())),
2857 &[
2858 "session_id",
2859 "id",
2860 "timestamp",
2861 "role",
2862 "content",
2863 "options",
2864 ],
2865 )
2866 .await?;
2867 let mut messages = Vec::with_capacity(batch.num_rows());
2868 for row in 0..batch.num_rows() {
2869 messages.push(message_from_batch(&batch, row)?);
2870 }
2871 messages.sort_by(|left, right| {
2872 left.timestamp()
2873 .cmp(&right.timestamp())
2874 .then_with(|| left.id().cmp(right.id()))
2875 });
2876
2877 let message_ids = messages
2878 .iter()
2879 .map(|message| message.id().to_owned())
2880 .collect::<Vec<_>>();
2881 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
2882
2883 Ok(messages
2884 .into_iter()
2885 .map(|message| {
2886 let key = (message.session_id().to_owned(), message.id().to_owned());
2887 let parts = parts_by_message.remove(&key).unwrap_or_default();
2888 MessageWithParts { message, parts }
2889 })
2890 .collect())
2891 }
2892
2893 pub async fn parts_for_messages(
2897 &self,
2898 session_id: &str,
2899 message_ids: &[String],
2900 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2901 self.scan_parts(session_id, message_ids, None).await
2902 }
2903
2904 pub async fn summary_parts_for_messages(
2909 &self,
2910 session_id: &str,
2911 message_ids: &[String],
2912 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2913 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
2914 .await
2915 }
2916
2917 async fn scan_parts(
2918 &self,
2919 session_id: &str,
2920 message_ids: &[String],
2921 part_types: Option<&[&str]>,
2922 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2923 if message_ids.is_empty() {
2924 return Ok(BTreeMap::new());
2925 }
2926 let mut clauses = vec![
2927 Predicate::Eq("session_id", session_id.into()),
2928 in_predicate("message_id", message_ids),
2929 ];
2930 if let Some(types) = part_types {
2931 clauses.push(Predicate::In(
2932 "type",
2933 types.iter().map(|&t| t.into()).collect(),
2934 ));
2935 }
2936 let predicate = Predicate::And(clauses);
2937 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
2938 let mut scanner = self
2939 .handle
2940 .scan(
2941 Table::Parts,
2942 ScanOpts::with_predicate_and_projection(
2943 &predicate,
2944 &[
2945 "session_id",
2946 "message_id",
2947 "id",
2948 "ordinal",
2949 "type",
2950 "provenance",
2951 "variant_data",
2952 "options",
2953 ],
2954 ),
2955 )
2956 .await?;
2957 scanner.with_row_address();
2958 let batch = scanner.try_into_batch().await.context("scan failed")?;
2959 let row_addresses = uint64(&batch, "_rowaddr")?;
2960 let mut file_payloads = BTreeMap::<usize, FileData>::new();
2961 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
2962 for row in 0..batch.num_rows() {
2963 if string(&batch, "type", row)?.as_deref() == Some("file") {
2964 let variant_data =
2965 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
2966 file_rows.push((row, row_addresses.value(row), variant_data));
2967 }
2968 }
2969 if !file_rows.is_empty() {
2970 let addresses = file_rows
2971 .iter()
2972 .map(|(_, address, _)| *address)
2973 .collect::<Vec<_>>();
2974 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
2975 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
2976 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
2980 file_payloads.insert(row, payload);
2981 }
2982 }
2983 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
2984 for row in 0..batch.num_rows() {
2985 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
2986 parts_by_message
2987 .entry((part.session_id.clone(), part.message_id.clone()))
2988 .or_default()
2989 .push(part);
2990 }
2991 for parts in parts_by_message.values_mut() {
2992 parts.sort_by_key(|part| part.ordinal);
2993 }
2994 Ok(parts_by_message)
2995 }
2996}
2997
2998#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2999#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
3000pub enum IngestEvent {
3001 Session(Session),
3002 Message(Message),
3003 Part(Part),
3004}
3005
3006#[derive(Debug, Clone, PartialEq, Eq, Default)]
3014pub struct IngestSummary {
3015 pub inserted: usize,
3019 pub matched: usize,
3021 pub sessions_inserted: usize,
3023 pub messages_inserted_total: usize,
3026 pub messages_inserted_searchable: usize,
3030 pub parts_inserted: usize,
3032 pub sessions_matched: usize,
3034 pub messages_matched_total: usize,
3036 pub messages_matched_searchable: usize,
3038 pub parts_matched: usize,
3040 pub dropped_events: usize,
3050 pub dropped_sessions: usize,
3055 pub skipped_files: usize,
3058 pub skipped_empty: usize,
3063 pub skipped_fresh: usize,
3067 pub storage_errors: usize,
3071 pub truncated_values: usize,
3074 pub drop_reasons: BTreeMap<&'static str, usize>,
3080}
3081
3082pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
3088pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
3089pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
3090pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
3091pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
3092pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
3093pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
3094pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
3095pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
3096pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
3097pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
3098
3099#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3107pub struct BatchCounts {
3108 pub sessions_inserted: usize,
3109 pub sessions_matched: usize,
3110 pub messages_inserted_total: usize,
3111 pub messages_inserted_searchable: usize,
3112 pub messages_matched_total: usize,
3113 pub messages_matched_searchable: usize,
3114 pub parts_inserted: usize,
3115 pub parts_matched: usize,
3116}
3117
3118impl IngestSummary {
3119 pub fn accepted(&self) -> usize {
3120 self.inserted + self.matched
3121 }
3122
3123 pub fn add_batch(&mut self, counts: &BatchCounts) {
3127 self.sessions_inserted += counts.sessions_inserted;
3128 self.sessions_matched += counts.sessions_matched;
3129 self.messages_inserted_total += counts.messages_inserted_total;
3130 self.messages_inserted_searchable += counts.messages_inserted_searchable;
3131 self.messages_matched_total += counts.messages_matched_total;
3132 self.messages_matched_searchable += counts.messages_matched_searchable;
3133 self.parts_inserted += counts.parts_inserted;
3134 self.parts_matched += counts.parts_matched;
3135 self.inserted +=
3136 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
3137 self.matched +=
3138 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
3139 }
3140
3141 pub fn merge(&mut self, other: &Self) {
3145 self.inserted += other.inserted;
3146 self.matched += other.matched;
3147 self.sessions_inserted += other.sessions_inserted;
3148 self.messages_inserted_total += other.messages_inserted_total;
3149 self.messages_inserted_searchable += other.messages_inserted_searchable;
3150 self.parts_inserted += other.parts_inserted;
3151 self.sessions_matched += other.sessions_matched;
3152 self.messages_matched_total += other.messages_matched_total;
3153 self.messages_matched_searchable += other.messages_matched_searchable;
3154 self.parts_matched += other.parts_matched;
3155 self.dropped_events += other.dropped_events;
3156 self.dropped_sessions += other.dropped_sessions;
3157 self.skipped_files += other.skipped_files;
3158 self.skipped_empty += other.skipped_empty;
3159 self.skipped_fresh += other.skipped_fresh;
3160 self.storage_errors += other.storage_errors;
3161 self.truncated_values += other.truncated_values;
3162 for (key, value) in &other.drop_reasons {
3163 *self.drop_reasons.entry(key).or_insert(0) += value;
3164 }
3165 }
3166
3167 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
3172 for outcome in outcomes {
3173 if !matches!(outcome.status, OutcomeStatus::Error) {
3174 continue;
3175 }
3176 if outcome.kind == "session" {
3177 self.dropped_sessions += 1;
3178 } else {
3179 self.dropped_events += 1;
3180 }
3181 let reason = outcome
3182 .error
3183 .as_ref()
3184 .and_then(|error| error.reason_key)
3185 .unwrap_or(DROP_REASON_UNCATEGORIZED);
3186 *self.drop_reasons.entry(reason).or_insert(0) += 1;
3187 }
3188 }
3189
3190 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
3191 for outcome in outcomes {
3192 match outcome.status {
3193 OutcomeStatus::Inserted => {
3194 self.inserted += 1;
3195 match outcome.kind {
3196 "session" => self.sessions_inserted += 1,
3197 "message" => {
3198 self.messages_inserted_total += 1;
3199 if outcome.searchable {
3200 self.messages_inserted_searchable += 1;
3201 }
3202 }
3203 "part" => self.parts_inserted += 1,
3204 _ => {}
3205 }
3206 }
3207 OutcomeStatus::Matched => {
3208 self.matched += 1;
3209 match outcome.kind {
3210 "session" => self.sessions_matched += 1,
3211 "message" => {
3212 self.messages_matched_total += 1;
3213 if outcome.searchable {
3214 self.messages_matched_searchable += 1;
3215 }
3216 }
3217 "part" => self.parts_matched += 1,
3218 _ => {}
3219 }
3220 }
3221 OutcomeStatus::Error => {
3222 if outcome.kind == "session" {
3228 self.dropped_sessions += 1;
3229 } else {
3230 self.dropped_events += 1;
3231 }
3232 let reason = outcome
3233 .error
3234 .as_ref()
3235 .and_then(|e| e.reason_key)
3236 .unwrap_or(DROP_REASON_UNCATEGORIZED);
3237 *self.drop_reasons.entry(reason).or_insert(0) += 1;
3238 }
3239 }
3240 }
3241 }
3242}
3243
3244#[derive(Debug, Clone, PartialEq)]
3249pub struct RowOutcome {
3250 pub index: usize,
3251 pub kind: &'static str,
3252 pub pk: Value,
3253 pub status: OutcomeStatus,
3254 pub error: Option<RowError>,
3255 pub searchable: bool,
3260}
3261
3262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3263pub enum OutcomeStatus {
3264 Inserted,
3265 Matched,
3266 Error,
3267}
3268
3269#[derive(Debug, Clone, PartialEq, Eq)]
3272pub struct RowError {
3273 pub message: String,
3274 pub field: Option<&'static str>,
3275 pub reason: Option<&'static str>,
3276 pub reason_key: Option<&'static str>,
3281}
3282
3283#[derive(Debug)]
3287struct BufferedSession {
3288 index: usize,
3289 session: Session,
3290}
3291
3292#[derive(Debug)]
3293struct BufferedMessage {
3294 index: usize,
3295 message: Message,
3296 parts: Vec<BufferedPart>,
3297 search_text: Option<String>,
3298}
3299
3300#[derive(Debug)]
3301struct BufferedPart {
3302 index: usize,
3303 part: Part,
3304}
3305
3306#[derive(Debug, Default)]
3323pub struct IngestValidator {
3324 session: Option<BufferedSession>,
3325 current_message: Option<BufferedMessage>,
3326 current_parts: Vec<BufferedPart>,
3327 messages: Vec<BufferedMessage>,
3328 seen_message_ids: HashSet<String>,
3332 seen_part_keys: HashSet<(String, String)>,
3335 completed: Vec<CompletedSubstream>,
3339}
3340
3341#[derive(Debug)]
3343struct CompletedSubstream {
3344 session_index: usize,
3345 session: Session,
3346 messages: Vec<BufferedMessage>,
3347}
3348
3349fn ingest_host_stamp() -> Option<&'static Value> {
3354 static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
3355 STAMP
3356 .get_or_init(|| {
3357 let mut host = serde_json::Map::new();
3358 if let Ok(username) = whoami::username() {
3359 host.insert("username".to_owned(), username.into());
3360 }
3361 if let Ok(hostname) = whoami::hostname() {
3362 host.insert("hostname".to_owned(), hostname.into());
3363 }
3364 if let Ok(devicename) = whoami::devicename() {
3365 host.insert("device_name".to_owned(), devicename.into());
3366 }
3367 (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
3368 })
3369 .as_ref()
3370}
3371
3372impl IngestValidator {
3373 pub async fn push(
3379 &mut self,
3380 store: &Store,
3381 index: usize,
3382 event: IngestEvent,
3383 ) -> Result<Vec<RowOutcome>> {
3384 match event {
3385 IngestEvent::Session(session) => self.push_session(store, index, session).await,
3386 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
3387 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
3388 }
3389 }
3390
3391 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3396 self.close_current_substream();
3397 self.flush(store).await
3398 }
3399
3400 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3407 if self.completed.is_empty() {
3408 return Ok((Vec::new(), BatchCounts::default()));
3409 }
3410 let completed = std::mem::take(&mut self.completed);
3411 store.upsert_session_batch(completed).await
3412 }
3413
3414 pub fn pending_substreams(&self) -> usize {
3417 self.completed.len()
3418 }
3419
3420 async fn push_session(
3421 &mut self,
3422 _store: &Store,
3423 index: usize,
3424 mut session: Session,
3425 ) -> Result<Vec<RowOutcome>> {
3426 self.close_current_substream();
3430
3431 let trimmed = session.source_agent.trim();
3436 if trimmed.is_empty() {
3437 return Ok(vec![RowOutcome {
3438 index,
3439 kind: "session",
3440 pk: Value::String(session.id.clone()),
3441 status: OutcomeStatus::Error,
3442 error: Some(RowError {
3443 message: format!("session {} has empty source_agent after trim", session.id),
3444 field: Some("source_agent"),
3445 reason: None,
3446 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
3447 }),
3448 searchable: false,
3449 }]);
3450 }
3451 if trimmed.len() != session.source_agent.len() {
3452 session.source_agent = trimmed.to_owned();
3453 }
3454
3455 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
3456 return Ok(vec![RowOutcome {
3457 index,
3458 kind: "session",
3459 pk: Value::String(session.id.clone()),
3460 status: OutcomeStatus::Error,
3461 error: Some(RowError {
3462 message: format!(
3463 "session {} has parent_message_id without parent_session_id",
3464 session.id,
3465 ),
3466 field: Some("parent_message_id"),
3467 reason: None,
3468 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
3469 }),
3470 searchable: false,
3471 }]);
3472 }
3473
3474 self.seen_message_ids.clear();
3475 self.seen_part_keys.clear();
3476 self.session = Some(BufferedSession { index, session });
3477 Ok(Vec::new())
3478 }
3479
3480 fn close_current_substream(&mut self) {
3481 self.flush_current_message();
3482 let Some(BufferedSession {
3483 index: session_index,
3484 session,
3485 }) = self.session.take()
3486 else {
3487 return;
3488 };
3489 let messages = std::mem::take(&mut self.messages);
3490 self.seen_message_ids.clear();
3491 self.seen_part_keys.clear();
3492 self.completed.push(CompletedSubstream {
3493 session_index,
3494 session,
3495 messages,
3496 });
3497 }
3498
3499 fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
3500 let pk = Value::Array(vec![
3501 Value::String(message.session_id().to_owned()),
3502 Value::String(message.id().to_owned()),
3503 ]);
3504 let Some(session) = &self.session else {
3505 return vec![error_outcome(
3506 index,
3507 "message",
3508 pk,
3509 "first event in a session stream must be Session",
3510 None,
3511 DROP_REASON_MESSAGE_BEFORE_SESSION,
3512 )];
3513 };
3514 if message.session_id() != session.session.id {
3515 let msg = format!(
3516 "message {} references session {}, expected {}",
3517 message.id(),
3518 message.session_id(),
3519 session.session.id
3520 );
3521 return vec![error_outcome(
3522 index,
3523 "message",
3524 pk,
3525 &msg,
3526 Some("session_id"),
3527 DROP_REASON_MESSAGE_SESSION_MISMATCH,
3528 )];
3529 }
3530 if !self.seen_message_ids.insert(message.id().to_owned()) {
3531 let msg = format!("duplicate message id {} in session substream", message.id());
3535 return vec![error_outcome(
3536 index,
3537 "message",
3538 pk,
3539 &msg,
3540 None,
3541 DROP_REASON_DUPLICATE_MESSAGE_ID,
3542 )];
3543 }
3544 match ingest_host_stamp() {
3549 Some(stamp) => {
3550 message
3551 .options_mut()
3552 .insert("pond".to_owned(), stamp.clone());
3553 }
3554 None => {
3555 message.options_mut().remove("pond");
3556 }
3557 }
3558 self.flush_current_message();
3559 self.current_message = Some(BufferedMessage {
3560 index,
3561 message,
3562 parts: Vec::new(),
3563 search_text: None,
3564 });
3565 Vec::new()
3566 }
3567
3568 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
3569 let pk = Value::Array(vec![
3570 Value::String(part.session_id.clone()),
3571 Value::String(part.message_id.clone()),
3572 Value::String(part.id.clone()),
3573 ]);
3574 let Some(current) = &self.current_message else {
3575 return vec![error_outcome(
3576 index,
3577 "part",
3578 pk,
3579 "part event appeared before a message",
3580 None,
3581 DROP_REASON_PART_BEFORE_MESSAGE,
3582 )];
3583 };
3584 if part.session_id != current.message.session_id() {
3585 let msg = format!(
3586 "part {} references session {}, expected {}",
3587 part.id,
3588 part.session_id,
3589 current.message.session_id()
3590 );
3591 return vec![error_outcome(
3592 index,
3593 "part",
3594 pk,
3595 &msg,
3596 Some("session_id"),
3597 DROP_REASON_PART_MESSAGE_MISMATCH,
3598 )];
3599 }
3600 if part.message_id != current.message.id() {
3601 let msg = format!(
3602 "part {} references message {}, expected {}",
3603 part.id,
3604 part.message_id,
3605 current.message.id()
3606 );
3607 return vec![error_outcome(
3608 index,
3609 "part",
3610 pk,
3611 &msg,
3612 Some("message_id"),
3613 DROP_REASON_PART_MESSAGE_MISMATCH,
3614 )];
3615 }
3616 let part_key = (part.message_id.clone(), part.id.clone());
3617 if !self.seen_part_keys.insert(part_key) {
3618 let msg = format!(
3619 "duplicate part id {} for message {} in session substream",
3620 part.id, part.message_id
3621 );
3622 return vec![error_outcome(
3623 index,
3624 "part",
3625 pk,
3626 &msg,
3627 None,
3628 DROP_REASON_DUPLICATE_PART_KEY,
3629 )];
3630 }
3631 self.current_parts.push(BufferedPart { index, part });
3632 Vec::new()
3633 }
3634
3635 fn flush_current_message(&mut self) {
3636 let Some(mut buffered) = self.current_message.take() else {
3637 return;
3638 };
3639 let parts = std::mem::take(&mut self.current_parts);
3640 let mut canonical_parts = Vec::with_capacity(parts.len());
3641 for part in &parts {
3642 canonical_parts.push(part.part.clone());
3643 }
3644 buffered.search_text = search_text(&buffered.message, &canonical_parts);
3645 buffered.parts = parts;
3646 self.messages.push(buffered);
3647 }
3648}
3649
3650fn error_outcome(
3651 index: usize,
3652 kind: &'static str,
3653 pk: Value,
3654 message: &str,
3655 field: Option<&'static str>,
3656 reason_key: &'static str,
3657) -> RowOutcome {
3658 RowOutcome {
3659 index,
3660 kind,
3661 pk,
3662 status: OutcomeStatus::Error,
3663 error: Some(RowError {
3664 message: message.to_owned(),
3665 field,
3666 reason: None,
3667 reason_key: Some(reason_key),
3668 }),
3669 searchable: false,
3670 }
3671}
3672
3673fn error_outcomes_for_substream(
3678 session_index: usize,
3679 session: &Session,
3680 _messages: &[BufferedMessage],
3681 message: impl Into<String>,
3682 field: Option<&'static str>,
3683 reason_key: &'static str,
3684) -> Vec<RowOutcome> {
3685 let reason = field.map(|_| "immutable");
3686 vec![RowOutcome {
3687 index: session_index,
3688 kind: "session",
3689 pk: Value::String(session.id.clone()),
3690 status: OutcomeStatus::Error,
3691 error: Some(RowError {
3692 message: message.into(),
3693 field,
3694 reason,
3695 reason_key: Some(reason_key),
3696 }),
3697 searchable: false,
3698 }]
3699}
3700
3701fn success_outcomes_for_substream(
3707 session_index: usize,
3708 session: &Session,
3709 messages: &[BufferedMessage],
3710 existing_sessions: &std::collections::HashMap<String, Session>,
3711 existing_message_pks: &HashSet<(String, String)>,
3712 existing_part_pks: &HashSet<(String, String, String)>,
3713 counts: &mut BatchCounts,
3714) -> Vec<RowOutcome> {
3715 let session_was_present = existing_sessions.contains_key(&session.id);
3716 let session_status = if session_was_present {
3717 counts.sessions_matched += 1;
3718 UpsertStatus::Matched
3719 } else {
3720 counts.sessions_inserted += 1;
3721 UpsertStatus::Inserted
3722 };
3723
3724 let mut outcomes = Vec::with_capacity(1 + messages.len());
3725 outcomes.push(success_outcome(
3726 session_index,
3727 "session",
3728 Value::String(session.id.clone()),
3729 session_status,
3730 false,
3731 ));
3732 for buffered in messages {
3733 let key = (
3734 buffered.message.session_id().to_owned(),
3735 buffered.message.id().to_owned(),
3736 );
3737 let searchable = buffered.search_text.is_some();
3738 let message_status = if existing_message_pks.contains(&key) {
3739 counts.messages_matched_total += 1;
3740 if searchable {
3741 counts.messages_matched_searchable += 1;
3742 }
3743 UpsertStatus::Matched
3744 } else {
3745 counts.messages_inserted_total += 1;
3746 if searchable {
3747 counts.messages_inserted_searchable += 1;
3748 }
3749 UpsertStatus::Inserted
3750 };
3751 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
3752 outcomes.push(success_outcome(
3753 buffered.index,
3754 "message",
3755 pk,
3756 message_status,
3757 searchable,
3758 ));
3759 for part in &buffered.parts {
3760 let part_key = (
3761 part.part.session_id.clone(),
3762 part.part.message_id.clone(),
3763 part.part.id.clone(),
3764 );
3765 let part_status = if existing_part_pks.contains(&part_key) {
3766 counts.parts_matched += 1;
3767 UpsertStatus::Matched
3768 } else {
3769 counts.parts_inserted += 1;
3770 UpsertStatus::Inserted
3771 };
3772 let part_pk = Value::Array(vec![
3773 Value::String(part_key.0),
3774 Value::String(part_key.1),
3775 Value::String(part_key.2),
3776 ]);
3777 outcomes.push(success_outcome(
3778 part.index,
3779 "part",
3780 part_pk,
3781 part_status,
3782 false,
3783 ));
3784 }
3785 }
3786 outcomes
3787}
3788
3789fn success_outcome(
3790 index: usize,
3791 kind: &'static str,
3792 pk: Value,
3793 status: UpsertStatus,
3794 searchable: bool,
3795) -> RowOutcome {
3796 let status = match status {
3797 UpsertStatus::Inserted => OutcomeStatus::Inserted,
3798 UpsertStatus::Matched => OutcomeStatus::Matched,
3799 };
3800 RowOutcome {
3801 index,
3802 kind,
3803 pk,
3804 status,
3805 error: None,
3806 searchable,
3807 }
3808}
3809
3810#[derive(Debug, Clone, PartialEq, Eq)]
3811enum IngestError {
3812 ImmutableField {
3817 field: &'static str,
3818 session_id: String,
3819 stored: String,
3820 attempted: String,
3821 },
3822}
3823
3824impl std::fmt::Display for IngestError {
3825 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3826 match self {
3827 Self::ImmutableField {
3828 field,
3829 session_id,
3830 stored,
3831 attempted,
3832 } => write!(
3833 formatter,
3834 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
3835 ),
3836 }
3837 }
3838}
3839
3840impl std::error::Error for IngestError {}
3841
3842fn ensure_immutable_match(
3846 existing: &Session,
3847 incoming: &Session,
3848) -> std::result::Result<(), IngestError> {
3849 if existing.source_agent != incoming.source_agent {
3850 return Err(IngestError::ImmutableField {
3851 field: "source_agent",
3852 session_id: incoming.id.clone(),
3853 stored: existing.source_agent.clone(),
3854 attempted: incoming.source_agent.clone(),
3855 });
3856 }
3857 if existing.project != incoming.project {
3858 return Err(IngestError::ImmutableField {
3859 field: "project",
3860 session_id: incoming.id.clone(),
3861 stored: (*existing.project).clone(),
3862 attempted: (*incoming.project).clone(),
3863 });
3864 }
3865 Ok(())
3866}
3867
3868pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
3869 use crate::wire::Provenance;
3870 let mut chunks: Vec<String> = Vec::new();
3871 for part in parts {
3872 if part.provenance != Provenance::Conversational {
3875 continue;
3876 }
3877 match (message.role(), &part.kind) {
3878 (Role::User | Role::Assistant, PartKind::Text { text }) => {
3879 if let Some(text) = text {
3880 chunks.push(text.to_string());
3881 }
3882 }
3883 (
3884 Role::User | Role::Assistant,
3885 PartKind::File {
3886 media_type,
3887 file_name,
3888 data,
3889 },
3890 ) => {
3891 if let Some(file_name) = file_name {
3892 chunks.push(file_name.clone());
3893 }
3894 if let Some(media_type) = media_type {
3895 chunks.push(media_type.clone());
3896 }
3897 if let FileData::Url(uri) = data {
3898 chunks.push(uri.clone());
3899 }
3900 }
3901 (
3902 Role::System | Role::Tool,
3903 PartKind::Text { .. }
3904 | PartKind::Reasoning { .. }
3905 | PartKind::File { .. }
3906 | PartKind::ToolCall { .. }
3907 | PartKind::ToolResult { .. }
3908 | PartKind::ToolApprovalRequest { .. }
3909 | PartKind::ToolApprovalResponse { .. },
3910 )
3911 | (
3912 Role::User | Role::Assistant,
3913 PartKind::Reasoning { .. }
3914 | PartKind::ToolCall { .. }
3915 | PartKind::ToolResult { .. }
3916 | PartKind::ToolApprovalRequest { .. }
3917 | PartKind::ToolApprovalResponse { .. },
3918 ) => {}
3919 }
3920 }
3921
3922 let text = chunks
3923 .into_iter()
3924 .filter(|chunk| !chunk.trim().is_empty())
3925 .collect::<Vec<_>>()
3926 .join("\n");
3927 if text.is_empty() { None } else { Some(text) }
3928}
3929
3930#[derive(Debug, Clone, PartialEq, Eq)]
3932pub struct SearchText(String);
3933
3934impl SearchText {
3935 pub fn as_str(&self) -> &str {
3936 &self.0
3937 }
3938
3939 pub fn into_inner(self) -> String {
3940 self.0
3941 }
3942}
3943
3944impl AsRef<str> for SearchText {
3945 fn as_ref(&self) -> &str {
3946 &self.0
3947 }
3948}
3949
3950#[derive(Debug, Clone, PartialEq)]
3951pub struct MessageWithParts {
3952 pub message: Message,
3953 pub parts: Vec<Part>,
3954}
3955
3956#[derive(Debug, Clone, PartialEq)]
3957pub struct SessionWithMessages {
3958 pub session: Session,
3959 pub messages: Vec<MessageWithParts>,
3960}
3961
3962#[derive(Debug, Clone)]
3963pub struct SessionViewParams<'a> {
3964 pub after_message_id: Option<&'a str>,
3966 pub before_message_id: Option<&'a str>,
3968 pub limit: usize,
3969 pub budget_bytes: usize,
3970 pub session_from: SessionFrom,
3972}
3973
3974#[derive(Debug, Clone)]
3975pub struct MessageViewParams {
3976 pub context_before: usize,
3978 pub context_after: usize,
3980 pub budget_bytes: usize,
3981}
3982
3983#[derive(Debug, Clone, PartialEq)]
3989pub enum GetLookup<T> {
3990 NotFound,
3991 UnknownAnchor,
3992 Found(T),
3993}
3994
3995#[derive(Debug, Clone, PartialEq)]
3999pub struct SessionPage {
4000 pub session: Session,
4001 pub messages: Vec<RetrievedMessage>,
4002 pub before_remaining: usize,
4003 pub after_remaining: usize,
4004}
4005
4006#[derive(Debug, Clone, PartialEq)]
4010pub struct MessagePage {
4011 pub session: Session,
4012 pub target: RetrievedMessage,
4013 pub target_parts: Vec<Part>,
4014 pub target_parts_remaining: usize,
4015 pub siblings: Vec<RetrievedMessage>,
4016}
4017
4018#[derive(Debug, Clone, PartialEq)]
4019pub struct RetrievedMessage {
4020 pub id: String,
4021 pub role: Role,
4022 pub timestamp: DateTime<Utc>,
4023 pub text: Option<String>,
4024 pub content: Option<String>,
4025 pub parts: Vec<Part>,
4026}
4027
4028#[derive(Debug, Clone)]
4029struct ScanRow {
4030 id: String,
4031 role: Role,
4032 timestamp: DateTime<Utc>,
4033 text: Option<String>,
4034 content: Option<String>,
4035}
4036
4037#[derive(Debug, Clone)]
4040pub struct ConversationalRow {
4041 pub session_id: String,
4042 pub message_id: String,
4043 pub role: Role,
4044 pub timestamp: DateTime<Utc>,
4045 pub text: SearchText,
4046}
4047
4048fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
4053 let capped = items.len().min(limit.clamp(1, 1000));
4054 let mut acc = 0usize;
4055 let mut emitted = 0usize;
4056 for item in &items[..capped] {
4057 let next = acc.saturating_add(size(item));
4058 if emitted > 0 && next > budget_bytes {
4059 break;
4060 }
4061 acc = next;
4062 emitted += 1;
4063 }
4064 emitted
4065}
4066
4067fn page_tail<T>(
4072 items: &[T],
4073 limit: usize,
4074 budget_bytes: usize,
4075 size: impl Fn(&T) -> usize,
4076) -> usize {
4077 let cap = limit.clamp(1, 1000);
4078 let mut acc = 0usize;
4079 let mut emitted = 0usize;
4080 for item in items.iter().rev() {
4081 if emitted >= cap {
4082 break;
4083 }
4084 let next = acc.saturating_add(size(item));
4085 if emitted > 0 && next > budget_bytes {
4086 break;
4087 }
4088 acc = next;
4089 emitted += 1;
4090 }
4091 emitted
4092}
4093
4094fn role_from_str(value: &str) -> Result<Role> {
4095 match value {
4096 "system" => Ok(Role::System),
4097 "user" => Ok(Role::User),
4098 "assistant" => Ok(Role::Assistant),
4099 "tool" => Ok(Role::Tool),
4100 other => anyhow::bail!("unknown message role {other}"),
4101 }
4102}
4103
4104const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4114 (
4115 "session_id",
4116 BuiltinIndexType::BTree,
4117 "messages_session_id_btree",
4118 ),
4119 (
4125 "timestamp",
4126 BuiltinIndexType::ZoneMap,
4127 "messages_timestamp_zonemap",
4128 ),
4129 (
4130 "source_agent",
4131 BuiltinIndexType::Bitmap,
4132 "messages_source_agent_bitmap",
4133 ),
4134];
4135
4136const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4139 (
4140 "session_id",
4141 BuiltinIndexType::BTree,
4142 "parts_session_id_btree",
4143 ),
4144 (
4145 "message_id",
4146 BuiltinIndexType::BTree,
4147 "parts_message_id_btree",
4148 ),
4149];
4150
4151const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
4154 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
4155
4156const COPY_SESSION_IN_CHUNK: usize = 512;
4160
4161fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
4162 Predicate::In(
4163 column,
4164 values.iter().cloned().map(ScalarValue::String).collect(),
4165 )
4166}
4167
4168fn embedded_scope(filter: &Predicate) -> Predicate {
4181 filter.clone()
4182}
4183
4184pub const DEFAULT_NPROBES: usize = 32;
4189
4190fn apply_vector_search_knobs(
4197 scanner: &mut lance::dataset::scanner::Scanner,
4198 search: Option<&config::SearchConfig>,
4199) {
4200 let nprobes = search
4201 .and_then(|cfg| cfg.nprobes)
4202 .unwrap_or(DEFAULT_NPROBES);
4203 scanner.nprobes(nprobes);
4204}
4205
4206pub(crate) const SESSIONS: &str = "sessions";
4210pub(crate) const MESSAGES: &str = "messages";
4211pub(crate) const PARTS: &str = "parts";
4212
4213pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
4216
4217pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
4224
4225const IVF_SQ_NUM_BITS: u16 = 8;
4230const IVF_SQ_MAX_ITERS: usize = 15;
4231
4232pub fn pond_index_intents() -> IndexIntents {
4235 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
4236}
4237
4238pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
4242 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
4243 messages.push(IndexIntent {
4244 name: MESSAGES_FTS_INDEX,
4245 column: "search_text",
4246 trigger: IndexTrigger::OnAnyRows,
4247 params: IndexParamsKind::InvertedFtsWord,
4248 });
4249 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
4250 messages.push(IndexIntent {
4251 name,
4252 column,
4253 trigger: IndexTrigger::OnAnyRows,
4254 params: IndexParamsKind::Scalar(kind.clone()),
4255 });
4256 }
4257 messages.push(IndexIntent {
4258 name: MESSAGES_VECTOR_INDEX,
4259 column: "vector",
4260 trigger: IndexTrigger::OnNonNullCount {
4261 column: "vector",
4262 threshold: vector_threshold,
4263 },
4264 params: IndexParamsKind::IvfSqCosine {
4265 num_bits: IVF_SQ_NUM_BITS,
4266 max_iters: IVF_SQ_MAX_ITERS,
4267 },
4268 });
4269 let parts = PARTS_SCALAR_INDICES
4270 .iter()
4271 .map(|(column, kind, name)| IndexIntent {
4272 name,
4273 column,
4274 trigger: IndexTrigger::OnAnyRows,
4275 params: IndexParamsKind::Scalar(kind.clone()),
4276 })
4277 .collect();
4278 let sessions = SESSIONS_SCALAR_INDICES
4279 .iter()
4280 .map(|(column, kind, name)| IndexIntent {
4281 name,
4282 column,
4283 trigger: IndexTrigger::OnAnyRows,
4284 params: IndexParamsKind::Scalar(kind.clone()),
4285 })
4286 .collect();
4287 IndexIntents {
4288 sessions,
4289 messages,
4290 parts,
4291 }
4292}
4293
4294pub const DEFAULT_EMBEDDING_DIM: usize = 384;
4298
4299static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
4305
4306pub fn embedding_dim() -> usize {
4309 EMBEDDING_DIM_RUNTIME
4310 .get()
4311 .copied()
4312 .unwrap_or(DEFAULT_EMBEDDING_DIM)
4313}
4314
4315pub fn init_embedding_dim(dim: usize) {
4317 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
4318}
4319
4320pub(crate) fn write_params_for_create() -> WriteParams {
4327 WriteParams {
4328 data_storage_version: Some(LanceFileVersion::V2_1),
4329 enable_v2_manifest_paths: true,
4330 enable_stable_row_ids: true,
4331 auto_cleanup: Some(AutoCleanupParams {
4332 interval: 20,
4333 older_than: chrono::TimeDelta::days(1),
4334 }),
4335 skip_auto_cleanup: true,
4336 ..WriteParams::default()
4337 }
4338}
4339
4340fn export_schema(table: Table) -> Arc<Schema> {
4341 match table {
4342 Table::Sessions => session_schema(),
4343 Table::Messages => message_schema(),
4344 Table::Parts => part_schema(),
4345 }
4346}
4347
4348fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
4349 let expected = export_schema(table);
4350 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
4351 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
4352 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
4353 if actual_names != expected_names {
4354 anyhow::bail!(
4355 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
4356 table.as_str(),
4357 );
4358 }
4359 Ok(())
4360}
4361
4362async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
4363 let source_uri = source
4364 .to_str()
4365 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
4366 let dataset = Dataset::open(source_uri)
4367 .await
4368 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
4369 ensure_schema_matches_archive(&dataset, table)?;
4370 Ok(dataset)
4371}
4372
4373pub(crate) fn session_schema() -> Arc<Schema> {
4374 Arc::new(Schema::new(vec![
4375 primary_field("id", DataType::Utf8, false),
4376 Field::new("parent_session_id", DataType::Utf8, true),
4377 Field::new("parent_message_id", DataType::Utf8, true),
4378 Field::new("source_agent", DataType::Utf8, false),
4379 Field::new(
4380 "created_at",
4381 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4382 false,
4383 ),
4384 Field::new("project", DataType::Utf8, false),
4385 json_field("options", false),
4386 ]))
4387}
4388
4389pub(crate) fn message_schema() -> Arc<Schema> {
4390 Arc::new(Schema::new(vec![
4391 primary_field("session_id", DataType::Utf8, false),
4392 primary_field("id", DataType::Utf8, false),
4393 Field::new(
4394 "timestamp",
4395 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4396 false,
4397 ),
4398 Field::new("role", DataType::Utf8, false),
4399 Field::new("source_agent", DataType::Utf8, false),
4400 Field::new("project", DataType::Utf8, false),
4401 Field::new("content", DataType::Utf8, true),
4402 Field::new("search_text", DataType::Utf8, true),
4403 Field::new("vector", embedding_vector_type(), true),
4406 Field::new("embedding_model", DataType::Utf8, true),
4407 json_field("options", false),
4408 ]))
4409}
4410
4411pub(crate) fn part_schema() -> Arc<Schema> {
4412 Arc::new(Schema::new(vec![
4413 primary_field("session_id", DataType::Utf8, false),
4414 primary_field("message_id", DataType::Utf8, false),
4415 primary_field("id", DataType::Utf8, false),
4416 Field::new("ordinal", DataType::Int32, false),
4417 Field::new("type", DataType::Utf8, false),
4418 Field::new("provenance", DataType::Utf8, false),
4421 json_field("variant_data", false),
4422 legacy_blob_field("data", true),
4423 json_field("options", false),
4424 ]))
4425}
4426
4427pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
4428 let arrays = schema
4429 .fields()
4430 .iter()
4431 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
4432 .collect();
4433 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
4434}
4435
4436pub(crate) fn empty_reader(
4437 schema: Arc<Schema>,
4438) -> Result<
4439 RecordBatchIterator<
4440 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
4441 >,
4442> {
4443 let batch = empty_batch(schema.clone())?;
4444 Ok(RecordBatchIterator::new(
4445 vec![Ok(batch)].into_iter(),
4446 schema,
4447 ))
4448}
4449
4450pub(crate) struct MessageBatchRow<'a> {
4451 pub message: &'a Message,
4452 pub source_agent: &'a str,
4453 pub project: &'a str,
4454 pub search_text: Option<&'a str>,
4455}
4456
4457fn embedding_vector_type() -> DataType {
4463 DataType::FixedSizeList(
4464 Arc::new(Field::new("item", DataType::Float16, true)),
4465 embedding_dim() as i32,
4466 )
4467}
4468
4469fn embedding_update_schema() -> Arc<Schema> {
4473 Arc::new(Schema::new(vec![
4474 primary_field("session_id", DataType::Utf8, false),
4475 primary_field("id", DataType::Utf8, false),
4476 Field::new("vector", embedding_vector_type(), true),
4477 Field::new("embedding_model", DataType::Utf8, true),
4478 ]))
4479}
4480
4481pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
4484 let dim = embedding_dim();
4485 let mut flat = Vec::with_capacity(rows.len() * dim);
4486 for row in rows {
4487 if row.vector.len() != dim {
4488 anyhow::bail!(
4489 "embedding for message {} has dim {}, expected {dim}",
4490 row.id,
4491 row.vector.len(),
4492 );
4493 }
4494 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
4495 }
4496 let values = Float16Array::from(flat);
4497 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
4498 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
4499 .context("failed to build embedding vector column")?;
4500
4501 RecordBatch::try_new(
4502 embedding_update_schema(),
4503 vec![
4504 Arc::new(StringArray::from(
4505 rows.iter()
4506 .map(|row| row.session_id.as_str())
4507 .collect::<Vec<_>>(),
4508 )),
4509 Arc::new(StringArray::from(
4510 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
4511 )),
4512 Arc::new(vectors),
4513 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
4514 ],
4515 )
4516 .context("failed to build embedding update batch")
4517}
4518
4519const COLUMN_BYTE_BUDGET: usize = 1 << 30;
4524
4525fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
4530 let mut chunks = Vec::new();
4531 let mut start = 0usize;
4532 let mut running = 0usize;
4533 for (index, &row) in cells.iter().enumerate() {
4534 if running + row > COLUMN_BYTE_BUDGET && index > start {
4535 chunks.push(start..index);
4536 start = index;
4537 running = 0;
4538 }
4539 running += row;
4540 }
4541 if start < cells.len() {
4542 chunks.push(start..cells.len());
4543 }
4544 chunks
4545}
4546
4547fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
4548 if bytes >= COLUMN_BYTE_BUDGET {
4549 anyhow::bail!(
4550 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
4551 overflow Arrow's i32 offset buffer"
4552 );
4553 }
4554 Ok(())
4555}
4556
4557async fn merge_insert_chunks(
4558 handle: &Handle,
4559 table: Table,
4560 batches: Vec<RecordBatch>,
4561) -> Result<u64> {
4562 let mut inserted = 0u64;
4563 for batch in batches {
4564 let rows = batch.num_rows();
4565 inserted += handle.merge_insert(table, batch, rows).await?;
4566 }
4567 Ok(inserted)
4568}
4569
4570pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
4571 let options = sessions
4572 .iter()
4573 .map(|session| json_bytes(&session.options))
4574 .collect::<Result<Vec<_>>>()?;
4575 let mut cells = Vec::with_capacity(sessions.len());
4576 for (session, encoded) in sessions.iter().zip(&options) {
4577 let columns = [
4578 session.id.len(),
4579 session.parent_session_id.as_deref().map_or(0, str::len),
4580 session.parent_message_id.as_deref().map_or(0, str::len),
4581 session.source_agent.len(),
4582 session.project.as_str().len(),
4583 encoded.len(),
4584 ];
4585 for bytes in columns {
4586 guard_cell("sessions", &session.id, bytes)?;
4587 }
4588 cells.push(columns.iter().sum());
4589 }
4590 chunk_ranges(&cells)
4591 .into_iter()
4592 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
4593 .collect()
4594}
4595
4596fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
4597 let schema = session_schema();
4598 RecordBatch::try_new(
4599 schema.clone(),
4600 vec![
4601 Arc::new(StringArray::from(
4602 sessions
4603 .iter()
4604 .map(|session| session.id.as_str())
4605 .collect::<Vec<_>>(),
4606 )),
4607 Arc::new(StringArray::from(
4608 sessions
4609 .iter()
4610 .map(|session| session.parent_session_id.as_deref())
4611 .collect::<Vec<_>>(),
4612 )),
4613 Arc::new(StringArray::from(
4614 sessions
4615 .iter()
4616 .map(|session| session.parent_message_id.as_deref())
4617 .collect::<Vec<_>>(),
4618 )),
4619 Arc::new(StringArray::from(
4620 sessions
4621 .iter()
4622 .map(|session| session.source_agent.as_str())
4623 .collect::<Vec<_>>(),
4624 )),
4625 Arc::new(
4626 TimestampMicrosecondArray::from(
4627 sessions
4628 .iter()
4629 .map(|session| micros(session.created_at))
4630 .collect::<Vec<_>>(),
4631 )
4632 .with_timezone("UTC"),
4633 ),
4634 Arc::new(StringArray::from(
4635 sessions
4636 .iter()
4637 .map(|session| session.project.as_str())
4638 .collect::<Vec<_>>(),
4639 )),
4640 Arc::new(LargeBinaryArray::from_iter_values(
4641 options.iter().map(Vec::as_slice),
4642 )),
4643 ],
4644 )
4645 .context("failed to build session batch")
4646}
4647
4648pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
4649 let options = rows
4650 .iter()
4651 .map(|row| json_bytes(row.message.options()))
4652 .collect::<Result<Vec<_>>>()?;
4653 let mut cells = Vec::with_capacity(rows.len());
4654 for (row, encoded) in rows.iter().zip(&options) {
4655 let columns = [
4656 row.message.session_id().len(),
4657 row.message.id().len(),
4658 row.message.role().as_str().len(),
4659 row.source_agent.len(),
4660 row.project.len(),
4661 row.message.system_content().map_or(0, str::len),
4662 row.search_text.map_or(0, str::len),
4663 encoded.len(),
4664 ];
4665 for bytes in columns {
4666 guard_cell("messages", row.message.id(), bytes)?;
4667 }
4668 cells.push(columns.iter().sum());
4669 }
4670 chunk_ranges(&cells)
4671 .into_iter()
4672 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
4673 .collect()
4674}
4675
4676fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
4677 let schema = message_schema();
4678 RecordBatch::try_new(
4679 schema.clone(),
4680 vec![
4681 Arc::new(StringArray::from(
4682 rows.iter()
4683 .map(|row| row.message.session_id())
4684 .collect::<Vec<_>>(),
4685 )),
4686 Arc::new(StringArray::from(
4687 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
4688 )),
4689 Arc::new(
4690 TimestampMicrosecondArray::from(
4691 rows.iter()
4692 .map(|row| micros(row.message.timestamp()))
4693 .collect::<Vec<_>>(),
4694 )
4695 .with_timezone("UTC"),
4696 ),
4697 Arc::new(StringArray::from(
4698 rows.iter()
4699 .map(|row| row.message.role().as_str())
4700 .collect::<Vec<_>>(),
4701 )),
4702 Arc::new(StringArray::from(
4703 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
4704 )),
4705 Arc::new(StringArray::from(
4706 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
4707 )),
4708 Arc::new(StringArray::from(
4709 rows.iter()
4710 .map(|row| row.message.system_content())
4711 .collect::<Vec<_>>(),
4712 )),
4713 Arc::new(StringArray::from(
4714 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
4715 )),
4716 new_null_array(&embedding_vector_type(), rows.len()),
4720 new_null_array(&DataType::Utf8, rows.len()),
4721 Arc::new(LargeBinaryArray::from_iter_values(
4722 options.iter().map(Vec::as_slice),
4723 )),
4724 ],
4725 )
4726 .context("failed to build message batch")
4727}
4728
4729pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
4730 let variant_data = parts
4731 .iter()
4732 .map(|part| part_variant_json(&part.kind))
4733 .collect::<Result<Vec<_>>>()?;
4734 let options = parts
4735 .iter()
4736 .map(|part| json_bytes(&part.options))
4737 .collect::<Result<Vec<_>>>()?;
4738 let mut cells = Vec::with_capacity(parts.len());
4739 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
4742 let columns = [
4743 part.session_id.len(),
4744 part.message_id.len(),
4745 part.id.len(),
4746 part.kind.type_name().len(),
4747 part.provenance.as_str().len(),
4748 variant.len(),
4749 encoded.len(),
4750 ];
4751 for bytes in columns {
4752 guard_cell("parts", &part.id, bytes)?;
4753 }
4754 cells.push(columns.iter().sum());
4755 }
4756 chunk_ranges(&cells)
4757 .into_iter()
4758 .map(|range| {
4759 parts_chunk(
4760 &parts[range.clone()],
4761 &variant_data[range.clone()],
4762 &options[range],
4763 )
4764 })
4765 .collect()
4766}
4767
4768fn parts_chunk(
4769 parts: &[Part],
4770 variant_data: &[Vec<u8>],
4771 options: &[Vec<u8>],
4772) -> Result<RecordBatch> {
4773 let schema = part_schema();
4774 let blob_payloads: Vec<Option<&[u8]>> = parts
4778 .iter()
4779 .map(|part| match &part.kind {
4780 PartKind::File { data, .. } => Some(match data {
4781 FileData::String(value) => value.as_bytes(),
4782 FileData::Bytes(value) => value.as_slice(),
4783 FileData::Url(value) => value.as_bytes(),
4784 }),
4785 PartKind::Text { .. }
4786 | PartKind::Reasoning { .. }
4787 | PartKind::ToolCall { .. }
4788 | PartKind::ToolResult { .. }
4789 | PartKind::ToolApprovalRequest { .. }
4790 | PartKind::ToolApprovalResponse { .. } => None,
4791 })
4792 .collect();
4793 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
4794
4795 RecordBatch::try_new(
4796 schema.clone(),
4797 vec![
4798 Arc::new(StringArray::from(
4799 parts
4800 .iter()
4801 .map(|part| part.session_id.as_str())
4802 .collect::<Vec<_>>(),
4803 )),
4804 Arc::new(StringArray::from(
4805 parts
4806 .iter()
4807 .map(|part| part.message_id.as_str())
4808 .collect::<Vec<_>>(),
4809 )),
4810 Arc::new(StringArray::from(
4811 parts
4812 .iter()
4813 .map(|part| part.id.as_str())
4814 .collect::<Vec<_>>(),
4815 )),
4816 Arc::new(Int32Array::from(
4817 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
4818 )),
4819 Arc::new(StringArray::from(
4820 parts
4821 .iter()
4822 .map(|part| part.kind.type_name())
4823 .collect::<Vec<_>>(),
4824 )),
4825 Arc::new(StringArray::from(
4826 parts
4827 .iter()
4828 .map(|part| part.provenance.as_str())
4829 .collect::<Vec<_>>(),
4830 )),
4831 Arc::new(LargeBinaryArray::from_iter_values(
4832 variant_data.iter().map(Vec::as_slice),
4833 )),
4834 Arc::new(blob_array),
4835 Arc::new(LargeBinaryArray::from_iter_values(
4836 options.iter().map(Vec::as_slice),
4837 )),
4838 ],
4839 )
4840 .context("failed to build parts batch")
4841}
4842
4843pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
4844 Ok(Session {
4845 id: string(batch, "id", row)?.context("session id is null")?,
4846 parent_session_id: string(batch, "parent_session_id", row)?,
4847 parent_message_id: string(batch, "parent_message_id", row)?,
4848 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4849 created_at: datetime(batch, "created_at", row)?,
4850 project: crate::adapter::Extracted::from_stored(
4851 string(batch, "project", row)?.context("project is null")?,
4852 ),
4853 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
4854 })
4855}
4856
4857pub struct RowmapOracle(pub Option<Arc<RowMetaSet>>);
4864
4865impl crate::adapter::SkipOracle for RowmapOracle {
4866 fn session_max_ts(&self, session_id: &str) -> Option<i64> {
4867 self.0.as_ref()?.lookup_max_ts(session_id)
4868 }
4869
4870 fn is_empty(&self) -> bool {
4871 self.0.as_ref().is_none_or(|set| set.is_empty())
4872 }
4873}
4874
4875fn row_meta_entry(batch: &RecordBatch, row_id: u64, row: usize) -> Result<RowMetaEntry> {
4876 Ok(RowMetaEntry {
4877 row_id,
4878 session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4879 message_id: string(batch, "id", row)?.context("message id is null")?,
4880 role: string(batch, "role", row)?.context("role is null")?,
4881 project: string(batch, "project", row)?.context("project is null")?,
4882 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4883 timestamp_micros: datetime(batch, "timestamp", row)?.timestamp_micros(),
4884 search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4885 })
4886}
4887
4888pub(crate) fn message_meta_from_batch(batch: &RecordBatch, row: usize) -> Result<MessageMeta> {
4889 Ok(MessageMeta {
4890 message_id: string(batch, "id", row)?.context("id is null")?,
4891 session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4892 role: string(batch, "role", row)?.context("role is null")?,
4893 project: string(batch, "project", row)?.context("project is null")?,
4894 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4895 timestamp: datetime(batch, "timestamp", row)?,
4896 search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4897 })
4898}
4899
4900pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
4901 let id = string(batch, "id", row)?.context("message id is null")?;
4902 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
4903 let timestamp = datetime(batch, "timestamp", row)?;
4904 let options =
4905 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
4906
4907 match string(batch, "role", row)?
4908 .context("message role is null")?
4909 .as_str()
4910 {
4911 "system" => Ok(Message::System {
4912 id,
4913 session_id,
4914 timestamp,
4915 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
4922 options,
4923 }),
4924 "user" => Ok(Message::User {
4925 id,
4926 session_id,
4927 timestamp,
4928 options,
4929 }),
4930 "assistant" => Ok(Message::Assistant {
4931 id,
4932 session_id,
4933 timestamp,
4934 options,
4935 }),
4936 "tool" => Ok(Message::Tool {
4937 id,
4938 session_id,
4939 timestamp,
4940 options,
4941 }),
4942 other => anyhow::bail!("unknown message role {other}"),
4943 }
4944}
4945
4946pub(crate) fn part_from_batch(
4947 batch: &RecordBatch,
4948 row: usize,
4949 file_data: Option<FileData>,
4950) -> Result<Part> {
4951 let type_name = string(batch, "type", row)?.context("part type is null")?;
4952 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
4953 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
4954 Ok(Part {
4955 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
4956 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
4957 id: string(batch, "id", row)?.context("part id is null")?,
4958 ordinal: int32(batch, "ordinal", row)?,
4959 provenance: provenance_from_str(&provenance)?,
4960 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
4961 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
4962 })
4963}
4964
4965fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
4966 match value {
4967 "conversational" => Ok(crate::wire::Provenance::Conversational),
4968 "injected" => Ok(crate::wire::Provenance::Injected),
4969 other => anyhow::bail!("unknown part provenance {other}"),
4970 }
4971}
4972
4973fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
4974 let kind = file_data_kind(variant_data)?;
4975 match kind.as_str() {
4976 "string" => {
4977 let text = std::str::from_utf8(bytes)
4978 .context("file string payload is not UTF-8")?
4979 .to_owned();
4980 Ok(FileData::String(text))
4981 }
4982 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
4983 "url" => Ok(FileData::Url(
4984 std::str::from_utf8(bytes)
4985 .context("file URL payload is not UTF-8")?
4986 .to_owned(),
4987 )),
4988 other => anyhow::bail!("unknown file data_kind {other}"),
4989 }
4990}
4991
4992fn file_data_kind(variant_data: &[u8]) -> Result<String> {
4993 let value = json_parse::<Value>(variant_data)?;
4994 value
4995 .get("data_kind")
4996 .and_then(Value::as_str)
4997 .map(str::to_owned)
4998 .context("file part variant_data missing data_kind")
4999}
5000
5001fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
5002 batch
5003 .column_by_name(name)
5004 .with_context(|| format!("missing column {name}"))?
5005 .as_any()
5006 .downcast_ref::<UInt64Array>()
5007 .with_context(|| format!("column {name} is not UInt64"))
5008}
5009
5010pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
5011 let array = batch
5012 .column_by_name(name)
5013 .with_context(|| format!("missing column {name}"))?
5014 .as_any()
5015 .downcast_ref::<StringArray>()
5016 .with_context(|| format!("column {name} is not Utf8"))?;
5017 if array.is_null(row) {
5018 Ok(None)
5019 } else {
5020 Ok(Some(array.value(row).to_owned()))
5021 }
5022}
5023
5024fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
5025 let column = batch
5029 .column_by_name(name)
5030 .with_context(|| format!("missing column {name}"))?;
5031 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
5032 return if array.is_null(row) {
5033 Ok(None)
5034 } else {
5035 Ok(Some(
5036 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
5037 ))
5038 };
5039 }
5040 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
5041 return if array.is_null(row) {
5042 Ok(None)
5043 } else {
5044 Ok(Some(array.value(row).as_bytes().to_vec()))
5045 };
5046 }
5047 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
5048 return if array.is_null(row) {
5049 Ok(None)
5050 } else {
5051 Ok(Some(array.value(row).as_bytes().to_vec()))
5052 };
5053 }
5054 anyhow::bail!("column {name} is not a JSON-compatible array")
5055}
5056
5057fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
5058 let array = batch
5059 .column_by_name(name)
5060 .with_context(|| format!("missing column {name}"))?
5061 .as_any()
5062 .downcast_ref::<Int32Array>()
5063 .with_context(|| format!("column {name} is not Int32"))?;
5064 Ok(array.value(row))
5065}
5066
5067pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
5068 let array = batch
5069 .column_by_name(name)
5070 .with_context(|| format!("missing column {name}"))?
5071 .as_any()
5072 .downcast_ref::<Float32Array>()
5073 .with_context(|| format!("column {name} is not Float32"))?;
5074 Ok(array.value(row))
5075}
5076
5077pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
5078 let array = batch
5079 .column_by_name(name)
5080 .with_context(|| format!("missing column {name}"))?
5081 .as_any()
5082 .downcast_ref::<TimestampMicrosecondArray>()
5083 .with_context(|| format!("column {name} is not timestamp_micros"))?;
5084 Utc.timestamp_micros(array.value(row))
5085 .single()
5086 .context("timestamp is out of range")
5087}
5088
5089fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
5090 Field::new(name, data_type, nullable).with_metadata(
5091 [(
5092 "lance-schema:unenforced-primary-key".to_owned(),
5093 "true".to_owned(),
5094 )]
5095 .into(),
5096 )
5097}
5098
5099fn legacy_blob_field(name: &str, nullable: bool) -> Field {
5109 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
5110 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
5111 .into_iter()
5112 .collect(),
5113 )
5114}
5115
5116fn json_field(name: &str, nullable: bool) -> Field {
5117 lance_arrow::json::json_field(name, nullable)
5118}
5119
5120fn micros(timestamp: DateTime<Utc>) -> i64 {
5121 timestamp.timestamp_micros()
5122}
5123
5124fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
5125 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
5133 lance_arrow::json::encode_json(&text)
5134 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
5135}
5136
5137fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
5138 serde_json::from_slice(value).context("failed to parse JSON field")
5139}
5140
5141fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
5142 if let PartKind::File {
5143 media_type,
5144 file_name,
5145 data,
5146 } = kind
5147 {
5148 let data_kind = match data {
5149 FileData::String(_) => "string",
5150 FileData::Bytes(_) => "bytes",
5151 FileData::Url(_) => "url",
5152 };
5153 return json_bytes(&serde_json::json!({
5154 "media_type": media_type,
5155 "file_name": file_name,
5156 "data_kind": data_kind,
5157 }));
5158 }
5159 let value = serde_json::to_value(kind)?;
5160 let mut object = value
5161 .as_object()
5162 .cloned()
5163 .context("part variant did not serialize to an object")?;
5164 object.remove("type");
5165 json_bytes(&object)
5166}
5167
5168fn part_kind_from_json(
5169 type_name: &str,
5170 variant_data: &[u8],
5171 file_data: Option<FileData>,
5172) -> Result<PartKind> {
5173 let mut value = json_parse::<Value>(variant_data)?;
5174 let object = value
5175 .as_object_mut()
5176 .context("part variant data is not an object")?;
5177 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
5178 if let Some(data) = file_data {
5179 object.remove("data_kind");
5180 object.insert("data".to_owned(), serde_json::to_value(data)?);
5181 }
5182 serde_json::from_value(value).context("failed to parse part kind")
5183}
5184
5185#[cfg(test)]
5186mod tests {
5187 #![allow(clippy::expect_used, clippy::unwrap_used)]
5188
5189 use super::*;
5190 use crate::{
5191 adapter::Extracted,
5192 handlers::ingest_events,
5193 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
5194 };
5195 use chrono::Utc;
5196 use serde_json::json;
5197 use tempfile::TempDir;
5198
5199 fn synthetic_session(id: &str) -> Session {
5200 Session {
5201 id: id.to_owned(),
5202 parent_session_id: None,
5203 parent_message_id: None,
5204 source_agent: "claude-code".to_owned(),
5205 created_at: Utc::now(),
5206 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
5207 options: ProviderOptions::new(),
5208 }
5209 }
5210
5211 #[test]
5212 fn search_text_excludes_injected_parts() {
5213 use crate::wire::Provenance;
5214 let message = Message::User {
5215 id: "m1".to_owned(),
5216 session_id: "s1".to_owned(),
5217 timestamp: Utc::now(),
5218 options: ProviderOptions::new(),
5219 };
5220 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
5221 session_id: "s1".to_owned(),
5222 id: id.to_owned(),
5223 message_id: "m1".to_owned(),
5224 ordinal: 0,
5225 provenance,
5226 options: ProviderOptions::new(),
5227 kind: PartKind::Text {
5228 text: Some(Extracted::from_test_value(text.to_owned())),
5229 },
5230 };
5231
5232 let conversational = search_text(
5235 &message,
5236 &[text_part(
5237 "p1",
5238 "real human prompt",
5239 Provenance::Conversational,
5240 )],
5241 );
5242 assert_eq!(conversational.as_deref(), Some("real human prompt"));
5243
5244 let injected = search_text(
5245 &message,
5246 &[text_part(
5247 "p2",
5248 "<task-notification>...</task-notification>",
5249 Provenance::Injected,
5250 )],
5251 );
5252 assert!(
5253 injected.is_none(),
5254 "a message whose only part is injected has null search_text"
5255 );
5256 }
5257
5258 #[test]
5259 fn chunk_ranges_splits_on_byte_budget() {
5260 assert!(chunk_ranges(&[]).is_empty());
5261 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
5262
5263 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
5264 assert_eq!(
5265 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
5266 vec![0..1, 1..2, 2..3],
5267 );
5268
5269 assert_eq!(
5271 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
5272 vec![0..1, 1..2, 2..3],
5273 );
5274 }
5275
5276 #[tokio::test]
5277 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
5278 let temp = TempDir::new()?;
5283 let store = Store::open_local(temp.path()).await?;
5284 let session = synthetic_session("ordering");
5285 let orphan_part = Part {
5286 session_id: session.id.clone(),
5287 id: "orphan-part".to_owned(),
5288 message_id: "missing-message".to_owned(),
5289 ordinal: 0,
5290 provenance: crate::wire::Provenance::Conversational,
5291 options: ProviderOptions::new(),
5292 kind: PartKind::Text {
5293 text: Some(Extracted::from_test_value("orphan".to_owned())),
5294 },
5295 };
5296 let valid_message = Message::User {
5297 id: "valid-message".to_owned(),
5298 session_id: session.id.clone(),
5299 timestamp: Utc::now(),
5300 options: ProviderOptions::new(),
5301 };
5302 let valid_part = Part {
5303 session_id: session.id.clone(),
5304 id: "valid-part".to_owned(),
5305 message_id: valid_message.id().to_owned(),
5306 ordinal: 0,
5307 provenance: crate::wire::Provenance::Conversational,
5308 options: ProviderOptions::new(),
5309 kind: PartKind::Text {
5310 text: Some(Extracted::from_test_value("kept".to_owned())),
5311 },
5312 };
5313
5314 let mut validator = IngestValidator::default();
5315 validator
5316 .push(&store, 0, IngestEvent::Session(session.clone()))
5317 .await?;
5318 let part_outcomes = validator
5319 .push(&store, 1, IngestEvent::Part(orphan_part))
5320 .await?;
5321 assert_eq!(part_outcomes.len(), 1);
5322 assert_eq!(part_outcomes[0].kind, "part");
5323 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
5324 assert!(
5325 part_outcomes[0]
5326 .error
5327 .as_ref()
5328 .map(|e| e.message.contains("part event appeared before a message"))
5329 .unwrap_or(false),
5330 "error message must explain the ordering violation: {part_outcomes:?}"
5331 );
5332 validator
5333 .push(&store, 2, IngestEvent::Message(valid_message))
5334 .await?;
5335 validator
5336 .push(&store, 3, IngestEvent::Part(valid_part))
5337 .await?;
5338 validator.finish(&store).await?;
5339
5340 let (sessions, messages, parts) = store.row_counts().await?;
5341 assert_eq!(sessions, 1, "session committed despite the orphan part");
5342 assert_eq!(messages, 1, "valid message committed");
5343 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
5344
5345 Ok(())
5346 }
5347
5348 #[tokio::test]
5349 async fn resident_meta_map_hydration_matches_take_rows_fallback() -> anyhow::Result<()> {
5350 let temp = TempDir::new()?;
5354 let store = Store::open_local(temp.path()).await?;
5355 let session = synthetic_session("hydration-parity");
5356
5357 let messages = [
5358 (
5359 "m1",
5360 "the auth refactor landed cleanly",
5361 1_700_000_000_123_456_i64,
5362 ),
5363 (
5364 "m2",
5365 "balance handler now retries on rpc timeout",
5366 1_700_000_050_654_321,
5367 ),
5368 ];
5369 let mut validator = IngestValidator::default();
5370 validator
5371 .push(&store, 0, IngestEvent::Session(session.clone()))
5372 .await?;
5373 let mut seq = 1;
5374 for (mid, text, micros) in messages {
5375 let message = Message::User {
5376 id: mid.to_owned(),
5377 session_id: session.id.clone(),
5378 timestamp: DateTime::from_timestamp_micros(micros).unwrap(),
5379 options: ProviderOptions::new(),
5380 };
5381 validator
5382 .push(&store, seq, IngestEvent::Message(message))
5383 .await?;
5384 seq += 1;
5385 let part = Part {
5386 session_id: session.id.clone(),
5387 id: format!("{mid}-p0"),
5388 message_id: mid.to_owned(),
5389 ordinal: 0,
5390 provenance: crate::wire::Provenance::Conversational,
5391 options: ProviderOptions::new(),
5392 kind: PartKind::Text {
5393 text: Some(Extracted::from_test_value(text.to_owned())),
5394 },
5395 };
5396 validator.push(&store, seq, IngestEvent::Part(part)).await?;
5397 seq += 1;
5398 }
5399 validator.finish(&store).await?;
5400
5401 let rowids: Vec<u64> = store
5402 .collect_row_metas()
5403 .await?
5404 .into_iter()
5405 .map(|entry| entry.row_id)
5406 .collect();
5407 assert_eq!(rowids.len(), 2);
5408
5409 let sort_by_id = |mut metas: Vec<MessageMeta>| {
5410 metas.sort_by(|left, right| left.message_id.cmp(&right.message_id));
5411 metas
5412 };
5413
5414 let fallback = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5415
5416 store.ensure_rowmap(&temp.path().join("cache")).await?;
5419 let resident = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5420
5421 assert_eq!(
5422 resident, fallback,
5423 "resident-map hydration must match the take_rows fallback"
5424 );
5425 assert_eq!(
5426 resident[0].timestamp.timestamp_micros(),
5427 1_700_000_000_123_456
5428 );
5429 Ok(())
5430 }
5431
5432 #[tokio::test]
5433 async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
5434 let temp = TempDir::new()?;
5439 let store = Store::open_local(temp.path()).await?;
5440 assert!(
5441 !store.initialized().await?,
5442 "fresh store has no parts table"
5443 );
5444
5445 let session = synthetic_session("initialized-probe");
5446 let message = Message::User {
5447 id: "message-1".to_owned(),
5448 session_id: session.id.clone(),
5449 timestamp: Utc::now(),
5450 options: ProviderOptions::new(),
5451 };
5452 let part = Part {
5453 session_id: session.id.clone(),
5454 id: "part-1".to_owned(),
5455 message_id: message.id().to_owned(),
5456 ordinal: 0,
5457 provenance: crate::wire::Provenance::Conversational,
5458 options: ProviderOptions::new(),
5459 kind: PartKind::Text {
5460 text: Some(Extracted::from_test_value("hello".to_owned())),
5461 },
5462 };
5463 let mut validator = IngestValidator::default();
5464 validator
5465 .push(&store, 0, IngestEvent::Session(session))
5466 .await?;
5467 validator
5468 .push(&store, 1, IngestEvent::Message(message))
5469 .await?;
5470 validator.push(&store, 2, IngestEvent::Part(part)).await?;
5471 validator.finish(&store).await?;
5472
5473 assert!(store.initialized().await?, "ingest creates the parts table");
5474 Ok(())
5475 }
5476
5477 #[tokio::test]
5478 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
5479 let temp = TempDir::new()?;
5483 let store = Store::open_local(temp.path()).await?;
5484 let session = synthetic_session("duplicate-message");
5485 let first = Message::User {
5486 id: "message-1".to_owned(),
5487 session_id: session.id.clone(),
5488 timestamp: Utc::now(),
5489 options: ProviderOptions::new(),
5490 };
5491 let second = Message::Assistant {
5492 id: "message-1".to_owned(),
5493 session_id: session.id.clone(),
5494 timestamp: Utc::now(),
5495 options: ProviderOptions::new(),
5496 };
5497
5498 let mut validator = IngestValidator::default();
5499 validator
5500 .push(&store, 0, IngestEvent::Session(session.clone()))
5501 .await?;
5502 validator
5503 .push(&store, 1, IngestEvent::Message(first))
5504 .await?;
5505 let dup_outcomes = validator
5506 .push(&store, 2, IngestEvent::Message(second))
5507 .await?;
5508 assert_eq!(dup_outcomes.len(), 1);
5509 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
5510 assert!(
5511 dup_outcomes[0]
5512 .error
5513 .as_ref()
5514 .map(|e| e.message.contains("duplicate message id message-1"))
5515 .unwrap_or(false),
5516 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
5517 );
5518
5519 validator.finish(&store).await?;
5520 let (sessions, messages, _) = store.row_counts().await?;
5521 assert_eq!(sessions, 1, "session committed");
5522 assert_eq!(messages, 1, "only the first message committed");
5523
5524 Ok(())
5525 }
5526
5527 #[tokio::test]
5528 async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
5529 -> anyhow::Result<()> {
5530 let temp = TempDir::new()?;
5534 let store = Store::open_local(temp.path()).await?;
5535 let session = synthetic_session("host-provenance");
5536 let mut spoofed = ProviderOptions::new();
5537 spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
5538 let message = Message::User {
5539 id: "message-1".to_owned(),
5540 session_id: session.id.clone(),
5541 timestamp: Utc::now(),
5542 options: spoofed,
5543 };
5544 let part = Part {
5545 session_id: session.id.clone(),
5546 id: "part-1".to_owned(),
5547 message_id: "message-1".to_owned(),
5548 ordinal: 0,
5549 provenance: crate::wire::Provenance::Conversational,
5550 options: ProviderOptions::new(),
5551 kind: PartKind::Text {
5552 text: Some(Extracted::from_test_value("hello".to_owned())),
5553 },
5554 };
5555
5556 let mut validator = IngestValidator::default();
5557 validator
5558 .push(&store, 0, IngestEvent::Session(session.clone()))
5559 .await?;
5560 validator
5561 .push(&store, 1, IngestEvent::Message(message))
5562 .await?;
5563 validator.push(&store, 2, IngestEvent::Part(part)).await?;
5564 validator.finish(&store).await?;
5565
5566 let stored = store
5567 .get_session(&session.id)
5568 .await?
5569 .expect("ingested session is readable");
5570 assert!(
5571 !stored.session.options.contains_key("pond"),
5572 "session rows are not stamped (attribution derives from messages)"
5573 );
5574 let stored_message = &stored.messages[0].message;
5575 match ingest_host_stamp() {
5576 Some(stamp) => {
5577 assert_eq!(
5578 stored_message.options().get("pond"),
5579 Some(stamp),
5580 "stored message carries the real stamp, never the spoof"
5581 );
5582 let host = stamp
5583 .pointer("/ingest/host")
5584 .and_then(Value::as_object)
5585 .expect("stamp shape is {ingest: {host: {..}}}");
5586 assert!(!host.is_empty(), "an all-empty stamp must be None instead");
5587 assert!(
5588 host.values()
5589 .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
5590 "stamp fields are omitted when unavailable, never empty: {host:?}"
5591 );
5592 }
5593 None => assert!(
5594 stored_message.options().get("pond").is_none(),
5595 "with no resolvable stamp the spoofed key is still stripped"
5596 ),
5597 }
5598 assert!(
5599 !stored.messages[0].parts[0].options.contains_key("pond"),
5600 "part rows are not stamped (covered by their message's stamp)"
5601 );
5602
5603 Ok(())
5604 }
5605
5606 #[tokio::test(flavor = "multi_thread")]
5614 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
5615 use crate::wire::{FileData, PartKind, Provenance};
5616 let temp = TempDir::new()?;
5617 let store = Store::open_local(temp.path()).await?;
5618
5619 let session = synthetic_session("compact-blob");
5620 store
5621 .upsert_sessions(std::slice::from_ref(&session))
5622 .await?;
5623
5624 let make_part = |idx: usize, kind: PartKind| Part {
5625 session_id: session.id.clone(),
5626 message_id: format!("msg-{idx}"),
5627 id: format!("part-{idx}"),
5628 ordinal: 0,
5629 provenance: Provenance::Conversational,
5630 options: ProviderOptions::new(),
5631 kind,
5632 };
5633
5634 let batch_a = vec![
5635 make_part(
5636 0,
5637 PartKind::File {
5638 media_type: Some("text/plain".to_owned()),
5639 file_name: Some("a.txt".to_owned()),
5640 data: FileData::Bytes(b"alpha".to_vec()),
5641 },
5642 ),
5643 make_part(
5644 1,
5645 PartKind::File {
5646 media_type: Some("text/plain".to_owned()),
5647 file_name: Some("b.txt".to_owned()),
5648 data: FileData::String("beta".to_owned()),
5649 },
5650 ),
5651 ];
5652 store.upsert_parts(&batch_a).await?;
5653
5654 let batch_b = vec![
5655 make_part(
5656 2,
5657 PartKind::File {
5658 media_type: Some("application/octet-stream".to_owned()),
5659 file_name: None,
5660 data: FileData::Url("https://example.com/file".to_owned()),
5661 },
5662 ),
5663 make_part(
5664 3,
5665 PartKind::File {
5666 media_type: Some("image/png".to_owned()),
5667 file_name: Some("c.png".to_owned()),
5668 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
5669 },
5670 ),
5671 ];
5672 store.upsert_parts(&batch_b).await?;
5673
5674 store
5675 .optimize_indices(None, &MaintenancePolicy::always_compact())
5676 .await?
5677 .into_result()?;
5678
5679 Ok(())
5680 }
5681
5682 #[tokio::test]
5683 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
5684 let temp = TempDir::new()?;
5685 let store = Store::open_local(temp.path()).await?;
5686 let session = synthetic_session("blob");
5687 let message = Message::User {
5688 id: "message-1".to_owned(),
5689 session_id: session.id.clone(),
5690 timestamp: Utc::now(),
5691 options: ProviderOptions::new(),
5692 };
5693 let part = Part {
5694 session_id: session.id.clone(),
5695 id: "part-1".to_owned(),
5696 message_id: message.id().to_owned(),
5697 ordinal: 0,
5698 provenance: crate::wire::Provenance::Conversational,
5699 options: ProviderOptions::new(),
5700 kind: PartKind::File {
5701 media_type: Some("text/plain".to_owned()),
5702 file_name: Some("payload.txt".to_owned()),
5703 data: FileData::Bytes(b"pond".to_vec()),
5704 },
5705 };
5706
5707 let mut validator = IngestValidator::default();
5708 validator
5709 .push(&store, 0, IngestEvent::Session(session.clone()))
5710 .await?;
5711 validator
5712 .push(&store, 1, IngestEvent::Message(message.clone()))
5713 .await?;
5714 validator
5715 .push(&store, 2, IngestEvent::Part(part.clone()))
5716 .await?;
5717 validator.finish(&store).await?;
5718
5719 let stored = store
5720 .get_session(&session.id)
5721 .await?
5722 .expect("session should exist");
5723 let stored_part = &stored.messages[0].parts[0];
5724 assert_eq!(stored_part, &part);
5725
5726 Ok(())
5727 }
5728
5729 fn base_session() -> Session {
5740 Session {
5741 id: "01HXY00000000001".to_owned(),
5742 parent_session_id: None,
5743 parent_message_id: None,
5744 source_agent: "claude-code".to_owned(),
5745 created_at: Utc::now(),
5746 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
5747 options: ProviderOptions::new(),
5748 }
5749 }
5750
5751 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
5752 outcomes
5753 .iter()
5754 .filter(|outcome| outcome.status == target)
5755 .count()
5756 }
5757
5758 #[tokio::test(flavor = "multi_thread")]
5759 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
5760 -> anyhow::Result<()> {
5761 let temp = TempDir::new()?;
5762 let store = Store::open_local(temp.path()).await?;
5763
5764 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5765 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
5766
5767 let mut again = base_session();
5768 again.options.insert("title".to_owned(), json!("renamed"));
5769 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
5770 assert_eq!(
5771 count_status(&second, OutcomeStatus::Error),
5772 0,
5773 "options is mutable; the re-ingest must not surface an error: {second:?}",
5774 );
5775 assert_eq!(
5776 count_status(&second, OutcomeStatus::Matched),
5777 1,
5778 "unchanged immutable fields must match-insert via merge_insert",
5779 );
5780
5781 Ok(())
5782 }
5783
5784 #[tokio::test(flavor = "multi_thread")]
5785 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
5786 let temp = TempDir::new()?;
5787 let store = Store::open_local(temp.path()).await?;
5788
5789 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5790 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5791
5792 let mut tampered = base_session();
5793 tampered.source_agent = "codex-cli".to_owned();
5794 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5795 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
5796 let err_row = second
5797 .iter()
5798 .find(|outcome| outcome.status == OutcomeStatus::Error)
5799 .expect("error outcome present");
5800 let err = err_row.error.as_ref().expect("error body present");
5801 assert_eq!(err.field, Some("source_agent"));
5802 assert_eq!(err.reason, Some("immutable"));
5803
5804 let stored = store
5806 .get_session(&base_session().id)
5807 .await?
5808 .expect("session row survives the rejected re-ingest");
5809 assert_eq!(stored.session.source_agent, "claude-code");
5810
5811 Ok(())
5812 }
5813
5814 #[tokio::test(flavor = "multi_thread")]
5815 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
5816 let temp = TempDir::new()?;
5817 let store = Store::open_local(temp.path()).await?;
5818
5819 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5820 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5821
5822 let mut tampered = base_session();
5823 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
5824 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5825 let err_row = second
5826 .iter()
5827 .find(|outcome| outcome.status == OutcomeStatus::Error)
5828 .expect("project change must surface an error outcome");
5829 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
5830
5831 let stored = store
5832 .get_session(&base_session().id)
5833 .await?
5834 .expect("session row survives");
5835 assert_eq!(
5836 stored.session.project.as_str(),
5837 "/home/me/proj",
5838 "stored project must remain the original",
5839 );
5840
5841 Ok(())
5842 }
5843
5844 #[tokio::test(flavor = "multi_thread")]
5845 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
5846 use crate::wire::Provenance;
5854 let temp = TempDir::new()?;
5855 let store = Store::open_local(temp.path()).await?;
5856 let session = base_session();
5857
5858 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
5859 session_id: session.id.clone(),
5860 id: part_id.to_owned(),
5861 message_id: message_id.to_owned(),
5862 ordinal: 0,
5863 provenance: Provenance::Conversational,
5864 options: ProviderOptions::new(),
5865 kind: PartKind::Text {
5866 text: Some(Extracted::from_test_value(body.to_owned())),
5867 },
5868 };
5869 let user_message = |id: &str| Message::User {
5870 id: id.to_owned(),
5871 session_id: session.id.clone(),
5872 timestamp: Utc::now(),
5873 options: ProviderOptions::new(),
5874 };
5875
5876 let mut validator = IngestValidator::default();
5878 validator
5879 .push(&store, 0, IngestEvent::Session(session.clone()))
5880 .await?;
5881 validator
5882 .push(&store, 1, IngestEvent::Message(user_message("m1")))
5883 .await?;
5884 validator
5885 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
5886 .await?;
5887 validator
5888 .push(&store, 3, IngestEvent::Message(user_message("m2")))
5889 .await?;
5890 validator
5891 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
5892 .await?;
5893 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
5894 assert_eq!(first_counts.sessions_inserted, 1);
5895 assert_eq!(first_counts.messages_inserted_total, 2);
5896 assert_eq!(first_counts.messages_inserted_searchable, 2);
5897
5898 let mut validator = IngestValidator::default();
5900 validator
5901 .push(&store, 0, IngestEvent::Session(session.clone()))
5902 .await?;
5903 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
5904 let pid = format!("p{}", idx + 3);
5905 validator
5906 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
5907 .await?;
5908 validator
5909 .push(
5910 &store,
5911 idx * 2 + 2,
5912 IngestEvent::Part(text_part(&pid, mid, "gamma")),
5913 )
5914 .await?;
5915 }
5916 let (second_outcomes, second_counts) = validator.finish(&store).await?;
5917
5918 assert_eq!(
5919 second_counts.sessions_inserted, 0,
5920 "existing session row must report as Matched, not Inserted",
5921 );
5922 assert_eq!(second_counts.sessions_matched, 1);
5923 assert_eq!(
5924 second_counts.messages_inserted_total, 3,
5925 "the three NEW messages must register as Inserted in BatchCounts",
5926 );
5927 assert_eq!(
5928 second_counts.messages_inserted_searchable, 3,
5929 "all three new messages carry conversational text -> searchable",
5930 );
5931 assert_eq!(second_counts.messages_matched_total, 0);
5932 assert_eq!(second_counts.parts_inserted, 3);
5933 assert_eq!(second_counts.parts_matched, 0);
5934
5935 let session_outcome = second_outcomes
5938 .iter()
5939 .find(|outcome| outcome.kind == "session")
5940 .expect("session-row outcome present");
5941 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
5942 for outcome in &second_outcomes {
5943 if outcome.kind == "message" || outcome.kind == "part" {
5944 assert_eq!(
5945 outcome.status,
5946 OutcomeStatus::Inserted,
5947 "new row must be Inserted, got: {outcome:?}",
5948 );
5949 }
5950 }
5951 Ok(())
5952 }
5953
5954 async fn store_with_messages(
5958 temp: &TempDir,
5959 count: usize,
5960 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5961 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
5962 }
5963
5964 async fn store_with_messages_at_threshold(
5967 temp: &TempDir,
5968 count: usize,
5969 _vector_threshold: usize,
5970 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5971 let store = Store::open_local(temp.path()).await?;
5972 let sessions = 8.min(count.max(1));
5973 let mut events = Vec::new();
5974 for s in 0..sessions {
5975 events.push(IngestEvent::Session(Session {
5976 id: format!("session-{s}"),
5977 parent_session_id: None,
5978 parent_message_id: None,
5979 source_agent: "claude-code".to_owned(),
5980 created_at: Utc::now(),
5981 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
5982 options: ProviderOptions::new(),
5983 }));
5984 for i in (s..count).step_by(sessions) {
5985 let message_id = format!("msg-{i}");
5986 events.push(IngestEvent::Message(Message::User {
5987 id: message_id.clone(),
5988 session_id: format!("session-{s}"),
5989 timestamp: Utc::now(),
5990 options: ProviderOptions::new(),
5991 }));
5992 events.push(IngestEvent::Part(Part {
5993 session_id: format!("session-{s}"),
5994 id: format!("{message_id}-part"),
5995 message_id,
5996 ordinal: 0,
5997 provenance: crate::wire::Provenance::Conversational,
5998 options: ProviderOptions::new(),
5999 kind: PartKind::Text {
6000 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
6001 },
6002 }));
6003 }
6004 }
6005 ingest_events(&store, events).await?;
6006 let keys = (0..count)
6007 .map(|i| MessageKey {
6008 session_id: format!("session-{}", i % sessions),
6009 message_id: format!("msg-{i}"),
6010 })
6011 .collect();
6012 Ok((store, keys))
6013 }
6014
6015 fn synthetic_vector(seed: usize) -> Vec<f32> {
6017 let mut state = (seed as u64)
6018 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
6019 .wrapping_add(1);
6020 (0..embedding_dim())
6021 .map(|_| {
6022 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
6023 #[allow(clippy::cast_precision_loss)]
6024 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
6025 unit - 1.0
6026 })
6027 .collect()
6028 }
6029
6030 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
6032 keys.iter()
6033 .enumerate()
6034 .map(|(seed, key)| EmbeddedMessage {
6035 session_id: key.session_id.clone(),
6036 id: key.message_id.clone(),
6037 vector: synthetic_vector(seed),
6038 })
6039 .collect()
6040 }
6041
6042 fn embedding_update_batch_with_model(
6043 rows: &[EmbeddedMessage],
6044 model: &str,
6045 ) -> Result<RecordBatch> {
6046 let mut batch = embedding_update_batch(rows)?;
6047 let columns = batch
6048 .columns()
6049 .iter()
6050 .take(3)
6051 .cloned()
6052 .chain(std::iter::once(
6053 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
6054 ))
6055 .collect::<Vec<_>>();
6056 batch = RecordBatch::try_new(batch.schema(), columns)?;
6057 Ok(batch)
6058 }
6059
6060 #[tokio::test]
6061 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
6062 let temp = TempDir::new()?;
6063 let (store, keys) = store_with_messages(&temp, 4).await?;
6067 store.write_embeddings(&embedded(&keys)).await?;
6068 store
6069 .optimize_indices(None, &MaintenancePolicy::always_compact())
6070 .await?
6071 .into_result()?;
6072
6073 let query = vec![0.01_f32; embedding_dim()];
6074 let plan = store
6075 .explain_vector_plan(
6076 &query,
6077 10,
6078 &Predicate::Eq("session_id", "session-3".into()),
6079 None,
6080 )
6081 .await?;
6082
6083 assert!(
6088 plan.contains("ScalarIndexQuery"),
6089 "expected a ScalarIndexQuery node in the plan:\n{plan}",
6090 );
6091 let predicate_postfiltered = plan
6092 .lines()
6093 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
6094 assert!(
6095 !predicate_postfiltered,
6096 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
6097 );
6098 Ok(())
6099 }
6100
6101 #[tokio::test]
6102 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
6103 let temp = TempDir::new()?;
6104 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6105
6106 store.write_embeddings(&embedded(&keys[..255])).await?;
6109 store
6110 .optimize_indices_with_vector_threshold(256)
6111 .await?
6112 .into_result()?;
6113 assert!(
6114 !store
6115 .handle
6116 .messages_index_names()
6117 .await?
6118 .iter()
6119 .any(|name| name == MESSAGES_VECTOR_INDEX),
6120 "IVF_SQ must not exist below the activation threshold",
6121 );
6122
6123 store.write_embeddings(&embedded(&keys[255..256])).await?;
6126 store
6127 .optimize_indices_with_vector_threshold(256)
6128 .await?
6129 .into_result()?;
6130 assert!(
6131 store
6132 .handle
6133 .messages_index_names()
6134 .await?
6135 .iter()
6136 .any(|name| name == MESSAGES_VECTOR_INDEX),
6137 "optimize must create the IVF_SQ once the threshold is crossed",
6138 );
6139
6140 let hits = store
6143 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
6144 .await?;
6145 assert!(
6146 hits.iter().any(|hit| hit.key == keys[0]),
6147 "an embedded row is retrievable via the index",
6148 );
6149 Ok(())
6150 }
6151
6152 #[tokio::test]
6153 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
6154 {
6155 let temp = TempDir::new()?;
6156 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6157 let old_rows = embedded(&keys);
6158 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
6159 store
6160 .handle
6161 .merge_update(Table::Messages, old_batch, old_rows.len())
6162 .await?;
6163 store
6164 .optimize_indices_with_vector_threshold(256)
6165 .await?
6166 .into_result()?;
6167 assert!(
6168 store
6169 .handle
6170 .messages_index_names()
6171 .await?
6172 .iter()
6173 .any(|name| name == MESSAGES_VECTOR_INDEX),
6174 "IVF_SQ must exist before a model swap",
6175 );
6176 assert_eq!(store.stale_embedding_count().await?, keys.len());
6177
6178 store.drop_vector_index().await?;
6179 let mut pending = Vec::new();
6180 let stream = store.pending_or_stale_messages();
6181 tokio::pin!(stream);
6182 while let Some(row) = stream.next().await {
6183 pending.push(row?);
6184 }
6185 assert_eq!(
6186 pending.len(),
6187 keys.len(),
6188 "force stream should see stale rows"
6189 );
6190 store.write_embeddings(&embedded(&keys)).await?;
6191 assert_eq!(store.stale_embedding_count().await?, 0);
6192 store
6193 .optimize_indices_with_vector_threshold(256)
6194 .await?
6195 .into_result()?;
6196 assert!(
6197 store
6198 .handle
6199 .messages_index_names()
6200 .await?
6201 .iter()
6202 .any(|name| name == MESSAGES_VECTOR_INDEX),
6203 "optimize must rebuild IVF_SQ after force re-embed",
6204 );
6205
6206 let stream = store.pending_or_stale_messages();
6207 tokio::pin!(stream);
6208 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
6209 Ok(())
6210 }
6211
6212 #[tokio::test]
6213 async fn session_last_message_ids_come_from_durable_messages() -> anyhow::Result<()> {
6214 let temp = TempDir::new()?;
6215 let store = Store::open_local(temp.path()).await?;
6216 let session = synthetic_session("oracle");
6217 store
6218 .upsert_sessions(std::slice::from_ref(&session))
6219 .await?;
6220 let timestamp =
6221 chrono::DateTime::from_timestamp(1_700_000_000, 0).expect("valid timestamp");
6222 let message_a = Message::User {
6223 id: "oracle-a".to_owned(),
6224 session_id: session.id.clone(),
6225 timestamp,
6226 options: ProviderOptions::new(),
6227 };
6228 let message_b = Message::User {
6229 id: "oracle-b".to_owned(),
6230 session_id: session.id.clone(),
6231 timestamp,
6232 options: ProviderOptions::new(),
6233 };
6234 store
6235 .upsert_messages(
6236 &session,
6237 &[
6238 MessageWrite {
6239 message: &message_a,
6240 parts: &[],
6241 search_text: Some("a"),
6242 },
6243 MessageWrite {
6244 message: &message_b,
6245 parts: &[],
6246 search_text: Some("b"),
6247 },
6248 ],
6249 )
6250 .await?;
6251
6252 let empty_session = synthetic_session("session-row-only");
6253 store.upsert_sessions(&[empty_session]).await?;
6254
6255 let orphan = synthetic_session("messages-no-row");
6259 let orphan_message = Message::User {
6260 id: "orphan-a".to_owned(),
6261 session_id: orphan.id.clone(),
6262 timestamp,
6263 options: ProviderOptions::new(),
6264 };
6265 store
6266 .upsert_messages(
6267 &orphan,
6268 &[MessageWrite {
6269 message: &orphan_message,
6270 parts: &[],
6271 search_text: Some("a"),
6272 }],
6273 )
6274 .await?;
6275
6276 let map = store.session_last_message_ids().await?;
6277 assert_eq!(map.get("oracle").map(String::as_str), Some("oracle-b"));
6278 assert!(
6279 !map.contains_key("session-row-only"),
6280 "a session row without durable messages must not produce a freshness key",
6281 );
6282 assert!(
6283 !map.contains_key("messages-no-row"),
6284 "messages without a durable session row must not produce a freshness key",
6285 );
6286 Ok(())
6287 }
6288
6289 #[tokio::test]
6290 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
6291 let temp = TempDir::new()?;
6292 let (store, keys) = store_with_messages(&temp, 10).await?;
6293
6294 let before = store.embedding_progress().await?;
6295 assert_eq!(before.embedded, 0);
6296 assert_eq!(before.total, 10);
6297 assert_eq!(before.backlog, 10);
6298 assert_eq!(before.model, crate::embed::model_id());
6299
6300 store.write_embeddings(&embedded(&keys[..4])).await?;
6301 let partial = store.embedding_progress().await?;
6302 assert_eq!(partial.embedded, 4);
6303 assert_eq!(partial.total, 10);
6304 assert_eq!(partial.backlog, 6);
6305
6306 store.write_embeddings(&embedded(&keys[4..])).await?;
6307 let full = store.embedding_progress().await?;
6308 assert_eq!(full.embedded, 10);
6309 assert_eq!(full.total, 10);
6310 assert_eq!(full.backlog, 0);
6313 assert_eq!(full.backlog, store.embed_backlog_count().await?);
6314 Ok(())
6315 }
6316
6317 #[tokio::test]
6318 async fn ensure_rowmap_layers_a_delta_on_new_ingest() -> anyhow::Result<()> {
6319 let temp = TempDir::new()?;
6320 let (store, _keys) = store_with_messages(&temp, 6).await?;
6321 let cache = temp.path().join("cache");
6322
6323 store.ensure_rowmap(&cache).await?;
6324 assert_eq!(
6325 store.rowmap_delta_count(),
6326 Some(0),
6327 "first build is a lone base"
6328 );
6329
6330 ingest_events(
6332 &store,
6333 vec![
6334 IngestEvent::Session(Session {
6335 id: "session-new".to_owned(),
6336 parent_session_id: None,
6337 parent_message_id: None,
6338 source_agent: "claude-code".to_owned(),
6339 created_at: Utc::now(),
6340 project: Extracted::from_test_value("/proj/new".to_owned()),
6341 options: ProviderOptions::new(),
6342 }),
6343 IngestEvent::Message(Message::User {
6344 id: "m-new".to_owned(),
6345 session_id: "session-new".to_owned(),
6346 timestamp: Utc::now(),
6347 options: ProviderOptions::new(),
6348 }),
6349 IngestEvent::Part(Part {
6350 session_id: "session-new".to_owned(),
6351 id: "m-new-part".to_owned(),
6352 message_id: "m-new".to_owned(),
6353 ordinal: 0,
6354 provenance: crate::wire::Provenance::Conversational,
6355 options: ProviderOptions::new(),
6356 kind: PartKind::Text {
6357 text: Some(Extracted::from_test_value("brand new message".to_owned())),
6358 },
6359 }),
6360 ],
6361 )
6362 .await?;
6363
6364 store.ensure_rowmap(&cache).await?;
6367 assert_eq!(
6368 store.rowmap_delta_count(),
6369 Some(1),
6370 "new ingest layered a delta"
6371 );
6372
6373 let counts = store
6375 .session_message_counts(&["session-new".to_owned()])
6376 .await?;
6377 assert_eq!(counts.get("session-new").copied(), Some(1));
6378 Ok(())
6379 }
6380
6381 #[tokio::test]
6388 async fn ensure_rowmap_rebuilds_when_base_manifest_reclaimed() -> anyhow::Result<()> {
6389 let temp = TempDir::new()?;
6390 let (store, _keys) = store_with_messages(&temp, 6).await?;
6391 let cache = temp.path().join("cache");
6392
6393 store.ensure_rowmap(&cache).await?;
6396 assert_eq!(store.rowmap_delta_count(), Some(0), "first build is a base");
6397 let base_version = store.messages_version().await?;
6398 let versions_dir = temp.path().join("messages.lance").join("_versions");
6399 let base_manifests: Vec<_> = std::fs::read_dir(&versions_dir)?
6400 .filter_map(|entry| entry.ok().map(|entry| entry.path()))
6401 .filter(|path| path.extension().is_some_and(|ext| ext == "manifest"))
6402 .collect();
6403 assert!(
6404 !base_manifests.is_empty(),
6405 "the base version has a manifest"
6406 );
6407
6408 ingest_events(
6411 &store,
6412 vec![
6413 IngestEvent::Session(Session {
6414 id: "session-after".to_owned(),
6415 parent_session_id: None,
6416 parent_message_id: None,
6417 source_agent: "claude-code".to_owned(),
6418 created_at: Utc::now(),
6419 project: Extracted::from_test_value("/proj/after".to_owned()),
6420 options: ProviderOptions::new(),
6421 }),
6422 IngestEvent::Message(Message::User {
6423 id: "m-after".to_owned(),
6424 session_id: "session-after".to_owned(),
6425 timestamp: Utc::now(),
6426 options: ProviderOptions::new(),
6427 }),
6428 IngestEvent::Part(Part {
6429 session_id: "session-after".to_owned(),
6430 id: "m-after-part".to_owned(),
6431 message_id: "m-after".to_owned(),
6432 ordinal: 0,
6433 provenance: crate::wire::Provenance::Conversational,
6434 options: ProviderOptions::new(),
6435 kind: PartKind::Text {
6436 text: Some(Extracted::from_test_value("after the base".to_owned())),
6437 },
6438 }),
6439 ],
6440 )
6441 .await?;
6442 assert!(
6443 store.messages_version().await? > base_version,
6444 "the new ingest advanced the dataset past the chain's base"
6445 );
6446
6447 for manifest in &base_manifests {
6450 std::fs::remove_file(manifest)?;
6451 }
6452
6453 let reopened = Store::open_local(temp.path()).await?;
6456 reopened.ensure_rowmap(&cache).await?;
6457 assert!(
6458 reopened.rowmap_snapshot().is_some(),
6459 "map rebuilt after the base manifest was reclaimed"
6460 );
6461 assert_eq!(
6462 reopened.rowmap_delta_count(),
6463 Some(0),
6464 "a reclaimed base forces a fresh full-scan base, not a stuck chain"
6465 );
6466
6467 let counts = reopened
6469 .session_message_counts(&["session-after".to_owned()])
6470 .await?;
6471 assert_eq!(counts.get("session-after").copied(), Some(1));
6472 Ok(())
6473 }
6474
6475 #[tokio::test]
6482 async fn ensure_rowmap_deltas_across_embedding_fragment_rewrite() -> anyhow::Result<()> {
6483 let temp = TempDir::new()?;
6484 let (store, keys) = store_with_messages(&temp, 6).await?;
6485 let cache = temp.path().join("cache");
6486 store.ensure_rowmap(&cache).await?;
6487 assert_eq!(store.rowmap_delta_count(), Some(0), "first build is a base");
6488
6489 store.write_embeddings(&embedded(&keys)).await?;
6492
6493 ingest_events(
6495 &store,
6496 vec![
6497 IngestEvent::Session(Session {
6498 id: "session-after".to_owned(),
6499 parent_session_id: None,
6500 parent_message_id: None,
6501 source_agent: "claude-code".to_owned(),
6502 created_at: Utc::now(),
6503 project: Extracted::from_test_value("/proj/after".to_owned()),
6504 options: ProviderOptions::new(),
6505 }),
6506 IngestEvent::Message(Message::User {
6507 id: "m-after".to_owned(),
6508 session_id: "session-after".to_owned(),
6509 timestamp: Utc::now(),
6510 options: ProviderOptions::new(),
6511 }),
6512 IngestEvent::Part(Part {
6513 session_id: "session-after".to_owned(),
6514 id: "m-after-part".to_owned(),
6515 message_id: "m-after".to_owned(),
6516 ordinal: 0,
6517 provenance: crate::wire::Provenance::Conversational,
6518 options: ProviderOptions::new(),
6519 kind: PartKind::Text {
6520 text: Some(Extracted::from_test_value("after embedding".to_owned())),
6521 },
6522 }),
6523 ],
6524 )
6525 .await?;
6526
6527 store.ensure_rowmap(&cache).await?;
6530 assert_eq!(
6531 store.rowmap_delta_count(),
6532 Some(1),
6533 "fragment rewrite + append must layer a delta, not full-rebuild"
6534 );
6535
6536 let counts = store
6539 .session_message_counts(&["session-after".to_owned(), "session-0".to_owned()])
6540 .await?;
6541 assert_eq!(counts.get("session-after").copied(), Some(1));
6542 assert_eq!(
6543 counts.get("session-0").copied(),
6544 Some(1),
6545 "a base row survived the rewrite without being double-counted"
6546 );
6547 Ok(())
6548 }
6549
6550 #[tokio::test]
6551 async fn rowmap_chain_compacts_and_stays_bounded() -> anyhow::Result<()> {
6552 let temp = TempDir::new()?;
6555 let (store, _keys) = store_with_messages(&temp, 4).await?;
6556 let cache = temp.path().join("cache");
6557 store.ensure_rowmap(&cache).await?;
6558
6559 let mut reached_cap = false;
6560 let mut compacted = false;
6561 for i in 0..(Store::MAX_ROWMAP_DELTAS + 2) {
6562 let session = format!("session-x{i}");
6563 ingest_events(
6564 &store,
6565 vec![
6566 IngestEvent::Session(Session {
6567 id: session.clone(),
6568 parent_session_id: None,
6569 parent_message_id: None,
6570 source_agent: "claude-code".to_owned(),
6571 created_at: Utc::now(),
6572 project: Extracted::from_test_value("/proj/x".to_owned()),
6573 options: ProviderOptions::new(),
6574 }),
6575 IngestEvent::Message(Message::User {
6576 id: format!("mx{i}"),
6577 session_id: session.clone(),
6578 timestamp: Utc::now(),
6579 options: ProviderOptions::new(),
6580 }),
6581 IngestEvent::Part(Part {
6582 session_id: session.clone(),
6583 id: format!("mx{i}-part"),
6584 message_id: format!("mx{i}"),
6585 ordinal: 0,
6586 provenance: crate::wire::Provenance::Conversational,
6587 options: ProviderOptions::new(),
6588 kind: PartKind::Text {
6589 text: Some(Extracted::from_test_value(format!("msg {i}"))),
6590 },
6591 }),
6592 ],
6593 )
6594 .await?;
6595 store.ensure_rowmap(&cache).await?;
6596 let deltas = store.rowmap_delta_count().unwrap();
6597 assert!(
6598 deltas <= Store::MAX_ROWMAP_DELTAS,
6599 "delta count {deltas} exceeded the cap",
6600 );
6601 if deltas == Store::MAX_ROWMAP_DELTAS {
6602 reached_cap = true;
6603 }
6604 if reached_cap && deltas < Store::MAX_ROWMAP_DELTAS {
6605 compacted = true;
6606 }
6607 }
6608 assert!(reached_cap, "deltas accumulated to the cap (append path)");
6609 assert!(compacted, "the chain compacted back into a base");
6610
6611 let mut rmm = 0;
6613 for entry in std::fs::read_dir(&cache)? {
6614 let name = entry?.file_name().into_string().unwrap_or_default();
6615 assert!(!name.contains(".tmp-"), "leaked build temp: {name}");
6616 if name.ends_with(".rmm") {
6617 rmm += 1;
6618 }
6619 }
6620 assert!(
6621 rmm <= Store::MAX_ROWMAP_DELTAS + 1,
6622 "files unbounded: {rmm}"
6623 );
6624 Ok(())
6625 }
6626
6627 #[tokio::test]
6628 async fn embed_backlog_count_tracks_eligible_unembedded_rows() -> anyhow::Result<()> {
6629 let temp = TempDir::new()?;
6630 let (store, keys) = store_with_messages(&temp, 10).await?;
6631
6632 assert_eq!(store.embed_backlog_count().await?, 10);
6635
6636 store.write_embeddings(&embedded(&keys[..4])).await?;
6637 assert_eq!(store.embed_backlog_count().await?, 6);
6638
6639 store.write_embeddings(&embedded(&keys[4..])).await?;
6640 assert_eq!(store.embed_backlog_count().await?, 0);
6641 Ok(())
6642 }
6643
6644 #[tokio::test]
6645 async fn session_message_counts_returns_per_session_counts_with_zeros_for_unknown_sessions()
6646 -> anyhow::Result<()> {
6647 let temp = TempDir::new()?;
6650 let (store, _keys) = store_with_messages(&temp, 32).await?;
6651
6652 let mut requested: Vec<String> = (0..8).map(|s| format!("session-{s}")).collect();
6653 requested.push("session-unknown-a".to_owned());
6654 requested.push("session-unknown-b".to_owned());
6655 let counts = store.session_message_counts(&requested).await?;
6656
6657 assert_eq!(counts.len(), requested.len());
6660 for s in 0..8 {
6661 assert_eq!(
6662 counts.get(&format!("session-{s}")).copied(),
6663 Some(4),
6664 "session-{s} should have 4 messages",
6665 );
6666 }
6667 assert_eq!(counts.get("session-unknown-a").copied(), Some(0));
6668 assert_eq!(counts.get("session-unknown-b").copied(), Some(0));
6669
6670 let empty = store.session_message_counts(&[]).await?;
6672 assert!(empty.is_empty());
6673 Ok(())
6674 }
6675}