1use std::{
5 collections::{BTreeMap, HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use anyhow::{Context, Result};
11use arc_swap::ArcSwapOption;
12use async_stream::try_stream;
13use chrono::{DateTime, TimeZone, Utc};
14use lance::Dataset;
15use lance::dataset::{AutoCleanupParams, ProjectionRequest, WriteMode, WriteParams};
16use lance::deps::arrow_array::{
17 Array, FixedSizeListArray, Float16Array, Float32Array, Int32Array, LargeBinaryArray,
18 LargeStringArray, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
19 UInt64Array, new_null_array,
20};
21use lance::deps::arrow_schema::{DataType, Field, Schema, TimeUnit};
22use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
23use lance::index::DatasetIndexExt;
24use lance_file::version::LanceFileVersion;
25use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery};
26use serde::{Deserialize, Serialize, de::DeserializeOwned};
27use serde_json::Value;
28use tokio_stream::{Stream, StreamExt};
29
30use crate::{
31 config, embed,
32 rowmap::{RowMetaEntry, RowMetaMap, RowMetaSet, discover_chain},
33 substrate::{
34 Handle, IndexIntent, IndexParamsKind, IndexStatus, IndexTrigger, MaintenancePolicy,
35 OptimizeProgressFn, PhaseOutcome, Predicate, ScalarValue, ScanOpts, Table,
36 TableOptimizeOutcome, TableSizes, VECTOR_INDEX_ACTIVATION_ROWS,
37 },
38 wire::{FileData, Message, Part, PartKind, Role, SUMMARY_PART_TYPES, Session, SessionFrom},
39};
40use url::Url;
41
42#[derive(Debug)]
43pub struct Store {
44 handle: Handle,
45 rowmap: ArcSwapOption<RowMetaSet>,
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
54pub struct LanceArchiveCounts {
55 pub sessions: usize,
56 pub messages: usize,
57 pub parts: usize,
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
61pub struct LanceArchiveVersions {
62 pub sessions: u64,
63 pub messages: u64,
64 pub parts: u64,
65}
66
67#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
68pub struct LanceArchiveExport {
69 pub rows: LanceArchiveCounts,
70 pub source_versions: LanceArchiveVersions,
71}
72
73#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
74pub struct LanceArchiveImport {
75 pub rows: LanceArchiveCounts,
76 pub inserted: LanceArchiveCounts,
77}
78
79#[derive(Debug, Clone, Default)]
91pub struct TablePlan {
92 pub append: Vec<String>,
93 pub merge: Vec<String>,
94}
95
96impl TablePlan {
97 pub fn is_empty(&self) -> bool {
98 self.append.is_empty() && self.merge.is_empty()
99 }
100}
101
102#[derive(Debug, Clone, Default)]
108pub struct DeltaPlan {
109 pub sessions: TablePlan,
110 pub messages: TablePlan,
111 pub parts: TablePlan,
112 pub source_sessions: usize,
113}
114
115impl DeltaPlan {
116 pub fn is_empty(&self) -> bool {
117 self.sessions.is_empty() && self.messages.is_empty() && self.parts.is_empty()
118 }
119
120 pub fn new_sessions(&self) -> usize {
124 self.sessions.append.len()
125 }
126
127 pub fn total(&self) -> usize {
130 let mut seen = std::collections::HashSet::new();
131 for plan in [&self.sessions, &self.messages, &self.parts] {
132 seen.extend(plan.append.iter());
133 seen.extend(plan.merge.iter());
134 }
135 seen.len()
136 }
137}
138
139#[derive(Debug, Clone, Default)]
140pub struct IndexIntents {
141 pub sessions: Vec<IndexIntent>,
142 pub messages: Vec<IndexIntent>,
143 pub parts: Vec<IndexIntent>,
144}
145
146impl IndexIntents {
147 fn all(&self) -> [(Table, &[IndexIntent]); 3] {
148 [
149 (Table::Sessions, &self.sessions),
150 (Table::Messages, &self.messages),
151 (Table::Parts, &self.parts),
152 ]
153 }
154}
155
156#[derive(Debug, Clone, PartialEq)]
160pub struct PendingMessage {
161 pub session_id: String,
162 pub id: String,
163 pub search_text: String,
164}
165
166#[derive(Debug, Clone, PartialEq)]
169pub struct EmbeddedMessage {
170 pub session_id: String,
171 pub id: String,
172 pub vector: Vec<f32>,
173}
174
175#[derive(Debug, Clone, PartialEq)]
177pub struct MessageMeta {
178 pub message_id: String,
179 pub session_id: String,
180 pub role: String,
181 pub project: String,
182 pub source_agent: String,
183 pub timestamp: DateTime<Utc>,
184 pub search_text: String,
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
188pub struct MessageKey {
189 pub session_id: String,
190 pub message_id: String,
191}
192
193#[derive(Debug, Clone, PartialEq)]
198pub struct SearchHit {
199 pub rowid: Option<u64>,
200 pub key: MessageKey,
201 pub score: f32,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub enum UpsertStatus {
206 Inserted,
207 Matched,
208}
209
210#[derive(Debug, Default)]
215pub struct OptimizeOutcome {
216 pub tables: Vec<TableOptimizeOutcome>,
217}
218
219impl OptimizeOutcome {
220 pub fn any_indices_failed(&self) -> bool {
223 self.tables.iter().any(|t| t.indices.is_failed())
224 }
225
226 pub fn into_result(self) -> Result<Self> {
230 for table in &self.tables {
231 if let PhaseOutcome::Failed(error) = &table.indices {
232 anyhow::bail!(
233 "indices phase failed on {}: {error:#}",
234 table.table.as_str()
235 );
236 }
237 if let PhaseOutcome::Failed(error) = &table.compaction {
238 anyhow::bail!(
239 "compaction phase failed on {}: {error:#}",
240 table.table.as_str()
241 );
242 }
243 }
244 Ok(self)
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249pub struct RowTotals {
250 pub sessions: u64,
251 pub messages: u64,
252 pub parts: u64,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub struct EmbeddingProgress {
265 pub embedded: usize,
266 pub total: usize,
267 pub backlog: usize,
268 pub model: &'static str,
269}
270
271#[derive(Debug, Clone, Copy)]
272pub struct MessageWrite<'a> {
273 pub message: &'a Message,
274 pub parts: &'a [Part],
275 pub search_text: Option<&'a str>,
276}
277
278impl Store {
279 pub async fn open(location: &Url) -> Result<Self> {
285 Ok(Self {
286 handle: Handle::open(location).await?,
287 rowmap: ArcSwapOption::empty(),
288 })
289 }
290
291 pub fn lance_cache_bytes(&self) -> u64 {
294 self.handle.lance_cache_bytes()
295 }
296
297 pub async fn open_with_options(
303 location: &Url,
304 storage_options: std::collections::HashMap<String, String>,
305 caps: crate::substrate::RuntimeCaps,
306 ) -> Result<Self> {
307 Ok(Self {
308 handle: Handle::open_with_options(location, storage_options, caps).await?,
309 rowmap: ArcSwapOption::empty(),
310 })
311 }
312
313 pub async fn open_local(path: impl AsRef<std::path::Path>) -> Result<Self> {
318 let url = config::url_for_path(path)?;
319 Self::open_with_options(
320 &url,
321 std::collections::HashMap::new(),
322 crate::substrate::RuntimeCaps::default(),
323 )
324 .await
325 }
326
327 pub async fn export_clean_lance_datasets(&self, dest: &Path) -> Result<LanceArchiveExport> {
335 std::fs::create_dir_all(dest)
336 .with_context(|| format!("failed to create archive staging dir {}", dest.display()))?;
337 let (sessions, sessions_version) = self
338 .export_clean_table(Table::Sessions, &dest.join("sessions.lance"))
339 .await?;
340 let (messages, messages_version) = self
341 .export_clean_table(Table::Messages, &dest.join("messages.lance"))
342 .await?;
343 let (parts, parts_version) = self
344 .export_clean_table(Table::Parts, &dest.join("parts.lance"))
345 .await?;
346 Ok(LanceArchiveExport {
347 rows: LanceArchiveCounts {
348 sessions,
349 messages,
350 parts,
351 },
352 source_versions: LanceArchiveVersions {
353 sessions: sessions_version,
354 messages: messages_version,
355 parts: parts_version,
356 },
357 })
358 }
359
360 pub async fn import_clean_lance_datasets(&self, source: &Path) -> Result<LanceArchiveImport> {
361 let sessions_dataset =
362 open_archive_table(Table::Sessions, &source.join("sessions.lance")).await?;
363 let messages_dataset =
364 open_archive_table(Table::Messages, &source.join("messages.lance")).await?;
365 let parts_dataset = open_archive_table(Table::Parts, &source.join("parts.lance")).await?;
366 let (sessions, sessions_inserted) = self
367 .import_clean_table(Table::Sessions, sessions_dataset)
368 .await?;
369 let (messages, messages_inserted) = self
370 .import_clean_table(Table::Messages, messages_dataset)
371 .await?;
372 let (parts, parts_inserted) = self.import_clean_table(Table::Parts, parts_dataset).await?;
373 Ok(LanceArchiveImport {
374 rows: LanceArchiveCounts {
375 sessions,
376 messages,
377 parts,
378 },
379 inserted: LanceArchiveCounts {
380 sessions: sessions_inserted,
381 messages: messages_inserted,
382 parts: parts_inserted,
383 },
384 })
385 }
386
387 async fn export_clean_table(&self, table: Table, dest: &Path) -> Result<(usize, u64)> {
388 let dataset = self.handle.dataset(table).await?;
389 let source_version = dataset.version_id();
390 let schema = export_schema(table);
391 let mut scan = dataset.scan();
392 scan.blob_handling(lance::datatypes::BlobHandling::AllBinary);
397 let mut stream = scan
398 .try_into_stream()
399 .await
400 .with_context(|| format!("failed to scan {} for archive export", table.as_str()))?;
401 let dest_uri = dest
402 .to_str()
403 .with_context(|| format!("archive path is not UTF-8: {}", dest.display()))?;
404
405 let mut rows = 0usize;
406 let mut wrote = false;
407 while let Some(batch) = stream.next().await {
408 let batch = batch
409 .with_context(|| format!("failed to read {} archive batch", table.as_str()))?;
410 rows += batch.num_rows();
411 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
412 let mut params = write_params_for_create();
413 if wrote {
414 params.mode = WriteMode::Append;
415 }
416 Dataset::write(reader, dest_uri, Some(params))
417 .await
418 .with_context(|| format!("failed to write {} archive table", table.as_str()))?;
419 wrote = true;
420 }
421
422 if !wrote {
423 let batch = RecordBatch::new_empty(schema.clone());
424 let reader = RecordBatchIterator::new([Ok(batch)], schema);
425 Dataset::write(reader, dest_uri, Some(write_params_for_create()))
426 .await
427 .with_context(|| {
428 format!("failed to write empty {} archive table", table.as_str())
429 })?;
430 }
431 Ok((rows, source_version))
432 }
433
434 async fn import_clean_table(&self, table: Table, dataset: Dataset) -> Result<(usize, usize)> {
435 let _ = self.handle.dataset(table).await?;
439 self.merge_scanner(table, dataset.scan(), "archive import")
440 .await
441 }
442
443 async fn merge_scanner(
449 &self,
450 table: Table,
451 mut scanner: lance::dataset::scanner::Scanner,
452 context: &'static str,
453 ) -> Result<(usize, usize)> {
454 scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
455 let mut stream = scanner
456 .try_into_stream()
457 .await
458 .with_context(|| format!("failed to scan {} for {context}", table.as_str()))?;
459 let mut rows = 0usize;
460 let mut inserted = 0usize;
461 while let Some(batch) = stream.next().await {
462 let batch = batch
463 .with_context(|| format!("failed to read {} {context} batch", table.as_str()))?;
464 let row_count = batch.num_rows();
465 rows += row_count;
466 let stats = self
467 .handle
468 .merge_insert_stats(table, batch, row_count)
469 .await
470 .with_context(|| format!("failed to merge {} during {context}", table.as_str()))?;
471 inserted += (stats.num_inserted_rows + stats.num_updated_rows) as usize;
472 }
473 Ok((rows, inserted))
474 }
475
476 pub async fn all_session_message_counts(&self) -> Result<HashMap<String, usize>> {
489 self.all_session_row_counts(Table::Messages).await
490 }
491
492 pub async fn all_session_part_counts(&self) -> Result<HashMap<String, usize>> {
493 self.all_session_row_counts(Table::Parts).await
494 }
495
496 async fn all_session_row_counts(&self, table: Table) -> Result<HashMap<String, usize>> {
501 let scanner = self
502 .handle
503 .scan(table, ScanOpts::project_only(&["session_id"]))
504 .await?;
505 let mut stream = scanner.try_into_stream().await?;
506 let mut out: HashMap<String, usize> = HashMap::new();
507 while let Some(batch) = stream.next().await {
508 let batch = batch?;
509 let session_ids = batch
510 .column_by_name("session_id")
511 .context("scan projection dropped the session_id column")?
512 .as_any()
513 .downcast_ref::<StringArray>()
514 .context("session_id column is not Utf8")?;
515 for row in 0..batch.num_rows() {
516 if session_ids.is_null(row) {
517 continue;
518 }
519 let session_id = session_ids.value(row);
520 if let Some(count) = out.get_mut(session_id) {
521 *count += 1;
522 } else {
523 out.insert(session_id.to_owned(), 1);
524 }
525 }
526 }
527 Ok(out)
528 }
529
530 pub async fn plan_incremental_from(&self, source: &Store) -> Result<DeltaPlan> {
539 let (
540 source_ids,
541 dest_ids,
542 source_msg_counts,
543 dest_msg_counts,
544 source_part_counts,
545 dest_part_counts,
546 ) = tokio::try_join!(
547 source.collect_ids(Table::Sessions),
548 self.collect_ids(Table::Sessions),
549 source.all_session_message_counts(),
550 self.all_session_message_counts(),
551 source.all_session_part_counts(),
552 self.all_session_part_counts(),
553 )?;
554 let source_sessions = source_ids.len();
555 let mut plan = DeltaPlan {
556 source_sessions,
557 ..DeltaPlan::default()
558 };
559 for id in &source_ids {
560 if !dest_ids.contains(id) {
563 plan.sessions.append.push(id.clone());
564 }
565 let source_msgs = source_msg_counts.get(id).copied().unwrap_or(0);
566 let dest_msgs = dest_msg_counts.get(id).copied().unwrap_or(0);
567 if dest_msgs == 0 {
568 if source_msgs > 0 {
569 plan.messages.append.push(id.clone());
570 }
571 } else if source_msgs > dest_msgs {
572 plan.messages.merge.push(id.clone());
573 }
574 let source_parts = source_part_counts.get(id).copied().unwrap_or(0);
575 let dest_parts = dest_part_counts.get(id).copied().unwrap_or(0);
576 if dest_parts == 0 {
577 if source_parts > 0 {
578 plan.parts.append.push(id.clone());
579 }
580 } else if source_parts > dest_parts {
581 plan.parts.merge.push(id.clone());
582 }
583 }
584 Ok(plan)
585 }
586
587 pub async fn copy_delta_from(
598 &self,
599 source: &Store,
600 plan: &DeltaPlan,
601 ) -> Result<LanceArchiveImport> {
602 let ((sessions, sessions_inserted), (messages, messages_inserted), (parts, parts_inserted)) =
606 tokio::try_join!(
607 self.copy_table(
608 source,
609 Table::Sessions,
610 "id",
611 &plan.sessions,
612 plan.source_sessions,
613 ),
614 self.copy_table(
615 source,
616 Table::Messages,
617 "session_id",
618 &plan.messages,
619 plan.source_sessions,
620 ),
621 self.copy_table(
622 source,
623 Table::Parts,
624 "session_id",
625 &plan.parts,
626 plan.source_sessions,
627 ),
628 )?;
629 Ok(LanceArchiveImport {
630 rows: LanceArchiveCounts {
631 sessions,
632 messages,
633 parts,
634 },
635 inserted: LanceArchiveCounts {
636 sessions: sessions_inserted,
637 messages: messages_inserted,
638 parts: parts_inserted,
639 },
640 })
641 }
642
643 async fn copy_table(
649 &self,
650 source: &Store,
651 table: Table,
652 key_column: &'static str,
653 table_plan: &TablePlan,
654 source_sessions: usize,
655 ) -> Result<(usize, usize)> {
656 let _ = self.handle.dataset(table).await?;
660
661 let appended = self
662 .append_sessions(
663 source,
664 table,
665 key_column,
666 &table_plan.append,
667 source_sessions,
668 )
669 .await?;
670
671 let mut merged_rows = 0usize;
675 let mut merged_inserted = 0usize;
676 for chunk in table_plan.merge.chunks(COPY_SESSION_IN_CHUNK) {
677 let predicate = in_predicate(key_column, chunk);
678 let scanner = source
679 .handle
680 .scan(
681 table,
682 ScanOpts {
683 predicate: Some(&predicate),
684 projection: None,
685 },
686 )
687 .await?;
688 let (r, i) = self.merge_scanner(table, scanner, "copy").await?;
689 merged_rows += r;
690 merged_inserted += i;
691 }
692
693 Ok((appended + merged_rows, appended + merged_inserted))
694 }
695
696 async fn append_sessions(
703 &self,
704 source: &Store,
705 table: Table,
706 key_column: &'static str,
707 session_ids: &[String],
708 source_sessions: usize,
709 ) -> Result<usize> {
710 if session_ids.is_empty() {
711 return Ok(0);
712 }
713 if session_ids.len() == source_sessions {
714 return self.append_scanner(source, table, None).await;
715 }
716 let mut rows = 0usize;
717 for chunk in session_ids.chunks(COPY_SESSION_IN_CHUNK) {
718 let predicate = in_predicate(key_column, chunk);
719 rows += self.append_scanner(source, table, Some(&predicate)).await?;
720 }
721 Ok(rows)
722 }
723
724 async fn append_scanner(
730 &self,
731 source: &Store,
732 table: Table,
733 predicate: Option<&Predicate>,
734 ) -> Result<usize> {
735 let make_source = || async {
736 let mut scanner = source
737 .handle
738 .scan(
739 table,
740 ScanOpts {
741 predicate,
742 projection: None,
743 },
744 )
745 .await?;
746 scanner.blob_handling(lance::datatypes::BlobHandling::AllBinary);
747 let stream: SendableRecordBatchStream = scanner
748 .try_into_stream()
749 .await
750 .with_context(|| format!("failed to scan {} for copy", table.as_str()))?
751 .into();
752 Ok(stream)
753 };
754 let stats = self.handle.append_stream(table, make_source).await?;
755 Ok(stats.rows as usize)
756 }
757
758 pub async fn append_absent_rows(
763 &self,
764 source: &Store,
765 table: Table,
766 filter_column: &'static str,
767 values: &[String],
768 ) -> Result<usize> {
769 if values.is_empty() {
770 return Ok(0);
771 }
772 let _ = self.handle.dataset(table).await?;
773 let mut rows = 0usize;
774 for chunk in values.chunks(COPY_SESSION_IN_CHUNK) {
775 let predicate = in_predicate(filter_column, chunk);
776 rows += self.append_scanner(source, table, Some(&predicate)).await?;
777 }
778 Ok(rows)
779 }
780
781 pub async fn upsert_sessions(&self, sessions: &[Session]) -> Result<()> {
786 if sessions.is_empty() {
787 return Ok(());
788 }
789 let batches = sessions_batches(sessions)?;
790 merge_insert_chunks(&self.handle, Table::Sessions, batches).await?;
791 Ok(())
792 }
793
794 async fn upsert_session_batch(
818 &self,
819 substreams: Vec<CompletedSubstream>,
820 ) -> Result<(Vec<RowOutcome>, BatchCounts)> {
821 if substreams.is_empty() {
822 return Ok((Vec::new(), BatchCounts::default()));
823 }
824
825 let mut outcomes: Vec<RowOutcome> = Vec::with_capacity(substreams.len());
826 let mut counts = BatchCounts::default();
827
828 let mut merged: Vec<CompletedSubstream> = Vec::with_capacity(substreams.len());
832 let mut by_session_id: std::collections::HashMap<String, usize> =
833 std::collections::HashMap::with_capacity(substreams.len());
834 for substream in substreams {
835 if let Some(&existing_idx) = by_session_id.get(&substream.session.id) {
836 let existing = &merged[existing_idx];
837 if existing.session.source_agent != substream.session.source_agent
838 || existing.session.project != substream.session.project
839 {
840 let reason = if existing.session.source_agent != substream.session.source_agent
845 {
846 IngestError::ImmutableField {
847 field: "source_agent",
848 session_id: substream.session.id.clone(),
849 stored: existing.session.source_agent.clone(),
850 attempted: substream.session.source_agent.clone(),
851 }
852 } else {
853 IngestError::ImmutableField {
854 field: "project",
855 session_id: substream.session.id.clone(),
856 stored: (*existing.session.project).clone(),
857 attempted: (*substream.session.project).clone(),
858 }
859 };
860 let field = match &reason {
861 IngestError::ImmutableField { field, .. } => Some(*field),
862 };
863 let reason_key = match field {
864 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
865 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
866 _ => DROP_REASON_UNCATEGORIZED,
867 };
868 outcomes.extend(error_outcomes_for_substream(
869 substream.session_index,
870 &substream.session,
871 &substream.messages,
872 reason.to_string(),
873 field,
874 reason_key,
875 ));
876 continue;
877 }
878 let existing = &mut merged[existing_idx];
883 let mut seen: std::collections::HashSet<String> = existing
884 .messages
885 .iter()
886 .map(|m| m.message.id().to_owned())
887 .collect();
888 for msg in substream.messages {
889 if seen.insert(msg.message.id().to_owned()) {
890 existing.messages.push(msg);
891 }
892 }
893 continue;
894 }
895 by_session_id.insert(substream.session.id.clone(), merged.len());
896 merged.push(substream);
897 }
898
899 let session_id_values: Vec<ScalarValue> = merged
904 .iter()
905 .map(|substream| ScalarValue::String(substream.session.id.clone()))
906 .collect();
907 let existing_sessions: std::collections::HashMap<String, Session> =
908 if session_id_values.is_empty() {
909 std::collections::HashMap::new()
910 } else {
911 let batch = self
912 .handle
913 .scan_batch(
914 Table::Sessions,
915 Some(&Predicate::In("id", session_id_values.clone())),
916 &[],
917 )
918 .await?;
919 let mut map = std::collections::HashMap::with_capacity(batch.num_rows());
920 for row in 0..batch.num_rows() {
921 let session = session_from_batch(&batch, row)?;
922 map.insert(session.id.clone(), session);
923 }
924 map
925 };
926 let existing_message_pks: HashSet<(String, String)> = if session_id_values.is_empty() {
927 HashSet::new()
928 } else {
929 let batch = self
930 .handle
931 .scan_batch(
932 Table::Messages,
933 Some(&Predicate::In("session_id", session_id_values.clone())),
934 &["session_id", "id"],
935 )
936 .await?;
937 let mut set = HashSet::with_capacity(batch.num_rows());
938 for row in 0..batch.num_rows() {
939 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
940 let mid = string(&batch, "id", row)?.context("message id is null")?;
941 set.insert((sid, mid));
942 }
943 set
944 };
945 let existing_part_pks: HashSet<(String, String, String)> = if session_id_values.is_empty() {
946 HashSet::new()
947 } else {
948 let batch = self
949 .handle
950 .scan_batch(
951 Table::Parts,
952 Some(&Predicate::In("session_id", session_id_values)),
953 &["session_id", "message_id", "id"],
954 )
955 .await?;
956 let mut set = HashSet::with_capacity(batch.num_rows());
957 for row in 0..batch.num_rows() {
958 let sid = string(&batch, "session_id", row)?.context("session_id is null")?;
959 let mid = string(&batch, "message_id", row)?.context("message_id is null")?;
960 let pid = string(&batch, "id", row)?.context("part id is null")?;
961 set.insert((sid, mid, pid));
962 }
963 set
964 };
965
966 let mut writeable: Vec<CompletedSubstream> = Vec::with_capacity(merged.len());
967 for substream in merged {
968 if let Some(existing) = existing_sessions.get(&substream.session.id)
969 && let Err(failure) = ensure_immutable_match(existing, &substream.session)
970 {
971 let field = match &failure {
972 IngestError::ImmutableField { field, .. } => Some(*field),
973 };
974 let reason_key = match field {
975 Some("project") => DROP_REASON_IMMUTABLE_PROJECT,
976 Some("source_agent") => DROP_REASON_IMMUTABLE_SOURCE_AGENT,
977 _ => DROP_REASON_UNCATEGORIZED,
978 };
979 outcomes.extend(error_outcomes_for_substream(
980 substream.session_index,
981 &substream.session,
982 &substream.messages,
983 failure.to_string(),
984 field,
985 reason_key,
986 ));
987 continue;
988 }
989 writeable.push(substream);
990 }
991
992 if writeable.is_empty() {
993 outcomes.sort_by_key(|outcome| outcome.index);
994 return Ok((outcomes, counts));
995 }
996
997 let sessions_owned: Vec<Session> = writeable
998 .iter()
999 .map(|substream| substream.session.clone())
1000 .collect();
1001 let mut seen_messages: HashSet<(String, String)> = HashSet::new();
1005 let message_rows: Vec<MessageBatchRow<'_>> = writeable
1006 .iter()
1007 .flat_map(|substream| {
1008 substream.messages.iter().map(|buffered| MessageBatchRow {
1009 message: &buffered.message,
1010 source_agent: &substream.session.source_agent,
1011 project: &substream.session.project,
1012 search_text: buffered.search_text.as_deref(),
1013 })
1014 })
1015 .filter(|row| {
1016 let key = (
1017 row.message.session_id().to_owned(),
1018 row.message.id().to_owned(),
1019 );
1020 !existing_message_pks.contains(&key) && seen_messages.insert(key)
1021 })
1022 .collect();
1023 let mut seen_parts: HashSet<(String, String, String)> = HashSet::new();
1024 let part_rows: Vec<Part> = writeable
1025 .iter()
1026 .flat_map(|substream| {
1027 substream.messages.iter().flat_map(|buffered| {
1028 buffered
1029 .parts
1030 .iter()
1031 .map(|buffered_part| buffered_part.part.clone())
1032 })
1033 })
1034 .filter(|part| {
1035 let key = (
1036 part.session_id.clone(),
1037 part.message_id.clone(),
1038 part.id.clone(),
1039 );
1040 !existing_part_pks.contains(&key) && seen_parts.insert(key)
1041 })
1042 .collect();
1043
1044 let session_batches = sessions_batches(&sessions_owned)?;
1045 let message_batches = messages_batches(&message_rows)?;
1046 let part_batches = parts_batches(&part_rows)?;
1047
1048 let (_messages_appended, _parts_appended) = tokio::try_join!(
1049 self.handle.append_batches(Table::Messages, message_batches),
1050 self.handle.append_batches(Table::Parts, part_batches),
1051 )?;
1052 let _sessions_inserted =
1053 merge_insert_chunks(&self.handle, Table::Sessions, session_batches).await?;
1054
1055 for substream in &writeable {
1056 outcomes.extend(success_outcomes_for_substream(
1057 substream.session_index,
1058 &substream.session,
1059 &substream.messages,
1060 &existing_sessions,
1061 &existing_message_pks,
1062 &existing_part_pks,
1063 &mut counts,
1064 ));
1065 }
1066
1067 outcomes.sort_by_key(|outcome| outcome.index);
1068 Ok((outcomes, counts))
1069 }
1070
1071 pub async fn upsert_messages(
1072 &self,
1073 session: &Session,
1074 messages: &[MessageWrite<'_>],
1075 ) -> Result<()> {
1076 if messages.is_empty() {
1077 return Ok(());
1078 }
1079
1080 let rows = messages
1081 .iter()
1082 .map(|write| MessageBatchRow {
1083 message: write.message,
1084 source_agent: &session.source_agent,
1085 project: &session.project,
1086 search_text: write.search_text,
1087 })
1088 .collect::<Vec<_>>();
1089 let batches = messages_batches(&rows)?;
1090 merge_insert_chunks(&self.handle, Table::Messages, batches).await?;
1091 Ok(())
1092 }
1093
1094 pub async fn upsert_parts(&self, parts: &[Part]) -> Result<()> {
1095 if parts.is_empty() {
1096 return Ok(());
1097 }
1098 let batches = parts_batches(parts)?;
1099 merge_insert_chunks(&self.handle, Table::Parts, batches).await?;
1100 Ok(())
1101 }
1102
1103 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionWithMessages>> {
1104 let Some(session) = self.find_session(session_id).await? else {
1105 return Ok(None);
1106 };
1107 let messages = self.messages_for_session(session_id).await?;
1108 Ok(Some(SessionWithMessages { session, messages }))
1109 }
1110
1111 pub async fn session_ids(&self) -> Result<Vec<String>> {
1113 let batch = self
1114 .handle
1115 .scan_batch(Table::Sessions, None, &["id"])
1116 .await?;
1117 let mut ids = Vec::with_capacity(batch.num_rows());
1118 for row in 0..batch.num_rows() {
1119 if let Some(id) = string(&batch, "id", row)? {
1120 ids.push(id);
1121 }
1122 }
1123 Ok(ids)
1124 }
1125
1126 pub async fn child_sessions(&self, parent_session_id: &str) -> Result<Vec<Session>> {
1127 let batch = self
1128 .handle
1129 .scan_batch(
1130 Table::Sessions,
1131 Some(&Predicate::Eq(
1132 "parent_session_id",
1133 parent_session_id.into(),
1134 )),
1135 &[
1136 "id",
1137 "parent_session_id",
1138 "parent_message_id",
1139 "source_agent",
1140 "created_at",
1141 "project",
1142 "options",
1143 ],
1144 )
1145 .await?;
1146 let mut sessions = Vec::with_capacity(batch.num_rows());
1147 for row in 0..batch.num_rows() {
1148 sessions.push(session_from_batch(&batch, row)?);
1149 }
1150 sessions.sort_by(|left, right| left.id.cmp(&right.id));
1151 Ok(sessions)
1152 }
1153
1154 pub async fn session_last_message_ids(&self) -> Result<HashMap<String, String>> {
1167 let (session_ids, latest) = tokio::try_join!(self.collect_ids(Table::Sessions), async {
1168 let scanner = self
1169 .handle
1170 .scan(
1171 Table::Messages,
1172 ScanOpts::project_only(&["session_id", "id", "timestamp"]),
1173 )
1174 .await?;
1175 let mut stream = scanner.try_into_stream().await?;
1176 let mut latest: HashMap<String, (DateTime<Utc>, String)> = HashMap::new();
1177 while let Some(batch) = stream.next().await {
1178 let batch = batch?;
1179 let session_ids = batch
1180 .column_by_name("session_id")
1181 .context("scan projection dropped the session_id column")?
1182 .as_any()
1183 .downcast_ref::<StringArray>()
1184 .context("session_id column is not Utf8")?;
1185 for row in 0..batch.num_rows() {
1186 if session_ids.is_null(row) {
1187 continue;
1188 }
1189 let session_id = session_ids.value(row);
1190 let Some(id) = string(&batch, "id", row)? else {
1191 continue;
1192 };
1193 let timestamp = datetime(&batch, "timestamp", row)?;
1194 match latest.get_mut(session_id) {
1195 Some((stored_ts, stored_id))
1196 if timestamp > *stored_ts
1197 || (timestamp == *stored_ts
1198 && id.as_str() > stored_id.as_str()) =>
1199 {
1200 *stored_ts = timestamp;
1201 *stored_id = id;
1202 }
1203 None => {
1204 latest.insert(session_id.to_owned(), (timestamp, id));
1205 }
1206 _ => {}
1207 }
1208 }
1209 }
1210 Ok::<_, anyhow::Error>(latest)
1211 })?;
1212 Ok(latest
1213 .into_iter()
1214 .filter(|(session_id, _)| session_ids.contains(session_id))
1215 .map(|(session_id, (_, message_id))| (session_id, message_id))
1216 .collect())
1217 }
1218
1219 pub async fn session_view(
1228 &self,
1229 session_id: &str,
1230 params: SessionViewParams<'_>,
1231 ) -> Result<GetLookup<SessionPage>> {
1232 let Some(session) = self.find_session(session_id).await? else {
1233 return Ok(GetLookup::NotFound);
1234 };
1235 let mut rows: Vec<ScanRow> = self
1236 .scan_conversational_messages(session_id)
1237 .await?
1238 .into_iter()
1239 .map(|row| ScanRow {
1240 id: row.message_id,
1241 role: row.role,
1242 timestamp: row.timestamp,
1243 text: Some(row.text.into_inner()),
1244 content: None,
1245 })
1246 .collect();
1247 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1248
1249 let size = |row: &ScanRow| row.text.as_deref().map_or(0, str::len);
1250 let total = rows.len();
1251 let (win_start, win_end) = match (params.after_message_id, params.before_message_id) {
1254 (Some(after), _) => {
1255 let pos = match rows.iter().position(|row| row.id == after) {
1256 Some(idx) => idx + 1,
1257 None => return Ok(GetLookup::UnknownAnchor),
1258 };
1259 let n = page_by(&rows[pos..], params.limit, params.budget_bytes, size);
1260 (pos, pos + n)
1261 }
1262 (None, Some(before)) => {
1263 let pos = match rows.iter().position(|row| row.id == before) {
1264 Some(idx) => idx,
1265 None => return Ok(GetLookup::UnknownAnchor),
1266 };
1267 let n = page_tail(&rows[..pos], params.limit, params.budget_bytes, size);
1268 (pos - n, pos)
1269 }
1270 (None, None) => match params.session_from {
1271 SessionFrom::Start => (0, page_by(&rows, params.limit, params.budget_bytes, size)),
1272 SessionFrom::End => {
1273 let n = page_tail(&rows, params.limit, params.budget_bytes, size);
1274 (total - n, total)
1275 }
1276 },
1277 };
1278 let emitted = &rows[win_start..win_end];
1279 let before_remaining = win_start;
1280 let after_remaining = total - win_end;
1281 let ids: Vec<String> = emitted.iter().map(|row| row.id.clone()).collect();
1282
1283 let mut parts_by_message = self.summary_parts_for_messages(session_id, &ids).await?;
1284 let messages = emitted
1285 .iter()
1286 .map(|row| RetrievedMessage {
1287 id: row.id.clone(),
1288 role: row.role,
1289 timestamp: row.timestamp,
1290 text: row.text.clone(),
1291 content: row.content.clone(),
1292 parts: parts_by_message
1293 .remove(&(session_id.to_owned(), row.id.clone()))
1294 .unwrap_or_default(),
1295 })
1296 .collect();
1297
1298 Ok(GetLookup::Found(SessionPage {
1299 session,
1300 messages,
1301 before_remaining,
1302 after_remaining,
1303 }))
1304 }
1305
1306 pub async fn message_view(
1312 &self,
1313 message_id: &str,
1314 params: MessageViewParams,
1315 ) -> Result<GetLookup<MessagePage>> {
1316 let Some(session_id) = self.session_id_for_message(message_id).await? else {
1317 return Ok(GetLookup::NotFound);
1318 };
1319 let Some(session) = self.find_session(&session_id).await? else {
1320 return Ok(GetLookup::NotFound);
1321 };
1322 let mut rows = self.scan_all_messages(&session_id).await?;
1323 rows.retain(|row| row.text.is_some() || row.id == message_id);
1328 rows.sort_by(|a, b| a.timestamp.cmp(&b.timestamp).then_with(|| a.id.cmp(&b.id)));
1329 let Some(target_pos) = rows.iter().position(|row| row.id == message_id) else {
1330 return Ok(GetLookup::NotFound);
1331 };
1332
1333 let start = target_pos.saturating_sub(params.context_before);
1334 let end = (target_pos + params.context_after + 1).min(rows.len());
1335 let window = &rows[start..end];
1336 let window_ids: Vec<String> = window.iter().map(|row| row.id.clone()).collect();
1337 let mut parts_by_message = self.parts_for_messages(&session_id, &window_ids).await?;
1340
1341 let all_parts = parts_by_message
1342 .remove(&(session_id.clone(), message_id.to_owned()))
1343 .unwrap_or_default();
1344 let part_count = page_by(&all_parts, 1000, params.budget_bytes, |part| {
1347 serde_json::to_string(part).map_or(0, |json| json.len())
1348 });
1349 let target_parts = all_parts[..part_count].to_vec();
1350 let target_parts_remaining = all_parts.len() - part_count;
1351
1352 let target_row = &rows[target_pos];
1353 let target = RetrievedMessage {
1354 id: target_row.id.clone(),
1355 role: target_row.role,
1356 timestamp: target_row.timestamp,
1357 text: target_row.text.clone(),
1358 content: target_row.content.clone(),
1359 parts: Vec::new(),
1361 };
1362 let siblings = window
1363 .iter()
1364 .enumerate()
1365 .filter(|(idx, _)| start + idx != target_pos)
1366 .map(|(_, row)| RetrievedMessage {
1367 id: row.id.clone(),
1368 role: row.role,
1369 timestamp: row.timestamp,
1370 text: row.text.clone(),
1371 content: row.content.clone(),
1372 parts: parts_by_message
1373 .get(&(session_id.clone(), row.id.clone()))
1374 .cloned()
1375 .unwrap_or_default(),
1376 })
1377 .collect();
1378
1379 Ok(GetLookup::Found(MessagePage {
1380 session,
1381 target,
1382 target_parts,
1383 target_parts_remaining,
1384 siblings,
1385 }))
1386 }
1387
1388 async fn scan_all_messages(&self, session_id: &str) -> Result<Vec<ScanRow>> {
1389 let batch = self
1390 .handle
1391 .scan_batch(
1392 Table::Messages,
1393 Some(&Predicate::Eq("session_id", session_id.into())),
1394 &["id", "timestamp", "role", "search_text", "content"],
1395 )
1396 .await?;
1397 let mut rows = Vec::with_capacity(batch.num_rows());
1398 for row in 0..batch.num_rows() {
1399 let id = string(&batch, "id", row)?.context("message id is null")?;
1400 let role =
1401 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1402 let timestamp = datetime(&batch, "timestamp", row)?;
1403 rows.push(ScanRow {
1404 id,
1405 role,
1406 timestamp,
1407 text: string(&batch, "search_text", row)?,
1408 content: string(&batch, "content", row)?,
1409 });
1410 }
1411 Ok(rows)
1412 }
1413
1414 pub async fn scan_conversational_messages(
1418 &self,
1419 session_id: &str,
1420 ) -> Result<Vec<ConversationalRow>> {
1421 let filter = Predicate::And(vec![
1422 Predicate::Eq("session_id", session_id.into()),
1423 Predicate::IsNotNull("search_text"),
1424 ]);
1425 let batch = self
1426 .handle
1427 .scan_batch(
1428 Table::Messages,
1429 Some(&filter),
1430 &["id", "timestamp", "role", "search_text"],
1431 )
1432 .await?;
1433
1434 let mut rows = Vec::with_capacity(batch.num_rows());
1435 for row in 0..batch.num_rows() {
1436 let message_id = string(&batch, "id", row)?.context("message id is null")?;
1437 let role =
1438 role_from_str(&string(&batch, "role", row)?.context("message role is null")?)?;
1439 let timestamp = datetime(&batch, "timestamp", row)?;
1440 let text_str = string(&batch, "search_text", row)?.context(
1441 "search_text null after IsNotNull pushdown - storage invariant violated",
1442 )?;
1443 rows.push(ConversationalRow {
1444 session_id: session_id.to_owned(),
1445 message_id,
1446 role,
1447 timestamp,
1448 text: SearchText(text_str),
1449 });
1450 }
1451 rows.sort_by(|a, b| {
1452 a.timestamp
1453 .cmp(&b.timestamp)
1454 .then_with(|| a.message_id.cmp(&b.message_id))
1455 });
1456 Ok(rows)
1457 }
1458
1459 pub async fn session_id_for_message(&self, message_id: &str) -> Result<Option<String>> {
1462 let batch = self
1463 .handle
1464 .scan_batch(
1465 Table::Messages,
1466 Some(&Predicate::Eq("id", message_id.into())),
1467 &["session_id"],
1468 )
1469 .await?;
1470 if batch.num_rows() == 0 {
1471 return Ok(None);
1472 }
1473 string(&batch, "session_id", 0)
1474 }
1475
1476 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1477 self.handle.row_counts().await
1478 }
1479
1480 pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
1483 self.handle.collect_ids(table).await
1484 }
1485
1486 pub async fn id_diff_against(
1491 &self,
1492 table: Table,
1493 present: &std::collections::HashSet<String>,
1494 ) -> Result<(usize, usize)> {
1495 let scanner = self
1496 .handle
1497 .scan(table, ScanOpts::project_only(&["id"]))
1498 .await?;
1499 let mut stream = scanner.try_into_stream().await?;
1500 let (mut rows, mut absent) = (0usize, 0usize);
1501 while let Some(batch) = stream.next().await {
1502 let batch = batch?;
1503 let ids = batch
1504 .column_by_name("id")
1505 .context("scan projection dropped the id column")?
1506 .as_any()
1507 .downcast_ref::<StringArray>()
1508 .context("id column is not Utf8")?;
1509 for id in ids.iter().flatten() {
1510 rows += 1;
1511 if !present.contains(id) {
1512 absent += 1;
1513 }
1514 }
1515 }
1516 Ok((rows, absent))
1517 }
1518
1519 pub async fn dataset(&self, table: Table) -> Result<Arc<Dataset>> {
1523 Ok(Arc::new(self.handle.dataset(table).await?))
1524 }
1525
1526 pub async fn prewarm(&self, cache_dir: &Path) -> Result<()> {
1537 let messages = self.dataset(Table::Messages).await?;
1538 if let Err(error) = messages.prewarm_index(MESSAGES_VECTOR_INDEX).await {
1539 tracing::debug!(%error, "vector index prewarm skipped");
1540 }
1541 if let Err(error) = self.ensure_rowmap(cache_dir).await {
1544 tracing::warn!(%error, "rowmap build skipped; arms fall back to data-take resolution");
1545 }
1546 if let Err(error) = self
1549 .fts_search("pond", 1, &Predicate::And(Vec::new()))
1550 .await
1551 {
1552 tracing::debug!(%error, "fts index prewarm skipped");
1553 }
1554 Ok(())
1555 }
1556
1557 fn store_key(&self) -> String {
1560 let digest = blake3::hash(self.handle.location().as_str().as_bytes());
1561 digest.to_hex()[..16].to_owned()
1562 }
1563
1564 const MAX_ROWMAP_DELTAS: usize = 16;
1566
1567 const ROW_META_COLUMNS: [&str; 7] = [
1572 "session_id",
1573 "id",
1574 "role",
1575 "project",
1576 "source_agent",
1577 "timestamp",
1578 "search_text",
1579 ];
1580
1581 pub async fn ensure_rowmap(&self, cache_dir: &Path) -> Result<()> {
1588 let version = self.messages_version().await?;
1589 if let Some(current) = self.rowmap.load_full()
1590 && current.version() == version
1591 {
1592 return Ok(());
1593 }
1594 std::fs::create_dir_all(cache_dir)
1595 .with_context(|| format!("create cache dir {}", cache_dir.display()))?;
1596 let store_key = self.store_key();
1597
1598 if let Some(chain) = discover_chain(cache_dir, &store_key)
1601 && chain.version() == version
1602 && let Ok(set) = RowMetaSet::open(&chain)
1603 {
1604 self.rowmap.store(Some(Arc::new(set)));
1605 Self::sweep_stale_rowmaps(cache_dir, &store_key, chain.base_version);
1606 return Ok(());
1607 }
1608 if let Some(set) = self
1609 .extend_rowmap_coordinated(cache_dir, &store_key, version)
1610 .await?
1611 {
1612 self.rowmap.store(Some(Arc::new(set)));
1613 }
1614 Ok(())
1615 }
1616
1617 async fn extend_rowmap_coordinated(
1623 &self,
1624 cache_dir: &Path,
1625 store_key: &str,
1626 version: u64,
1627 ) -> Result<Option<RowMetaSet>> {
1628 let lock_path = cache_dir.join(format!("rowmetamap-{store_key}.lock"));
1629 let lock = std::fs::File::create(&lock_path)
1630 .with_context(|| format!("create rowmap build lock {}", lock_path.display()))?;
1631 match lock.try_lock() {
1632 Ok(()) => {}
1633 Err(std::fs::TryLockError::WouldBlock) => return Ok(None),
1634 Err(std::fs::TryLockError::Error(error)) => {
1635 return Err(error).context("lock rowmap build");
1636 }
1637 }
1638
1639 if let Some(chain) = discover_chain(cache_dir, store_key)
1643 && chain.version() == version
1644 && let Ok(set) = RowMetaSet::open(&chain)
1645 {
1646 return Ok(Some(set));
1647 }
1648
1649 Self::sweep_orphan_temps(cache_dir, store_key);
1652
1653 let mut chain = discover_chain(cache_dir, store_key);
1658 if let Some(existing) = &chain
1659 && let Err(error) = RowMetaSet::open(existing)
1660 {
1661 tracing::warn!(%error, store = store_key, "rowmap unreadable; purging and rebuilding");
1662 Self::purge_rowmaps(cache_dir, store_key);
1663 chain = None;
1664 }
1665 let delta = match &chain {
1667 Some(existing) => self.collect_row_metas_delta(existing.version()).await?,
1668 None => None,
1669 };
1670
1671 let base_version = match (&chain, delta) {
1672 (Some(existing), Some(entries)) if existing.deltas.len() < Self::MAX_ROWMAP_DELTAS => {
1674 let path = RowMetaMap::delta_path(cache_dir, store_key, version);
1675 RowMetaMap::build(&path, version, entries)?;
1676 existing.base_version
1677 }
1678 (Some(existing), Some(entries)) => {
1682 let mut merged = RowMetaSet::open(existing)?.merged_entries();
1683 merged.extend(entries);
1684 let path = RowMetaMap::path_for(cache_dir, store_key, version);
1685 RowMetaMap::build(&path, version, merged)?;
1686 version
1687 }
1688 _ => {
1690 let entries = self.collect_row_metas().await?;
1691 let path = RowMetaMap::path_for(cache_dir, store_key, version);
1692 RowMetaMap::build(&path, version, entries)?;
1693 version
1694 }
1695 };
1696
1697 let chain =
1698 discover_chain(cache_dir, store_key).context("rowmap chain missing after build")?;
1699 let set = RowMetaSet::open(&chain)?;
1700 Self::sweep_stale_rowmaps(cache_dir, store_key, base_version);
1701 Ok(Some(set))
1702 }
1703
1704 fn sweep_stale_rowmaps(cache_dir: &Path, store_key: &str, keep: u64) {
1709 let prefix = format!("rowmetamap-{store_key}-");
1710 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1711 return;
1712 };
1713 for entry in entries.flatten() {
1714 let name = entry.file_name();
1715 let Some(rest) = name
1716 .to_str()
1717 .and_then(|name| name.strip_prefix(&prefix))
1718 .and_then(|rest| rest.strip_suffix(".rmm"))
1719 else {
1720 continue;
1721 };
1722 let version = rest
1723 .strip_prefix('v')
1724 .or_else(|| rest.strip_prefix('d'))
1725 .and_then(|digits| digits.parse::<u64>().ok());
1726 if let Some(version) = version
1727 && version < keep
1728 {
1729 let _ = std::fs::remove_file(entry.path());
1730 }
1731 }
1732 }
1733
1734 fn purge_rowmaps(cache_dir: &Path, store_key: &str) {
1739 let prefix = format!("rowmetamap-{store_key}-");
1740 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1741 return;
1742 };
1743 for entry in entries.flatten() {
1744 if let Some(name) = entry.file_name().to_str()
1745 && name.starts_with(&prefix)
1746 && name.ends_with(".rmm")
1747 {
1748 let _ = std::fs::remove_file(entry.path());
1749 }
1750 }
1751 }
1752
1753 fn sweep_orphan_temps(cache_dir: &Path, store_key: &str) {
1757 let prefix = format!("rowmetamap-{store_key}-");
1758 let Ok(entries) = std::fs::read_dir(cache_dir) else {
1759 return;
1760 };
1761 for entry in entries.flatten() {
1762 let name = entry.file_name();
1763 let Some(name) = name.to_str() else { continue };
1764 if name.starts_with(&prefix) && name.contains(".tmp-") {
1765 let _ = std::fs::remove_file(entry.path());
1766 }
1767 }
1768 }
1769
1770 #[cfg(test)]
1771 pub(crate) fn rowmap_delta_count(&self) -> Option<usize> {
1772 self.rowmap.load_full().map(|set| set.delta_count())
1773 }
1774
1775 pub fn rowmap_snapshot(&self) -> Option<Arc<RowMetaSet>> {
1779 self.rowmap.load_full()
1780 }
1781
1782 async fn resolve_rowid_hits(
1786 &self,
1787 map: &RowMetaSet,
1788 hits: Vec<(u64, f32)>,
1789 ) -> Result<Vec<SearchHit>> {
1790 let mut resolved = Vec::with_capacity(hits.len());
1791 let mut misses: Vec<(u64, f32)> = Vec::new();
1792 for (rowid, score) in hits {
1793 match map.lookup(rowid) {
1794 Some((session_id, message_id)) => resolved.push(SearchHit {
1795 rowid: Some(rowid),
1796 key: MessageKey {
1797 session_id: session_id.to_owned(),
1798 message_id: message_id.to_owned(),
1799 },
1800 score,
1801 }),
1802 None => misses.push((rowid, score)),
1803 }
1804 }
1805 if !misses.is_empty() {
1808 let rowids: Vec<u64> = misses.iter().map(|(rowid, _)| *rowid).collect();
1809 let keys = self.message_keys_by_rowids(&rowids).await?;
1810 for ((rowid, score), key) in misses.into_iter().zip(keys) {
1811 resolved.push(SearchHit {
1812 rowid: Some(rowid),
1813 key,
1814 score,
1815 });
1816 }
1817 }
1818 Ok(resolved)
1819 }
1820
1821 async fn message_keys_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageKey>> {
1824 let dataset = self.handle.dataset(Table::Messages).await?;
1825 let projection = ProjectionRequest::from_columns(["session_id", "id"], dataset.schema());
1826 let batch = dataset.take_rows(rowids, projection).await?;
1827 let mut keys = Vec::with_capacity(batch.num_rows());
1828 for row in 0..batch.num_rows() {
1829 keys.push(MessageKey {
1830 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
1831 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
1832 });
1833 }
1834 Ok(keys)
1835 }
1836
1837 pub async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1839 self.handle.export_write(name, bytes).await
1840 }
1841
1842 pub async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1844 self.handle.export_read(name).await
1845 }
1846
1847 pub fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1849 self.handle.export_local_path(name)
1850 }
1851
1852 pub async fn adapter_names(&self, include_subagents: bool) -> Result<Vec<String>> {
1858 let scanner = self
1859 .handle
1860 .scan(Table::Sessions, ScanOpts::project_only(&["source_agent"]))
1861 .await?;
1862 let mut stream = scanner.try_into_stream().await?;
1863 let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
1864 while let Some(batch) = stream.next().await {
1865 let batch = batch?;
1866 for row in 0..batch.num_rows() {
1867 let agent = string(&batch, "source_agent", row)?.unwrap_or_default();
1868 if !include_subagents && agent.contains('/') {
1869 continue;
1870 }
1871 names.insert(agent);
1872 }
1873 }
1874 Ok(names.into_iter().collect())
1875 }
1876
1877 pub async fn write_embeddings(&self, rows: &[EmbeddedMessage]) -> Result<()> {
1882 if rows.is_empty() {
1883 return Ok(());
1884 }
1885 let batch = embedding_update_batch(rows)?;
1886 self.handle
1887 .merge_update(Table::Messages, batch, rows.len())
1888 .await?;
1889 Ok(())
1890 }
1891
1892 pub fn pending_embedding_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1895 try_stream! {
1896 let filter = Predicate::And(vec![
1897 Predicate::IsNull("vector"),
1898 Predicate::IsNotNull("search_text"),
1899 ]);
1900 let projection: &[&str] = &["session_id", "id", "search_text"];
1901 let scanner = self
1902 .handle
1903 .scan(
1904 Table::Messages,
1905 ScanOpts::with_predicate_and_projection(&filter, projection),
1906 )
1907 .await?;
1908 let mut batches = scanner
1909 .try_into_stream()
1910 .await
1911 .context("failed to open messages stream")?;
1912 while let Some(batch) = batches.next().await {
1913 let batch = batch?;
1914 for row in 0..batch.num_rows() {
1915 yield PendingMessage {
1916 session_id: string(&batch, "session_id", row)?
1917 .context("session_id is null")?,
1918 id: string(&batch, "id", row)?.context("message id is null")?,
1919 search_text: string(&batch, "search_text", row)?
1920 .context("search_text is null")?,
1921 };
1922 }
1923 }
1924 }
1925 }
1926
1927 pub fn pending_or_stale_messages(&self) -> impl Stream<Item = Result<PendingMessage>> + '_ {
1932 try_stream! {
1933 let filter = Predicate::And(vec![
1934 Predicate::IsNotNull("search_text"),
1935 Predicate::Or(vec![
1936 Predicate::IsNull("vector"),
1937 Predicate::Ne("embedding_model", embed::model_id().into()),
1938 ]),
1939 ]);
1940 let projection: &[&str] = &["session_id", "id", "search_text"];
1941 let scanner = self
1942 .handle
1943 .scan(
1944 Table::Messages,
1945 ScanOpts::with_predicate_and_projection(&filter, projection),
1946 )
1947 .await?;
1948 let mut batches = scanner
1949 .try_into_stream()
1950 .await
1951 .context("failed to open pending-or-stale messages stream")?;
1952 while let Some(batch) = batches.next().await {
1953 let batch = batch?;
1954 for row in 0..batch.num_rows() {
1955 yield PendingMessage {
1956 session_id: string(&batch, "session_id", row)?
1957 .context("session_id is null")?,
1958 id: string(&batch, "id", row)?.context("message id is null")?,
1959 search_text: string(&batch, "search_text", row)?
1960 .context("search_text is null")?,
1961 };
1962 }
1963 }
1964 }
1965 }
1966
1967 pub async fn fts_search(
1972 &self,
1973 query: &str,
1974 limit: usize,
1975 filter: &Predicate,
1976 ) -> Result<Vec<SearchHit>> {
1977 let mut hits = if let Some(map) = self.rowmap.load_full() {
1978 let rowid_hits = self.fts_search_rowids(query, limit, filter).await?;
1979 self.resolve_rowid_hits(&map, rowid_hits).await?
1980 } else {
1981 self.fts_search_keys(query, limit, filter).await?
1982 };
1983 hits.sort_by(|left, right| {
1989 right
1990 .score
1991 .partial_cmp(&left.score)
1992 .unwrap_or(std::cmp::Ordering::Equal)
1993 .then_with(|| left.key.session_id.cmp(&right.key.session_id))
1994 .then_with(|| left.key.message_id.cmp(&right.key.message_id))
1995 });
1996 Ok(hits)
1997 }
1998
1999 async fn fts_scanner(
2002 &self,
2003 query: &str,
2004 limit: usize,
2005 filter: &Predicate,
2006 ) -> Result<lance::dataset::scanner::Scanner> {
2007 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2008 scanner.full_text_search(
2009 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2010 )?;
2011 if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2012 scanner.fast_search();
2013 }
2014 scanner.disable_scoring_autoprojection();
2020 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2021 Ok(scanner)
2022 }
2023
2024 async fn fts_search_keys(
2027 &self,
2028 query: &str,
2029 limit: usize,
2030 filter: &Predicate,
2031 ) -> Result<Vec<SearchHit>> {
2032 let mut scanner = self.fts_scanner(query, limit, filter).await?;
2033 scanner.project(&["session_id", "id", "_score"])?;
2034 let batch = scanner.try_into_batch().await?;
2035 let mut hits = Vec::with_capacity(batch.num_rows());
2036 for row in 0..batch.num_rows() {
2037 let key = MessageKey {
2038 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2039 message_id: string(&batch, "id", row)?.context("fts hit id is null")?,
2040 };
2041 hits.push(SearchHit {
2042 rowid: None,
2043 key,
2044 score: float32(&batch, "_score", row)?,
2045 });
2046 }
2047 Ok(hits)
2048 }
2049
2050 pub async fn messages_version(&self) -> Result<u64> {
2053 Ok(self
2054 .handle
2055 .dataset(Table::Messages)
2056 .await?
2057 .version()
2058 .version)
2059 }
2060
2061 pub async fn collect_row_metas(&self) -> Result<Vec<RowMetaEntry>> {
2065 let mut scanner = self.handle.scanner(Table::Messages, None).await?;
2066 scanner.with_row_id();
2067 scanner.project(&Self::ROW_META_COLUMNS)?;
2068 let mut stream = scanner.try_into_stream().await?;
2069 let mut out = Vec::new();
2070 while let Some(batch) = stream.next().await {
2071 let batch = batch?;
2072 let rowids = uint64(&batch, "_rowid")?;
2073 for row in 0..batch.num_rows() {
2074 out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2075 }
2076 }
2077 Ok(out)
2078 }
2079
2080 async fn collect_row_metas_delta(
2085 &self,
2086 base_version: u64,
2087 ) -> Result<Option<Vec<RowMetaEntry>>> {
2088 let dataset = self.handle.dataset(Table::Messages).await?;
2089 let old = dataset.checkout_version(base_version).await?;
2090 let old_ids: HashSet<u64> = old.get_fragments().iter().map(|f| f.id() as u64).collect();
2091 let current = dataset.get_fragments();
2092 let current_ids: HashSet<u64> = current.iter().map(|f| f.id() as u64).collect();
2093 if !old_ids.is_subset(¤t_ids) {
2094 return Ok(None);
2095 }
2096 let new_fragments: Vec<_> = current
2097 .iter()
2098 .filter(|fragment| !old_ids.contains(&(fragment.id() as u64)))
2099 .map(|fragment| fragment.metadata().clone())
2100 .collect();
2101 if new_fragments.is_empty() {
2102 return Ok(Some(Vec::new()));
2103 }
2104 let mut scanner = dataset.scan();
2105 scanner.with_fragments(new_fragments);
2106 scanner.with_row_id();
2107 scanner.project(&Self::ROW_META_COLUMNS)?;
2108 let mut stream = scanner.try_into_stream().await?;
2109 let mut out = Vec::new();
2110 while let Some(batch) = stream.next().await {
2111 let batch = batch?;
2112 let rowids = uint64(&batch, "_rowid")?;
2113 for row in 0..batch.num_rows() {
2114 out.push(row_meta_entry(&batch, rowids.value(row), row)?);
2115 }
2116 }
2117 Ok(Some(out))
2118 }
2119
2120 async fn fts_search_rowids(
2123 &self,
2124 query: &str,
2125 limit: usize,
2126 filter: &Predicate,
2127 ) -> Result<Vec<(u64, f32)>> {
2128 let mut scanner = self.fts_scanner(query, limit, filter).await?;
2129 scanner.with_row_id();
2130 scanner.project(&["_score"])?;
2131 let batch = scanner.try_into_batch().await?;
2132 let rowids = uint64(&batch, "_rowid")?;
2133 let mut hits = Vec::with_capacity(batch.num_rows());
2134 for row in 0..batch.num_rows() {
2135 hits.push((rowids.value(row), float32(&batch, "_score", row)?));
2136 }
2137 Ok(hits)
2138 }
2139
2140 pub async fn searchable_in_scope(&self, filter: &Predicate) -> Result<usize> {
2147 if matches!(filter, Predicate::And(clauses) if clauses.is_empty())
2153 && let Some(count) = self.fts_num_docs().await?
2154 {
2155 return Ok(count);
2156 }
2157 let scope = Predicate::And(vec![Predicate::IsNotNull("search_text"), filter.clone()]);
2158 let dataset = self.handle.dataset(Table::Messages).await?;
2159 let count = dataset.count_rows(Some(scope.to_lance())).await?;
2160 Ok(count)
2161 }
2162
2163 async fn fts_num_docs(&self) -> Result<Option<usize>> {
2167 if !self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2168 return Ok(None);
2169 }
2170 let dataset = self.handle.dataset(Table::Messages).await?;
2171 let json = dataset.index_statistics(MESSAGES_FTS_INDEX).await?;
2172 let parsed: Value =
2173 serde_json::from_str(&json).context("failed to parse FTS index_statistics")?;
2174 let total: u64 = parsed["indices"]
2175 .as_array()
2176 .map(|segments| {
2177 segments
2178 .iter()
2179 .filter_map(|segment| segment["num_docs"].as_u64())
2180 .sum()
2181 })
2182 .unwrap_or(0);
2183 Ok(Some(usize::try_from(total).unwrap_or(usize::MAX)))
2184 }
2185
2186 pub async fn has_embeddings(&self) -> Result<bool> {
2191 let scope = Predicate::IsNotNull("vector");
2192 let mut scanner = self
2193 .handle
2194 .scan(
2195 Table::Messages,
2196 ScanOpts::with_predicate_and_projection(&scope, &["id"]),
2197 )
2198 .await?;
2199 scanner.limit(Some(1), None)?;
2200 let batch = scanner.try_into_batch().await?;
2201 Ok(batch.num_rows() > 0)
2202 }
2203
2204 pub async fn vector_search(
2212 &self,
2213 query: &[f32],
2214 limit: usize,
2215 filter: &Predicate,
2216 search: Option<&config::SearchConfig>,
2217 ) -> Result<Vec<SearchHit>> {
2218 let mut hits = if let Some(map) = self.rowmap.load_full() {
2219 let rowid_hits = self
2220 .vector_search_rowids(query, limit, filter, search)
2221 .await?;
2222 self.resolve_rowid_hits(&map, rowid_hits).await?
2223 } else {
2224 self.vector_search_keys(query, limit, filter, search)
2225 .await?
2226 };
2227 hits.sort_by(|left, right| {
2233 left.score
2234 .partial_cmp(&right.score)
2235 .unwrap_or(std::cmp::Ordering::Equal)
2236 .then_with(|| left.key.session_id.cmp(&right.key.session_id))
2237 .then_with(|| left.key.message_id.cmp(&right.key.message_id))
2238 });
2239 Ok(hits)
2240 }
2241
2242 async fn vector_scanner(
2244 &self,
2245 query: &[f32],
2246 limit: usize,
2247 filter: &Predicate,
2248 search: Option<&config::SearchConfig>,
2249 ) -> Result<lance::dataset::scanner::Scanner> {
2250 let scope = embedded_scope(filter);
2251 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2252 let key = Float32Array::from(query.to_vec());
2253 scanner.nearest("vector", &key, limit)?;
2254 apply_vector_search_knobs(&mut scanner, search);
2255 if self
2256 .handle
2257 .messages_has_index(MESSAGES_VECTOR_INDEX)
2258 .await?
2259 {
2260 scanner.fast_search();
2261 }
2262 scanner.disable_scoring_autoprojection();
2263 Ok(scanner)
2264 }
2265
2266 async fn vector_search_rowids(
2269 &self,
2270 query: &[f32],
2271 limit: usize,
2272 filter: &Predicate,
2273 search: Option<&config::SearchConfig>,
2274 ) -> Result<Vec<(u64, f32)>> {
2275 let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2276 scanner.with_row_id();
2277 scanner.project(&["_distance"])?;
2278 let batch = scanner.try_into_batch().await?;
2279 let rowids = uint64(&batch, "_rowid")?;
2280 let mut hits = Vec::with_capacity(batch.num_rows());
2281 for row in 0..batch.num_rows() {
2282 hits.push((rowids.value(row), float32(&batch, "_distance", row)?));
2283 }
2284 Ok(hits)
2285 }
2286
2287 async fn vector_search_keys(
2290 &self,
2291 query: &[f32],
2292 limit: usize,
2293 filter: &Predicate,
2294 search: Option<&config::SearchConfig>,
2295 ) -> Result<Vec<SearchHit>> {
2296 let mut scanner = self.vector_scanner(query, limit, filter, search).await?;
2297 scanner.project(&["session_id", "id", "_distance"])?;
2298 let batch = scanner.try_into_batch().await?;
2299 let mut hits = Vec::with_capacity(batch.num_rows());
2300 for row in 0..batch.num_rows() {
2301 let key = MessageKey {
2302 session_id: string(&batch, "session_id", row)?.context("session_id is null")?,
2303 message_id: string(&batch, "id", row)?.context("message id is null")?,
2304 };
2305 hits.push(SearchHit {
2306 rowid: None,
2307 key,
2308 score: float32(&batch, "_distance", row)?,
2309 });
2310 }
2311 Ok(hits)
2312 }
2313
2314 pub async fn explain_vector_plan(
2317 &self,
2318 query: &[f32],
2319 limit: usize,
2320 filter: &Predicate,
2321 search: Option<&config::SearchConfig>,
2322 ) -> Result<String> {
2323 let scope = embedded_scope(filter);
2324 let mut scanner = self.handle.scanner(Table::Messages, Some(&scope)).await?;
2325 let key = Float32Array::from(query.to_vec());
2326 scanner.nearest("vector", &key, limit)?;
2327 apply_vector_search_knobs(&mut scanner, search);
2328 if self
2329 .handle
2330 .messages_has_index(MESSAGES_VECTOR_INDEX)
2331 .await?
2332 {
2333 scanner.fast_search();
2334 }
2335 scanner
2336 .explain_plan(true)
2337 .await
2338 .context("explain_plan failed")
2339 }
2340
2341 pub async fn explain_fts_plan(
2342 &self,
2343 query: &str,
2344 limit: usize,
2345 filter: &Predicate,
2346 ) -> Result<String> {
2347 let mut scanner = self.handle.scanner(Table::Messages, Some(filter)).await?;
2348 scanner.full_text_search(
2349 FullTextSearchQuery::new(query.to_owned()).with_column("search_text".to_owned())?,
2350 )?;
2351 if self.handle.messages_has_index(MESSAGES_FTS_INDEX).await? {
2352 scanner.fast_search();
2353 }
2354 scanner.project(&["session_id", "id"])?;
2355 scanner.limit(Some(i64::try_from(limit).unwrap_or(i64::MAX)), None)?;
2356 scanner
2357 .explain_plan(true)
2358 .await
2359 .context("explain_plan failed")
2360 }
2361
2362 pub async fn message_metas_by_rowids(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2370 if rowids.is_empty() {
2371 return Ok(Vec::new());
2372 }
2373 let mut metas = Vec::with_capacity(rowids.len());
2374 let misses: Vec<u64> = if let Some(map) = self.rowmap.load_full() {
2375 let (hits, misses) = map.hydrate(rowids);
2376 metas.extend(hits.into_iter().map(|entry| MessageMeta {
2377 message_id: entry.message_id,
2378 session_id: entry.session_id,
2379 role: entry.role,
2380 project: entry.project,
2381 source_agent: entry.source_agent,
2382 timestamp:
2383 DateTime::from_timestamp_micros(entry.timestamp_micros).unwrap_or_default(),
2384 search_text: entry.search_text,
2385 }));
2386 misses
2387 } else {
2388 rowids.to_vec()
2389 };
2390 if !misses.is_empty() {
2391 metas.extend(self.message_metas_by_rowids_take(&misses).await?);
2392 }
2393 Ok(metas)
2394 }
2395
2396 async fn message_metas_by_rowids_take(&self, rowids: &[u64]) -> Result<Vec<MessageMeta>> {
2401 let dataset = self.handle.dataset(Table::Messages).await?;
2402 let projection = ProjectionRequest::from_columns(
2403 [
2404 "id",
2405 "session_id",
2406 "role",
2407 "project",
2408 "source_agent",
2409 "timestamp",
2410 "search_text",
2411 ],
2412 dataset.schema(),
2413 );
2414 let batch = dataset.take_rows(rowids, projection).await?;
2415 let mut metas = Vec::with_capacity(batch.num_rows());
2416 for row in 0..batch.num_rows() {
2417 metas.push(message_meta_from_batch(&batch, row)?);
2418 }
2419 Ok(metas)
2420 }
2421
2422 pub async fn message_metas_by_keys(&self, keys: &[MessageKey]) -> Result<Vec<MessageMeta>> {
2424 if keys.is_empty() {
2425 return Ok(Vec::new());
2426 }
2427 let wanted = keys.iter().cloned().collect::<HashSet<_>>();
2428 let session_ids = keys
2429 .iter()
2430 .map(|key| key.session_id.clone())
2431 .collect::<Vec<_>>();
2432 let message_ids = keys
2433 .iter()
2434 .map(|key| key.message_id.clone())
2435 .collect::<Vec<_>>();
2436 let predicate = Predicate::And(vec![
2437 in_predicate("session_id", &session_ids),
2438 in_predicate("id", &message_ids),
2439 ]);
2440 let batch = self
2441 .handle
2442 .scan_batch(
2443 Table::Messages,
2444 Some(&predicate),
2445 &[
2446 "id",
2447 "session_id",
2448 "role",
2449 "project",
2450 "source_agent",
2451 "timestamp",
2452 "search_text",
2453 ],
2454 )
2455 .await?;
2456 let mut metas = Vec::with_capacity(batch.num_rows());
2457 for row in 0..batch.num_rows() {
2458 let meta = message_meta_from_batch(&batch, row)?;
2461 if wanted.contains(&MessageKey {
2462 session_id: meta.session_id.clone(),
2463 message_id: meta.message_id.clone(),
2464 }) {
2465 metas.push(meta);
2466 }
2467 }
2468 Ok(metas)
2469 }
2470
2471 pub async fn session_message_counts(
2480 &self,
2481 session_ids: &[String],
2482 ) -> Result<BTreeMap<String, usize>> {
2483 if session_ids.is_empty() {
2484 return Ok(BTreeMap::new());
2485 }
2486 if let Some(map) = self.rowmap.load_full()
2493 && map.version() == self.messages_version().await?
2494 {
2495 return Ok(session_ids
2496 .iter()
2497 .map(|id| (id.clone(), map.lookup_count(id).unwrap_or(0)))
2498 .collect());
2499 }
2500 let predicate = in_predicate("session_id", session_ids);
2501 let scanner = self
2502 .handle
2503 .scan(
2504 Table::Messages,
2505 ScanOpts::with_predicate_and_projection(&predicate, &["session_id"]),
2506 )
2507 .await?;
2508 let mut stream = scanner
2509 .try_into_stream()
2510 .await
2511 .context("failed to open session_message_counts stream")?;
2512 let mut counts: BTreeMap<String, usize> =
2513 session_ids.iter().map(|id| (id.clone(), 0)).collect();
2514 while let Some(batch) = stream.next().await {
2515 let batch = batch.context("failed to read session_message_counts batch")?;
2516 let column = batch
2517 .column_by_name("session_id")
2518 .context("session_message_counts: session_id column missing")?
2519 .as_any()
2520 .downcast_ref::<StringArray>()
2521 .context("session_message_counts: session_id column is not Utf8")?;
2522 for value in column.iter().flatten() {
2523 if let Some(entry) = counts.get_mut(value) {
2524 *entry += 1;
2525 }
2526 }
2527 }
2528 Ok(counts)
2529 }
2530
2531 pub async fn unindexed_message_backlog(&self) -> Result<usize> {
2534 self.handle
2535 .unindexed_row_count(Table::Messages, MESSAGES_FTS_INDEX)
2536 .await
2537 }
2538
2539 pub async fn unindexed_vector_backlog(&self) -> Result<usize> {
2545 self.handle
2546 .unindexed_row_count(Table::Messages, MESSAGES_VECTOR_INDEX)
2547 .await
2548 }
2549
2550 pub async fn embedding_progress(&self) -> Result<EmbeddingProgress> {
2554 let dataset = self.handle.dataset(Table::Messages).await?;
2555 let embedded = dataset
2561 .count_rows(Some(Predicate::IsNotNull("embedding_model").to_lance()))
2562 .await?;
2563 let backlog = self.embed_backlog_count().await?;
2570 Ok(EmbeddingProgress {
2571 embedded,
2572 total: embedded + backlog,
2573 backlog,
2574 model: embed::model_id(),
2575 })
2576 }
2577
2578 pub async fn embed_backlog_count(&self) -> Result<usize> {
2584 let dataset = self.handle.dataset(Table::Messages).await?;
2585 let filter = Predicate::And(vec![
2586 Predicate::IsNull("embedding_model"),
2587 Predicate::IsNotNull("search_text"),
2588 ]);
2589 Ok(dataset.count_rows(Some(filter.to_lance())).await?)
2590 }
2591
2592 pub async fn stale_embedding_count(&self) -> Result<usize> {
2596 let dataset = self.handle.dataset(Table::Messages).await?;
2597 dataset
2603 .count_rows(Some(
2604 Predicate::And(vec![
2605 Predicate::IsNotNull("embedding_model"),
2606 Predicate::Ne("embedding_model", embed::model_id().into()),
2607 ])
2608 .to_lance(),
2609 ))
2610 .await
2611 .map_err(Into::into)
2612 }
2613
2614 pub async fn optimize_indices(
2620 &self,
2621 progress: Option<OptimizeProgressFn>,
2622 maintenance: &MaintenancePolicy,
2623 ) -> Result<OptimizeOutcome> {
2624 let intents = pond_index_intents();
2625 let mut tables = Vec::with_capacity(3);
2626 for (table, intents) in intents.all() {
2627 let outcome = self
2628 .handle
2629 .optimize_table(table, intents, progress.as_ref(), maintenance)
2630 .await;
2631 tables.push(outcome);
2632 }
2633 Ok(OptimizeOutcome { tables })
2634 }
2635
2636 pub async fn build_indices_only(
2642 &self,
2643 progress: Option<OptimizeProgressFn>,
2644 ) -> Result<OptimizeOutcome> {
2645 let policy = pond_index_intents();
2646 let mut tables = Vec::with_capacity(3);
2647 for (table, intents) in policy.all() {
2648 let indices = self
2649 .handle
2650 .optimize_table_indices_only(table, intents, progress.as_ref())
2651 .await;
2652 tables.push(TableOptimizeOutcome {
2653 table,
2654 indices,
2655 compaction: PhaseOutcome::NotAttempted,
2656 });
2657 }
2658 Ok(OptimizeOutcome { tables })
2659 }
2660
2661 #[cfg(test)]
2662 async fn optimize_indices_with_vector_threshold(
2663 &self,
2664 vector_threshold: usize,
2665 ) -> Result<OptimizeOutcome> {
2666 let intents = pond_index_intents_with_vector_threshold(vector_threshold);
2667 let policy = MaintenancePolicy::always_compact();
2668 let mut tables = Vec::with_capacity(3);
2669 for (table, intents) in intents.all() {
2670 let outcome = self
2671 .handle
2672 .optimize_table(table, intents, None, &policy)
2673 .await;
2674 tables.push(outcome);
2675 }
2676 Ok(OptimizeOutcome { tables })
2677 }
2678
2679 pub async fn cleanup_old_versions(&self, older_than: chrono::Duration) -> Result<()> {
2685 for (table, _) in pond_index_intents().all() {
2686 self.handle
2687 .cleanup_table_versions(table, older_than)
2688 .await?;
2689 }
2690 Ok(())
2691 }
2692
2693 pub async fn rebuild_indices(
2694 &self,
2695 intent_name: Option<&str>,
2696 progress: Option<OptimizeProgressFn>,
2697 ) -> Result<()> {
2698 let policy = pond_index_intents();
2699 let mut matched = false;
2700 for (table, intents) in policy.all() {
2701 for intent in intents {
2702 if intent_name.is_none_or(|name| name == intent.name) {
2703 matched = true;
2704 self.handle
2705 .rebuild_index(table, intent, progress.as_ref())
2706 .await?;
2707 }
2708 }
2709 }
2710 if let Some(name) = intent_name
2711 && !matched
2712 {
2713 anyhow::bail!("unknown index intent {name:?}");
2714 }
2715 Ok(())
2716 }
2717
2718 pub async fn drop_index_by_name(&self, name: &str) -> Result<()> {
2725 let Some(owner) = self.handle.find_index_owner(name).await? else {
2726 anyhow::bail!("no index named {name:?} found on any table");
2727 };
2728 self.handle.drop_index(owner, name).await
2729 }
2730
2731 pub async fn index_status(&self) -> Result<Vec<IndexStatus>> {
2732 let policy = pond_index_intents();
2733 let mut statuses = Vec::new();
2734 for (table, intents) in policy.all() {
2735 statuses.extend(self.handle.index_status(table, intents).await?);
2736 }
2737 Ok(statuses)
2738 }
2739
2740 pub async fn drop_vector_index(&self) -> Result<()> {
2744 match self
2745 .handle
2746 .drop_index(Table::Messages, MESSAGES_VECTOR_INDEX)
2747 .await
2748 {
2749 Ok(()) => Ok(()),
2750 Err(error) => {
2751 let msg = error.to_string();
2752 if msg.contains("not found") || msg.contains("does not exist") {
2753 Ok(())
2754 } else {
2755 Err(error)
2756 }
2757 }
2758 }
2759 }
2760
2761 pub async fn table_sizes(&self) -> Result<TableSizes> {
2764 self.handle.table_sizes().await
2765 }
2766
2767 pub async fn initialized(&self) -> Result<bool> {
2768 self.handle.initialized().await
2769 }
2770
2771 async fn find_session(&self, session_id: &str) -> Result<Option<Session>> {
2772 let batch = self
2773 .handle
2774 .scan_batch(
2775 Table::Sessions,
2776 Some(&Predicate::Eq("id", session_id.into())),
2777 &[],
2778 )
2779 .await?;
2780 if batch.num_rows() == 0 {
2781 Ok(None)
2782 } else {
2783 Ok(Some(session_from_batch(&batch, 0)?))
2784 }
2785 }
2786
2787 async fn messages_for_session(&self, session_id: &str) -> Result<Vec<MessageWithParts>> {
2788 let batch = self
2789 .handle
2790 .scan_batch(
2791 Table::Messages,
2792 Some(&Predicate::Eq("session_id", session_id.into())),
2793 &[
2794 "session_id",
2795 "id",
2796 "timestamp",
2797 "role",
2798 "content",
2799 "options",
2800 ],
2801 )
2802 .await?;
2803 let mut messages = Vec::with_capacity(batch.num_rows());
2804 for row in 0..batch.num_rows() {
2805 messages.push(message_from_batch(&batch, row)?);
2806 }
2807 messages.sort_by(|left, right| {
2808 left.timestamp()
2809 .cmp(&right.timestamp())
2810 .then_with(|| left.id().cmp(right.id()))
2811 });
2812
2813 let message_ids = messages
2814 .iter()
2815 .map(|message| message.id().to_owned())
2816 .collect::<Vec<_>>();
2817 let mut parts_by_message = self.parts_for_messages(session_id, &message_ids).await?;
2818
2819 Ok(messages
2820 .into_iter()
2821 .map(|message| {
2822 let key = (message.session_id().to_owned(), message.id().to_owned());
2823 let parts = parts_by_message.remove(&key).unwrap_or_default();
2824 MessageWithParts { message, parts }
2825 })
2826 .collect())
2827 }
2828
2829 pub async fn parts_for_messages(
2833 &self,
2834 session_id: &str,
2835 message_ids: &[String],
2836 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2837 self.scan_parts(session_id, message_ids, None).await
2838 }
2839
2840 pub async fn summary_parts_for_messages(
2845 &self,
2846 session_id: &str,
2847 message_ids: &[String],
2848 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2849 self.scan_parts(session_id, message_ids, Some(SUMMARY_PART_TYPES))
2850 .await
2851 }
2852
2853 async fn scan_parts(
2854 &self,
2855 session_id: &str,
2856 message_ids: &[String],
2857 part_types: Option<&[&str]>,
2858 ) -> Result<BTreeMap<(String, String), Vec<Part>>> {
2859 if message_ids.is_empty() {
2860 return Ok(BTreeMap::new());
2861 }
2862 let mut clauses = vec![
2863 Predicate::Eq("session_id", session_id.into()),
2864 in_predicate("message_id", message_ids),
2865 ];
2866 if let Some(types) = part_types {
2867 clauses.push(Predicate::In(
2868 "type",
2869 types.iter().map(|&t| t.into()).collect(),
2870 ));
2871 }
2872 let predicate = Predicate::And(clauses);
2873 let dataset = std::sync::Arc::new(self.handle.dataset(Table::Parts).await?);
2874 let mut scanner = self
2875 .handle
2876 .scan(
2877 Table::Parts,
2878 ScanOpts::with_predicate_and_projection(
2879 &predicate,
2880 &[
2881 "session_id",
2882 "message_id",
2883 "id",
2884 "ordinal",
2885 "type",
2886 "provenance",
2887 "variant_data",
2888 "options",
2889 ],
2890 ),
2891 )
2892 .await?;
2893 scanner.with_row_address();
2894 let batch = scanner.try_into_batch().await.context("scan failed")?;
2895 let row_addresses = uint64(&batch, "_rowaddr")?;
2896 let mut file_payloads = BTreeMap::<usize, FileData>::new();
2897 let mut file_rows = Vec::<(usize, u64, Vec<u8>)>::new();
2898 for row in 0..batch.num_rows() {
2899 if string(&batch, "type", row)?.as_deref() == Some("file") {
2900 let variant_data =
2901 json_column(&batch, "variant_data", row)?.context("variant_data is null")?;
2902 file_rows.push((row, row_addresses.value(row), variant_data));
2903 }
2904 }
2905 if !file_rows.is_empty() {
2906 let addresses = file_rows
2907 .iter()
2908 .map(|(_, address, _)| *address)
2909 .collect::<Vec<_>>();
2910 let blobs = dataset.take_blobs_by_addresses(&addresses, "data").await?;
2911 for ((row, _, variant_data), blob) in file_rows.into_iter().zip(blobs) {
2912 let payload = file_data_from_blob(&variant_data, &blob.read().await?)?;
2916 file_payloads.insert(row, payload);
2917 }
2918 }
2919 let mut parts_by_message = BTreeMap::<(String, String), Vec<Part>>::new();
2920 for row in 0..batch.num_rows() {
2921 let part = part_from_batch(&batch, row, file_payloads.remove(&row))?;
2922 parts_by_message
2923 .entry((part.session_id.clone(), part.message_id.clone()))
2924 .or_default()
2925 .push(part);
2926 }
2927 for parts in parts_by_message.values_mut() {
2928 parts.sort_by_key(|part| part.ordinal);
2929 }
2930 Ok(parts_by_message)
2931 }
2932}
2933
2934#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2935#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
2936pub enum IngestEvent {
2937 Session(Session),
2938 Message(Message),
2939 Part(Part),
2940}
2941
2942#[derive(Debug, Clone, PartialEq, Eq, Default)]
2950pub struct IngestSummary {
2951 pub inserted: usize,
2955 pub matched: usize,
2957 pub sessions_inserted: usize,
2959 pub messages_inserted_total: usize,
2962 pub messages_inserted_searchable: usize,
2966 pub parts_inserted: usize,
2968 pub sessions_matched: usize,
2970 pub messages_matched_total: usize,
2972 pub messages_matched_searchable: usize,
2974 pub parts_matched: usize,
2976 pub dropped_events: usize,
2986 pub dropped_sessions: usize,
2991 pub skipped_files: usize,
2994 pub skipped_empty: usize,
2999 pub skipped_fresh: usize,
3003 pub storage_errors: usize,
3007 pub truncated_values: usize,
3010 pub drop_reasons: BTreeMap<&'static str, usize>,
3016}
3017
3018pub const DROP_REASON_DUPLICATE_MESSAGE_ID: &str = "duplicate_message_id";
3024pub const DROP_REASON_DUPLICATE_PART_KEY: &str = "duplicate_part_key";
3025pub const DROP_REASON_MESSAGE_BEFORE_SESSION: &str = "message_before_session";
3026pub const DROP_REASON_MESSAGE_SESSION_MISMATCH: &str = "message_session_mismatch";
3027pub const DROP_REASON_PART_BEFORE_MESSAGE: &str = "part_before_message";
3028pub const DROP_REASON_PART_MESSAGE_MISMATCH: &str = "part_message_mismatch";
3029pub const DROP_REASON_EMPTY_SOURCE_AGENT: &str = "empty_source_agent";
3030pub const DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION: &str = "parent_message_without_session";
3031pub const DROP_REASON_IMMUTABLE_PROJECT: &str = "immutable_project";
3032pub const DROP_REASON_IMMUTABLE_SOURCE_AGENT: &str = "immutable_source_agent";
3033pub const DROP_REASON_UNCATEGORIZED: &str = "uncategorized";
3034
3035#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3043pub struct BatchCounts {
3044 pub sessions_inserted: usize,
3045 pub sessions_matched: usize,
3046 pub messages_inserted_total: usize,
3047 pub messages_inserted_searchable: usize,
3048 pub messages_matched_total: usize,
3049 pub messages_matched_searchable: usize,
3050 pub parts_inserted: usize,
3051 pub parts_matched: usize,
3052}
3053
3054impl IngestSummary {
3055 pub fn accepted(&self) -> usize {
3056 self.inserted + self.matched
3057 }
3058
3059 pub fn add_batch(&mut self, counts: &BatchCounts) {
3063 self.sessions_inserted += counts.sessions_inserted;
3064 self.sessions_matched += counts.sessions_matched;
3065 self.messages_inserted_total += counts.messages_inserted_total;
3066 self.messages_inserted_searchable += counts.messages_inserted_searchable;
3067 self.messages_matched_total += counts.messages_matched_total;
3068 self.messages_matched_searchable += counts.messages_matched_searchable;
3069 self.parts_inserted += counts.parts_inserted;
3070 self.parts_matched += counts.parts_matched;
3071 self.inserted +=
3072 counts.sessions_inserted + counts.messages_inserted_total + counts.parts_inserted;
3073 self.matched +=
3074 counts.sessions_matched + counts.messages_matched_total + counts.parts_matched;
3075 }
3076
3077 pub fn merge(&mut self, other: &Self) {
3081 self.inserted += other.inserted;
3082 self.matched += other.matched;
3083 self.sessions_inserted += other.sessions_inserted;
3084 self.messages_inserted_total += other.messages_inserted_total;
3085 self.messages_inserted_searchable += other.messages_inserted_searchable;
3086 self.parts_inserted += other.parts_inserted;
3087 self.sessions_matched += other.sessions_matched;
3088 self.messages_matched_total += other.messages_matched_total;
3089 self.messages_matched_searchable += other.messages_matched_searchable;
3090 self.parts_matched += other.parts_matched;
3091 self.dropped_events += other.dropped_events;
3092 self.dropped_sessions += other.dropped_sessions;
3093 self.skipped_files += other.skipped_files;
3094 self.skipped_empty += other.skipped_empty;
3095 self.skipped_fresh += other.skipped_fresh;
3096 self.storage_errors += other.storage_errors;
3097 self.truncated_values += other.truncated_values;
3098 for (key, value) in &other.drop_reasons {
3099 *self.drop_reasons.entry(key).or_insert(0) += value;
3100 }
3101 }
3102
3103 pub fn add_outcomes_errors_only(&mut self, outcomes: &[RowOutcome]) {
3108 for outcome in outcomes {
3109 if !matches!(outcome.status, OutcomeStatus::Error) {
3110 continue;
3111 }
3112 if outcome.kind == "session" {
3113 self.dropped_sessions += 1;
3114 } else {
3115 self.dropped_events += 1;
3116 }
3117 let reason = outcome
3118 .error
3119 .as_ref()
3120 .and_then(|error| error.reason_key)
3121 .unwrap_or(DROP_REASON_UNCATEGORIZED);
3122 *self.drop_reasons.entry(reason).or_insert(0) += 1;
3123 }
3124 }
3125
3126 pub fn add_outcomes(&mut self, outcomes: &[RowOutcome]) {
3127 for outcome in outcomes {
3128 match outcome.status {
3129 OutcomeStatus::Inserted => {
3130 self.inserted += 1;
3131 match outcome.kind {
3132 "session" => self.sessions_inserted += 1,
3133 "message" => {
3134 self.messages_inserted_total += 1;
3135 if outcome.searchable {
3136 self.messages_inserted_searchable += 1;
3137 }
3138 }
3139 "part" => self.parts_inserted += 1,
3140 _ => {}
3141 }
3142 }
3143 OutcomeStatus::Matched => {
3144 self.matched += 1;
3145 match outcome.kind {
3146 "session" => self.sessions_matched += 1,
3147 "message" => {
3148 self.messages_matched_total += 1;
3149 if outcome.searchable {
3150 self.messages_matched_searchable += 1;
3151 }
3152 }
3153 "part" => self.parts_matched += 1,
3154 _ => {}
3155 }
3156 }
3157 OutcomeStatus::Error => {
3158 if outcome.kind == "session" {
3164 self.dropped_sessions += 1;
3165 } else {
3166 self.dropped_events += 1;
3167 }
3168 let reason = outcome
3169 .error
3170 .as_ref()
3171 .and_then(|e| e.reason_key)
3172 .unwrap_or(DROP_REASON_UNCATEGORIZED);
3173 *self.drop_reasons.entry(reason).or_insert(0) += 1;
3174 }
3175 }
3176 }
3177 }
3178}
3179
3180#[derive(Debug, Clone, PartialEq)]
3185pub struct RowOutcome {
3186 pub index: usize,
3187 pub kind: &'static str,
3188 pub pk: Value,
3189 pub status: OutcomeStatus,
3190 pub error: Option<RowError>,
3191 pub searchable: bool,
3196}
3197
3198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3199pub enum OutcomeStatus {
3200 Inserted,
3201 Matched,
3202 Error,
3203}
3204
3205#[derive(Debug, Clone, PartialEq, Eq)]
3208pub struct RowError {
3209 pub message: String,
3210 pub field: Option<&'static str>,
3211 pub reason: Option<&'static str>,
3212 pub reason_key: Option<&'static str>,
3217}
3218
3219#[derive(Debug)]
3223struct BufferedSession {
3224 index: usize,
3225 session: Session,
3226}
3227
3228#[derive(Debug)]
3229struct BufferedMessage {
3230 index: usize,
3231 message: Message,
3232 parts: Vec<BufferedPart>,
3233 search_text: Option<String>,
3234}
3235
3236#[derive(Debug)]
3237struct BufferedPart {
3238 index: usize,
3239 part: Part,
3240}
3241
3242#[derive(Debug, Default)]
3259pub struct IngestValidator {
3260 session: Option<BufferedSession>,
3261 current_message: Option<BufferedMessage>,
3262 current_parts: Vec<BufferedPart>,
3263 messages: Vec<BufferedMessage>,
3264 seen_message_ids: HashSet<String>,
3268 seen_part_keys: HashSet<(String, String)>,
3271 completed: Vec<CompletedSubstream>,
3275}
3276
3277#[derive(Debug)]
3279struct CompletedSubstream {
3280 session_index: usize,
3281 session: Session,
3282 messages: Vec<BufferedMessage>,
3283}
3284
3285fn ingest_host_stamp() -> Option<&'static Value> {
3290 static STAMP: std::sync::OnceLock<Option<Value>> = std::sync::OnceLock::new();
3291 STAMP
3292 .get_or_init(|| {
3293 let mut host = serde_json::Map::new();
3294 if let Ok(username) = whoami::username() {
3295 host.insert("username".to_owned(), username.into());
3296 }
3297 if let Ok(hostname) = whoami::hostname() {
3298 host.insert("hostname".to_owned(), hostname.into());
3299 }
3300 if let Ok(devicename) = whoami::devicename() {
3301 host.insert("device_name".to_owned(), devicename.into());
3302 }
3303 (!host.is_empty()).then(|| serde_json::json!({ "ingest": { "host": host } }))
3304 })
3305 .as_ref()
3306}
3307
3308impl IngestValidator {
3309 pub async fn push(
3315 &mut self,
3316 store: &Store,
3317 index: usize,
3318 event: IngestEvent,
3319 ) -> Result<Vec<RowOutcome>> {
3320 match event {
3321 IngestEvent::Session(session) => self.push_session(store, index, session).await,
3322 IngestEvent::Message(message) => Ok(self.push_message(index, message)),
3323 IngestEvent::Part(part) => Ok(self.push_part(index, part)),
3324 }
3325 }
3326
3327 pub async fn finish(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3332 self.close_current_substream();
3333 self.flush(store).await
3334 }
3335
3336 pub async fn flush(&mut self, store: &Store) -> Result<(Vec<RowOutcome>, BatchCounts)> {
3343 if self.completed.is_empty() {
3344 return Ok((Vec::new(), BatchCounts::default()));
3345 }
3346 let completed = std::mem::take(&mut self.completed);
3347 store.upsert_session_batch(completed).await
3348 }
3349
3350 pub fn pending_substreams(&self) -> usize {
3353 self.completed.len()
3354 }
3355
3356 async fn push_session(
3357 &mut self,
3358 _store: &Store,
3359 index: usize,
3360 mut session: Session,
3361 ) -> Result<Vec<RowOutcome>> {
3362 self.close_current_substream();
3366
3367 let trimmed = session.source_agent.trim();
3372 if trimmed.is_empty() {
3373 return Ok(vec![RowOutcome {
3374 index,
3375 kind: "session",
3376 pk: Value::String(session.id.clone()),
3377 status: OutcomeStatus::Error,
3378 error: Some(RowError {
3379 message: format!("session {} has empty source_agent after trim", session.id),
3380 field: Some("source_agent"),
3381 reason: None,
3382 reason_key: Some(DROP_REASON_EMPTY_SOURCE_AGENT),
3383 }),
3384 searchable: false,
3385 }]);
3386 }
3387 if trimmed.len() != session.source_agent.len() {
3388 session.source_agent = trimmed.to_owned();
3389 }
3390
3391 if session.parent_message_id.is_some() && session.parent_session_id.is_none() {
3392 return Ok(vec![RowOutcome {
3393 index,
3394 kind: "session",
3395 pk: Value::String(session.id.clone()),
3396 status: OutcomeStatus::Error,
3397 error: Some(RowError {
3398 message: format!(
3399 "session {} has parent_message_id without parent_session_id",
3400 session.id,
3401 ),
3402 field: Some("parent_message_id"),
3403 reason: None,
3404 reason_key: Some(DROP_REASON_PARENT_MESSAGE_WITHOUT_SESSION),
3405 }),
3406 searchable: false,
3407 }]);
3408 }
3409
3410 self.seen_message_ids.clear();
3411 self.seen_part_keys.clear();
3412 self.session = Some(BufferedSession { index, session });
3413 Ok(Vec::new())
3414 }
3415
3416 fn close_current_substream(&mut self) {
3417 self.flush_current_message();
3418 let Some(BufferedSession {
3419 index: session_index,
3420 session,
3421 }) = self.session.take()
3422 else {
3423 return;
3424 };
3425 let messages = std::mem::take(&mut self.messages);
3426 self.seen_message_ids.clear();
3427 self.seen_part_keys.clear();
3428 self.completed.push(CompletedSubstream {
3429 session_index,
3430 session,
3431 messages,
3432 });
3433 }
3434
3435 fn push_message(&mut self, index: usize, mut message: Message) -> Vec<RowOutcome> {
3436 let pk = Value::Array(vec![
3437 Value::String(message.session_id().to_owned()),
3438 Value::String(message.id().to_owned()),
3439 ]);
3440 let Some(session) = &self.session else {
3441 return vec![error_outcome(
3442 index,
3443 "message",
3444 pk,
3445 "first event in a session stream must be Session",
3446 None,
3447 DROP_REASON_MESSAGE_BEFORE_SESSION,
3448 )];
3449 };
3450 if message.session_id() != session.session.id {
3451 let msg = format!(
3452 "message {} references session {}, expected {}",
3453 message.id(),
3454 message.session_id(),
3455 session.session.id
3456 );
3457 return vec![error_outcome(
3458 index,
3459 "message",
3460 pk,
3461 &msg,
3462 Some("session_id"),
3463 DROP_REASON_MESSAGE_SESSION_MISMATCH,
3464 )];
3465 }
3466 if !self.seen_message_ids.insert(message.id().to_owned()) {
3467 let msg = format!("duplicate message id {} in session substream", message.id());
3471 return vec![error_outcome(
3472 index,
3473 "message",
3474 pk,
3475 &msg,
3476 None,
3477 DROP_REASON_DUPLICATE_MESSAGE_ID,
3478 )];
3479 }
3480 match ingest_host_stamp() {
3485 Some(stamp) => {
3486 message
3487 .options_mut()
3488 .insert("pond".to_owned(), stamp.clone());
3489 }
3490 None => {
3491 message.options_mut().remove("pond");
3492 }
3493 }
3494 self.flush_current_message();
3495 self.current_message = Some(BufferedMessage {
3496 index,
3497 message,
3498 parts: Vec::new(),
3499 search_text: None,
3500 });
3501 Vec::new()
3502 }
3503
3504 fn push_part(&mut self, index: usize, part: Part) -> Vec<RowOutcome> {
3505 let pk = Value::Array(vec![
3506 Value::String(part.session_id.clone()),
3507 Value::String(part.message_id.clone()),
3508 Value::String(part.id.clone()),
3509 ]);
3510 let Some(current) = &self.current_message else {
3511 return vec![error_outcome(
3512 index,
3513 "part",
3514 pk,
3515 "part event appeared before a message",
3516 None,
3517 DROP_REASON_PART_BEFORE_MESSAGE,
3518 )];
3519 };
3520 if part.session_id != current.message.session_id() {
3521 let msg = format!(
3522 "part {} references session {}, expected {}",
3523 part.id,
3524 part.session_id,
3525 current.message.session_id()
3526 );
3527 return vec![error_outcome(
3528 index,
3529 "part",
3530 pk,
3531 &msg,
3532 Some("session_id"),
3533 DROP_REASON_PART_MESSAGE_MISMATCH,
3534 )];
3535 }
3536 if part.message_id != current.message.id() {
3537 let msg = format!(
3538 "part {} references message {}, expected {}",
3539 part.id,
3540 part.message_id,
3541 current.message.id()
3542 );
3543 return vec![error_outcome(
3544 index,
3545 "part",
3546 pk,
3547 &msg,
3548 Some("message_id"),
3549 DROP_REASON_PART_MESSAGE_MISMATCH,
3550 )];
3551 }
3552 let part_key = (part.message_id.clone(), part.id.clone());
3553 if !self.seen_part_keys.insert(part_key) {
3554 let msg = format!(
3555 "duplicate part id {} for message {} in session substream",
3556 part.id, part.message_id
3557 );
3558 return vec![error_outcome(
3559 index,
3560 "part",
3561 pk,
3562 &msg,
3563 None,
3564 DROP_REASON_DUPLICATE_PART_KEY,
3565 )];
3566 }
3567 self.current_parts.push(BufferedPart { index, part });
3568 Vec::new()
3569 }
3570
3571 fn flush_current_message(&mut self) {
3572 let Some(mut buffered) = self.current_message.take() else {
3573 return;
3574 };
3575 let parts = std::mem::take(&mut self.current_parts);
3576 let mut canonical_parts = Vec::with_capacity(parts.len());
3577 for part in &parts {
3578 canonical_parts.push(part.part.clone());
3579 }
3580 buffered.search_text = search_text(&buffered.message, &canonical_parts);
3581 buffered.parts = parts;
3582 self.messages.push(buffered);
3583 }
3584}
3585
3586fn error_outcome(
3587 index: usize,
3588 kind: &'static str,
3589 pk: Value,
3590 message: &str,
3591 field: Option<&'static str>,
3592 reason_key: &'static str,
3593) -> RowOutcome {
3594 RowOutcome {
3595 index,
3596 kind,
3597 pk,
3598 status: OutcomeStatus::Error,
3599 error: Some(RowError {
3600 message: message.to_owned(),
3601 field,
3602 reason: None,
3603 reason_key: Some(reason_key),
3604 }),
3605 searchable: false,
3606 }
3607}
3608
3609fn error_outcomes_for_substream(
3614 session_index: usize,
3615 session: &Session,
3616 _messages: &[BufferedMessage],
3617 message: impl Into<String>,
3618 field: Option<&'static str>,
3619 reason_key: &'static str,
3620) -> Vec<RowOutcome> {
3621 let reason = field.map(|_| "immutable");
3622 vec![RowOutcome {
3623 index: session_index,
3624 kind: "session",
3625 pk: Value::String(session.id.clone()),
3626 status: OutcomeStatus::Error,
3627 error: Some(RowError {
3628 message: message.into(),
3629 field,
3630 reason,
3631 reason_key: Some(reason_key),
3632 }),
3633 searchable: false,
3634 }]
3635}
3636
3637fn success_outcomes_for_substream(
3643 session_index: usize,
3644 session: &Session,
3645 messages: &[BufferedMessage],
3646 existing_sessions: &std::collections::HashMap<String, Session>,
3647 existing_message_pks: &HashSet<(String, String)>,
3648 existing_part_pks: &HashSet<(String, String, String)>,
3649 counts: &mut BatchCounts,
3650) -> Vec<RowOutcome> {
3651 let session_was_present = existing_sessions.contains_key(&session.id);
3652 let session_status = if session_was_present {
3653 counts.sessions_matched += 1;
3654 UpsertStatus::Matched
3655 } else {
3656 counts.sessions_inserted += 1;
3657 UpsertStatus::Inserted
3658 };
3659
3660 let mut outcomes = Vec::with_capacity(1 + messages.len());
3661 outcomes.push(success_outcome(
3662 session_index,
3663 "session",
3664 Value::String(session.id.clone()),
3665 session_status,
3666 false,
3667 ));
3668 for buffered in messages {
3669 let key = (
3670 buffered.message.session_id().to_owned(),
3671 buffered.message.id().to_owned(),
3672 );
3673 let searchable = buffered.search_text.is_some();
3674 let message_status = if existing_message_pks.contains(&key) {
3675 counts.messages_matched_total += 1;
3676 if searchable {
3677 counts.messages_matched_searchable += 1;
3678 }
3679 UpsertStatus::Matched
3680 } else {
3681 counts.messages_inserted_total += 1;
3682 if searchable {
3683 counts.messages_inserted_searchable += 1;
3684 }
3685 UpsertStatus::Inserted
3686 };
3687 let pk = Value::Array(vec![Value::String(key.0), Value::String(key.1)]);
3688 outcomes.push(success_outcome(
3689 buffered.index,
3690 "message",
3691 pk,
3692 message_status,
3693 searchable,
3694 ));
3695 for part in &buffered.parts {
3696 let part_key = (
3697 part.part.session_id.clone(),
3698 part.part.message_id.clone(),
3699 part.part.id.clone(),
3700 );
3701 let part_status = if existing_part_pks.contains(&part_key) {
3702 counts.parts_matched += 1;
3703 UpsertStatus::Matched
3704 } else {
3705 counts.parts_inserted += 1;
3706 UpsertStatus::Inserted
3707 };
3708 let part_pk = Value::Array(vec![
3709 Value::String(part_key.0),
3710 Value::String(part_key.1),
3711 Value::String(part_key.2),
3712 ]);
3713 outcomes.push(success_outcome(
3714 part.index,
3715 "part",
3716 part_pk,
3717 part_status,
3718 false,
3719 ));
3720 }
3721 }
3722 outcomes
3723}
3724
3725fn success_outcome(
3726 index: usize,
3727 kind: &'static str,
3728 pk: Value,
3729 status: UpsertStatus,
3730 searchable: bool,
3731) -> RowOutcome {
3732 let status = match status {
3733 UpsertStatus::Inserted => OutcomeStatus::Inserted,
3734 UpsertStatus::Matched => OutcomeStatus::Matched,
3735 };
3736 RowOutcome {
3737 index,
3738 kind,
3739 pk,
3740 status,
3741 error: None,
3742 searchable,
3743 }
3744}
3745
3746#[derive(Debug, Clone, PartialEq, Eq)]
3747enum IngestError {
3748 ImmutableField {
3753 field: &'static str,
3754 session_id: String,
3755 stored: String,
3756 attempted: String,
3757 },
3758}
3759
3760impl std::fmt::Display for IngestError {
3761 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3762 match self {
3763 Self::ImmutableField {
3764 field,
3765 session_id,
3766 stored,
3767 attempted,
3768 } => write!(
3769 formatter,
3770 "session {session_id} {field} is immutable: stored {stored:?}, attempted {attempted:?}",
3771 ),
3772 }
3773 }
3774}
3775
3776impl std::error::Error for IngestError {}
3777
3778fn ensure_immutable_match(
3782 existing: &Session,
3783 incoming: &Session,
3784) -> std::result::Result<(), IngestError> {
3785 if existing.source_agent != incoming.source_agent {
3786 return Err(IngestError::ImmutableField {
3787 field: "source_agent",
3788 session_id: incoming.id.clone(),
3789 stored: existing.source_agent.clone(),
3790 attempted: incoming.source_agent.clone(),
3791 });
3792 }
3793 if existing.project != incoming.project {
3794 return Err(IngestError::ImmutableField {
3795 field: "project",
3796 session_id: incoming.id.clone(),
3797 stored: (*existing.project).clone(),
3798 attempted: (*incoming.project).clone(),
3799 });
3800 }
3801 Ok(())
3802}
3803
3804pub fn search_text(message: &Message, parts: &[Part]) -> Option<String> {
3805 use crate::wire::Provenance;
3806 let mut chunks: Vec<String> = Vec::new();
3807 for part in parts {
3808 if part.provenance != Provenance::Conversational {
3811 continue;
3812 }
3813 match (message.role(), &part.kind) {
3814 (Role::User | Role::Assistant, PartKind::Text { text }) => {
3815 if let Some(text) = text {
3816 chunks.push(text.to_string());
3817 }
3818 }
3819 (
3820 Role::User | Role::Assistant,
3821 PartKind::File {
3822 media_type,
3823 file_name,
3824 data,
3825 },
3826 ) => {
3827 if let Some(file_name) = file_name {
3828 chunks.push(file_name.clone());
3829 }
3830 if let Some(media_type) = media_type {
3831 chunks.push(media_type.clone());
3832 }
3833 if let FileData::Url(uri) = data {
3834 chunks.push(uri.clone());
3835 }
3836 }
3837 (
3838 Role::System | Role::Tool,
3839 PartKind::Text { .. }
3840 | PartKind::Reasoning { .. }
3841 | PartKind::File { .. }
3842 | PartKind::ToolCall { .. }
3843 | PartKind::ToolResult { .. }
3844 | PartKind::ToolApprovalRequest { .. }
3845 | PartKind::ToolApprovalResponse { .. },
3846 )
3847 | (
3848 Role::User | Role::Assistant,
3849 PartKind::Reasoning { .. }
3850 | PartKind::ToolCall { .. }
3851 | PartKind::ToolResult { .. }
3852 | PartKind::ToolApprovalRequest { .. }
3853 | PartKind::ToolApprovalResponse { .. },
3854 ) => {}
3855 }
3856 }
3857
3858 let text = chunks
3859 .into_iter()
3860 .filter(|chunk| !chunk.trim().is_empty())
3861 .collect::<Vec<_>>()
3862 .join("\n");
3863 if text.is_empty() { None } else { Some(text) }
3864}
3865
3866#[derive(Debug, Clone, PartialEq, Eq)]
3868pub struct SearchText(String);
3869
3870impl SearchText {
3871 pub fn as_str(&self) -> &str {
3872 &self.0
3873 }
3874
3875 pub fn into_inner(self) -> String {
3876 self.0
3877 }
3878}
3879
3880impl AsRef<str> for SearchText {
3881 fn as_ref(&self) -> &str {
3882 &self.0
3883 }
3884}
3885
3886#[derive(Debug, Clone, PartialEq)]
3887pub struct MessageWithParts {
3888 pub message: Message,
3889 pub parts: Vec<Part>,
3890}
3891
3892#[derive(Debug, Clone, PartialEq)]
3893pub struct SessionWithMessages {
3894 pub session: Session,
3895 pub messages: Vec<MessageWithParts>,
3896}
3897
3898#[derive(Debug, Clone)]
3899pub struct SessionViewParams<'a> {
3900 pub after_message_id: Option<&'a str>,
3902 pub before_message_id: Option<&'a str>,
3904 pub limit: usize,
3905 pub budget_bytes: usize,
3906 pub session_from: SessionFrom,
3908}
3909
3910#[derive(Debug, Clone)]
3911pub struct MessageViewParams {
3912 pub context_before: usize,
3914 pub context_after: usize,
3916 pub budget_bytes: usize,
3917}
3918
3919#[derive(Debug, Clone, PartialEq)]
3925pub enum GetLookup<T> {
3926 NotFound,
3927 UnknownAnchor,
3928 Found(T),
3929}
3930
3931#[derive(Debug, Clone, PartialEq)]
3935pub struct SessionPage {
3936 pub session: Session,
3937 pub messages: Vec<RetrievedMessage>,
3938 pub before_remaining: usize,
3939 pub after_remaining: usize,
3940}
3941
3942#[derive(Debug, Clone, PartialEq)]
3946pub struct MessagePage {
3947 pub session: Session,
3948 pub target: RetrievedMessage,
3949 pub target_parts: Vec<Part>,
3950 pub target_parts_remaining: usize,
3951 pub siblings: Vec<RetrievedMessage>,
3952}
3953
3954#[derive(Debug, Clone, PartialEq)]
3955pub struct RetrievedMessage {
3956 pub id: String,
3957 pub role: Role,
3958 pub timestamp: DateTime<Utc>,
3959 pub text: Option<String>,
3960 pub content: Option<String>,
3961 pub parts: Vec<Part>,
3962}
3963
3964#[derive(Debug, Clone)]
3965struct ScanRow {
3966 id: String,
3967 role: Role,
3968 timestamp: DateTime<Utc>,
3969 text: Option<String>,
3970 content: Option<String>,
3971}
3972
3973#[derive(Debug, Clone)]
3976pub struct ConversationalRow {
3977 pub session_id: String,
3978 pub message_id: String,
3979 pub role: Role,
3980 pub timestamp: DateTime<Utc>,
3981 pub text: SearchText,
3982}
3983
3984fn page_by<T>(items: &[T], limit: usize, budget_bytes: usize, size: impl Fn(&T) -> usize) -> usize {
3989 let capped = items.len().min(limit.clamp(1, 1000));
3990 let mut acc = 0usize;
3991 let mut emitted = 0usize;
3992 for item in &items[..capped] {
3993 let next = acc.saturating_add(size(item));
3994 if emitted > 0 && next > budget_bytes {
3995 break;
3996 }
3997 acc = next;
3998 emitted += 1;
3999 }
4000 emitted
4001}
4002
4003fn page_tail<T>(
4008 items: &[T],
4009 limit: usize,
4010 budget_bytes: usize,
4011 size: impl Fn(&T) -> usize,
4012) -> usize {
4013 let cap = limit.clamp(1, 1000);
4014 let mut acc = 0usize;
4015 let mut emitted = 0usize;
4016 for item in items.iter().rev() {
4017 if emitted >= cap {
4018 break;
4019 }
4020 let next = acc.saturating_add(size(item));
4021 if emitted > 0 && next > budget_bytes {
4022 break;
4023 }
4024 acc = next;
4025 emitted += 1;
4026 }
4027 emitted
4028}
4029
4030fn role_from_str(value: &str) -> Result<Role> {
4031 match value {
4032 "system" => Ok(Role::System),
4033 "user" => Ok(Role::User),
4034 "assistant" => Ok(Role::Assistant),
4035 "tool" => Ok(Role::Tool),
4036 other => anyhow::bail!("unknown message role {other}"),
4037 }
4038}
4039
4040const MESSAGE_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4050 (
4051 "session_id",
4052 BuiltinIndexType::BTree,
4053 "messages_session_id_btree",
4054 ),
4055 (
4061 "timestamp",
4062 BuiltinIndexType::ZoneMap,
4063 "messages_timestamp_zonemap",
4064 ),
4065 (
4066 "source_agent",
4067 BuiltinIndexType::Bitmap,
4068 "messages_source_agent_bitmap",
4069 ),
4070];
4071
4072const PARTS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] = &[
4075 (
4076 "session_id",
4077 BuiltinIndexType::BTree,
4078 "parts_session_id_btree",
4079 ),
4080 (
4081 "message_id",
4082 BuiltinIndexType::BTree,
4083 "parts_message_id_btree",
4084 ),
4085];
4086
4087const SESSIONS_SCALAR_INDICES: &[(&str, BuiltinIndexType, &str)] =
4090 &[("id", BuiltinIndexType::BTree, "sessions_id_btree")];
4091
4092const COPY_SESSION_IN_CHUNK: usize = 512;
4096
4097fn in_predicate(column: &'static str, values: &[String]) -> Predicate {
4098 Predicate::In(
4099 column,
4100 values.iter().cloned().map(ScalarValue::String).collect(),
4101 )
4102}
4103
4104fn embedded_scope(filter: &Predicate) -> Predicate {
4117 filter.clone()
4118}
4119
4120pub const DEFAULT_NPROBES: usize = 32;
4125
4126fn apply_vector_search_knobs(
4133 scanner: &mut lance::dataset::scanner::Scanner,
4134 search: Option<&config::SearchConfig>,
4135) {
4136 let nprobes = search
4137 .and_then(|cfg| cfg.nprobes)
4138 .unwrap_or(DEFAULT_NPROBES);
4139 scanner.nprobes(nprobes);
4140}
4141
4142pub(crate) const SESSIONS: &str = "sessions";
4146pub(crate) const MESSAGES: &str = "messages";
4147pub(crate) const PARTS: &str = "parts";
4148
4149pub const MESSAGES_FTS_INDEX: &str = "messages_search_text_fts";
4152
4153pub const MESSAGES_VECTOR_INDEX: &str = "messages_vector_ivfpq";
4160
4161const IVF_SQ_NUM_BITS: u16 = 8;
4166const IVF_SQ_MAX_ITERS: usize = 15;
4167
4168pub fn pond_index_intents() -> IndexIntents {
4171 pond_index_intents_with_vector_threshold(VECTOR_INDEX_ACTIVATION_ROWS)
4172}
4173
4174pub(crate) fn pond_index_intents_with_vector_threshold(vector_threshold: usize) -> IndexIntents {
4178 let mut messages = Vec::with_capacity(MESSAGE_SCALAR_INDICES.len() + 2);
4179 messages.push(IndexIntent {
4180 name: MESSAGES_FTS_INDEX,
4181 column: "search_text",
4182 trigger: IndexTrigger::OnAnyRows,
4183 params: IndexParamsKind::InvertedFtsWord,
4184 });
4185 for (column, kind, name) in MESSAGE_SCALAR_INDICES {
4186 messages.push(IndexIntent {
4187 name,
4188 column,
4189 trigger: IndexTrigger::OnAnyRows,
4190 params: IndexParamsKind::Scalar(kind.clone()),
4191 });
4192 }
4193 messages.push(IndexIntent {
4194 name: MESSAGES_VECTOR_INDEX,
4195 column: "vector",
4196 trigger: IndexTrigger::OnNonNullCount {
4197 column: "vector",
4198 threshold: vector_threshold,
4199 },
4200 params: IndexParamsKind::IvfSqCosine {
4201 num_bits: IVF_SQ_NUM_BITS,
4202 max_iters: IVF_SQ_MAX_ITERS,
4203 },
4204 });
4205 let parts = PARTS_SCALAR_INDICES
4206 .iter()
4207 .map(|(column, kind, name)| IndexIntent {
4208 name,
4209 column,
4210 trigger: IndexTrigger::OnAnyRows,
4211 params: IndexParamsKind::Scalar(kind.clone()),
4212 })
4213 .collect();
4214 let sessions = SESSIONS_SCALAR_INDICES
4215 .iter()
4216 .map(|(column, kind, name)| IndexIntent {
4217 name,
4218 column,
4219 trigger: IndexTrigger::OnAnyRows,
4220 params: IndexParamsKind::Scalar(kind.clone()),
4221 })
4222 .collect();
4223 IndexIntents {
4224 sessions,
4225 messages,
4226 parts,
4227 }
4228}
4229
4230pub const DEFAULT_EMBEDDING_DIM: usize = 384;
4234
4235static EMBEDDING_DIM_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
4241
4242pub fn embedding_dim() -> usize {
4245 EMBEDDING_DIM_RUNTIME
4246 .get()
4247 .copied()
4248 .unwrap_or(DEFAULT_EMBEDDING_DIM)
4249}
4250
4251pub fn init_embedding_dim(dim: usize) {
4253 EMBEDDING_DIM_RUNTIME.get_or_init(|| dim);
4254}
4255
4256pub(crate) fn write_params_for_create() -> WriteParams {
4263 WriteParams {
4264 data_storage_version: Some(LanceFileVersion::V2_1),
4265 enable_v2_manifest_paths: true,
4266 enable_stable_row_ids: true,
4267 auto_cleanup: Some(AutoCleanupParams {
4268 interval: 20,
4269 older_than: chrono::TimeDelta::days(1),
4270 }),
4271 skip_auto_cleanup: true,
4272 ..WriteParams::default()
4273 }
4274}
4275
4276fn export_schema(table: Table) -> Arc<Schema> {
4277 match table {
4278 Table::Sessions => session_schema(),
4279 Table::Messages => message_schema(),
4280 Table::Parts => part_schema(),
4281 }
4282}
4283
4284fn ensure_schema_matches_archive(dataset: &Dataset, table: Table) -> Result<()> {
4285 let expected = export_schema(table);
4286 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
4287 let actual_names: Vec<_> = actual.fields().iter().map(|field| field.name()).collect();
4288 let expected_names: Vec<_> = expected.fields().iter().map(|field| field.name()).collect();
4289 if actual_names != expected_names {
4290 anyhow::bail!(
4291 "{} archive table has columns {actual_names:?} but this pond build expects {expected_names:?}",
4292 table.as_str(),
4293 );
4294 }
4295 Ok(())
4296}
4297
4298async fn open_archive_table(table: Table, source: &Path) -> Result<Dataset> {
4299 let source_uri = source
4300 .to_str()
4301 .with_context(|| format!("archive path is not UTF-8: {}", source.display()))?;
4302 let dataset = Dataset::open(source_uri)
4303 .await
4304 .with_context(|| format!("failed to open {} archive table", table.as_str()))?;
4305 ensure_schema_matches_archive(&dataset, table)?;
4306 Ok(dataset)
4307}
4308
4309pub(crate) fn session_schema() -> Arc<Schema> {
4310 Arc::new(Schema::new(vec![
4311 primary_field("id", DataType::Utf8, false),
4312 Field::new("parent_session_id", DataType::Utf8, true),
4313 Field::new("parent_message_id", DataType::Utf8, true),
4314 Field::new("source_agent", DataType::Utf8, false),
4315 Field::new(
4316 "created_at",
4317 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4318 false,
4319 ),
4320 Field::new("project", DataType::Utf8, false),
4321 json_field("options", false),
4322 ]))
4323}
4324
4325pub(crate) fn message_schema() -> Arc<Schema> {
4326 Arc::new(Schema::new(vec![
4327 primary_field("session_id", DataType::Utf8, false),
4328 primary_field("id", DataType::Utf8, false),
4329 Field::new(
4330 "timestamp",
4331 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
4332 false,
4333 ),
4334 Field::new("role", DataType::Utf8, false),
4335 Field::new("source_agent", DataType::Utf8, false),
4336 Field::new("project", DataType::Utf8, false),
4337 Field::new("content", DataType::Utf8, true),
4338 Field::new("search_text", DataType::Utf8, true),
4339 Field::new("vector", embedding_vector_type(), true),
4342 Field::new("embedding_model", DataType::Utf8, true),
4343 json_field("options", false),
4344 ]))
4345}
4346
4347pub(crate) fn part_schema() -> Arc<Schema> {
4348 Arc::new(Schema::new(vec![
4349 primary_field("session_id", DataType::Utf8, false),
4350 primary_field("message_id", DataType::Utf8, false),
4351 primary_field("id", DataType::Utf8, false),
4352 Field::new("ordinal", DataType::Int32, false),
4353 Field::new("type", DataType::Utf8, false),
4354 Field::new("provenance", DataType::Utf8, false),
4357 json_field("variant_data", false),
4358 legacy_blob_field("data", true),
4359 json_field("options", false),
4360 ]))
4361}
4362
4363pub(crate) fn empty_batch(schema: Arc<Schema>) -> Result<RecordBatch> {
4364 let arrays = schema
4365 .fields()
4366 .iter()
4367 .map(|field| lance::deps::arrow_array::new_empty_array(field.data_type()))
4368 .collect();
4369 RecordBatch::try_new(schema, arrays).context("failed to build empty Lance batch")
4370}
4371
4372pub(crate) fn empty_reader(
4373 schema: Arc<Schema>,
4374) -> Result<
4375 RecordBatchIterator<
4376 std::vec::IntoIter<Result<RecordBatch, lance::deps::arrow_schema::ArrowError>>,
4377 >,
4378> {
4379 let batch = empty_batch(schema.clone())?;
4380 Ok(RecordBatchIterator::new(
4381 vec![Ok(batch)].into_iter(),
4382 schema,
4383 ))
4384}
4385
4386pub(crate) struct MessageBatchRow<'a> {
4387 pub message: &'a Message,
4388 pub source_agent: &'a str,
4389 pub project: &'a str,
4390 pub search_text: Option<&'a str>,
4391}
4392
4393fn embedding_vector_type() -> DataType {
4399 DataType::FixedSizeList(
4400 Arc::new(Field::new("item", DataType::Float16, true)),
4401 embedding_dim() as i32,
4402 )
4403}
4404
4405fn embedding_update_schema() -> Arc<Schema> {
4409 Arc::new(Schema::new(vec![
4410 primary_field("session_id", DataType::Utf8, false),
4411 primary_field("id", DataType::Utf8, false),
4412 Field::new("vector", embedding_vector_type(), true),
4413 Field::new("embedding_model", DataType::Utf8, true),
4414 ]))
4415}
4416
4417pub(crate) fn embedding_update_batch(rows: &[EmbeddedMessage]) -> Result<RecordBatch> {
4420 let dim = embedding_dim();
4421 let mut flat = Vec::with_capacity(rows.len() * dim);
4422 for row in rows {
4423 if row.vector.len() != dim {
4424 anyhow::bail!(
4425 "embedding for message {} has dim {}, expected {dim}",
4426 row.id,
4427 row.vector.len(),
4428 );
4429 }
4430 flat.extend(row.vector.iter().map(|value| half::f16::from_f32(*value)));
4431 }
4432 let values = Float16Array::from(flat);
4433 let item_field = Arc::new(Field::new("item", DataType::Float16, true));
4434 let vectors = FixedSizeListArray::try_new(item_field, dim as i32, Arc::new(values), None)
4435 .context("failed to build embedding vector column")?;
4436
4437 RecordBatch::try_new(
4438 embedding_update_schema(),
4439 vec![
4440 Arc::new(StringArray::from(
4441 rows.iter()
4442 .map(|row| row.session_id.as_str())
4443 .collect::<Vec<_>>(),
4444 )),
4445 Arc::new(StringArray::from(
4446 rows.iter().map(|row| row.id.as_str()).collect::<Vec<_>>(),
4447 )),
4448 Arc::new(vectors),
4449 Arc::new(StringArray::from(vec![embed::model_id(); rows.len()])),
4450 ],
4451 )
4452 .context("failed to build embedding update batch")
4453}
4454
4455const COLUMN_BYTE_BUDGET: usize = 1 << 30;
4460
4461fn chunk_ranges(cells: &[usize]) -> Vec<std::ops::Range<usize>> {
4466 let mut chunks = Vec::new();
4467 let mut start = 0usize;
4468 let mut running = 0usize;
4469 for (index, &row) in cells.iter().enumerate() {
4470 if running + row > COLUMN_BYTE_BUDGET && index > start {
4471 chunks.push(start..index);
4472 start = index;
4473 running = 0;
4474 }
4475 running += row;
4476 }
4477 if start < cells.len() {
4478 chunks.push(start..cells.len());
4479 }
4480 chunks
4481}
4482
4483fn guard_cell(table: &str, pk: &str, bytes: usize) -> Result<()> {
4484 if bytes >= COLUMN_BYTE_BUDGET {
4485 anyhow::bail!(
4486 "{table} row {pk}: a {bytes}-byte text cell meets the per-cell ceiling and would \
4487 overflow Arrow's i32 offset buffer"
4488 );
4489 }
4490 Ok(())
4491}
4492
4493async fn merge_insert_chunks(
4494 handle: &Handle,
4495 table: Table,
4496 batches: Vec<RecordBatch>,
4497) -> Result<u64> {
4498 let mut inserted = 0u64;
4499 for batch in batches {
4500 let rows = batch.num_rows();
4501 inserted += handle.merge_insert(table, batch, rows).await?;
4502 }
4503 Ok(inserted)
4504}
4505
4506pub(crate) fn sessions_batches(sessions: &[Session]) -> Result<Vec<RecordBatch>> {
4507 let options = sessions
4508 .iter()
4509 .map(|session| json_bytes(&session.options))
4510 .collect::<Result<Vec<_>>>()?;
4511 let mut cells = Vec::with_capacity(sessions.len());
4512 for (session, encoded) in sessions.iter().zip(&options) {
4513 let columns = [
4514 session.id.len(),
4515 session.parent_session_id.as_deref().map_or(0, str::len),
4516 session.parent_message_id.as_deref().map_or(0, str::len),
4517 session.source_agent.len(),
4518 session.project.as_str().len(),
4519 encoded.len(),
4520 ];
4521 for bytes in columns {
4522 guard_cell("sessions", &session.id, bytes)?;
4523 }
4524 cells.push(columns.iter().sum());
4525 }
4526 chunk_ranges(&cells)
4527 .into_iter()
4528 .map(|range| sessions_chunk(&sessions[range.clone()], &options[range]))
4529 .collect()
4530}
4531
4532fn sessions_chunk(sessions: &[Session], options: &[Vec<u8>]) -> Result<RecordBatch> {
4533 let schema = session_schema();
4534 RecordBatch::try_new(
4535 schema.clone(),
4536 vec![
4537 Arc::new(StringArray::from(
4538 sessions
4539 .iter()
4540 .map(|session| session.id.as_str())
4541 .collect::<Vec<_>>(),
4542 )),
4543 Arc::new(StringArray::from(
4544 sessions
4545 .iter()
4546 .map(|session| session.parent_session_id.as_deref())
4547 .collect::<Vec<_>>(),
4548 )),
4549 Arc::new(StringArray::from(
4550 sessions
4551 .iter()
4552 .map(|session| session.parent_message_id.as_deref())
4553 .collect::<Vec<_>>(),
4554 )),
4555 Arc::new(StringArray::from(
4556 sessions
4557 .iter()
4558 .map(|session| session.source_agent.as_str())
4559 .collect::<Vec<_>>(),
4560 )),
4561 Arc::new(
4562 TimestampMicrosecondArray::from(
4563 sessions
4564 .iter()
4565 .map(|session| micros(session.created_at))
4566 .collect::<Vec<_>>(),
4567 )
4568 .with_timezone("UTC"),
4569 ),
4570 Arc::new(StringArray::from(
4571 sessions
4572 .iter()
4573 .map(|session| session.project.as_str())
4574 .collect::<Vec<_>>(),
4575 )),
4576 Arc::new(LargeBinaryArray::from_iter_values(
4577 options.iter().map(Vec::as_slice),
4578 )),
4579 ],
4580 )
4581 .context("failed to build session batch")
4582}
4583
4584pub(crate) fn messages_batches(rows: &[MessageBatchRow<'_>]) -> Result<Vec<RecordBatch>> {
4585 let options = rows
4586 .iter()
4587 .map(|row| json_bytes(row.message.options()))
4588 .collect::<Result<Vec<_>>>()?;
4589 let mut cells = Vec::with_capacity(rows.len());
4590 for (row, encoded) in rows.iter().zip(&options) {
4591 let columns = [
4592 row.message.session_id().len(),
4593 row.message.id().len(),
4594 row.message.role().as_str().len(),
4595 row.source_agent.len(),
4596 row.project.len(),
4597 row.message.system_content().map_or(0, str::len),
4598 row.search_text.map_or(0, str::len),
4599 encoded.len(),
4600 ];
4601 for bytes in columns {
4602 guard_cell("messages", row.message.id(), bytes)?;
4603 }
4604 cells.push(columns.iter().sum());
4605 }
4606 chunk_ranges(&cells)
4607 .into_iter()
4608 .map(|range| messages_chunk(&rows[range.clone()], &options[range]))
4609 .collect()
4610}
4611
4612fn messages_chunk(rows: &[MessageBatchRow<'_>], options: &[Vec<u8>]) -> Result<RecordBatch> {
4613 let schema = message_schema();
4614 RecordBatch::try_new(
4615 schema.clone(),
4616 vec![
4617 Arc::new(StringArray::from(
4618 rows.iter()
4619 .map(|row| row.message.session_id())
4620 .collect::<Vec<_>>(),
4621 )),
4622 Arc::new(StringArray::from(
4623 rows.iter().map(|row| row.message.id()).collect::<Vec<_>>(),
4624 )),
4625 Arc::new(
4626 TimestampMicrosecondArray::from(
4627 rows.iter()
4628 .map(|row| micros(row.message.timestamp()))
4629 .collect::<Vec<_>>(),
4630 )
4631 .with_timezone("UTC"),
4632 ),
4633 Arc::new(StringArray::from(
4634 rows.iter()
4635 .map(|row| row.message.role().as_str())
4636 .collect::<Vec<_>>(),
4637 )),
4638 Arc::new(StringArray::from(
4639 rows.iter().map(|row| row.source_agent).collect::<Vec<_>>(),
4640 )),
4641 Arc::new(StringArray::from(
4642 rows.iter().map(|row| row.project).collect::<Vec<_>>(),
4643 )),
4644 Arc::new(StringArray::from(
4645 rows.iter()
4646 .map(|row| row.message.system_content())
4647 .collect::<Vec<_>>(),
4648 )),
4649 Arc::new(StringArray::from(
4650 rows.iter().map(|row| row.search_text).collect::<Vec<_>>(),
4651 )),
4652 new_null_array(&embedding_vector_type(), rows.len()),
4656 new_null_array(&DataType::Utf8, rows.len()),
4657 Arc::new(LargeBinaryArray::from_iter_values(
4658 options.iter().map(Vec::as_slice),
4659 )),
4660 ],
4661 )
4662 .context("failed to build message batch")
4663}
4664
4665pub(crate) fn parts_batches(parts: &[Part]) -> Result<Vec<RecordBatch>> {
4666 let variant_data = parts
4667 .iter()
4668 .map(|part| part_variant_json(&part.kind))
4669 .collect::<Result<Vec<_>>>()?;
4670 let options = parts
4671 .iter()
4672 .map(|part| json_bytes(&part.options))
4673 .collect::<Result<Vec<_>>>()?;
4674 let mut cells = Vec::with_capacity(parts.len());
4675 for ((part, variant), encoded) in parts.iter().zip(&variant_data).zip(&options) {
4678 let columns = [
4679 part.session_id.len(),
4680 part.message_id.len(),
4681 part.id.len(),
4682 part.kind.type_name().len(),
4683 part.provenance.as_str().len(),
4684 variant.len(),
4685 encoded.len(),
4686 ];
4687 for bytes in columns {
4688 guard_cell("parts", &part.id, bytes)?;
4689 }
4690 cells.push(columns.iter().sum());
4691 }
4692 chunk_ranges(&cells)
4693 .into_iter()
4694 .map(|range| {
4695 parts_chunk(
4696 &parts[range.clone()],
4697 &variant_data[range.clone()],
4698 &options[range],
4699 )
4700 })
4701 .collect()
4702}
4703
4704fn parts_chunk(
4705 parts: &[Part],
4706 variant_data: &[Vec<u8>],
4707 options: &[Vec<u8>],
4708) -> Result<RecordBatch> {
4709 let schema = part_schema();
4710 let blob_payloads: Vec<Option<&[u8]>> = parts
4714 .iter()
4715 .map(|part| match &part.kind {
4716 PartKind::File { data, .. } => Some(match data {
4717 FileData::String(value) => value.as_bytes(),
4718 FileData::Bytes(value) => value.as_slice(),
4719 FileData::Url(value) => value.as_bytes(),
4720 }),
4721 PartKind::Text { .. }
4722 | PartKind::Reasoning { .. }
4723 | PartKind::ToolCall { .. }
4724 | PartKind::ToolResult { .. }
4725 | PartKind::ToolApprovalRequest { .. }
4726 | PartKind::ToolApprovalResponse { .. } => None,
4727 })
4728 .collect();
4729 let blob_array = LargeBinaryArray::from_iter(blob_payloads);
4730
4731 RecordBatch::try_new(
4732 schema.clone(),
4733 vec![
4734 Arc::new(StringArray::from(
4735 parts
4736 .iter()
4737 .map(|part| part.session_id.as_str())
4738 .collect::<Vec<_>>(),
4739 )),
4740 Arc::new(StringArray::from(
4741 parts
4742 .iter()
4743 .map(|part| part.message_id.as_str())
4744 .collect::<Vec<_>>(),
4745 )),
4746 Arc::new(StringArray::from(
4747 parts
4748 .iter()
4749 .map(|part| part.id.as_str())
4750 .collect::<Vec<_>>(),
4751 )),
4752 Arc::new(Int32Array::from(
4753 parts.iter().map(|part| part.ordinal).collect::<Vec<_>>(),
4754 )),
4755 Arc::new(StringArray::from(
4756 parts
4757 .iter()
4758 .map(|part| part.kind.type_name())
4759 .collect::<Vec<_>>(),
4760 )),
4761 Arc::new(StringArray::from(
4762 parts
4763 .iter()
4764 .map(|part| part.provenance.as_str())
4765 .collect::<Vec<_>>(),
4766 )),
4767 Arc::new(LargeBinaryArray::from_iter_values(
4768 variant_data.iter().map(Vec::as_slice),
4769 )),
4770 Arc::new(blob_array),
4771 Arc::new(LargeBinaryArray::from_iter_values(
4772 options.iter().map(Vec::as_slice),
4773 )),
4774 ],
4775 )
4776 .context("failed to build parts batch")
4777}
4778
4779pub(crate) fn session_from_batch(batch: &RecordBatch, row: usize) -> Result<Session> {
4780 Ok(Session {
4781 id: string(batch, "id", row)?.context("session id is null")?,
4782 parent_session_id: string(batch, "parent_session_id", row)?,
4783 parent_message_id: string(batch, "parent_message_id", row)?,
4784 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4785 created_at: datetime(batch, "created_at", row)?,
4786 project: crate::adapter::Extracted::from_stored(
4787 string(batch, "project", row)?.context("project is null")?,
4788 ),
4789 options: json_parse(&json_column(batch, "options", row)?.context("options is null")?)?,
4790 })
4791}
4792
4793pub struct RowmapOracle(pub Option<Arc<RowMetaSet>>);
4800
4801impl crate::adapter::SkipOracle for RowmapOracle {
4802 fn session_max_ts(&self, session_id: &str) -> Option<i64> {
4803 self.0.as_ref()?.lookup_max_ts(session_id)
4804 }
4805
4806 fn is_empty(&self) -> bool {
4807 self.0.as_ref().is_none_or(|set| set.is_empty())
4808 }
4809}
4810
4811fn row_meta_entry(batch: &RecordBatch, row_id: u64, row: usize) -> Result<RowMetaEntry> {
4812 Ok(RowMetaEntry {
4813 row_id,
4814 session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4815 message_id: string(batch, "id", row)?.context("message id is null")?,
4816 role: string(batch, "role", row)?.context("role is null")?,
4817 project: string(batch, "project", row)?.context("project is null")?,
4818 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4819 timestamp_micros: datetime(batch, "timestamp", row)?.timestamp_micros(),
4820 search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4821 })
4822}
4823
4824pub(crate) fn message_meta_from_batch(batch: &RecordBatch, row: usize) -> Result<MessageMeta> {
4825 Ok(MessageMeta {
4826 message_id: string(batch, "id", row)?.context("id is null")?,
4827 session_id: string(batch, "session_id", row)?.context("session_id is null")?,
4828 role: string(batch, "role", row)?.context("role is null")?,
4829 project: string(batch, "project", row)?.context("project is null")?,
4830 source_agent: string(batch, "source_agent", row)?.context("source_agent is null")?,
4831 timestamp: datetime(batch, "timestamp", row)?,
4832 search_text: string(batch, "search_text", row)?.unwrap_or_default(),
4833 })
4834}
4835
4836pub(crate) fn message_from_batch(batch: &RecordBatch, row: usize) -> Result<Message> {
4837 let id = string(batch, "id", row)?.context("message id is null")?;
4838 let session_id = string(batch, "session_id", row)?.context("message session_id is null")?;
4839 let timestamp = datetime(batch, "timestamp", row)?;
4840 let options =
4841 json_parse(&json_column(batch, "options", row)?.context("message options is null")?)?;
4842
4843 match string(batch, "role", row)?
4844 .context("message role is null")?
4845 .as_str()
4846 {
4847 "system" => Ok(Message::System {
4848 id,
4849 session_id,
4850 timestamp,
4851 content: string(batch, "content", row)?.map(crate::adapter::Extracted::from_stored),
4858 options,
4859 }),
4860 "user" => Ok(Message::User {
4861 id,
4862 session_id,
4863 timestamp,
4864 options,
4865 }),
4866 "assistant" => Ok(Message::Assistant {
4867 id,
4868 session_id,
4869 timestamp,
4870 options,
4871 }),
4872 "tool" => Ok(Message::Tool {
4873 id,
4874 session_id,
4875 timestamp,
4876 options,
4877 }),
4878 other => anyhow::bail!("unknown message role {other}"),
4879 }
4880}
4881
4882pub(crate) fn part_from_batch(
4883 batch: &RecordBatch,
4884 row: usize,
4885 file_data: Option<FileData>,
4886) -> Result<Part> {
4887 let type_name = string(batch, "type", row)?.context("part type is null")?;
4888 let variant_data = json_column(batch, "variant_data", row)?.context("variant_data is null")?;
4889 let provenance = string(batch, "provenance", row)?.context("part provenance is null")?;
4890 Ok(Part {
4891 session_id: string(batch, "session_id", row)?.context("part session_id is null")?,
4892 message_id: string(batch, "message_id", row)?.context("part message_id is null")?,
4893 id: string(batch, "id", row)?.context("part id is null")?,
4894 ordinal: int32(batch, "ordinal", row)?,
4895 provenance: provenance_from_str(&provenance)?,
4896 options: json_parse(&json_column(batch, "options", row)?.context("part options is null")?)?,
4897 kind: part_kind_from_json(&type_name, &variant_data, file_data)?,
4898 })
4899}
4900
4901fn provenance_from_str(value: &str) -> Result<crate::wire::Provenance> {
4902 match value {
4903 "conversational" => Ok(crate::wire::Provenance::Conversational),
4904 "injected" => Ok(crate::wire::Provenance::Injected),
4905 other => anyhow::bail!("unknown part provenance {other}"),
4906 }
4907}
4908
4909fn file_data_from_blob(variant_data: &[u8], bytes: &[u8]) -> Result<FileData> {
4910 let kind = file_data_kind(variant_data)?;
4911 match kind.as_str() {
4912 "string" => {
4913 let text = std::str::from_utf8(bytes)
4914 .context("file string payload is not UTF-8")?
4915 .to_owned();
4916 Ok(FileData::String(text))
4917 }
4918 "bytes" => Ok(FileData::Bytes(bytes.to_vec())),
4919 "url" => Ok(FileData::Url(
4920 std::str::from_utf8(bytes)
4921 .context("file URL payload is not UTF-8")?
4922 .to_owned(),
4923 )),
4924 other => anyhow::bail!("unknown file data_kind {other}"),
4925 }
4926}
4927
4928fn file_data_kind(variant_data: &[u8]) -> Result<String> {
4929 let value = json_parse::<Value>(variant_data)?;
4930 value
4931 .get("data_kind")
4932 .and_then(Value::as_str)
4933 .map(str::to_owned)
4934 .context("file part variant_data missing data_kind")
4935}
4936
4937fn uint64<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
4938 batch
4939 .column_by_name(name)
4940 .with_context(|| format!("missing column {name}"))?
4941 .as_any()
4942 .downcast_ref::<UInt64Array>()
4943 .with_context(|| format!("column {name} is not UInt64"))
4944}
4945
4946pub(crate) fn string(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<String>> {
4947 let array = batch
4948 .column_by_name(name)
4949 .with_context(|| format!("missing column {name}"))?
4950 .as_any()
4951 .downcast_ref::<StringArray>()
4952 .with_context(|| format!("column {name} is not Utf8"))?;
4953 if array.is_null(row) {
4954 Ok(None)
4955 } else {
4956 Ok(Some(array.value(row).to_owned()))
4957 }
4958}
4959
4960fn json_column(batch: &RecordBatch, name: &str, row: usize) -> Result<Option<Vec<u8>>> {
4961 let column = batch
4965 .column_by_name(name)
4966 .with_context(|| format!("missing column {name}"))?;
4967 if let Some(array) = column.as_any().downcast_ref::<LargeBinaryArray>() {
4968 return if array.is_null(row) {
4969 Ok(None)
4970 } else {
4971 Ok(Some(
4972 lance_arrow::json::decode_json(array.value(row)).into_bytes(),
4973 ))
4974 };
4975 }
4976 if let Some(array) = column.as_any().downcast_ref::<StringArray>() {
4977 return if array.is_null(row) {
4978 Ok(None)
4979 } else {
4980 Ok(Some(array.value(row).as_bytes().to_vec()))
4981 };
4982 }
4983 if let Some(array) = column.as_any().downcast_ref::<LargeStringArray>() {
4984 return if array.is_null(row) {
4985 Ok(None)
4986 } else {
4987 Ok(Some(array.value(row).as_bytes().to_vec()))
4988 };
4989 }
4990 anyhow::bail!("column {name} is not a JSON-compatible array")
4991}
4992
4993fn int32(batch: &RecordBatch, name: &str, row: usize) -> Result<i32> {
4994 let array = batch
4995 .column_by_name(name)
4996 .with_context(|| format!("missing column {name}"))?
4997 .as_any()
4998 .downcast_ref::<Int32Array>()
4999 .with_context(|| format!("column {name} is not Int32"))?;
5000 Ok(array.value(row))
5001}
5002
5003pub(crate) fn float32(batch: &RecordBatch, name: &str, row: usize) -> Result<f32> {
5004 let array = batch
5005 .column_by_name(name)
5006 .with_context(|| format!("missing column {name}"))?
5007 .as_any()
5008 .downcast_ref::<Float32Array>()
5009 .with_context(|| format!("column {name} is not Float32"))?;
5010 Ok(array.value(row))
5011}
5012
5013pub(crate) fn datetime(batch: &RecordBatch, name: &str, row: usize) -> Result<DateTime<Utc>> {
5014 let array = batch
5015 .column_by_name(name)
5016 .with_context(|| format!("missing column {name}"))?
5017 .as_any()
5018 .downcast_ref::<TimestampMicrosecondArray>()
5019 .with_context(|| format!("column {name} is not timestamp_micros"))?;
5020 Utc.timestamp_micros(array.value(row))
5021 .single()
5022 .context("timestamp is out of range")
5023}
5024
5025fn primary_field(name: &str, data_type: DataType, nullable: bool) -> Field {
5026 Field::new(name, data_type, nullable).with_metadata(
5027 [(
5028 "lance-schema:unenforced-primary-key".to_owned(),
5029 "true".to_owned(),
5030 )]
5031 .into(),
5032 )
5033}
5034
5035fn legacy_blob_field(name: &str, nullable: bool) -> Field {
5045 Field::new(name, DataType::LargeBinary, nullable).with_metadata(
5046 [(lance_arrow::BLOB_META_KEY.to_owned(), "true".to_owned())]
5047 .into_iter()
5048 .collect(),
5049 )
5050}
5051
5052fn json_field(name: &str, nullable: bool) -> Field {
5053 lance_arrow::json::json_field(name, nullable)
5054}
5055
5056fn micros(timestamp: DateTime<Utc>) -> i64 {
5057 timestamp.timestamp_micros()
5058}
5059
5060fn json_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
5061 let text = serde_json::to_string(value).context("failed to serialize JSON field")?;
5069 lance_arrow::json::encode_json(&text)
5070 .map_err(|err| anyhow::anyhow!("failed to encode JSON field as JSONB: {err}"))
5071}
5072
5073fn json_parse<T: DeserializeOwned>(value: &[u8]) -> Result<T> {
5074 serde_json::from_slice(value).context("failed to parse JSON field")
5075}
5076
5077fn part_variant_json(kind: &PartKind) -> Result<Vec<u8>> {
5078 if let PartKind::File {
5079 media_type,
5080 file_name,
5081 data,
5082 } = kind
5083 {
5084 let data_kind = match data {
5085 FileData::String(_) => "string",
5086 FileData::Bytes(_) => "bytes",
5087 FileData::Url(_) => "url",
5088 };
5089 return json_bytes(&serde_json::json!({
5090 "media_type": media_type,
5091 "file_name": file_name,
5092 "data_kind": data_kind,
5093 }));
5094 }
5095 let value = serde_json::to_value(kind)?;
5096 let mut object = value
5097 .as_object()
5098 .cloned()
5099 .context("part variant did not serialize to an object")?;
5100 object.remove("type");
5101 json_bytes(&object)
5102}
5103
5104fn part_kind_from_json(
5105 type_name: &str,
5106 variant_data: &[u8],
5107 file_data: Option<FileData>,
5108) -> Result<PartKind> {
5109 let mut value = json_parse::<Value>(variant_data)?;
5110 let object = value
5111 .as_object_mut()
5112 .context("part variant data is not an object")?;
5113 object.insert("type".to_owned(), Value::String(type_name.to_owned()));
5114 if let Some(data) = file_data {
5115 object.remove("data_kind");
5116 object.insert("data".to_owned(), serde_json::to_value(data)?);
5117 }
5118 serde_json::from_value(value).context("failed to parse part kind")
5119}
5120
5121#[cfg(test)]
5122mod tests {
5123 #![allow(clippy::expect_used, clippy::unwrap_used)]
5124
5125 use super::*;
5126 use crate::{
5127 adapter::Extracted,
5128 handlers::ingest_events,
5129 wire::{FileData, Message, Part, PartKind, ProviderOptions, Session},
5130 };
5131 use chrono::Utc;
5132 use serde_json::json;
5133 use tempfile::TempDir;
5134
5135 fn synthetic_session(id: &str) -> Session {
5136 Session {
5137 id: id.to_owned(),
5138 parent_session_id: None,
5139 parent_message_id: None,
5140 source_agent: "claude-code".to_owned(),
5141 created_at: Utc::now(),
5142 project: crate::adapter::Extracted::from_test_value("/tmp/pond".to_owned()),
5143 options: ProviderOptions::new(),
5144 }
5145 }
5146
5147 #[test]
5148 fn search_text_excludes_injected_parts() {
5149 use crate::wire::Provenance;
5150 let message = Message::User {
5151 id: "m1".to_owned(),
5152 session_id: "s1".to_owned(),
5153 timestamp: Utc::now(),
5154 options: ProviderOptions::new(),
5155 };
5156 let text_part = |id: &str, text: &str, provenance: Provenance| Part {
5157 session_id: "s1".to_owned(),
5158 id: id.to_owned(),
5159 message_id: "m1".to_owned(),
5160 ordinal: 0,
5161 provenance,
5162 options: ProviderOptions::new(),
5163 kind: PartKind::Text {
5164 text: Some(Extracted::from_test_value(text.to_owned())),
5165 },
5166 };
5167
5168 let conversational = search_text(
5171 &message,
5172 &[text_part(
5173 "p1",
5174 "real human prompt",
5175 Provenance::Conversational,
5176 )],
5177 );
5178 assert_eq!(conversational.as_deref(), Some("real human prompt"));
5179
5180 let injected = search_text(
5181 &message,
5182 &[text_part(
5183 "p2",
5184 "<task-notification>...</task-notification>",
5185 Provenance::Injected,
5186 )],
5187 );
5188 assert!(
5189 injected.is_none(),
5190 "a message whose only part is injected has null search_text"
5191 );
5192 }
5193
5194 #[test]
5195 fn chunk_ranges_splits_on_byte_budget() {
5196 assert!(chunk_ranges(&[]).is_empty());
5197 assert_eq!(chunk_ranges(&[10, 10, 10]), vec![0..3]);
5198
5199 let two_thirds = COLUMN_BYTE_BUDGET * 2 / 3;
5200 assert_eq!(
5201 chunk_ranges(&[two_thirds, two_thirds, two_thirds]),
5202 vec![0..1, 1..2, 2..3],
5203 );
5204
5205 assert_eq!(
5207 chunk_ranges(&[10, COLUMN_BYTE_BUDGET + 1, 10]),
5208 vec![0..1, 1..2, 2..3],
5209 );
5210 }
5211
5212 #[tokio::test]
5213 async fn ordering_violation_drops_only_the_offending_event() -> anyhow::Result<()> {
5214 let temp = TempDir::new()?;
5219 let store = Store::open_local(temp.path()).await?;
5220 let session = synthetic_session("ordering");
5221 let orphan_part = Part {
5222 session_id: session.id.clone(),
5223 id: "orphan-part".to_owned(),
5224 message_id: "missing-message".to_owned(),
5225 ordinal: 0,
5226 provenance: crate::wire::Provenance::Conversational,
5227 options: ProviderOptions::new(),
5228 kind: PartKind::Text {
5229 text: Some(Extracted::from_test_value("orphan".to_owned())),
5230 },
5231 };
5232 let valid_message = Message::User {
5233 id: "valid-message".to_owned(),
5234 session_id: session.id.clone(),
5235 timestamp: Utc::now(),
5236 options: ProviderOptions::new(),
5237 };
5238 let valid_part = Part {
5239 session_id: session.id.clone(),
5240 id: "valid-part".to_owned(),
5241 message_id: valid_message.id().to_owned(),
5242 ordinal: 0,
5243 provenance: crate::wire::Provenance::Conversational,
5244 options: ProviderOptions::new(),
5245 kind: PartKind::Text {
5246 text: Some(Extracted::from_test_value("kept".to_owned())),
5247 },
5248 };
5249
5250 let mut validator = IngestValidator::default();
5251 validator
5252 .push(&store, 0, IngestEvent::Session(session.clone()))
5253 .await?;
5254 let part_outcomes = validator
5255 .push(&store, 1, IngestEvent::Part(orphan_part))
5256 .await?;
5257 assert_eq!(part_outcomes.len(), 1);
5258 assert_eq!(part_outcomes[0].kind, "part");
5259 assert_eq!(part_outcomes[0].status, OutcomeStatus::Error);
5260 assert!(
5261 part_outcomes[0]
5262 .error
5263 .as_ref()
5264 .map(|e| e.message.contains("part event appeared before a message"))
5265 .unwrap_or(false),
5266 "error message must explain the ordering violation: {part_outcomes:?}"
5267 );
5268 validator
5269 .push(&store, 2, IngestEvent::Message(valid_message))
5270 .await?;
5271 validator
5272 .push(&store, 3, IngestEvent::Part(valid_part))
5273 .await?;
5274 validator.finish(&store).await?;
5275
5276 let (sessions, messages, parts) = store.row_counts().await?;
5277 assert_eq!(sessions, 1, "session committed despite the orphan part");
5278 assert_eq!(messages, 1, "valid message committed");
5279 assert_eq!(parts, 1, "valid part committed; the orphan was dropped");
5280
5281 Ok(())
5282 }
5283
5284 #[tokio::test]
5285 async fn resident_meta_map_hydration_matches_take_rows_fallback() -> anyhow::Result<()> {
5286 let temp = TempDir::new()?;
5290 let store = Store::open_local(temp.path()).await?;
5291 let session = synthetic_session("hydration-parity");
5292
5293 let messages = [
5294 (
5295 "m1",
5296 "the auth refactor landed cleanly",
5297 1_700_000_000_123_456_i64,
5298 ),
5299 (
5300 "m2",
5301 "balance handler now retries on rpc timeout",
5302 1_700_000_050_654_321,
5303 ),
5304 ];
5305 let mut validator = IngestValidator::default();
5306 validator
5307 .push(&store, 0, IngestEvent::Session(session.clone()))
5308 .await?;
5309 let mut seq = 1;
5310 for (mid, text, micros) in messages {
5311 let message = Message::User {
5312 id: mid.to_owned(),
5313 session_id: session.id.clone(),
5314 timestamp: DateTime::from_timestamp_micros(micros).unwrap(),
5315 options: ProviderOptions::new(),
5316 };
5317 validator
5318 .push(&store, seq, IngestEvent::Message(message))
5319 .await?;
5320 seq += 1;
5321 let part = Part {
5322 session_id: session.id.clone(),
5323 id: format!("{mid}-p0"),
5324 message_id: mid.to_owned(),
5325 ordinal: 0,
5326 provenance: crate::wire::Provenance::Conversational,
5327 options: ProviderOptions::new(),
5328 kind: PartKind::Text {
5329 text: Some(Extracted::from_test_value(text.to_owned())),
5330 },
5331 };
5332 validator.push(&store, seq, IngestEvent::Part(part)).await?;
5333 seq += 1;
5334 }
5335 validator.finish(&store).await?;
5336
5337 let rowids: Vec<u64> = store
5338 .collect_row_metas()
5339 .await?
5340 .into_iter()
5341 .map(|entry| entry.row_id)
5342 .collect();
5343 assert_eq!(rowids.len(), 2);
5344
5345 let sort_by_id = |mut metas: Vec<MessageMeta>| {
5346 metas.sort_by(|left, right| left.message_id.cmp(&right.message_id));
5347 metas
5348 };
5349
5350 let fallback = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5351
5352 store.ensure_rowmap(&temp.path().join("cache")).await?;
5355 let resident = sort_by_id(store.message_metas_by_rowids(&rowids).await?);
5356
5357 assert_eq!(
5358 resident, fallback,
5359 "resident-map hydration must match the take_rows fallback"
5360 );
5361 assert_eq!(
5362 resident[0].timestamp.timestamp_micros(),
5363 1_700_000_000_123_456
5364 );
5365 Ok(())
5366 }
5367
5368 #[tokio::test]
5369 async fn initialized_flips_only_after_first_ingest() -> anyhow::Result<()> {
5370 let temp = TempDir::new()?;
5375 let store = Store::open_local(temp.path()).await?;
5376 assert!(
5377 !store.initialized().await?,
5378 "fresh store has no parts table"
5379 );
5380
5381 let session = synthetic_session("initialized-probe");
5382 let message = Message::User {
5383 id: "message-1".to_owned(),
5384 session_id: session.id.clone(),
5385 timestamp: Utc::now(),
5386 options: ProviderOptions::new(),
5387 };
5388 let part = Part {
5389 session_id: session.id.clone(),
5390 id: "part-1".to_owned(),
5391 message_id: message.id().to_owned(),
5392 ordinal: 0,
5393 provenance: crate::wire::Provenance::Conversational,
5394 options: ProviderOptions::new(),
5395 kind: PartKind::Text {
5396 text: Some(Extracted::from_test_value("hello".to_owned())),
5397 },
5398 };
5399 let mut validator = IngestValidator::default();
5400 validator
5401 .push(&store, 0, IngestEvent::Session(session))
5402 .await?;
5403 validator
5404 .push(&store, 1, IngestEvent::Message(message))
5405 .await?;
5406 validator.push(&store, 2, IngestEvent::Part(part)).await?;
5407 validator.finish(&store).await?;
5408
5409 assert!(store.initialized().await?, "ingest creates the parts table");
5410 Ok(())
5411 }
5412
5413 #[tokio::test]
5414 async fn duplicate_message_id_drops_the_second_keeps_the_first() -> anyhow::Result<()> {
5415 let temp = TempDir::new()?;
5419 let store = Store::open_local(temp.path()).await?;
5420 let session = synthetic_session("duplicate-message");
5421 let first = Message::User {
5422 id: "message-1".to_owned(),
5423 session_id: session.id.clone(),
5424 timestamp: Utc::now(),
5425 options: ProviderOptions::new(),
5426 };
5427 let second = Message::Assistant {
5428 id: "message-1".to_owned(),
5429 session_id: session.id.clone(),
5430 timestamp: Utc::now(),
5431 options: ProviderOptions::new(),
5432 };
5433
5434 let mut validator = IngestValidator::default();
5435 validator
5436 .push(&store, 0, IngestEvent::Session(session.clone()))
5437 .await?;
5438 validator
5439 .push(&store, 1, IngestEvent::Message(first))
5440 .await?;
5441 let dup_outcomes = validator
5442 .push(&store, 2, IngestEvent::Message(second))
5443 .await?;
5444 assert_eq!(dup_outcomes.len(), 1);
5445 assert_eq!(dup_outcomes[0].status, OutcomeStatus::Error);
5446 assert!(
5447 dup_outcomes[0]
5448 .error
5449 .as_ref()
5450 .map(|e| e.message.contains("duplicate message id message-1"))
5451 .unwrap_or(false),
5452 "duplicate-id rejection must name the offending id: {dup_outcomes:?}"
5453 );
5454
5455 validator.finish(&store).await?;
5456 let (sessions, messages, _) = store.row_counts().await?;
5457 assert_eq!(sessions, 1, "session committed");
5458 assert_eq!(messages, 1, "only the first message committed");
5459
5460 Ok(())
5461 }
5462
5463 #[tokio::test]
5464 async fn ingest_stamps_host_provenance_on_messages_and_strips_spoofed_pond_key()
5465 -> anyhow::Result<()> {
5466 let temp = TempDir::new()?;
5470 let store = Store::open_local(temp.path()).await?;
5471 let session = synthetic_session("host-provenance");
5472 let mut spoofed = ProviderOptions::new();
5473 spoofed.insert("pond".to_owned(), json!({"ingest": {"host": "spoofed"}}));
5474 let message = Message::User {
5475 id: "message-1".to_owned(),
5476 session_id: session.id.clone(),
5477 timestamp: Utc::now(),
5478 options: spoofed,
5479 };
5480 let part = Part {
5481 session_id: session.id.clone(),
5482 id: "part-1".to_owned(),
5483 message_id: "message-1".to_owned(),
5484 ordinal: 0,
5485 provenance: crate::wire::Provenance::Conversational,
5486 options: ProviderOptions::new(),
5487 kind: PartKind::Text {
5488 text: Some(Extracted::from_test_value("hello".to_owned())),
5489 },
5490 };
5491
5492 let mut validator = IngestValidator::default();
5493 validator
5494 .push(&store, 0, IngestEvent::Session(session.clone()))
5495 .await?;
5496 validator
5497 .push(&store, 1, IngestEvent::Message(message))
5498 .await?;
5499 validator.push(&store, 2, IngestEvent::Part(part)).await?;
5500 validator.finish(&store).await?;
5501
5502 let stored = store
5503 .get_session(&session.id)
5504 .await?
5505 .expect("ingested session is readable");
5506 assert!(
5507 !stored.session.options.contains_key("pond"),
5508 "session rows are not stamped (attribution derives from messages)"
5509 );
5510 let stored_message = &stored.messages[0].message;
5511 match ingest_host_stamp() {
5512 Some(stamp) => {
5513 assert_eq!(
5514 stored_message.options().get("pond"),
5515 Some(stamp),
5516 "stored message carries the real stamp, never the spoof"
5517 );
5518 let host = stamp
5519 .pointer("/ingest/host")
5520 .and_then(Value::as_object)
5521 .expect("stamp shape is {ingest: {host: {..}}}");
5522 assert!(!host.is_empty(), "an all-empty stamp must be None instead");
5523 assert!(
5524 host.values()
5525 .all(|v| v.as_str().is_some_and(|s| !s.is_empty())),
5526 "stamp fields are omitted when unavailable, never empty: {host:?}"
5527 );
5528 }
5529 None => assert!(
5530 stored_message.options().get("pond").is_none(),
5531 "with no resolvable stamp the spoofed key is still stripped"
5532 ),
5533 }
5534 assert!(
5535 !stored.messages[0].parts[0].options.contains_key("pond"),
5536 "part rows are not stamped (covered by their message's stamp)"
5537 );
5538
5539 Ok(())
5540 }
5541
5542 #[tokio::test(flavor = "multi_thread")]
5550 async fn optimize_indices_compacts_parts_with_blob_column() -> anyhow::Result<()> {
5551 use crate::wire::{FileData, PartKind, Provenance};
5552 let temp = TempDir::new()?;
5553 let store = Store::open_local(temp.path()).await?;
5554
5555 let session = synthetic_session("compact-blob");
5556 store
5557 .upsert_sessions(std::slice::from_ref(&session))
5558 .await?;
5559
5560 let make_part = |idx: usize, kind: PartKind| Part {
5561 session_id: session.id.clone(),
5562 message_id: format!("msg-{idx}"),
5563 id: format!("part-{idx}"),
5564 ordinal: 0,
5565 provenance: Provenance::Conversational,
5566 options: ProviderOptions::new(),
5567 kind,
5568 };
5569
5570 let batch_a = vec![
5571 make_part(
5572 0,
5573 PartKind::File {
5574 media_type: Some("text/plain".to_owned()),
5575 file_name: Some("a.txt".to_owned()),
5576 data: FileData::Bytes(b"alpha".to_vec()),
5577 },
5578 ),
5579 make_part(
5580 1,
5581 PartKind::File {
5582 media_type: Some("text/plain".to_owned()),
5583 file_name: Some("b.txt".to_owned()),
5584 data: FileData::String("beta".to_owned()),
5585 },
5586 ),
5587 ];
5588 store.upsert_parts(&batch_a).await?;
5589
5590 let batch_b = vec![
5591 make_part(
5592 2,
5593 PartKind::File {
5594 media_type: Some("application/octet-stream".to_owned()),
5595 file_name: None,
5596 data: FileData::Url("https://example.com/file".to_owned()),
5597 },
5598 ),
5599 make_part(
5600 3,
5601 PartKind::File {
5602 media_type: Some("image/png".to_owned()),
5603 file_name: Some("c.png".to_owned()),
5604 data: FileData::Bytes(vec![0x89, 0x50, 0x4e, 0x47]),
5605 },
5606 ),
5607 ];
5608 store.upsert_parts(&batch_b).await?;
5609
5610 store
5611 .optimize_indices(None, &MaintenancePolicy::always_compact())
5612 .await?
5613 .into_result()?;
5614
5615 Ok(())
5616 }
5617
5618 #[tokio::test]
5619 async fn file_part_blob_v2_round_trips_through_get() -> anyhow::Result<()> {
5620 let temp = TempDir::new()?;
5621 let store = Store::open_local(temp.path()).await?;
5622 let session = synthetic_session("blob");
5623 let message = Message::User {
5624 id: "message-1".to_owned(),
5625 session_id: session.id.clone(),
5626 timestamp: Utc::now(),
5627 options: ProviderOptions::new(),
5628 };
5629 let part = Part {
5630 session_id: session.id.clone(),
5631 id: "part-1".to_owned(),
5632 message_id: message.id().to_owned(),
5633 ordinal: 0,
5634 provenance: crate::wire::Provenance::Conversational,
5635 options: ProviderOptions::new(),
5636 kind: PartKind::File {
5637 media_type: Some("text/plain".to_owned()),
5638 file_name: Some("payload.txt".to_owned()),
5639 data: FileData::Bytes(b"pond".to_vec()),
5640 },
5641 };
5642
5643 let mut validator = IngestValidator::default();
5644 validator
5645 .push(&store, 0, IngestEvent::Session(session.clone()))
5646 .await?;
5647 validator
5648 .push(&store, 1, IngestEvent::Message(message.clone()))
5649 .await?;
5650 validator
5651 .push(&store, 2, IngestEvent::Part(part.clone()))
5652 .await?;
5653 validator.finish(&store).await?;
5654
5655 let stored = store
5656 .get_session(&session.id)
5657 .await?
5658 .expect("session should exist");
5659 let stored_part = &stored.messages[0].parts[0];
5660 assert_eq!(stored_part, &part);
5661
5662 Ok(())
5663 }
5664
5665 fn base_session() -> Session {
5676 Session {
5677 id: "01HXY00000000001".to_owned(),
5678 parent_session_id: None,
5679 parent_message_id: None,
5680 source_agent: "claude-code".to_owned(),
5681 created_at: Utc::now(),
5682 project: crate::adapter::Extracted::from_test_value("/home/me/proj".to_owned()),
5683 options: ProviderOptions::new(),
5684 }
5685 }
5686
5687 fn count_status(outcomes: &[RowOutcome], target: OutcomeStatus) -> usize {
5688 outcomes
5689 .iter()
5690 .filter(|outcome| outcome.status == target)
5691 .count()
5692 }
5693
5694 #[tokio::test(flavor = "multi_thread")]
5695 async fn re_ingesting_a_session_with_unchanged_immutable_fields_is_idempotent()
5696 -> anyhow::Result<()> {
5697 let temp = TempDir::new()?;
5698 let store = Store::open_local(temp.path()).await?;
5699
5700 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5701 assert_eq!(count_status(&first, OutcomeStatus::Inserted), 1);
5702
5703 let mut again = base_session();
5704 again.options.insert("title".to_owned(), json!("renamed"));
5705 let second = ingest_events(&store, vec![IngestEvent::Session(again)]).await?;
5706 assert_eq!(
5707 count_status(&second, OutcomeStatus::Error),
5708 0,
5709 "options is mutable; the re-ingest must not surface an error: {second:?}",
5710 );
5711 assert_eq!(
5712 count_status(&second, OutcomeStatus::Matched),
5713 1,
5714 "unchanged immutable fields must match-insert via merge_insert",
5715 );
5716
5717 Ok(())
5718 }
5719
5720 #[tokio::test(flavor = "multi_thread")]
5721 async fn re_ingesting_with_changed_source_agent_is_rejected() -> anyhow::Result<()> {
5722 let temp = TempDir::new()?;
5723 let store = Store::open_local(temp.path()).await?;
5724
5725 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5726 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5727
5728 let mut tampered = base_session();
5729 tampered.source_agent = "codex-cli".to_owned();
5730 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5731 assert_eq!(count_status(&second, OutcomeStatus::Error), 1);
5732 let err_row = second
5733 .iter()
5734 .find(|outcome| outcome.status == OutcomeStatus::Error)
5735 .expect("error outcome present");
5736 let err = err_row.error.as_ref().expect("error body present");
5737 assert_eq!(err.field, Some("source_agent"));
5738 assert_eq!(err.reason, Some("immutable"));
5739
5740 let stored = store
5742 .get_session(&base_session().id)
5743 .await?
5744 .expect("session row survives the rejected re-ingest");
5745 assert_eq!(stored.session.source_agent, "claude-code");
5746
5747 Ok(())
5748 }
5749
5750 #[tokio::test(flavor = "multi_thread")]
5751 async fn re_ingesting_with_changed_project_is_rejected() -> anyhow::Result<()> {
5752 let temp = TempDir::new()?;
5753 let store = Store::open_local(temp.path()).await?;
5754
5755 let first = ingest_events(&store, vec![IngestEvent::Session(base_session())]).await?;
5756 assert_eq!(count_status(&first, OutcomeStatus::Error), 0);
5757
5758 let mut tampered = base_session();
5759 tampered.project = crate::adapter::Extracted::from_test_value("/somewhere/else".to_owned());
5760 let second = ingest_events(&store, vec![IngestEvent::Session(tampered)]).await?;
5761 let err_row = second
5762 .iter()
5763 .find(|outcome| outcome.status == OutcomeStatus::Error)
5764 .expect("project change must surface an error outcome");
5765 assert_eq!(err_row.error.as_ref().unwrap().field, Some("project"));
5766
5767 let stored = store
5768 .get_session(&base_session().id)
5769 .await?
5770 .expect("session row survives");
5771 assert_eq!(
5772 stored.session.project.as_str(),
5773 "/home/me/proj",
5774 "stored project must remain the original",
5775 );
5776
5777 Ok(())
5778 }
5779
5780 #[tokio::test(flavor = "multi_thread")]
5781 async fn batched_flush_attributes_new_messages_on_existing_session() -> anyhow::Result<()> {
5782 use crate::wire::Provenance;
5790 let temp = TempDir::new()?;
5791 let store = Store::open_local(temp.path()).await?;
5792 let session = base_session();
5793
5794 let text_part = |part_id: &str, message_id: &str, body: &str| Part {
5795 session_id: session.id.clone(),
5796 id: part_id.to_owned(),
5797 message_id: message_id.to_owned(),
5798 ordinal: 0,
5799 provenance: Provenance::Conversational,
5800 options: ProviderOptions::new(),
5801 kind: PartKind::Text {
5802 text: Some(Extracted::from_test_value(body.to_owned())),
5803 },
5804 };
5805 let user_message = |id: &str| Message::User {
5806 id: id.to_owned(),
5807 session_id: session.id.clone(),
5808 timestamp: Utc::now(),
5809 options: ProviderOptions::new(),
5810 };
5811
5812 let mut validator = IngestValidator::default();
5814 validator
5815 .push(&store, 0, IngestEvent::Session(session.clone()))
5816 .await?;
5817 validator
5818 .push(&store, 1, IngestEvent::Message(user_message("m1")))
5819 .await?;
5820 validator
5821 .push(&store, 2, IngestEvent::Part(text_part("p1", "m1", "alpha")))
5822 .await?;
5823 validator
5824 .push(&store, 3, IngestEvent::Message(user_message("m2")))
5825 .await?;
5826 validator
5827 .push(&store, 4, IngestEvent::Part(text_part("p2", "m2", "beta")))
5828 .await?;
5829 let (_first_outcomes, first_counts) = validator.finish(&store).await?;
5830 assert_eq!(first_counts.sessions_inserted, 1);
5831 assert_eq!(first_counts.messages_inserted_total, 2);
5832 assert_eq!(first_counts.messages_inserted_searchable, 2);
5833
5834 let mut validator = IngestValidator::default();
5836 validator
5837 .push(&store, 0, IngestEvent::Session(session.clone()))
5838 .await?;
5839 for (idx, mid) in ["m3", "m4", "m5"].iter().enumerate() {
5840 let pid = format!("p{}", idx + 3);
5841 validator
5842 .push(&store, idx * 2 + 1, IngestEvent::Message(user_message(mid)))
5843 .await?;
5844 validator
5845 .push(
5846 &store,
5847 idx * 2 + 2,
5848 IngestEvent::Part(text_part(&pid, mid, "gamma")),
5849 )
5850 .await?;
5851 }
5852 let (second_outcomes, second_counts) = validator.finish(&store).await?;
5853
5854 assert_eq!(
5855 second_counts.sessions_inserted, 0,
5856 "existing session row must report as Matched, not Inserted",
5857 );
5858 assert_eq!(second_counts.sessions_matched, 1);
5859 assert_eq!(
5860 second_counts.messages_inserted_total, 3,
5861 "the three NEW messages must register as Inserted in BatchCounts",
5862 );
5863 assert_eq!(
5864 second_counts.messages_inserted_searchable, 3,
5865 "all three new messages carry conversational text -> searchable",
5866 );
5867 assert_eq!(second_counts.messages_matched_total, 0);
5868 assert_eq!(second_counts.parts_inserted, 3);
5869 assert_eq!(second_counts.parts_matched, 0);
5870
5871 let session_outcome = second_outcomes
5874 .iter()
5875 .find(|outcome| outcome.kind == "session")
5876 .expect("session-row outcome present");
5877 assert_eq!(session_outcome.status, OutcomeStatus::Matched);
5878 for outcome in &second_outcomes {
5879 if outcome.kind == "message" || outcome.kind == "part" {
5880 assert_eq!(
5881 outcome.status,
5882 OutcomeStatus::Inserted,
5883 "new row must be Inserted, got: {outcome:?}",
5884 );
5885 }
5886 }
5887 Ok(())
5888 }
5889
5890 async fn store_with_messages(
5894 temp: &TempDir,
5895 count: usize,
5896 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5897 store_with_messages_at_threshold(temp, count, VECTOR_INDEX_ACTIVATION_ROWS).await
5898 }
5899
5900 async fn store_with_messages_at_threshold(
5903 temp: &TempDir,
5904 count: usize,
5905 _vector_threshold: usize,
5906 ) -> anyhow::Result<(Store, Vec<MessageKey>)> {
5907 let store = Store::open_local(temp.path()).await?;
5908 let sessions = 8.min(count.max(1));
5909 let mut events = Vec::new();
5910 for s in 0..sessions {
5911 events.push(IngestEvent::Session(Session {
5912 id: format!("session-{s}"),
5913 parent_session_id: None,
5914 parent_message_id: None,
5915 source_agent: "claude-code".to_owned(),
5916 created_at: Utc::now(),
5917 project: Extracted::from_test_value(format!("/proj/{}", s % 4)),
5918 options: ProviderOptions::new(),
5919 }));
5920 for i in (s..count).step_by(sessions) {
5921 let message_id = format!("msg-{i}");
5922 events.push(IngestEvent::Message(Message::User {
5923 id: message_id.clone(),
5924 session_id: format!("session-{s}"),
5925 timestamp: Utc::now(),
5926 options: ProviderOptions::new(),
5927 }));
5928 events.push(IngestEvent::Part(Part {
5929 session_id: format!("session-{s}"),
5930 id: format!("{message_id}-part"),
5931 message_id,
5932 ordinal: 0,
5933 provenance: crate::wire::Provenance::Conversational,
5934 options: ProviderOptions::new(),
5935 kind: PartKind::Text {
5936 text: Some(Extracted::from_test_value(format!("synthetic message {i}"))),
5937 },
5938 }));
5939 }
5940 }
5941 ingest_events(&store, events).await?;
5942 let keys = (0..count)
5943 .map(|i| MessageKey {
5944 session_id: format!("session-{}", i % sessions),
5945 message_id: format!("msg-{i}"),
5946 })
5947 .collect();
5948 Ok((store, keys))
5949 }
5950
5951 fn synthetic_vector(seed: usize) -> Vec<f32> {
5953 let mut state = (seed as u64)
5954 .wrapping_mul(0x9E37_79B9_7F4A_7C15)
5955 .wrapping_add(1);
5956 (0..embedding_dim())
5957 .map(|_| {
5958 state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
5959 #[allow(clippy::cast_precision_loss)]
5960 let unit = (state >> 33) as f32 / (1u64 << 31) as f32;
5961 unit - 1.0
5962 })
5963 .collect()
5964 }
5965
5966 fn embedded(keys: &[MessageKey]) -> Vec<EmbeddedMessage> {
5968 keys.iter()
5969 .enumerate()
5970 .map(|(seed, key)| EmbeddedMessage {
5971 session_id: key.session_id.clone(),
5972 id: key.message_id.clone(),
5973 vector: synthetic_vector(seed),
5974 })
5975 .collect()
5976 }
5977
5978 fn embedding_update_batch_with_model(
5979 rows: &[EmbeddedMessage],
5980 model: &str,
5981 ) -> Result<RecordBatch> {
5982 let mut batch = embedding_update_batch(rows)?;
5983 let columns = batch
5984 .columns()
5985 .iter()
5986 .take(3)
5987 .cloned()
5988 .chain(std::iter::once(
5989 Arc::new(StringArray::from(vec![model; rows.len()])) as _,
5990 ))
5991 .collect::<Vec<_>>();
5992 batch = RecordBatch::try_new(batch.schema(), columns)?;
5993 Ok(batch)
5994 }
5995
5996 #[tokio::test]
5997 async fn filtered_vector_scan_pushes_scalar_predicate_into_the_index() -> anyhow::Result<()> {
5998 let temp = TempDir::new()?;
5999 let (store, keys) = store_with_messages(&temp, 4).await?;
6003 store.write_embeddings(&embedded(&keys)).await?;
6004 store
6005 .optimize_indices(None, &MaintenancePolicy::always_compact())
6006 .await?
6007 .into_result()?;
6008
6009 let query = vec![0.01_f32; embedding_dim()];
6010 let plan = store
6011 .explain_vector_plan(
6012 &query,
6013 10,
6014 &Predicate::Eq("session_id", "session-3".into()),
6015 None,
6016 )
6017 .await?;
6018
6019 assert!(
6024 plan.contains("ScalarIndexQuery"),
6025 "expected a ScalarIndexQuery node in the plan:\n{plan}",
6026 );
6027 let predicate_postfiltered = plan
6028 .lines()
6029 .any(|line| line.contains("FilterExec") && line.contains("session_id"));
6030 assert!(
6031 !predicate_postfiltered,
6032 "the scalar predicate must not fall back to a FilterExec postfilter:\n{plan}",
6033 );
6034 Ok(())
6035 }
6036
6037 #[tokio::test]
6038 async fn vector_index_activates_when_threshold_is_crossed() -> anyhow::Result<()> {
6039 let temp = TempDir::new()?;
6040 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6041
6042 store.write_embeddings(&embedded(&keys[..255])).await?;
6045 store
6046 .optimize_indices_with_vector_threshold(256)
6047 .await?
6048 .into_result()?;
6049 assert!(
6050 !store
6051 .handle
6052 .messages_index_names()
6053 .await?
6054 .iter()
6055 .any(|name| name == MESSAGES_VECTOR_INDEX),
6056 "IVF_SQ must not exist below the activation threshold",
6057 );
6058
6059 store.write_embeddings(&embedded(&keys[255..256])).await?;
6062 store
6063 .optimize_indices_with_vector_threshold(256)
6064 .await?
6065 .into_result()?;
6066 assert!(
6067 store
6068 .handle
6069 .messages_index_names()
6070 .await?
6071 .iter()
6072 .any(|name| name == MESSAGES_VECTOR_INDEX),
6073 "optimize must create the IVF_SQ once the threshold is crossed",
6074 );
6075
6076 let hits = store
6079 .vector_search(&synthetic_vector(0), 10, &Predicate::And(Vec::new()), None)
6080 .await?;
6081 assert!(
6082 hits.iter().any(|hit| hit.key == keys[0]),
6083 "an embedded row is retrievable via the index",
6084 );
6085 Ok(())
6086 }
6087
6088 #[tokio::test]
6089 async fn model_swap_force_re_embeds_only_stale_rows_and_rebuilds_ivf_pq() -> anyhow::Result<()>
6090 {
6091 let temp = TempDir::new()?;
6092 let (store, keys) = store_with_messages_at_threshold(&temp, 300, 256).await?;
6093 let old_rows = embedded(&keys);
6094 let old_batch = embedding_update_batch_with_model(&old_rows, "old-model")?;
6095 store
6096 .handle
6097 .merge_update(Table::Messages, old_batch, old_rows.len())
6098 .await?;
6099 store
6100 .optimize_indices_with_vector_threshold(256)
6101 .await?
6102 .into_result()?;
6103 assert!(
6104 store
6105 .handle
6106 .messages_index_names()
6107 .await?
6108 .iter()
6109 .any(|name| name == MESSAGES_VECTOR_INDEX),
6110 "IVF_SQ must exist before a model swap",
6111 );
6112 assert_eq!(store.stale_embedding_count().await?, keys.len());
6113
6114 store.drop_vector_index().await?;
6115 let mut pending = Vec::new();
6116 let stream = store.pending_or_stale_messages();
6117 tokio::pin!(stream);
6118 while let Some(row) = stream.next().await {
6119 pending.push(row?);
6120 }
6121 assert_eq!(
6122 pending.len(),
6123 keys.len(),
6124 "force stream should see stale rows"
6125 );
6126 store.write_embeddings(&embedded(&keys)).await?;
6127 assert_eq!(store.stale_embedding_count().await?, 0);
6128 store
6129 .optimize_indices_with_vector_threshold(256)
6130 .await?
6131 .into_result()?;
6132 assert!(
6133 store
6134 .handle
6135 .messages_index_names()
6136 .await?
6137 .iter()
6138 .any(|name| name == MESSAGES_VECTOR_INDEX),
6139 "optimize must rebuild IVF_SQ after force re-embed",
6140 );
6141
6142 let stream = store.pending_or_stale_messages();
6143 tokio::pin!(stream);
6144 assert!(stream.next().await.is_none(), "up-to-date rows are skipped");
6145 Ok(())
6146 }
6147
6148 #[tokio::test]
6149 async fn session_last_message_ids_come_from_durable_messages() -> anyhow::Result<()> {
6150 let temp = TempDir::new()?;
6151 let store = Store::open_local(temp.path()).await?;
6152 let session = synthetic_session("oracle");
6153 store
6154 .upsert_sessions(std::slice::from_ref(&session))
6155 .await?;
6156 let timestamp =
6157 chrono::DateTime::from_timestamp(1_700_000_000, 0).expect("valid timestamp");
6158 let message_a = Message::User {
6159 id: "oracle-a".to_owned(),
6160 session_id: session.id.clone(),
6161 timestamp,
6162 options: ProviderOptions::new(),
6163 };
6164 let message_b = Message::User {
6165 id: "oracle-b".to_owned(),
6166 session_id: session.id.clone(),
6167 timestamp,
6168 options: ProviderOptions::new(),
6169 };
6170 store
6171 .upsert_messages(
6172 &session,
6173 &[
6174 MessageWrite {
6175 message: &message_a,
6176 parts: &[],
6177 search_text: Some("a"),
6178 },
6179 MessageWrite {
6180 message: &message_b,
6181 parts: &[],
6182 search_text: Some("b"),
6183 },
6184 ],
6185 )
6186 .await?;
6187
6188 let empty_session = synthetic_session("session-row-only");
6189 store.upsert_sessions(&[empty_session]).await?;
6190
6191 let orphan = synthetic_session("messages-no-row");
6195 let orphan_message = Message::User {
6196 id: "orphan-a".to_owned(),
6197 session_id: orphan.id.clone(),
6198 timestamp,
6199 options: ProviderOptions::new(),
6200 };
6201 store
6202 .upsert_messages(
6203 &orphan,
6204 &[MessageWrite {
6205 message: &orphan_message,
6206 parts: &[],
6207 search_text: Some("a"),
6208 }],
6209 )
6210 .await?;
6211
6212 let map = store.session_last_message_ids().await?;
6213 assert_eq!(map.get("oracle").map(String::as_str), Some("oracle-b"));
6214 assert!(
6215 !map.contains_key("session-row-only"),
6216 "a session row without durable messages must not produce a freshness key",
6217 );
6218 assert!(
6219 !map.contains_key("messages-no-row"),
6220 "messages without a durable session row must not produce a freshness key",
6221 );
6222 Ok(())
6223 }
6224
6225 #[tokio::test]
6226 async fn embedding_progress_counts_embedded_and_eligible_rows() -> anyhow::Result<()> {
6227 let temp = TempDir::new()?;
6228 let (store, keys) = store_with_messages(&temp, 10).await?;
6229
6230 let before = store.embedding_progress().await?;
6231 assert_eq!(before.embedded, 0);
6232 assert_eq!(before.total, 10);
6233 assert_eq!(before.backlog, 10);
6234 assert_eq!(before.model, crate::embed::model_id());
6235
6236 store.write_embeddings(&embedded(&keys[..4])).await?;
6237 let partial = store.embedding_progress().await?;
6238 assert_eq!(partial.embedded, 4);
6239 assert_eq!(partial.total, 10);
6240 assert_eq!(partial.backlog, 6);
6241
6242 store.write_embeddings(&embedded(&keys[4..])).await?;
6243 let full = store.embedding_progress().await?;
6244 assert_eq!(full.embedded, 10);
6245 assert_eq!(full.total, 10);
6246 assert_eq!(full.backlog, 0);
6249 assert_eq!(full.backlog, store.embed_backlog_count().await?);
6250 Ok(())
6251 }
6252
6253 #[tokio::test]
6254 async fn ensure_rowmap_layers_a_delta_on_new_ingest() -> anyhow::Result<()> {
6255 let temp = TempDir::new()?;
6256 let (store, _keys) = store_with_messages(&temp, 6).await?;
6257 let cache = temp.path().join("cache");
6258
6259 store.ensure_rowmap(&cache).await?;
6260 assert_eq!(
6261 store.rowmap_delta_count(),
6262 Some(0),
6263 "first build is a lone base"
6264 );
6265
6266 ingest_events(
6268 &store,
6269 vec![
6270 IngestEvent::Session(Session {
6271 id: "session-new".to_owned(),
6272 parent_session_id: None,
6273 parent_message_id: None,
6274 source_agent: "claude-code".to_owned(),
6275 created_at: Utc::now(),
6276 project: Extracted::from_test_value("/proj/new".to_owned()),
6277 options: ProviderOptions::new(),
6278 }),
6279 IngestEvent::Message(Message::User {
6280 id: "m-new".to_owned(),
6281 session_id: "session-new".to_owned(),
6282 timestamp: Utc::now(),
6283 options: ProviderOptions::new(),
6284 }),
6285 IngestEvent::Part(Part {
6286 session_id: "session-new".to_owned(),
6287 id: "m-new-part".to_owned(),
6288 message_id: "m-new".to_owned(),
6289 ordinal: 0,
6290 provenance: crate::wire::Provenance::Conversational,
6291 options: ProviderOptions::new(),
6292 kind: PartKind::Text {
6293 text: Some(Extracted::from_test_value("brand new message".to_owned())),
6294 },
6295 }),
6296 ],
6297 )
6298 .await?;
6299
6300 store.ensure_rowmap(&cache).await?;
6303 assert_eq!(
6304 store.rowmap_delta_count(),
6305 Some(1),
6306 "new ingest layered a delta"
6307 );
6308
6309 let counts = store
6311 .session_message_counts(&["session-new".to_owned()])
6312 .await?;
6313 assert_eq!(counts.get("session-new").copied(), Some(1));
6314 Ok(())
6315 }
6316
6317 #[tokio::test]
6318 async fn rowmap_chain_compacts_and_stays_bounded() -> anyhow::Result<()> {
6319 let temp = TempDir::new()?;
6322 let (store, _keys) = store_with_messages(&temp, 4).await?;
6323 let cache = temp.path().join("cache");
6324 store.ensure_rowmap(&cache).await?;
6325
6326 let mut reached_cap = false;
6327 let mut compacted = false;
6328 for i in 0..(Store::MAX_ROWMAP_DELTAS + 2) {
6329 let session = format!("session-x{i}");
6330 ingest_events(
6331 &store,
6332 vec![
6333 IngestEvent::Session(Session {
6334 id: session.clone(),
6335 parent_session_id: None,
6336 parent_message_id: None,
6337 source_agent: "claude-code".to_owned(),
6338 created_at: Utc::now(),
6339 project: Extracted::from_test_value("/proj/x".to_owned()),
6340 options: ProviderOptions::new(),
6341 }),
6342 IngestEvent::Message(Message::User {
6343 id: format!("mx{i}"),
6344 session_id: session.clone(),
6345 timestamp: Utc::now(),
6346 options: ProviderOptions::new(),
6347 }),
6348 IngestEvent::Part(Part {
6349 session_id: session.clone(),
6350 id: format!("mx{i}-part"),
6351 message_id: format!("mx{i}"),
6352 ordinal: 0,
6353 provenance: crate::wire::Provenance::Conversational,
6354 options: ProviderOptions::new(),
6355 kind: PartKind::Text {
6356 text: Some(Extracted::from_test_value(format!("msg {i}"))),
6357 },
6358 }),
6359 ],
6360 )
6361 .await?;
6362 store.ensure_rowmap(&cache).await?;
6363 let deltas = store.rowmap_delta_count().unwrap();
6364 assert!(
6365 deltas <= Store::MAX_ROWMAP_DELTAS,
6366 "delta count {deltas} exceeded the cap",
6367 );
6368 if deltas == Store::MAX_ROWMAP_DELTAS {
6369 reached_cap = true;
6370 }
6371 if reached_cap && deltas < Store::MAX_ROWMAP_DELTAS {
6372 compacted = true;
6373 }
6374 }
6375 assert!(reached_cap, "deltas accumulated to the cap (append path)");
6376 assert!(compacted, "the chain compacted back into a base");
6377
6378 let mut rmm = 0;
6380 for entry in std::fs::read_dir(&cache)? {
6381 let name = entry?.file_name().into_string().unwrap_or_default();
6382 assert!(!name.contains(".tmp-"), "leaked build temp: {name}");
6383 if name.ends_with(".rmm") {
6384 rmm += 1;
6385 }
6386 }
6387 assert!(
6388 rmm <= Store::MAX_ROWMAP_DELTAS + 1,
6389 "files unbounded: {rmm}"
6390 );
6391 Ok(())
6392 }
6393
6394 #[tokio::test]
6395 async fn embed_backlog_count_tracks_eligible_unembedded_rows() -> anyhow::Result<()> {
6396 let temp = TempDir::new()?;
6397 let (store, keys) = store_with_messages(&temp, 10).await?;
6398
6399 assert_eq!(store.embed_backlog_count().await?, 10);
6402
6403 store.write_embeddings(&embedded(&keys[..4])).await?;
6404 assert_eq!(store.embed_backlog_count().await?, 6);
6405
6406 store.write_embeddings(&embedded(&keys[4..])).await?;
6407 assert_eq!(store.embed_backlog_count().await?, 0);
6408 Ok(())
6409 }
6410
6411 #[tokio::test]
6412 async fn session_message_counts_returns_per_session_counts_with_zeros_for_unknown_sessions()
6413 -> anyhow::Result<()> {
6414 let temp = TempDir::new()?;
6417 let (store, _keys) = store_with_messages(&temp, 32).await?;
6418
6419 let mut requested: Vec<String> = (0..8).map(|s| format!("session-{s}")).collect();
6420 requested.push("session-unknown-a".to_owned());
6421 requested.push("session-unknown-b".to_owned());
6422 let counts = store.session_message_counts(&requested).await?;
6423
6424 assert_eq!(counts.len(), requested.len());
6427 for s in 0..8 {
6428 assert_eq!(
6429 counts.get(&format!("session-{s}")).copied(),
6430 Some(4),
6431 "session-{s} should have 4 messages",
6432 );
6433 }
6434 assert_eq!(counts.get("session-unknown-a").copied(), Some(0));
6435 assert_eq!(counts.get("session-unknown-b").copied(), Some(0));
6436
6437 let empty = store.session_message_counts(&[]).await?;
6439 assert!(empty.is_empty());
6440 Ok(())
6441 }
6442}