1use std::cmp::Ordering;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4use std::time::Duration;
5
6use arrow_array::builder::{
7 FixedSizeListBuilder, Float32Builder, Int32Builder, LargeBinaryBuilder, LargeStringBuilder,
8 ListBuilder, StringBuilder, StringDictionaryBuilder, StructBuilder,
9 TimestampMicrosecondBuilder,
10};
11use arrow_array::types::Int8Type;
12use arrow_array::{
13 Array, ArrayRef, DictionaryArray, FixedSizeListArray, Float32Array, Int32Array,
14 LargeBinaryArray, LargeStringArray, ListArray, RecordBatch, RecordBatchIterator, StringArray,
15 StructArray, TimestampMicrosecondArray,
16};
17use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, TimeUnit};
18use chrono::{DateTime, Timelike, Utc};
19use futures::TryStreamExt;
20use lance::dataset::mem_wal::{
21 DatasetMemWalExt, LsmScanner, ShardManifestStore, ShardSnapshot, ShardWriterConfig,
22};
23use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptions};
24use lance::dataset::NewColumnTransform;
25use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams};
26use lance::index::DatasetIndexExt;
27use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
28use lance::{Error as LanceError, Result as LanceResult};
29use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
30use lance_index::scalar::ScalarIndexParams;
31use lance_index::IndexType;
32use tokio::sync::Mutex;
33use tokio::task::JoinHandle;
34use tracing::{error, info, warn};
35use uuid::Uuid;
36
37use crate::record::{
38 ContextRecord, LifecycleQueryOptions, RecordFilters, RecordPatch, Relationship, RetrieveResult,
39 SearchResult, StateMetadata, UpdateResult, UpsertResult, LIFECYCLE_ACTIVE,
40};
41use crate::serde::CONTENT_TYPE_TOMBSTONE;
42
43const DEFAULT_EMBEDDING_DIM: i32 = 1536;
45const DEFAULT_SEARCH_LIMIT: usize = 10;
46const DEFAULT_MANIFEST_SCAN_BATCH_SIZE: usize = 16;
47const RRF_K: f32 = 60.0;
48const ID_INDEX_NAME: &str = "id_idx";
49const RELATIONSHIPS_COLUMN: &str = "relationships";
50const DISTANCE_METRIC_METADATA_KEY: &str = "lance-context:distance_metric";
53
54#[derive(Debug, Clone)]
56pub struct CompactionConfig {
57 pub enabled: bool,
59 pub min_fragments: usize,
61 pub target_rows_per_fragment: usize,
63 pub max_rows_per_group: usize,
65 pub materialize_deletions: bool,
67 pub materialize_deletions_threshold: f32,
69 pub num_threads: Option<usize>,
71 pub check_interval_secs: u64,
73 pub quiet_hours: Vec<(u8, u8)>,
75}
76
77impl Default for CompactionConfig {
78 fn default() -> Self {
79 Self {
80 enabled: false,
81 min_fragments: 5,
82 target_rows_per_fragment: 1_000_000,
83 max_rows_per_group: 1024,
84 materialize_deletions: true,
85 materialize_deletions_threshold: 0.1,
86 num_threads: None,
87 check_interval_secs: 300,
88 quiet_hours: vec![],
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum IdIndexType {
96 #[default]
98 None,
99 ZoneMap,
101 BTree,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum DistanceMetric {
111 #[default]
113 L2,
114 Cosine,
117 Dot,
119}
120
121impl DistanceMetric {
122 pub fn parse(value: &str) -> LanceResult<Self> {
128 match value.to_ascii_lowercase().as_str() {
129 "l2" | "euclidean" => Ok(Self::L2),
130 "cosine" => Ok(Self::Cosine),
131 "dot" | "dot_product" => Ok(Self::Dot),
132 other => Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
133 "invalid distance metric '{other}': valid values are 'l2', 'cosine', 'dot'"
134 )))),
135 }
136 }
137
138 #[must_use]
142 pub fn distance(self, query: &[f32], candidate: &[f32]) -> f32 {
143 match self {
144 Self::L2 => l2_distance(query, candidate),
145 Self::Cosine => cosine_distance(query, candidate),
146 Self::Dot => dot_distance(query, candidate),
147 }
148 }
149
150 #[must_use]
153 pub fn as_str(self) -> &'static str {
154 match self {
155 Self::L2 => "l2",
156 Self::Cosine => "cosine",
157 Self::Dot => "dot",
158 }
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct CompactionStats {
165 pub total_fragments: usize,
167 pub is_compacting: bool,
169 pub last_compaction: Option<DateTime<Utc>>,
171 pub last_error: Option<String>,
173 pub total_compactions: u64,
175}
176
177struct CompactionState {
179 background_task: Option<JoinHandle<()>>,
180 is_compacting: bool,
181 last_compaction: Option<DateTime<Utc>>,
182 last_error: Option<String>,
183 total_compactions: u64,
184}
185
186const VALID_BLOB_COLUMNS: &[&str] = &["text_payload", "binary_payload"];
188
189#[derive(Clone)]
191pub struct ContextStore {
192 dataset: Dataset,
193 compaction_state: Arc<Mutex<CompactionState>>,
194 pub compaction_config: CompactionConfig,
195 blob_columns: HashSet<String>,
196 id_index_type: IdIndexType,
197 embedding_dim: i32,
198 distance_metric: DistanceMetric,
199}
200
201#[derive(Debug, Clone, Default)]
203pub struct ContextStoreOptions {
204 pub storage_options: Option<HashMap<String, String>>,
205 pub compaction: CompactionConfig,
206 pub embedding_dim: Option<i32>,
209 pub blob_columns: HashSet<String>,
212 pub id_index_type: IdIndexType,
214 pub distance_metric: Option<DistanceMetric>,
221}
222
223impl ContextStoreOptions {
224 #[must_use]
225 pub fn storage_options(&self) -> Option<HashMap<String, String>> {
226 self.storage_options.clone()
227 }
228}
229
230fn relationship_struct_fields() -> Vec<Field> {
231 vec![
232 Field::new("target_id", DataType::Utf8, true),
233 Field::new("relation", DataType::Utf8, true),
234 Field::new("weight", DataType::Float32, true),
235 ]
236}
237
238fn relationship_struct_data_type() -> DataType {
239 DataType::Struct(relationship_struct_fields().into())
240}
241
242fn relationship_list_item_field() -> FieldRef {
243 Arc::new(Field::new("item", relationship_struct_data_type(), true))
244}
245
246fn relationship_field() -> Field {
247 Field::new(
248 RELATIONSHIPS_COLUMN,
249 DataType::List(relationship_list_item_field()),
250 true,
251 )
252}
253
254fn relationship_struct_builder() -> StructBuilder {
255 let fields: Vec<FieldRef> = relationship_struct_fields()
256 .into_iter()
257 .map(|field| Arc::new(field) as FieldRef)
258 .collect();
259 StructBuilder::new(
260 fields,
261 vec![
262 Box::new(StringBuilder::new()),
263 Box::new(StringBuilder::new()),
264 Box::new(Float32Builder::new()),
265 ],
266 )
267}
268
269#[derive(Default)]
271struct ExternalIdState {
272 visible_ids: Vec<String>,
275 has_non_tombstone: bool,
278}
279
280impl ContextStore {
281 pub async fn open(uri: &str) -> LanceResult<Self> {
283 Self::open_with_options(uri, ContextStoreOptions::default()).await
284 }
285
286 pub async fn open_with_options(uri: &str, options: ContextStoreOptions) -> LanceResult<Self> {
288 for col in &options.blob_columns {
290 if !VALID_BLOB_COLUMNS.contains(&col.as_str()) {
291 return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
292 "invalid blob column '{}': valid columns are {:?}",
293 col, VALID_BLOB_COLUMNS
294 ))));
295 }
296 }
297
298 let requested_embedding_dim = match options.embedding_dim {
299 Some(dim) => {
300 validate_embedding_dim(dim)?;
301 dim
302 }
303 None => DEFAULT_EMBEDDING_DIM,
304 };
305 let storage_options = options.storage_options();
306 let blob_columns = options.blob_columns.clone();
307 let (dataset, created) = match Self::load_with_options(uri, storage_options.clone()).await {
308 Ok(dataset) => (dataset, false),
309 Err(LanceError::DatasetNotFound { .. }) => {
310 let dataset = Self::create_with_options(
311 uri,
312 storage_options,
313 &blob_columns,
314 requested_embedding_dim,
315 options.distance_metric.unwrap_or_default(),
316 )
317 .await?;
318 (dataset, true)
319 }
320 Err(err) => return Err(err),
321 };
322 let arrow_schema: Schema = dataset.schema().into();
323 let embedding_dim = embedding_dim_from_schema(&arrow_schema)?;
324 if !created && options.embedding_dim.is_some() && embedding_dim != requested_embedding_dim {
325 return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
326 "existing context embedding dimension {} does not match requested dimension {}",
327 embedding_dim, requested_embedding_dim
328 ))));
329 }
330 let distance_metric = distance_metric_from_schema(&arrow_schema)?;
331 if !created {
332 if let Some(requested) = options.distance_metric {
333 if requested != distance_metric {
334 return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
335 "existing context distance metric '{}' does not match requested metric '{}'",
336 distance_metric.as_str(),
337 requested.as_str()
338 ))));
339 }
340 }
341 }
342
343 let mut store = Self {
344 dataset,
345 compaction_state: Arc::new(Mutex::new(CompactionState {
346 background_task: None,
347 is_compacting: false,
348 last_compaction: None,
349 last_error: None,
350 total_compactions: 0,
351 })),
352 compaction_config: options.compaction,
353 blob_columns,
354 id_index_type: options.id_index_type,
355 embedding_dim,
356 distance_metric,
357 };
358
359 store.ensure_id_index().await?;
361
362 store.start_background_compaction().await?;
364
365 Ok(store)
366 }
367
368 #[must_use]
370 pub fn embedding_dim(&self) -> i32 {
371 self.embedding_dim
372 }
373
374 #[must_use]
376 pub fn uri(&self) -> &str {
377 self.dataset.uri()
378 }
379
380 #[must_use]
382 pub fn distance_metric(&self) -> DistanceMetric {
383 self.distance_metric
384 }
385
386 pub async fn add(&mut self, entries: &[ContextRecord]) -> LanceResult<u64> {
388 if entries.is_empty() {
389 return Ok(self.dataset.manifest.version);
390 }
391
392 self.validate_unique_ids(entries).await?;
393 self.write_entries(entries).await
394 }
395
396 async fn write_entries(&mut self, entries: &[ContextRecord]) -> LanceResult<u64> {
397 if entries.is_empty() {
398 return Ok(self.dataset.manifest.version);
399 }
400
401 let mut groups: HashMap<(Option<String>, Option<String>), Vec<ContextRecord>> =
403 HashMap::new();
404 for entry in entries {
405 let key = (entry.bot_id.clone(), entry.session_id.clone());
406 groups.entry(key).or_default().push(entry.clone());
407 }
408
409 {
411 let indices = self.dataset.load_indices().await?;
412 let has_mem_wal = indices.iter().any(|i| i.name == MEM_WAL_INDEX_NAME);
413
414 if !has_mem_wal {
415 let maintained_indexes: Vec<String> = indices
417 .iter()
418 .filter(|i| {
419 !(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)
420 })
421 .map(|i| i.name.clone())
422 .collect();
423 self.dataset
424 .initialize_mem_wal()
425 .unsharded()
426 .maintained_indexes(maintained_indexes)
427 .execute()
428 .await?;
429 }
430 }
431
432 for ((bot_id, session_id), group_entries) in groups {
433 let region_id = Self::derive_region_id(&bot_id, &session_id);
434 let batch = self.records_to_batch(&group_entries)?;
435 let config = ShardWriterConfig {
436 shard_id: region_id,
437 ..Default::default()
438 };
439
440 let writer = self.dataset.mem_wal_writer(region_id, config).await?;
441 writer.put(vec![batch]).await?;
442 writer.close().await?;
443 }
444
445 Ok(self.dataset.manifest.version)
446 }
447
448 pub async fn delete_by_id(&mut self, id: &str) -> LanceResult<bool> {
453 let Some(record) = self.get_by_id(id).await? else {
454 return Ok(false);
455 };
456 self.write_tombstone_for(record).await?;
457 Ok(true)
458 }
459
460 pub async fn delete_by_external_id(&mut self, external_id: &str) -> LanceResult<bool> {
462 let Some(record) = self.get_by_external_id(external_id).await? else {
463 return Ok(false);
464 };
465 self.write_tombstone_for(record).await?;
466 Ok(true)
467 }
468
469 pub async fn upsert_by_external_id(
477 &mut self,
478 mut record: ContextRecord,
479 ) -> LanceResult<UpsertResult> {
480 let Some(external_id) = record.external_id.clone() else {
481 return Err(ArrowError::InvalidArgumentError(
482 "upsert_by_external_id requires external_id".to_string(),
483 )
484 .into());
485 };
486 if external_id.is_empty() {
487 return Err(ArrowError::InvalidArgumentError(
488 "upsert_by_external_id requires a non-empty external_id".to_string(),
489 )
490 .into());
491 }
492 if record.is_tombstone() {
493 return Err(ArrowError::InvalidArgumentError(format!(
494 "content_type '{}' is reserved for internal tombstones",
495 CONTENT_TYPE_TOMBSTONE
496 ))
497 .into());
498 }
499 record.supersedes_id = None;
500 record.superseded_by_id = None;
501 self.validate_new_record_id(&record).await?;
502
503 let matches: Vec<ContextRecord> = self
504 .list(None, None)
505 .await?
506 .into_iter()
507 .filter(|existing| existing.external_id.as_deref() == Some(external_id.as_str()))
508 .collect();
509
510 match matches.as_slice() {
511 [] => {
512 let version = self.add(std::slice::from_ref(&record)).await?;
513 Ok(UpsertResult {
514 record,
515 inserted: true,
516 replaced_id: None,
517 version,
518 })
519 }
520 [existing] => {
521 record.supersedes_id = Some(existing.id.clone());
522 let version = self.write_entries(std::slice::from_ref(&record)).await?;
523 Ok(UpsertResult {
524 record,
525 inserted: false,
526 replaced_id: Some(existing.id.clone()),
527 version,
528 })
529 }
530 _ => Err(ArrowError::InvalidArgumentError(format!(
531 "external_id '{}' matches multiple visible records",
532 external_id
533 ))
534 .into()),
535 }
536 }
537
538 pub async fn upsert_many_by_external_id(
565 &mut self,
566 mut records: Vec<ContextRecord>,
567 ) -> LanceResult<Vec<UpsertResult>> {
568 if records.is_empty() {
569 return Ok(Vec::new());
570 }
571
572 let mut seen_ids: HashSet<&str> = HashSet::with_capacity(records.len());
574 let mut seen_external_ids: HashSet<&str> = HashSet::with_capacity(records.len());
575 for record in &records {
576 let Some(external_id) = record.external_id.as_deref() else {
577 return Err(ArrowError::InvalidArgumentError(
578 "upsert_many_by_external_id requires external_id on every record".to_string(),
579 )
580 .into());
581 };
582 if external_id.is_empty() {
583 return Err(ArrowError::InvalidArgumentError(
584 "upsert_many_by_external_id requires a non-empty external_id".to_string(),
585 )
586 .into());
587 }
588 if record.is_tombstone() {
589 return Err(ArrowError::InvalidArgumentError(format!(
590 "content_type '{}' is reserved for internal tombstones",
591 CONTENT_TYPE_TOMBSTONE
592 ))
593 .into());
594 }
595 if !seen_ids.insert(record.id.as_str()) {
596 return Err(ArrowError::InvalidArgumentError(format!(
597 "duplicate id '{}' in batch",
598 record.id
599 ))
600 .into());
601 }
602 if !seen_external_ids.insert(external_id) {
603 return Err(ArrowError::InvalidArgumentError(format!(
604 "duplicate external_id '{}' in batch",
605 external_id
606 ))
607 .into());
608 }
609 }
610
611 for record in &mut records {
613 record.supersedes_id = None;
614 record.superseded_by_id = None;
615 }
616
617 let id_list: Vec<&str> = records.iter().map(|r| r.id.as_str()).collect();
620 let (existing_ids, _) = self.find_existing_keys(&id_list, &[]).await?;
621 if let Some(record) = records
622 .iter()
623 .find(|r| existing_ids.contains(r.id.as_str()))
624 {
625 return Err(ArrowError::InvalidArgumentError(format!(
626 "id '{}' already exists",
627 record.id
628 ))
629 .into());
630 }
631
632 let external_id_list: Vec<&str> = records
634 .iter()
635 .map(|r| r.external_id.as_deref().unwrap_or_default())
636 .collect();
637 let states = self.external_id_states(&external_id_list).await?;
638
639 let mut outcomes: Vec<(bool, Option<String>)> = Vec::with_capacity(records.len());
641 for record in &mut records {
642 let external_id = record.external_id.as_deref().unwrap_or_default();
643 match states.get(external_id) {
644 Some(state) if state.visible_ids.len() > 1 => {
645 return Err(ArrowError::InvalidArgumentError(format!(
646 "external_id '{}' matches multiple visible records",
647 external_id
648 ))
649 .into());
650 }
651 Some(state) if state.visible_ids.len() == 1 => {
652 let existing_id = state.visible_ids[0].clone();
653 record.supersedes_id = Some(existing_id.clone());
654 outcomes.push((false, Some(existing_id)));
655 }
656 Some(state) if state.has_non_tombstone => {
657 return Err(ArrowError::InvalidArgumentError(format!(
661 "external_id '{}' already exists",
662 external_id
663 ))
664 .into());
665 }
666 _ => outcomes.push((true, None)),
667 }
668 }
669
670 let version = self.write_entries(&records).await?;
672
673 Ok(records
674 .into_iter()
675 .zip(outcomes)
676 .map(|(record, (inserted, replaced_id))| UpsertResult {
677 record,
678 inserted,
679 replaced_id,
680 version,
681 })
682 .collect())
683 }
684
685 async fn external_id_states(
693 &self,
694 external_ids: &[&str],
695 ) -> LanceResult<HashMap<String, ExternalIdState>> {
696 let mut states: HashMap<String, ExternalIdState> = HashMap::new();
697 let candidates: HashSet<&str> = external_ids
698 .iter()
699 .copied()
700 .filter(|value| !value.is_empty())
701 .collect();
702 if candidates.is_empty() {
703 return Ok(states);
704 }
705
706 let filter_values: Vec<&str> = candidates.iter().copied().collect();
707 let filter = format!("external_id IN ({})", sql_quoted_list(&filter_values));
708 let scanner = self.lsm_scanner().await?.filter(&filter)?;
709 let mut stream = scanner.try_into_stream().await?;
710 let mut rows: Vec<ContextRecord> = Vec::new();
711 while let Some(batch) = stream.try_next().await? {
712 rows.extend(batch_to_records(&batch)?);
713 }
714
715 let superseded_ids: HashSet<String> = rows
716 .iter()
717 .filter_map(|record| {
718 let supersedes_id = record.supersedes_id.as_ref()?;
719 if supersedes_id == &record.id {
720 None
721 } else {
722 Some(supersedes_id.clone())
723 }
724 })
725 .collect();
726
727 let options = LifecycleQueryOptions::default();
728 for record in rows {
729 let Some(external_id) = record.external_id.as_deref() else {
730 continue;
731 };
732 if !candidates.contains(external_id) {
733 continue;
734 }
735 let entry = states.entry(external_id.to_string()).or_default();
736 if !record.is_tombstone() {
737 entry.has_non_tombstone = true;
738 }
739 if options.is_visible(&record) && !superseded_ids.contains(&record.id) {
740 entry.visible_ids.push(record.id);
741 }
742 }
743
744 Ok(states)
745 }
746
747 pub async fn update_by_id(
753 &mut self,
754 id: &str,
755 patch: RecordPatch,
756 ) -> LanceResult<Option<UpdateResult>> {
757 if id.is_empty() {
758 return Err(ArrowError::InvalidArgumentError(
759 "update_by_id requires a non-empty id".to_string(),
760 )
761 .into());
762 }
763 let Some(existing) = self.get_by_id(id).await? else {
764 return Ok(None);
765 };
766 self.update_visible_record(existing, patch).await.map(Some)
767 }
768
769 pub async fn update_by_external_id(
773 &mut self,
774 external_id: &str,
775 patch: RecordPatch,
776 ) -> LanceResult<Option<UpdateResult>> {
777 if external_id.is_empty() {
778 return Err(ArrowError::InvalidArgumentError(
779 "update_by_external_id requires a non-empty external_id".to_string(),
780 )
781 .into());
782 }
783
784 let matches: Vec<ContextRecord> = self
785 .list(None, None)
786 .await?
787 .into_iter()
788 .filter(|existing| existing.external_id.as_deref() == Some(external_id))
789 .collect();
790
791 match matches.as_slice() {
792 [] => Ok(None),
793 [existing] => self
794 .update_visible_record(existing.clone(), patch)
795 .await
796 .map(Some),
797 _ => Err(ArrowError::InvalidArgumentError(format!(
798 "external_id '{}' matches multiple visible records",
799 external_id
800 ))
801 .into()),
802 }
803 }
804
805 async fn update_visible_record(
806 &mut self,
807 existing: ContextRecord,
808 patch: RecordPatch,
809 ) -> LanceResult<UpdateResult> {
810 if patch.is_empty() {
811 return Err(ArrowError::InvalidArgumentError(
812 "update requires at least one patch field".to_string(),
813 )
814 .into());
815 }
816
817 let mut record = existing.clone();
818 record.id = Uuid::new_v4().to_string();
819 record.run_id = Uuid::new_v4().to_string();
820 record.created_at = Utc::now();
821 record.supersedes_id = Some(existing.id.clone());
822 record.superseded_by_id = None;
823
824 if let Some(bot_id) = patch.bot_id {
825 record.bot_id = Some(bot_id);
826 }
827 if let Some(session_id) = patch.session_id {
828 record.session_id = Some(session_id);
829 }
830 if let Some(tenant) = patch.tenant {
831 record.tenant = Some(tenant);
832 }
833 if let Some(source) = patch.source {
834 record.source = Some(source);
835 }
836 if let Some(state_metadata) = patch.state_metadata {
837 record.state_metadata = Some(state_metadata);
838 }
839 if let Some(metadata) = patch.metadata {
840 record.metadata = Some(metadata);
841 }
842 if let Some(relationships) = patch.relationships {
843 record.relationships = relationships;
844 }
845 if let Some(expires_at) = patch.expires_at {
846 record.expires_at = Some(expires_at);
847 }
848 if let Some(retention_policy) = patch.retention_policy {
849 record.retention_policy = Some(retention_policy);
850 }
851 if let Some(lifecycle_status) = patch.lifecycle_status {
852 record.lifecycle_status = lifecycle_status;
853 }
854 if let Some(retired_at) = patch.retired_at {
855 record.retired_at = Some(retired_at);
856 }
857 if let Some(retired_reason) = patch.retired_reason {
858 record.retired_reason = Some(retired_reason);
859 }
860 if let Some(embedding) = patch.embedding {
861 record.embedding = Some(embedding);
862 }
863
864 self.validate_new_record_id(&record).await?;
865 let version = self.write_entries(std::slice::from_ref(&record)).await?;
866 Ok(UpdateResult {
867 record,
868 replaced_id: existing.id,
869 version,
870 })
871 }
872
873 async fn write_tombstone_for(&mut self, record: ContextRecord) -> LanceResult<u64> {
874 let tombstone = ContextRecord {
875 id: record.id,
876 external_id: record.external_id,
877 run_id: record.run_id,
878 bot_id: record.bot_id,
879 session_id: record.session_id,
880 tenant: record.tenant,
881 source: record.source,
882 created_at: Utc::now(),
883 role: record.role,
884 state_metadata: None,
885 metadata: None,
886 relationships: Vec::new(),
887 expires_at: None,
888 retention_policy: None,
889 lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
890 retired_at: None,
891 retired_reason: None,
892 supersedes_id: None,
893 superseded_by_id: None,
894 content_type: CONTENT_TYPE_TOMBSTONE.to_string(),
895 text_payload: None,
896 binary_payload: None,
897 embedding: None,
898 };
899 self.write_entries(std::slice::from_ref(&tombstone)).await
900 }
901
902 async fn validate_unique_ids(&self, entries: &[ContextRecord]) -> LanceResult<()> {
903 let mut ids = HashSet::new();
904 let mut external_ids = HashSet::new();
905 for entry in entries {
906 if entry.is_tombstone() {
907 return Err(ArrowError::InvalidArgumentError(format!(
908 "content_type '{}' is reserved for internal tombstones",
909 CONTENT_TYPE_TOMBSTONE
910 ))
911 .into());
912 }
913 if !ids.insert(entry.id.as_str()) {
914 return Err(ArrowError::InvalidArgumentError(format!(
915 "duplicate id '{}' in batch",
916 entry.id
917 ))
918 .into());
919 }
920 if let Some(external_id) = &entry.external_id {
921 if !external_ids.insert(external_id.as_str()) {
922 return Err(ArrowError::InvalidArgumentError(format!(
923 "duplicate external_id '{}' in batch",
924 external_id
925 ))
926 .into());
927 }
928 }
929 }
930
931 let id_list: Vec<&str> = ids.iter().copied().collect();
932 let external_id_list: Vec<&str> = external_ids.iter().copied().collect();
933 let (existing_ids, existing_external_ids) =
934 self.find_existing_keys(&id_list, &external_id_list).await?;
935
936 for entry in entries {
938 if existing_ids.contains(entry.id.as_str()) {
939 return Err(ArrowError::InvalidArgumentError(format!(
940 "id '{}' already exists",
941 entry.id
942 ))
943 .into());
944 }
945 if let Some(external_id) = &entry.external_id {
946 if existing_external_ids.contains(external_id.as_str()) {
947 return Err(ArrowError::InvalidArgumentError(format!(
948 "external_id '{}' already exists",
949 external_id
950 ))
951 .into());
952 }
953 }
954 }
955
956 Ok(())
957 }
958
959 async fn validate_new_record_id(&self, entry: &ContextRecord) -> LanceResult<()> {
960 let id = entry.id.as_str();
961 let (existing_ids, _) = self.find_existing_keys(&[id], &[]).await?;
962 if existing_ids.contains(id) {
963 return Err(ArrowError::InvalidArgumentError(format!(
964 "id '{}' already exists",
965 entry.id
966 ))
967 .into());
968 }
969 Ok(())
970 }
971
972 async fn find_existing_keys(
987 &self,
988 ids: &[&str],
989 external_ids: &[&str],
990 ) -> LanceResult<(HashSet<String>, HashSet<String>)> {
991 let mut existing_ids = HashSet::new();
992 let mut existing_external_ids = HashSet::new();
993
994 let candidate_ids: HashSet<&str> = ids.iter().copied().collect();
995 let candidate_external_ids: HashSet<&str> = external_ids.iter().copied().collect();
996
997 if !candidate_ids.is_empty() {
998 let filter = format!("id IN ({})", sql_quoted_list(ids));
999 let scanner = self
1000 .lsm_scanner()
1001 .await?
1002 .project(&["id", "content_type"])
1003 .filter(&filter)?;
1004 let mut stream = scanner.try_into_stream().await?;
1005 while let Some(batch) = stream.try_next().await? {
1006 let id_array = column_as::<StringArray>(&batch, "id")?;
1007 let content_type_array = column_as::<StringArray>(&batch, "content_type")?;
1008 for row in 0..batch.num_rows() {
1009 if content_type_array.value(row) == CONTENT_TYPE_TOMBSTONE {
1010 continue;
1011 }
1012 let id = id_array.value(row);
1013 if candidate_ids.contains(id) {
1014 existing_ids.insert(id.to_string());
1015 }
1016 }
1017 }
1018 }
1019
1020 if !candidate_external_ids.is_empty() && self.has_external_id_column() {
1021 let filter = format!("external_id IN ({})", sql_quoted_list(external_ids));
1022 let scanner = self
1023 .lsm_scanner()
1024 .await?
1025 .project(&["external_id", "content_type"])
1026 .filter(&filter)?;
1027 let mut stream = scanner.try_into_stream().await?;
1028 while let Some(batch) = stream.try_next().await? {
1029 let content_type_array = column_as::<StringArray>(&batch, "content_type")?;
1030 let Some(external_id_array) =
1031 column_as_optional::<StringArray>(&batch, "external_id")
1032 else {
1033 continue;
1034 };
1035 for row in 0..batch.num_rows() {
1036 if content_type_array.value(row) == CONTENT_TYPE_TOMBSTONE {
1037 continue;
1038 }
1039 if external_id_array.is_null(row) {
1040 continue;
1041 }
1042 let external_id = external_id_array.value(row);
1043 if candidate_external_ids.contains(external_id) {
1044 existing_external_ids.insert(external_id.to_string());
1045 }
1046 }
1047 }
1048 }
1049
1050 Ok((existing_ids, existing_external_ids))
1051 }
1052
1053 fn derive_region_id(bot_id: &Option<String>, session_id: &Option<String>) -> Uuid {
1054 let mut input = String::new();
1055
1056 if let Some(bid) = bot_id {
1057 input.push_str(bid);
1058 }
1059 input.push('#');
1060 if let Some(sid) = session_id {
1061 input.push_str(sid);
1062 }
1063
1064 Uuid::new_v5(&Uuid::NAMESPACE_OID, input.as_bytes())
1066 }
1067
1068 fn has_relationships_column(&self) -> bool {
1069 self.dataset
1070 .schema()
1071 .field_paths()
1072 .iter()
1073 .any(|path| path == RELATIONSHIPS_COLUMN)
1074 }
1075
1076 fn has_external_id_column(&self) -> bool {
1077 self.dataset
1078 .schema()
1079 .field_paths()
1080 .iter()
1081 .any(|path| path == "external_id")
1082 }
1083
1084 pub fn version(&self) -> u64 {
1086 self.dataset.manifest.version
1087 }
1088
1089 pub async fn migrate_relationships_column(&mut self) -> LanceResult<bool> {
1094 if self.has_relationships_column() {
1095 return Ok(false);
1096 }
1097
1098 let schema = Arc::new(Schema::new(vec![relationship_field()]));
1099 self.dataset
1100 .add_columns(NewColumnTransform::AllNulls(schema), None, None)
1101 .await?;
1102 Ok(true)
1103 }
1104
1105 pub async fn checkout(&mut self, version_id: u64) -> LanceResult<()> {
1107 let dataset = self.dataset.checkout_version(version_id).await?;
1108 self.dataset = dataset;
1109 Ok(())
1110 }
1111
1112 pub async fn get(&self, id: &str) -> LanceResult<Option<ContextRecord>> {
1114 let escaped_id = id.replace('\'', "''");
1115 let mut scanner = self.dataset.scan();
1116 scanner.filter(&format!("id = '{}'", escaped_id))?;
1117 scanner.limit(Some(1), None)?;
1118
1119 let mut stream = scanner.try_into_stream().await?;
1120 if let Some(batch) = stream.try_next().await? {
1121 let records = batch_to_records(&batch)?;
1122 return Ok(records.into_iter().next());
1123 }
1124 Ok(None)
1125 }
1126
1127 pub async fn list(
1129 &self,
1130 limit: Option<usize>,
1131 offset: Option<usize>,
1132 ) -> LanceResult<Vec<ContextRecord>> {
1133 self.list_filtered_with_options(limit, offset, None, LifecycleQueryOptions::default())
1134 .await
1135 }
1136
1137 pub async fn list_filtered(
1139 &self,
1140 limit: Option<usize>,
1141 offset: Option<usize>,
1142 filters: Option<&RecordFilters>,
1143 ) -> LanceResult<Vec<ContextRecord>> {
1144 self.list_filtered_with_options(limit, offset, filters, LifecycleQueryOptions::default())
1145 .await
1146 }
1147
1148 pub async fn list_with_options(
1150 &self,
1151 limit: Option<usize>,
1152 offset: Option<usize>,
1153 options: LifecycleQueryOptions,
1154 ) -> LanceResult<Vec<ContextRecord>> {
1155 self.list_filtered_with_options(limit, offset, None, options)
1156 .await
1157 }
1158
1159 pub async fn list_filtered_with_options(
1161 &self,
1162 limit: Option<usize>,
1163 offset: Option<usize>,
1164 filters: Option<&RecordFilters>,
1165 options: LifecycleQueryOptions,
1166 ) -> LanceResult<Vec<ContextRecord>> {
1167 let scanner = self.lsm_scanner().await?;
1168 let mut stream = scanner.try_into_stream().await?;
1169 let mut results = Vec::new();
1170 while let Some(batch) = stream.try_next().await? {
1171 results.extend(batch_to_records(&batch)?);
1172 }
1173
1174 let superseded_ids: HashSet<String> = results
1175 .iter()
1176 .filter_map(|record| {
1177 let supersedes_id = record.supersedes_id.as_ref()?;
1178 if supersedes_id == &record.id {
1179 None
1180 } else {
1181 Some(supersedes_id.clone())
1182 }
1183 })
1184 .collect();
1185 results.retain(|record| {
1186 options.is_visible(record)
1187 && (options.include_retired || !superseded_ids.contains(&record.id))
1188 });
1189 if let Some(filters) = filters.filter(|filters| !filters.is_empty()) {
1190 results.retain(|record| filters.matches(record));
1191 }
1192
1193 if let Some(offset) = offset {
1194 results = results.into_iter().skip(offset).collect();
1195 }
1196 if let Some(limit) = limit {
1197 results.truncate(limit);
1198 }
1199 Ok(results)
1200 }
1201
1202 pub async fn get_by_id(&self, id: &str) -> LanceResult<Option<ContextRecord>> {
1204 Ok(self
1205 .list(None, None)
1206 .await?
1207 .into_iter()
1208 .find(|record| record.id == id))
1209 }
1210
1211 pub async fn get_by_external_id(
1213 &self,
1214 external_id: &str,
1215 ) -> LanceResult<Option<ContextRecord>> {
1216 Ok(self
1217 .list(None, None)
1218 .await?
1219 .into_iter()
1220 .find(|record| record.external_id.as_deref() == Some(external_id)))
1221 }
1222
1223 pub async fn list_related(
1225 &self,
1226 target_id: &str,
1227 relation: Option<&str>,
1228 limit: Option<usize>,
1229 ) -> LanceResult<Vec<ContextRecord>> {
1230 self.list_related_with_options(target_id, relation, limit, LifecycleQueryOptions::default())
1231 .await
1232 }
1233
1234 pub async fn list_related_with_options(
1236 &self,
1237 target_id: &str,
1238 relation: Option<&str>,
1239 limit: Option<usize>,
1240 options: LifecycleQueryOptions,
1241 ) -> LanceResult<Vec<ContextRecord>> {
1242 let mut results: Vec<ContextRecord> = self
1243 .list_with_options(None, None, options)
1244 .await?
1245 .into_iter()
1246 .filter(|record| {
1247 record.relationships.iter().any(|relationship| {
1248 relationship.target_id == target_id
1249 && relation.is_none_or(|value| relationship.relation == value)
1250 })
1251 })
1252 .collect();
1253
1254 if let Some(limit) = limit {
1255 results.truncate(limit);
1256 }
1257 Ok(results)
1258 }
1259
1260 pub async fn search(
1262 &self,
1263 query: &[f32],
1264 limit: Option<usize>,
1265 ) -> LanceResult<Vec<SearchResult>> {
1266 self.search_filtered_with_options(query, limit, None, LifecycleQueryOptions::default())
1267 .await
1268 }
1269
1270 pub async fn search_filtered(
1272 &self,
1273 query: &[f32],
1274 limit: Option<usize>,
1275 filters: Option<&RecordFilters>,
1276 ) -> LanceResult<Vec<SearchResult>> {
1277 self.search_filtered_with_options(query, limit, filters, LifecycleQueryOptions::default())
1278 .await
1279 }
1280
1281 pub async fn search_with_options(
1283 &self,
1284 query: &[f32],
1285 limit: Option<usize>,
1286 options: LifecycleQueryOptions,
1287 ) -> LanceResult<Vec<SearchResult>> {
1288 self.search_filtered_with_options(query, limit, None, options)
1289 .await
1290 }
1291
1292 pub async fn search_filtered_with_options(
1294 &self,
1295 query: &[f32],
1296 limit: Option<usize>,
1297 filters: Option<&RecordFilters>,
1298 options: LifecycleQueryOptions,
1299 ) -> LanceResult<Vec<SearchResult>> {
1300 validate_query_dimension(query, self.embedding_dim)?;
1301
1302 let top_k = limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
1303 if top_k == 0 {
1304 return Ok(Vec::new());
1305 }
1306
1307 let mut results: Vec<SearchResult> = self
1308 .list_filtered_with_options(None, None, filters, options)
1309 .await?
1310 .into_iter()
1311 .filter_map(|record| {
1312 let distance = self
1313 .distance_metric
1314 .distance(query, record.embedding.as_ref()?);
1315 Some(SearchResult { record, distance })
1316 })
1317 .collect();
1318 results.sort_by(|left, right| left.distance.total_cmp(&right.distance));
1319 results.truncate(top_k);
1320 Ok(results)
1321 }
1322
1323 pub async fn retrieve_filtered_with_options(
1325 &self,
1326 text: Option<&str>,
1327 vector: Option<&[f32]>,
1328 limit: Option<usize>,
1329 filters: Option<&RecordFilters>,
1330 options: LifecycleQueryOptions,
1331 ) -> LanceResult<Vec<RetrieveResult>> {
1332 let text_terms = text.map(unique_query_terms).unwrap_or_default();
1333 let has_text = !text_terms.is_empty();
1334
1335 if !has_text && vector.is_none() {
1336 return Err(ArrowError::InvalidArgumentError(
1337 "retrieve requires text or vector".to_string(),
1338 )
1339 .into());
1340 }
1341
1342 if let Some(query) = vector {
1343 validate_query_dimension(query, self.embedding_dim)?;
1344 }
1345
1346 let top_k = limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
1347 if top_k == 0 {
1348 return Ok(Vec::new());
1349 }
1350
1351 let records = self
1352 .list_filtered_with_options(None, None, filters, options)
1353 .await?;
1354 let mut candidates: HashMap<String, RetrieveResult> = HashMap::new();
1355
1356 if let Some(query) = vector {
1357 let mut vector_hits: Vec<(usize, f32)> = records
1358 .iter()
1359 .enumerate()
1360 .filter_map(|(index, record)| {
1361 let distance = self
1362 .distance_metric
1363 .distance(query, record.embedding.as_ref()?);
1364 Some((index, distance))
1365 })
1366 .collect();
1367 vector_hits.sort_by(|left, right| {
1368 left.1
1369 .total_cmp(&right.1)
1370 .then_with(|| records[left.0].id.cmp(&records[right.0].id))
1371 });
1372
1373 for (rank, (index, distance)) in vector_hits.into_iter().enumerate() {
1374 add_retrieve_channel(
1375 &mut candidates,
1376 &records[index],
1377 rank + 1,
1378 "vector",
1379 Some(distance),
1380 None,
1381 );
1382 }
1383 }
1384
1385 if has_text {
1386 let mut text_hits: Vec<(usize, f32)> = records
1387 .iter()
1388 .enumerate()
1389 .filter_map(|(index, record)| {
1390 lexical_score(&text_terms, record.text_payload.as_deref())
1391 .map(|score| (index, score))
1392 })
1393 .collect();
1394 text_hits.sort_by(|left, right| {
1395 right
1396 .1
1397 .total_cmp(&left.1)
1398 .then_with(|| records[left.0].id.cmp(&records[right.0].id))
1399 });
1400
1401 for (rank, (index, score)) in text_hits.into_iter().enumerate() {
1402 add_retrieve_channel(
1403 &mut candidates,
1404 &records[index],
1405 rank + 1,
1406 "text",
1407 None,
1408 Some(score),
1409 );
1410 }
1411 }
1412
1413 let mut results: Vec<RetrieveResult> = candidates.into_values().collect();
1414 results.sort_by(compare_retrieve_results);
1415 results.truncate(top_k);
1416 Ok(results)
1417 }
1418
1419 async fn lsm_scanner(&self) -> LanceResult<LsmScanner> {
1420 let object_store = self.dataset.object_store(None).await?;
1421 let branch_location = self.dataset.branch_location();
1422 let shard_ids = self.dataset.list_mem_wal_latest_shard_ids().await?;
1423
1424 let mut shard_snapshots = Vec::with_capacity(shard_ids.len());
1425 for shard_id in shard_ids {
1426 let manifest_store = ShardManifestStore::new(
1427 object_store.clone(),
1428 &branch_location.path,
1429 shard_id,
1430 DEFAULT_MANIFEST_SCAN_BATCH_SIZE,
1431 );
1432 let Some(manifest) = manifest_store.read_latest().await? else {
1433 continue;
1434 };
1435
1436 let mut snapshot = ShardSnapshot::new(shard_id)
1437 .with_spec_id(manifest.shard_spec_id)
1438 .with_current_generation(manifest.current_generation);
1439 for flushed in manifest.flushed_generations {
1440 snapshot = snapshot.with_flushed_generation(flushed.generation, flushed.path);
1441 }
1442 shard_snapshots.push(snapshot);
1443 }
1444
1445 Ok(LsmScanner::new(
1446 Arc::new(self.dataset.clone()),
1447 shard_snapshots,
1448 vec!["id".to_string()],
1449 ))
1450 }
1451
1452 pub async fn compact(
1454 &mut self,
1455 options: Option<CompactionConfig>,
1456 ) -> LanceResult<CompactionMetrics> {
1457 let config = options.unwrap_or_else(|| self.compaction_config.clone());
1458
1459 info!(
1460 "Starting compaction: {} fragments",
1461 self.dataset.count_fragments()
1462 );
1463 let start = std::time::Instant::now();
1464
1465 {
1467 let mut state = self.compaction_state.lock().await;
1468 if state.is_compacting {
1469 warn!("Compaction already in progress, skipping");
1470 return Err(LanceError::from(ArrowError::InvalidArgumentError(
1471 "Compaction already in progress".to_string(),
1472 )));
1473 }
1474 state.is_compacting = true;
1475 }
1476
1477 let lance_options = CompactionOptions {
1479 target_rows_per_fragment: config.target_rows_per_fragment,
1480 max_rows_per_group: config.max_rows_per_group,
1481 materialize_deletions: config.materialize_deletions,
1482 materialize_deletions_threshold: config.materialize_deletions_threshold,
1483 num_threads: config.num_threads,
1484 ..Default::default()
1485 };
1486
1487 let result = compact_files(&mut self.dataset, lance_options, None).await;
1489
1490 let mut state = self.compaction_state.lock().await;
1492 state.is_compacting = false;
1493
1494 match result {
1495 Ok(metrics) => {
1496 state.last_compaction = Some(Utc::now());
1497 state.total_compactions += 1;
1498 state.last_error = None;
1499 drop(state); info!(
1502 "Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)",
1503 start.elapsed(),
1504 metrics.fragments_removed,
1505 metrics.files_removed,
1506 metrics.fragments_added,
1507 metrics.files_added
1508 );
1509
1510 self.dataset = Dataset::open(self.dataset.uri()).await?;
1512
1513 if let Err(e) = self.ensure_id_index().await {
1516 warn!("Failed to ensure id index after compaction: {}", e);
1517 }
1518
1519 Ok(metrics)
1520 }
1521 Err(e) => {
1522 error!("Compaction failed: {}", e);
1523 state.last_error = Some(e.to_string());
1524 Err(e)
1525 }
1526 }
1527 }
1528
1529 pub async fn should_compact(&self) -> LanceResult<bool> {
1531 let fragment_count = self.dataset.count_fragments();
1532
1533 if fragment_count < self.compaction_config.min_fragments {
1534 return Ok(false);
1535 }
1536
1537 if !self.compaction_config.quiet_hours.is_empty() {
1539 let now = Utc::now();
1540 let current_hour = now.hour() as u8;
1541
1542 for (start, end) in &self.compaction_config.quiet_hours {
1543 if current_hour >= *start && current_hour < *end {
1544 info!("Skipping compaction during quiet hours ({}-{})", start, end);
1545 return Ok(false);
1546 }
1547 }
1548 }
1549
1550 Ok(true)
1551 }
1552
1553 pub async fn compaction_stats(&self) -> LanceResult<CompactionStats> {
1555 let state = self.compaction_state.lock().await;
1556
1557 Ok(CompactionStats {
1558 total_fragments: self.dataset.count_fragments(),
1559 is_compacting: state.is_compacting,
1560 last_compaction: state.last_compaction,
1561 last_error: state.last_error.clone(),
1562 total_compactions: state.total_compactions,
1563 })
1564 }
1565
1566 async fn ensure_id_index(&mut self) -> LanceResult<()> {
1568 if self.id_index_type == IdIndexType::None {
1569 return Ok(());
1570 }
1571
1572 let indices = self.dataset.load_indices().await?;
1573 if indices.iter().any(|i| i.name == ID_INDEX_NAME) {
1574 return Ok(());
1575 }
1576
1577 self.create_id_index().await
1578 }
1579
1580 pub async fn create_id_index(&mut self) -> LanceResult<()> {
1582 let index_type = match self.id_index_type {
1583 IdIndexType::ZoneMap => IndexType::ZoneMap,
1584 IdIndexType::BTree => IndexType::BTree,
1585 IdIndexType::None => return Ok(()),
1586 };
1587
1588 info!("Creating {:?} index on id column", index_type);
1589
1590 let params = ScalarIndexParams::default();
1591
1592 self.dataset
1593 .create_index_builder(&["id"], index_type, ¶ms)
1594 .name(ID_INDEX_NAME.to_string())
1595 .replace(true)
1596 .await?;
1597
1598 self.dataset = Dataset::open(self.dataset.uri()).await?;
1600
1601 Ok(())
1602 }
1603
1604 async fn start_background_compaction(&mut self) -> LanceResult<()> {
1606 if !self.compaction_config.enabled {
1607 return Ok(());
1608 }
1609
1610 let mut state = self.compaction_state.lock().await;
1611 if state.background_task.is_some() {
1612 warn!("Background compaction already running");
1613 return Ok(());
1614 }
1615
1616 info!(
1617 "Starting background compaction (interval: {}s, min fragments: {})",
1618 self.compaction_config.check_interval_secs, self.compaction_config.min_fragments
1619 );
1620
1621 let mut store_clone = self.clone();
1622 let interval_secs = self.compaction_config.check_interval_secs;
1623
1624 let task = tokio::spawn(async move {
1625 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1626
1627 loop {
1628 interval.tick().await;
1629
1630 match store_clone.should_compact().await {
1631 Ok(true) => {
1632 info!("Background compaction triggered");
1633 if let Err(e) = store_clone.compact(None).await {
1634 error!("Background compaction failed: {}", e);
1635 }
1636 }
1637 Ok(false) => {
1638 }
1640 Err(e) => {
1641 error!("Error checking compaction need: {}", e);
1642 }
1643 }
1644 }
1645 });
1646
1647 state.background_task = Some(task);
1648 Ok(())
1649 }
1650
1651 pub async fn stop_background_compaction(&mut self) -> LanceResult<()> {
1653 let mut state = self.compaction_state.lock().await;
1654
1655 if let Some(task) = state.background_task.take() {
1656 info!("Stopping background compaction");
1657 task.abort();
1658 }
1659
1660 Ok(())
1661 }
1662
1663 pub fn schema(blob_columns: &HashSet<String>) -> Schema {
1669 Self::schema_with_embedding_dim(blob_columns, DEFAULT_EMBEDDING_DIM)
1670 }
1671
1672 pub fn schema_with_embedding_dim(blob_columns: &HashSet<String>, embedding_dim: i32) -> Schema {
1674 Self::schema_with_options(
1675 blob_columns,
1676 true,
1677 true,
1678 true,
1679 true,
1680 embedding_dim,
1681 DistanceMetric::default(),
1682 )
1683 }
1684
1685 fn schema_with_options(
1686 blob_columns: &HashSet<String>,
1687 include_external_id: bool,
1688 include_metadata: bool,
1689 include_relationships: bool,
1690 include_lifecycle: bool,
1691 embedding_dim: i32,
1692 distance_metric: DistanceMetric,
1693 ) -> Schema {
1694 let mut id_metadata = HashMap::new();
1695 id_metadata.insert(
1696 "lance-schema:unenforced-primary-key".to_string(),
1697 "true".to_string(),
1698 );
1699
1700 let text_field = if blob_columns.contains("text_payload") {
1701 let mut metadata = HashMap::new();
1702 metadata.insert("lance-encoding:blob".to_string(), "true".to_string());
1703 Field::new("text_payload", DataType::LargeBinary, true).with_metadata(metadata)
1704 } else {
1705 Field::new("text_payload", DataType::LargeUtf8, true)
1706 };
1707
1708 let binary_field = if blob_columns.contains("binary_payload") {
1709 let mut metadata = HashMap::new();
1710 metadata.insert("lance-encoding:blob".to_string(), "true".to_string());
1711 Field::new("binary_payload", DataType::LargeBinary, true).with_metadata(metadata)
1712 } else {
1713 Field::new("binary_payload", DataType::LargeBinary, true)
1714 };
1715
1716 let mut fields = vec![Field::new("id", DataType::Utf8, false).with_metadata(id_metadata)];
1717 if include_external_id {
1718 fields.push(Field::new("external_id", DataType::Utf8, true));
1719 }
1720 fields.extend([
1721 Field::new("run_id", DataType::Utf8, false),
1722 Field::new("bot_id", DataType::Utf8, true),
1723 Field::new("session_id", DataType::Utf8, true),
1724 Field::new("tenant", DataType::Utf8, true),
1725 Field::new("source", DataType::Utf8, true),
1726 Field::new(
1727 "created_at",
1728 DataType::Timestamp(TimeUnit::Microsecond, None),
1729 false,
1730 ),
1731 Field::new(
1732 "role",
1733 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1734 false,
1735 ),
1736 Field::new(
1737 "state_metadata",
1738 DataType::Struct(
1739 vec![
1740 Field::new("step", DataType::Int32, true),
1741 Field::new("active_plan_id", DataType::Utf8, true),
1742 Field::new("tokens_used", DataType::Int32, true),
1743 Field::new("custom", DataType::Utf8, true),
1744 ]
1745 .into(),
1746 ),
1747 true,
1748 ),
1749 ]);
1750 if include_metadata {
1751 fields.push(Field::new("metadata", DataType::LargeUtf8, true));
1752 }
1753 if include_relationships {
1754 fields.push(relationship_field());
1755 }
1756 if include_lifecycle {
1757 fields.extend([
1758 Field::new(
1759 "expires_at",
1760 DataType::Timestamp(TimeUnit::Microsecond, None),
1761 true,
1762 ),
1763 Field::new("retention_policy", DataType::Utf8, true),
1764 Field::new("lifecycle_status", DataType::Utf8, false),
1765 Field::new(
1766 "retired_at",
1767 DataType::Timestamp(TimeUnit::Microsecond, None),
1768 true,
1769 ),
1770 Field::new("retired_reason", DataType::Utf8, true),
1771 Field::new("supersedes_id", DataType::Utf8, true),
1772 Field::new("superseded_by_id", DataType::Utf8, true),
1773 ]);
1774 }
1775 fields.extend([
1776 Field::new("content_type", DataType::Utf8, false),
1777 text_field,
1778 binary_field,
1779 Field::new(
1780 "embedding",
1781 DataType::FixedSizeList(
1782 Arc::new(Field::new("item", DataType::Float32, true)),
1783 embedding_dim,
1784 ),
1785 true,
1786 ),
1787 ]);
1788
1789 let schema_metadata = HashMap::from([(
1790 DISTANCE_METRIC_METADATA_KEY.to_string(),
1791 distance_metric.as_str().to_string(),
1792 )]);
1793
1794 Schema::new_with_metadata(fields, schema_metadata)
1795 }
1796
1797 async fn load_with_options(
1798 uri: &str,
1799 storage_options: Option<HashMap<String, String>>,
1800 ) -> LanceResult<Dataset> {
1801 if let Some(options) = storage_options {
1802 DatasetBuilder::from_uri(uri)
1803 .with_storage_options(options)
1804 .load()
1805 .await
1806 } else {
1807 Dataset::open(uri).await
1808 }
1809 }
1810
1811 async fn create_with_options(
1812 uri: &str,
1813 storage_options: Option<HashMap<String, String>>,
1814 blob_columns: &HashSet<String>,
1815 embedding_dim: i32,
1816 distance_metric: DistanceMetric,
1817 ) -> LanceResult<Dataset> {
1818 let schema = Arc::new(Self::schema_with_options(
1819 blob_columns,
1820 true,
1821 true,
1822 true,
1823 true,
1824 embedding_dim,
1825 distance_metric,
1826 ));
1827 let empty_batch = RecordBatch::new_empty(schema.clone());
1828 let batches = RecordBatchIterator::new(
1829 vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
1830 schema.clone(),
1831 );
1832
1833 let mut params = WriteParams {
1834 mode: WriteMode::Create,
1835 ..Default::default()
1836 };
1837
1838 if let Some(options) = storage_options {
1839 let store_params = ObjectStoreParams {
1840 storage_options_accessor: Some(Arc::new(
1841 StorageOptionsAccessor::with_static_options(options),
1842 )),
1843 ..Default::default()
1844 };
1845 params.store_params = Some(store_params);
1846 }
1847
1848 Dataset::write(batches, uri, Some(params)).await
1849 }
1850
1851 fn records_to_batch(&self, entries: &[ContextRecord]) -> LanceResult<RecordBatch> {
1852 let include_external_id = self
1853 .dataset
1854 .schema()
1855 .field_paths()
1856 .iter()
1857 .any(|path| path == "external_id");
1858 let include_lifecycle = self
1859 .dataset
1860 .schema()
1861 .field_paths()
1862 .iter()
1863 .any(|path| path == "expires_at");
1864 let include_metadata = self
1865 .dataset
1866 .schema()
1867 .field_paths()
1868 .iter()
1869 .any(|path| path == "metadata");
1870 let include_tenant = self
1871 .dataset
1872 .schema()
1873 .field_paths()
1874 .iter()
1875 .any(|path| path == "tenant");
1876 let include_source = self
1877 .dataset
1878 .schema()
1879 .field_paths()
1880 .iter()
1881 .any(|path| path == "source");
1882 let include_relationships = self.has_relationships_column();
1883 if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) {
1884 return Err(ArrowError::InvalidArgumentError(
1885 "external_id requires a context dataset created with external_id support"
1886 .to_string(),
1887 )
1888 .into());
1889 }
1890 if !include_metadata && entries.iter().any(|entry| entry.metadata.is_some()) {
1891 return Err(ArrowError::InvalidArgumentError(
1892 "metadata requires a context dataset created with metadata support".to_string(),
1893 )
1894 .into());
1895 }
1896 if !include_tenant && entries.iter().any(|entry| entry.tenant.is_some()) {
1897 return Err(ArrowError::InvalidArgumentError(
1898 "tenant requires a context dataset created with partition-key column support"
1899 .to_string(),
1900 )
1901 .into());
1902 }
1903 if !include_source && entries.iter().any(|entry| entry.source.is_some()) {
1904 return Err(ArrowError::InvalidArgumentError(
1905 "source requires a context dataset created with partition-key column support"
1906 .to_string(),
1907 )
1908 .into());
1909 }
1910 if !include_relationships && entries.iter().any(|entry| !entry.relationships.is_empty()) {
1911 return Err(ArrowError::InvalidArgumentError(
1912 "relationships require a context dataset with relationships support; run migrate_relationships_column() on older datasets".to_string(),
1913 )
1914 .into());
1915 }
1916 if !include_lifecycle && entries.iter().any(ContextRecord::has_non_default_lifecycle) {
1917 return Err(ArrowError::InvalidArgumentError(
1918 "lifecycle fields require a context dataset created with lifecycle support"
1919 .to_string(),
1920 )
1921 .into());
1922 }
1923
1924 let mut id_builder = StringBuilder::new();
1925 let mut external_id_builder = StringBuilder::new();
1926 let mut run_id_builder = StringBuilder::new();
1927 let mut bot_id_builder = StringBuilder::new();
1928 let mut session_id_builder = StringBuilder::new();
1929 let mut tenant_builder = StringBuilder::new();
1930 let mut source_builder = StringBuilder::new();
1931 let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1932 let mut role_builder = StringDictionaryBuilder::<Int8Type>::new();
1933 let mut metadata_builder = LargeStringBuilder::new();
1934 let mut relationships_builder = ListBuilder::new(relationship_struct_builder())
1935 .with_field(relationship_list_item_field());
1936 let mut expires_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1937 let mut retention_policy_builder = StringBuilder::new();
1938 let mut lifecycle_status_builder = StringBuilder::new();
1939 let mut retired_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1940 let mut retired_reason_builder = StringBuilder::new();
1941 let mut supersedes_id_builder = StringBuilder::new();
1942 let mut superseded_by_id_builder = StringBuilder::new();
1943 let mut content_type_builder = StringBuilder::new();
1944 let mut binary_builder = LargeBinaryBuilder::new();
1945
1946 let text_is_blob = self.blob_columns.contains("text_payload");
1947 let mut text_string_builder = if !text_is_blob {
1948 Some(LargeStringBuilder::new())
1949 } else {
1950 None
1951 };
1952 let mut text_binary_builder = if text_is_blob {
1953 Some(LargeBinaryBuilder::new())
1954 } else {
1955 None
1956 };
1957
1958 let state_fields: Vec<FieldRef> = vec![
1959 Arc::new(Field::new("step", DataType::Int32, true)),
1960 Arc::new(Field::new("active_plan_id", DataType::Utf8, true)),
1961 Arc::new(Field::new("tokens_used", DataType::Int32, true)),
1962 Arc::new(Field::new("custom", DataType::Utf8, true)),
1963 ];
1964 let mut state_builder = StructBuilder::new(
1965 state_fields,
1966 vec![
1967 Box::new(Int32Builder::new()),
1968 Box::new(StringBuilder::new()),
1969 Box::new(Int32Builder::new()),
1970 Box::new(StringBuilder::new()),
1971 ],
1972 );
1973
1974 let mut embedding_builder =
1975 FixedSizeListBuilder::new(Float32Builder::new(), self.embedding_dim);
1976
1977 for entry in entries {
1978 id_builder.append_value(&entry.id);
1979 external_id_builder.append_option(entry.external_id.as_deref());
1980 run_id_builder.append_value(&entry.run_id);
1981 bot_id_builder.append_option(entry.bot_id.as_deref());
1982 session_id_builder.append_option(entry.session_id.as_deref());
1983 tenant_builder.append_option(entry.tenant.as_deref());
1984 source_builder.append_option(entry.source.as_deref());
1985 created_at_builder.append_value(entry.created_at.timestamp_micros());
1986 role_builder.append(&entry.role)?;
1987 match &entry.metadata {
1988 Some(metadata) => metadata_builder.append_value(metadata.to_string()),
1989 None => metadata_builder.append_null(),
1990 }
1991 for relationship in &entry.relationships {
1992 let values_builder = relationships_builder.values();
1993 values_builder
1994 .field_builder::<StringBuilder>(0)
1995 .unwrap()
1996 .append_value(&relationship.target_id);
1997 values_builder
1998 .field_builder::<StringBuilder>(1)
1999 .unwrap()
2000 .append_value(&relationship.relation);
2001 values_builder
2002 .field_builder::<Float32Builder>(2)
2003 .unwrap()
2004 .append_option(relationship.weight);
2005 values_builder.append(true);
2006 }
2007 relationships_builder.append(true);
2008 expires_at_builder
2009 .append_option(entry.expires_at.map(|value| value.timestamp_micros()));
2010 retention_policy_builder.append_option(entry.retention_policy.as_deref());
2011 lifecycle_status_builder.append_value(&entry.lifecycle_status);
2012 retired_at_builder
2013 .append_option(entry.retired_at.map(|value| value.timestamp_micros()));
2014 retired_reason_builder.append_option(entry.retired_reason.as_deref());
2015 supersedes_id_builder.append_option(entry.supersedes_id.as_deref());
2016 superseded_by_id_builder.append_option(entry.superseded_by_id.as_deref());
2017 content_type_builder.append_value(&entry.content_type);
2018
2019 if text_is_blob {
2020 match &entry.text_payload {
2021 Some(value) => text_binary_builder
2022 .as_mut()
2023 .unwrap()
2024 .append_value(value.as_bytes()),
2025 None => text_binary_builder.as_mut().unwrap().append_null(),
2026 }
2027 } else {
2028 match &entry.text_payload {
2029 Some(value) => text_string_builder.as_mut().unwrap().append_value(value),
2030 None => text_string_builder.as_mut().unwrap().append_null(),
2031 }
2032 }
2033
2034 match &entry.binary_payload {
2035 Some(value) => binary_builder.append_value(value),
2036 None => binary_builder.append_null(),
2037 }
2038
2039 if let Some(metadata) = &entry.state_metadata {
2040 state_builder
2041 .field_builder::<Int32Builder>(0)
2042 .unwrap()
2043 .append_option(metadata.step);
2044 state_builder
2045 .field_builder::<StringBuilder>(1)
2046 .unwrap()
2047 .append_option(metadata.active_plan_id.as_deref());
2048 state_builder
2049 .field_builder::<Int32Builder>(2)
2050 .unwrap()
2051 .append_option(metadata.tokens_used);
2052 state_builder
2053 .field_builder::<StringBuilder>(3)
2054 .unwrap()
2055 .append_option(metadata.custom.as_deref());
2056 state_builder.append(true);
2057 } else {
2058 state_builder
2059 .field_builder::<Int32Builder>(0)
2060 .unwrap()
2061 .append_null();
2062 state_builder
2063 .field_builder::<StringBuilder>(1)
2064 .unwrap()
2065 .append_null();
2066 state_builder
2067 .field_builder::<Int32Builder>(2)
2068 .unwrap()
2069 .append_null();
2070 state_builder
2071 .field_builder::<StringBuilder>(3)
2072 .unwrap()
2073 .append_null();
2074 state_builder.append(false);
2075 }
2076
2077 if let Some(embedding) = &entry.embedding {
2078 if embedding.len() != self.embedding_dim as usize {
2079 return Err(ArrowError::InvalidArgumentError(format!(
2080 "embedding length {} does not match expected dimension {}",
2081 embedding.len(),
2082 self.embedding_dim
2083 ))
2084 .into());
2085 }
2086 {
2087 let values_builder = embedding_builder.values();
2088 for value in embedding {
2089 values_builder.append_value(*value);
2090 }
2091 }
2092 embedding_builder.append(true);
2093 } else {
2094 let values_builder = embedding_builder.values();
2096 for _ in 0..self.embedding_dim {
2097 values_builder.append_null();
2098 }
2099 embedding_builder.append(false);
2100 }
2101 }
2102
2103 let id_array: ArrayRef = Arc::new(id_builder.finish());
2104 let external_id_array: ArrayRef = Arc::new(external_id_builder.finish());
2105 let run_id_array: ArrayRef = Arc::new(run_id_builder.finish());
2106 let bot_id_array: ArrayRef = Arc::new(bot_id_builder.finish());
2107 let session_id_array: ArrayRef = Arc::new(session_id_builder.finish());
2108 let tenant_array: ArrayRef = Arc::new(tenant_builder.finish());
2109 let source_array: ArrayRef = Arc::new(source_builder.finish());
2110 let created_at_array: ArrayRef = Arc::new(created_at_builder.finish());
2111 let role_array: ArrayRef = Arc::new(role_builder.finish());
2112 let metadata_array: ArrayRef = Arc::new(metadata_builder.finish());
2113 let relationships_array: ArrayRef = Arc::new(relationships_builder.finish());
2114 let expires_at_array: ArrayRef = Arc::new(expires_at_builder.finish());
2115 let retention_policy_array: ArrayRef = Arc::new(retention_policy_builder.finish());
2116 let lifecycle_status_array: ArrayRef = Arc::new(lifecycle_status_builder.finish());
2117 let retired_at_array: ArrayRef = Arc::new(retired_at_builder.finish());
2118 let retired_reason_array: ArrayRef = Arc::new(retired_reason_builder.finish());
2119 let supersedes_id_array: ArrayRef = Arc::new(supersedes_id_builder.finish());
2120 let superseded_by_id_array: ArrayRef = Arc::new(superseded_by_id_builder.finish());
2121 let content_type_array: ArrayRef = Arc::new(content_type_builder.finish());
2122 let text_array: ArrayRef = if text_is_blob {
2123 Arc::new(text_binary_builder.unwrap().finish())
2124 } else {
2125 Arc::new(text_string_builder.unwrap().finish())
2126 };
2127 let binary_array: ArrayRef = Arc::new(binary_builder.finish());
2128 let state_array: ArrayRef = Arc::new(state_builder.finish());
2129 let embedding_array: ArrayRef = Arc::new(embedding_builder.finish());
2130
2131 let mut arrays_by_name = HashMap::from([("id".to_string(), id_array)]);
2132 if include_external_id {
2133 arrays_by_name.insert("external_id".to_string(), external_id_array);
2134 }
2135 arrays_by_name.extend([
2136 ("run_id".to_string(), run_id_array),
2137 ("bot_id".to_string(), bot_id_array),
2138 ("session_id".to_string(), session_id_array),
2139 ("created_at".to_string(), created_at_array),
2140 ("role".to_string(), role_array),
2141 ("state_metadata".to_string(), state_array),
2142 ]);
2143 if include_tenant {
2144 arrays_by_name.insert("tenant".to_string(), tenant_array);
2145 }
2146 if include_source {
2147 arrays_by_name.insert("source".to_string(), source_array);
2148 }
2149 if include_metadata {
2150 arrays_by_name.insert("metadata".to_string(), metadata_array);
2151 }
2152 if include_relationships {
2153 arrays_by_name.insert(RELATIONSHIPS_COLUMN.to_string(), relationships_array);
2154 }
2155 if include_lifecycle {
2156 arrays_by_name.extend([
2157 ("expires_at".to_string(), expires_at_array),
2158 ("retention_policy".to_string(), retention_policy_array),
2159 ("lifecycle_status".to_string(), lifecycle_status_array),
2160 ("retired_at".to_string(), retired_at_array),
2161 ("retired_reason".to_string(), retired_reason_array),
2162 ("supersedes_id".to_string(), supersedes_id_array),
2163 ("superseded_by_id".to_string(), superseded_by_id_array),
2164 ]);
2165 }
2166 arrays_by_name.extend([
2167 ("content_type".to_string(), content_type_array),
2168 ("text_payload".to_string(), text_array),
2169 ("binary_payload".to_string(), binary_array),
2170 ("embedding".to_string(), embedding_array),
2171 ]);
2172
2173 let schema: Arc<Schema> = Arc::new(self.dataset.schema().into());
2174 let arrays = schema
2175 .fields()
2176 .iter()
2177 .map(|field| {
2178 arrays_by_name.remove(field.name().as_str()).ok_or_else(|| {
2179 LanceError::from(ArrowError::InvalidArgumentError(format!(
2180 "unsupported dataset column '{}'",
2181 field.name()
2182 )))
2183 })
2184 })
2185 .collect::<LanceResult<Vec<_>>>()?;
2186 let batch = RecordBatch::try_new(schema, arrays)?;
2187
2188 Ok(batch)
2189 }
2190}
2191
2192impl Drop for ContextStore {
2193 fn drop(&mut self) {
2194 if let Ok(mut state) = self.compaction_state.try_lock() {
2196 if let Some(task) = state.background_task.take() {
2197 task.abort();
2198 }
2199 }
2200 }
2201}
2202
2203fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
2205 let id_array = column_as::<StringArray>(batch, "id")?;
2206 let external_id_array = column_as_optional::<StringArray>(batch, "external_id");
2207 let run_id_array = column_as::<StringArray>(batch, "run_id")?;
2208 let bot_id_array = column_as_optional::<StringArray>(batch, "bot_id");
2209 let session_id_array = column_as_optional::<StringArray>(batch, "session_id");
2210 let tenant_array = column_as_optional::<StringArray>(batch, "tenant");
2211 let source_array = column_as_optional::<StringArray>(batch, "source");
2212 let created_at_array = column_as::<TimestampMicrosecondArray>(batch, "created_at")?;
2213 let role_array = column_as::<DictionaryArray<Int8Type>>(batch, "role")?;
2214 let state_array = column_as::<StructArray>(batch, "state_metadata")?;
2215 let metadata_array = column_as_optional::<LargeStringArray>(batch, "metadata");
2216 let relationships_array = column_as_optional::<ListArray>(batch, RELATIONSHIPS_COLUMN);
2217 let expires_at_array = column_as_optional::<TimestampMicrosecondArray>(batch, "expires_at");
2218 let retention_policy_array = column_as_optional::<StringArray>(batch, "retention_policy");
2219 let lifecycle_status_array = column_as_optional::<StringArray>(batch, "lifecycle_status");
2220 let retired_at_array = column_as_optional::<TimestampMicrosecondArray>(batch, "retired_at");
2221 let retired_reason_array = column_as_optional::<StringArray>(batch, "retired_reason");
2222 let supersedes_id_array = column_as_optional::<StringArray>(batch, "supersedes_id");
2223 let superseded_by_id_array = column_as_optional::<StringArray>(batch, "superseded_by_id");
2224 let content_type_array = column_as::<StringArray>(batch, "content_type")?;
2225 let binary_array = column_as::<LargeBinaryArray>(batch, "binary_payload")?;
2226 let embedding_array = column_as::<FixedSizeListArray>(batch, "embedding")?;
2227
2228 let text_is_binary = batch
2230 .schema()
2231 .field_with_name("text_payload")
2232 .is_ok_and(|f| f.data_type() == &DataType::LargeBinary);
2233
2234 let text_string_array = if !text_is_binary {
2235 Some(column_as::<LargeStringArray>(batch, "text_payload")?)
2236 } else {
2237 None
2238 };
2239 let text_binary_array = if text_is_binary {
2240 Some(column_as::<LargeBinaryArray>(batch, "text_payload")?)
2241 } else {
2242 None
2243 };
2244
2245 let step_array = state_array
2246 .column(0)
2247 .as_ref()
2248 .as_any()
2249 .downcast_ref::<Int32Array>()
2250 .ok_or_else(|| {
2251 LanceError::from(ArrowError::InvalidArgumentError(
2252 "step column has unexpected data type".to_string(),
2253 ))
2254 })?;
2255 let active_plan_array = state_array
2256 .column(1)
2257 .as_ref()
2258 .as_any()
2259 .downcast_ref::<StringArray>()
2260 .ok_or_else(|| {
2261 LanceError::from(ArrowError::InvalidArgumentError(
2262 "active_plan_id column has unexpected data type".to_string(),
2263 ))
2264 })?;
2265 let tokens_used_array = state_array
2266 .column(2)
2267 .as_ref()
2268 .as_any()
2269 .downcast_ref::<Int32Array>()
2270 .ok_or_else(|| {
2271 LanceError::from(ArrowError::InvalidArgumentError(
2272 "tokens_used column has unexpected data type".to_string(),
2273 ))
2274 })?;
2275 let custom_array = state_array
2276 .column(3)
2277 .as_ref()
2278 .as_any()
2279 .downcast_ref::<StringArray>()
2280 .ok_or_else(|| {
2281 LanceError::from(ArrowError::InvalidArgumentError(
2282 "custom column has unexpected data type".to_string(),
2283 ))
2284 })?;
2285
2286 let mut results = Vec::with_capacity(batch.num_rows());
2287 for row in 0..batch.num_rows() {
2288 let created_at = timestamp_from_micros(created_at_array.value(row), "created_at")?;
2289
2290 let state_metadata = if state_array.is_null(row) {
2291 None
2292 } else {
2293 Some(StateMetadata {
2294 step: if step_array.is_null(row) {
2295 None
2296 } else {
2297 Some(step_array.value(row))
2298 },
2299 active_plan_id: if active_plan_array.is_null(row) {
2300 None
2301 } else {
2302 Some(active_plan_array.value(row).to_string())
2303 },
2304 tokens_used: if tokens_used_array.is_null(row) {
2305 None
2306 } else {
2307 Some(tokens_used_array.value(row))
2308 },
2309 custom: if custom_array.is_null(row) {
2310 None
2311 } else {
2312 Some(custom_array.value(row).to_string())
2313 },
2314 })
2315 };
2316
2317 let text_payload = if text_is_binary {
2318 let arr = text_binary_array.unwrap();
2319 if arr.is_null(row) {
2320 None
2321 } else {
2322 Some(String::from_utf8_lossy(arr.value(row)).to_string())
2323 }
2324 } else {
2325 let arr = text_string_array.unwrap();
2326 if arr.is_null(row) {
2327 None
2328 } else {
2329 Some(arr.value(row).to_string())
2330 }
2331 };
2332
2333 let binary_payload = if binary_array.is_null(row) {
2334 None
2335 } else {
2336 Some(binary_array.value(row).to_vec())
2337 };
2338
2339 let embedding = if embedding_array.is_null(row) {
2340 None
2341 } else {
2342 Some(embedding_from_list(embedding_array, row)?)
2343 };
2344
2345 let role = if role_array.is_null(row) {
2346 return Err(LanceError::from(ArrowError::InvalidArgumentError(
2347 "role column contains null values".to_string(),
2348 )));
2349 } else {
2350 let role_values = role_array
2351 .values()
2352 .as_any()
2353 .downcast_ref::<StringArray>()
2354 .ok_or_else(|| {
2355 LanceError::from(ArrowError::InvalidArgumentError(
2356 "role dictionary values are not strings".to_string(),
2357 ))
2358 })?;
2359 let key = role_array.keys().value(row) as usize;
2360 role_values.value(key).to_string()
2361 };
2362
2363 let bot_id = bot_id_array.and_then(|arr| {
2364 if arr.is_null(row) {
2365 None
2366 } else {
2367 Some(arr.value(row).to_string())
2368 }
2369 });
2370
2371 let session_id = session_id_array.and_then(|arr| {
2372 if arr.is_null(row) {
2373 None
2374 } else {
2375 Some(arr.value(row).to_string())
2376 }
2377 });
2378
2379 let tenant = tenant_array.and_then(|arr| {
2380 if arr.is_null(row) {
2381 None
2382 } else {
2383 Some(arr.value(row).to_string())
2384 }
2385 });
2386
2387 let source = source_array.and_then(|arr| {
2388 if arr.is_null(row) {
2389 None
2390 } else {
2391 Some(arr.value(row).to_string())
2392 }
2393 });
2394
2395 let metadata = match metadata_array {
2396 Some(arr) if !arr.is_null(row) => {
2397 Some(serde_json::from_str(arr.value(row)).map_err(|err| {
2398 LanceError::from(ArrowError::InvalidArgumentError(format!(
2399 "invalid metadata JSON for record {}: {}",
2400 id_array.value(row),
2401 err
2402 )))
2403 })?)
2404 }
2405 _ => None,
2406 };
2407 let relationships = match relationships_array {
2408 Some(arr) if !arr.is_null(row) => relationships_from_list(arr, row)?,
2409 _ => Vec::new(),
2410 };
2411 let expires_at = optional_timestamp_from_array(expires_at_array, row, "expires_at")?;
2412 let retention_policy = optional_string_from_array(retention_policy_array, row);
2413 let lifecycle_status = optional_string_from_array(lifecycle_status_array, row)
2414 .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string());
2415 let retired_at = optional_timestamp_from_array(retired_at_array, row, "retired_at")?;
2416 let retired_reason = optional_string_from_array(retired_reason_array, row);
2417 let supersedes_id = optional_string_from_array(supersedes_id_array, row);
2418 let superseded_by_id = optional_string_from_array(superseded_by_id_array, row);
2419
2420 results.push(ContextRecord {
2421 id: id_array.value(row).to_string(),
2422 external_id: external_id_array.and_then(|arr| {
2423 if arr.is_null(row) {
2424 None
2425 } else {
2426 Some(arr.value(row).to_string())
2427 }
2428 }),
2429 run_id: run_id_array.value(row).to_string(),
2430 bot_id,
2431 session_id,
2432 tenant,
2433 source,
2434 created_at,
2435 role,
2436 state_metadata,
2437 metadata,
2438 relationships,
2439 expires_at,
2440 retention_policy,
2441 lifecycle_status,
2442 retired_at,
2443 retired_reason,
2444 supersedes_id,
2445 superseded_by_id,
2446 content_type: content_type_array.value(row).to_string(),
2447 text_payload,
2448 binary_payload,
2449 embedding,
2450 });
2451 }
2452
2453 Ok(results)
2454}
2455
2456fn embedding_from_list(list: &FixedSizeListArray, row: usize) -> LanceResult<Vec<f32>> {
2457 let values = list.value(row);
2458 let float_array = values
2459 .as_ref()
2460 .as_any()
2461 .downcast_ref::<Float32Array>()
2462 .ok_or_else(|| {
2463 LanceError::from(ArrowError::InvalidArgumentError(
2464 "embedding column does not contain float32 values".to_string(),
2465 ))
2466 })?;
2467 let mut embedding = Vec::with_capacity(float_array.len());
2468 for idx in 0..float_array.len() {
2469 embedding.push(float_array.value(idx));
2470 }
2471 Ok(embedding)
2472}
2473
2474fn relationships_from_list(list: &ListArray, row: usize) -> LanceResult<Vec<Relationship>> {
2475 let values = list.value(row);
2476 let struct_array = values
2477 .as_ref()
2478 .as_any()
2479 .downcast_ref::<StructArray>()
2480 .ok_or_else(|| {
2481 LanceError::from(ArrowError::InvalidArgumentError(
2482 "relationships column does not contain struct values".to_string(),
2483 ))
2484 })?;
2485
2486 let target_id_array = struct_array
2487 .column(0)
2488 .as_ref()
2489 .as_any()
2490 .downcast_ref::<StringArray>()
2491 .ok_or_else(|| {
2492 LanceError::from(ArrowError::InvalidArgumentError(
2493 "relationships.target_id column has unexpected data type".to_string(),
2494 ))
2495 })?;
2496 let relation_array = struct_array
2497 .column(1)
2498 .as_ref()
2499 .as_any()
2500 .downcast_ref::<StringArray>()
2501 .ok_or_else(|| {
2502 LanceError::from(ArrowError::InvalidArgumentError(
2503 "relationships.relation column has unexpected data type".to_string(),
2504 ))
2505 })?;
2506 let weight_array = struct_array
2507 .column(2)
2508 .as_ref()
2509 .as_any()
2510 .downcast_ref::<Float32Array>()
2511 .ok_or_else(|| {
2512 LanceError::from(ArrowError::InvalidArgumentError(
2513 "relationships.weight column has unexpected data type".to_string(),
2514 ))
2515 })?;
2516
2517 let mut relationships = Vec::with_capacity(struct_array.len());
2518 for idx in 0..struct_array.len() {
2519 if struct_array.is_null(idx) {
2520 continue;
2521 }
2522 if target_id_array.is_null(idx) {
2523 return Err(LanceError::from(ArrowError::InvalidArgumentError(
2524 "relationships.target_id contains null values".to_string(),
2525 )));
2526 }
2527 if relation_array.is_null(idx) {
2528 return Err(LanceError::from(ArrowError::InvalidArgumentError(
2529 "relationships.relation contains null values".to_string(),
2530 )));
2531 }
2532
2533 relationships.push(Relationship {
2534 target_id: target_id_array.value(idx).to_string(),
2535 relation: relation_array.value(idx).to_string(),
2536 weight: if weight_array.is_null(idx) {
2537 None
2538 } else {
2539 Some(weight_array.value(idx))
2540 },
2541 });
2542 }
2543 Ok(relationships)
2544}
2545
2546fn timestamp_from_micros(value: i64, column: &str) -> LanceResult<DateTime<Utc>> {
2547 DateTime::from_timestamp_micros(value).ok_or_else(|| {
2548 LanceError::from(ArrowError::InvalidArgumentError(format!(
2549 "invalid timestamp value {value} in column '{column}'"
2550 )))
2551 })
2552}
2553
2554fn optional_timestamp_from_array(
2555 array: Option<&TimestampMicrosecondArray>,
2556 row: usize,
2557 column: &str,
2558) -> LanceResult<Option<DateTime<Utc>>> {
2559 let Some(array) = array else {
2560 return Ok(None);
2561 };
2562 if array.is_null(row) {
2563 Ok(None)
2564 } else {
2565 timestamp_from_micros(array.value(row), column).map(Some)
2566 }
2567}
2568
2569fn optional_string_from_array(array: Option<&StringArray>, row: usize) -> Option<String> {
2570 array.and_then(|arr| {
2571 if arr.is_null(row) {
2572 None
2573 } else {
2574 Some(arr.value(row).to_string())
2575 }
2576 })
2577}
2578
2579fn l2_distance(left: &[f32], right: &[f32]) -> f32 {
2580 left.iter()
2581 .zip(right)
2582 .map(|(left, right)| {
2583 let delta = left - right;
2584 delta * delta
2585 })
2586 .sum::<f32>()
2587 .sqrt()
2588}
2589
2590fn validate_embedding_dim(embedding_dim: i32) -> LanceResult<()> {
2591 if embedding_dim <= 0 {
2592 return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
2593 "embedding_dim must be positive, got {embedding_dim}"
2594 ))));
2595 }
2596 Ok(())
2597}
2598
2599fn validate_query_dimension(query: &[f32], embedding_dim: i32) -> LanceResult<()> {
2600 if query.len() != embedding_dim as usize {
2601 return Err(ArrowError::InvalidArgumentError(format!(
2602 "query length {} does not match embedding dimension {}",
2603 query.len(),
2604 embedding_dim
2605 ))
2606 .into());
2607 }
2608 Ok(())
2609}
2610
2611fn unique_query_terms(text: &str) -> Vec<String> {
2612 let mut seen = HashSet::new();
2613 tokenize_for_retrieval(text)
2614 .into_iter()
2615 .filter(|term| seen.insert(term.clone()))
2616 .collect()
2617}
2618
2619fn tokenize_for_retrieval(text: &str) -> Vec<String> {
2620 let mut terms = Vec::new();
2621 let mut current = String::new();
2622
2623 for character in text.chars() {
2624 if character.is_alphanumeric() {
2625 current.extend(character.to_lowercase());
2626 } else if !current.is_empty() {
2627 terms.push(std::mem::take(&mut current));
2628 }
2629 }
2630
2631 if !current.is_empty() {
2632 terms.push(current);
2633 }
2634
2635 terms
2636}
2637
2638fn lexical_score(query_terms: &[String], text: Option<&str>) -> Option<f32> {
2639 let text = text?;
2640 if query_terms.is_empty() {
2641 return None;
2642 }
2643
2644 let payload_terms: HashSet<String> = tokenize_for_retrieval(text).into_iter().collect();
2645 if payload_terms.is_empty() {
2646 return None;
2647 }
2648
2649 let matched_terms = query_terms
2650 .iter()
2651 .filter(|term| payload_terms.contains(*term))
2652 .count();
2653 if matched_terms == 0 {
2654 return None;
2655 }
2656
2657 Some(matched_terms as f32 / query_terms.len() as f32)
2658}
2659
2660fn add_retrieve_channel(
2661 candidates: &mut HashMap<String, RetrieveResult>,
2662 record: &ContextRecord,
2663 rank: usize,
2664 channel: &str,
2665 vector_distance: Option<f32>,
2666 text_score: Option<f32>,
2667) {
2668 let candidate = candidates
2669 .entry(record.id.clone())
2670 .or_insert_with(|| RetrieveResult {
2671 record: record.clone(),
2672 score: 0.0,
2673 vector_distance: None,
2674 text_score: None,
2675 matched_channels: Vec::new(),
2676 });
2677 candidate.score += 1.0 / (RRF_K + rank as f32);
2678 if let Some(distance) = vector_distance {
2679 candidate.vector_distance = Some(distance);
2680 }
2681 if let Some(score) = text_score {
2682 candidate.text_score = Some(score);
2683 }
2684 if !candidate
2685 .matched_channels
2686 .iter()
2687 .any(|existing| existing == channel)
2688 {
2689 candidate.matched_channels.push(channel.to_string());
2690 }
2691}
2692
2693fn compare_retrieve_results(left: &RetrieveResult, right: &RetrieveResult) -> Ordering {
2694 right
2695 .score
2696 .total_cmp(&left.score)
2697 .then_with(|| compare_optional_distance(left.vector_distance, right.vector_distance))
2698 .then_with(|| compare_optional_score(left.text_score, right.text_score))
2699 .then_with(|| left.record.id.cmp(&right.record.id))
2700}
2701
2702fn compare_optional_distance(left: Option<f32>, right: Option<f32>) -> Ordering {
2703 match (left, right) {
2704 (Some(left), Some(right)) => left.total_cmp(&right),
2705 (Some(_), None) => Ordering::Less,
2706 (None, Some(_)) => Ordering::Greater,
2707 (None, None) => Ordering::Equal,
2708 }
2709}
2710
2711fn compare_optional_score(left: Option<f32>, right: Option<f32>) -> Ordering {
2712 match (left, right) {
2713 (Some(left), Some(right)) => right.total_cmp(&left),
2714 (Some(_), None) => Ordering::Less,
2715 (None, Some(_)) => Ordering::Greater,
2716 (None, None) => Ordering::Equal,
2717 }
2718}
2719
2720fn embedding_dim_from_schema(schema: &Schema) -> LanceResult<i32> {
2721 let field = schema
2722 .field_with_name("embedding")
2723 .map_err(LanceError::from)?;
2724 let DataType::FixedSizeList(item_field, embedding_dim) = field.data_type() else {
2725 return Err(LanceError::from(ArrowError::InvalidArgumentError(
2726 "embedding column must be a FixedSizeList<Float32>".to_string(),
2727 )));
2728 };
2729 if item_field.data_type() != &DataType::Float32 {
2730 return Err(LanceError::from(ArrowError::InvalidArgumentError(
2731 "embedding column must contain Float32 values".to_string(),
2732 )));
2733 }
2734 validate_embedding_dim(*embedding_dim)?;
2735 Ok(*embedding_dim)
2736}
2737
2738fn distance_metric_from_schema(schema: &Schema) -> LanceResult<DistanceMetric> {
2743 match schema.metadata.get(DISTANCE_METRIC_METADATA_KEY) {
2744 Some(value) => DistanceMetric::parse(value),
2745 None => Ok(DistanceMetric::default()),
2746 }
2747}
2748
2749fn dot_product(left: &[f32], right: &[f32]) -> f32 {
2751 left.iter()
2752 .zip(right)
2753 .map(|(left, right)| left * right)
2754 .sum::<f32>()
2755}
2756
2757fn cosine_distance(left: &[f32], right: &[f32]) -> f32 {
2762 let dot = dot_product(left, right);
2763 let left_norm = dot_product(left, left).sqrt();
2764 let right_norm = dot_product(right, right).sqrt();
2765 if left_norm == 0.0 || right_norm == 0.0 {
2766 return 1.0;
2767 }
2768 1.0 - (dot / (left_norm * right_norm))
2769}
2770
2771fn dot_distance(left: &[f32], right: &[f32]) -> f32 {
2774 -dot_product(left, right)
2775}
2776
2777fn column_as<'a, A>(batch: &'a RecordBatch, name: &str) -> LanceResult<&'a A>
2778where
2779 A: Array + 'static,
2780{
2781 let column = batch.column_by_name(name).ok_or_else(|| {
2782 LanceError::from(ArrowError::InvalidArgumentError(format!(
2783 "column '{name}' not found"
2784 )))
2785 })?;
2786 column.as_ref().as_any().downcast_ref::<A>().ok_or_else(|| {
2787 LanceError::from(ArrowError::InvalidArgumentError(format!(
2788 "column '{name}' has unexpected data type"
2789 )))
2790 })
2791}
2792
2793fn column_as_optional<'a, A>(batch: &'a RecordBatch, name: &str) -> Option<&'a A>
2794where
2795 A: Array + 'static,
2796{
2797 batch
2798 .column_by_name(name)
2799 .and_then(|col| col.as_ref().as_any().downcast_ref::<A>())
2800}
2801
2802fn sql_quoted_list(values: &[&str]) -> String {
2806 values
2807 .iter()
2808 .map(|value| format!("'{}'", value.replace('\'', "''")))
2809 .collect::<Vec<_>>()
2810 .join(",")
2811}
2812
2813#[cfg(test)]
2814mod tests {
2815 use super::*;
2816 use crate::serde::CONTENT_TYPE_TEXT;
2817 use chrono::{Duration as ChronoDuration, Utc};
2818 use tempfile::TempDir;
2819
2820 fn make_embedding_with_dim(dim: usize, pivot: f32) -> Vec<f32> {
2821 let mut values = vec![0.0; dim];
2822 if !values.is_empty() {
2823 values[0] = pivot;
2824 }
2825 values
2826 }
2827
2828 fn make_embedding(pivot: f32) -> Vec<f32> {
2829 make_embedding_with_dim(DEFAULT_EMBEDDING_DIM as usize, pivot)
2830 }
2831
2832 fn text_record(id: &str, embedding_pivot: f32) -> ContextRecord {
2833 ContextRecord {
2834 id: id.to_string(),
2835 external_id: None,
2836 run_id: format!("run-{id}"),
2837 bot_id: None,
2838 session_id: None,
2839 tenant: None,
2840 source: None,
2841 created_at: Utc::now(),
2842 role: "user".to_string(),
2843 state_metadata: Some(StateMetadata {
2844 step: Some(1),
2845 active_plan_id: Some("plan".to_string()),
2846 tokens_used: Some(10),
2847 custom: None,
2848 }),
2849 metadata: None,
2850 relationships: Vec::new(),
2851 expires_at: None,
2852 retention_policy: None,
2853 lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
2854 retired_at: None,
2855 retired_reason: None,
2856 supersedes_id: None,
2857 superseded_by_id: None,
2858 content_type: CONTENT_TYPE_TEXT.to_string(),
2859 text_payload: Some(format!("payload-{id}")),
2860 binary_payload: None,
2861 embedding: Some(make_embedding(embedding_pivot)),
2862 }
2863 }
2864
2865 #[test]
2866 fn search_orders_by_distance() {
2867 let dir = TempDir::new().unwrap();
2868 let uri = dir.path().to_string_lossy().to_string();
2869 let runtime = tokio::runtime::Runtime::new().unwrap();
2870 runtime.block_on(async {
2871 let mut store = ContextStore::open(&uri).await.unwrap();
2872 let first = text_record("a", 0.0);
2873 let second = text_record("b", 1.0);
2874 store.add(&[first.clone(), second.clone()]).await.unwrap();
2875
2876 let query = make_embedding(1.0);
2877 let results = store.search(&query, Some(2)).await.unwrap();
2878
2879 assert_eq!(results.len(), 2);
2880 assert_eq!(results[0].record.id, second.id);
2881 assert!(
2882 results[0].distance <= results[1].distance,
2883 "results not ordered by distance: {:?}",
2884 results
2885 );
2886 });
2887 }
2888
2889 #[test]
2890 fn search_validates_query_length() {
2891 let dir = TempDir::new().unwrap();
2892 let uri = dir.path().to_string_lossy().to_string();
2893 let runtime = tokio::runtime::Runtime::new().unwrap();
2894 runtime.block_on(async {
2895 let store = ContextStore::open(&uri).await.unwrap();
2896 let err = store.search(&[0.0_f32], None).await.unwrap_err();
2897 let message = err.to_string();
2898 assert!(
2899 message.contains("embedding dimension"),
2900 "unexpected error message: {message}"
2901 );
2902 });
2903 }
2904
2905 fn make_embedding2(x0: f32, x1: f32) -> Vec<f32> {
2906 let mut values = vec![0.0; DEFAULT_EMBEDDING_DIM as usize];
2907 values[0] = x0;
2908 values[1] = x1;
2909 values
2910 }
2911
2912 fn text_record_with(id: &str, embedding: Vec<f32>) -> ContextRecord {
2913 let mut record = text_record(id, 0.0);
2914 record.embedding = Some(embedding);
2915 record
2916 }
2917
2918 #[test]
2919 fn distance_metric_parse_and_math() {
2920 assert_eq!(DistanceMetric::parse("l2").unwrap(), DistanceMetric::L2);
2921 assert_eq!(DistanceMetric::parse("L2").unwrap(), DistanceMetric::L2);
2922 assert_eq!(
2923 DistanceMetric::parse("cosine").unwrap(),
2924 DistanceMetric::Cosine
2925 );
2926 assert_eq!(DistanceMetric::parse("DOT").unwrap(), DistanceMetric::Dot);
2927 assert!(DistanceMetric::parse("manhattan").is_err());
2928 assert_eq!(DistanceMetric::default(), DistanceMetric::L2);
2929
2930 let a = [1.0_f32, 0.0];
2931 let b = [1.0_f32, 1.0];
2932 assert!((DistanceMetric::L2.distance(&a, &b) - 1.0).abs() < 1e-6);
2934 assert!((DistanceMetric::Cosine.distance(&a, &b) - (1.0 - 0.707_106_77)).abs() < 1e-5);
2936 assert!((DistanceMetric::Dot.distance(&a, &b) + 1.0).abs() < 1e-6);
2938 let zero = [0.0_f32, 0.0];
2940 assert!((DistanceMetric::Cosine.distance(&a, &zero) - 1.0).abs() < 1e-6);
2941 }
2942
2943 #[test]
2944 fn search_metric_changes_ranking() {
2945 let runtime = tokio::runtime::Runtime::new().unwrap();
2946 runtime.block_on(async {
2947 let query = make_embedding2(1.0, 0.0);
2949 let aligned = make_embedding2(10.0, 0.0);
2952 let near = make_embedding2(1.0, 1.0);
2954
2955 let l2_dir = TempDir::new().unwrap();
2957 let mut l2_store = ContextStore::open(&l2_dir.path().to_string_lossy())
2958 .await
2959 .unwrap();
2960 l2_store
2961 .add(&[
2962 text_record_with("aligned", aligned.clone()),
2963 text_record_with("near", near.clone()),
2964 ])
2965 .await
2966 .unwrap();
2967 let l2_results = l2_store.search(&query, Some(2)).await.unwrap();
2968 assert_eq!(l2_results[0].record.id, "near");
2969
2970 let cos_dir = TempDir::new().unwrap();
2972 let cos_opts = ContextStoreOptions {
2973 distance_metric: Some(DistanceMetric::Cosine),
2974 ..Default::default()
2975 };
2976 let mut cos_store =
2977 ContextStore::open_with_options(&cos_dir.path().to_string_lossy(), cos_opts)
2978 .await
2979 .unwrap();
2980 cos_store
2981 .add(&[
2982 text_record_with("aligned", aligned.clone()),
2983 text_record_with("near", near.clone()),
2984 ])
2985 .await
2986 .unwrap();
2987 let cos_results = cos_store.search(&query, Some(2)).await.unwrap();
2988 assert_eq!(cos_results[0].record.id, "aligned");
2989
2990 let dot_dir = TempDir::new().unwrap();
2992 let dot_opts = ContextStoreOptions {
2993 distance_metric: Some(DistanceMetric::Dot),
2994 ..Default::default()
2995 };
2996 let mut dot_store =
2997 ContextStore::open_with_options(&dot_dir.path().to_string_lossy(), dot_opts)
2998 .await
2999 .unwrap();
3000 dot_store
3001 .add(&[
3002 text_record_with("aligned", aligned),
3003 text_record_with("near", near),
3004 ])
3005 .await
3006 .unwrap();
3007 let dot_results = dot_store.search(&query, Some(2)).await.unwrap();
3008 assert_eq!(dot_results[0].record.id, "aligned");
3009 });
3010 }
3011
3012 #[test]
3013 fn distance_metric_persists_across_reopen() {
3014 let runtime = tokio::runtime::Runtime::new().unwrap();
3015 runtime.block_on(async {
3016 let dir = TempDir::new().unwrap();
3017 let uri = dir.path().to_string_lossy().to_string();
3018 let query = make_embedding2(1.0, 0.0);
3019 let aligned = make_embedding2(10.0, 0.0);
3020 let near = make_embedding2(1.0, 1.0);
3021
3022 {
3024 let opts = ContextStoreOptions {
3025 distance_metric: Some(DistanceMetric::Cosine),
3026 ..Default::default()
3027 };
3028 let mut store = ContextStore::open_with_options(&uri, opts).await.unwrap();
3029 store
3030 .add(&[
3031 text_record_with("aligned", aligned.clone()),
3032 text_record_with("near", near.clone()),
3033 ])
3034 .await
3035 .unwrap();
3036 }
3037
3038 let store = ContextStore::open(&uri).await.unwrap();
3041 assert_eq!(store.distance_metric, DistanceMetric::Cosine);
3042 let results = store.search(&query, Some(2)).await.unwrap();
3043 assert_eq!(results[0].record.id, "aligned");
3044 });
3045 }
3046
3047 #[test]
3048 fn distance_metric_mismatch_errors() {
3049 let runtime = tokio::runtime::Runtime::new().unwrap();
3050 runtime.block_on(async {
3051 let dir = TempDir::new().unwrap();
3052 let uri = dir.path().to_string_lossy().to_string();
3053 ContextStore::open_with_options(
3054 &uri,
3055 ContextStoreOptions {
3056 distance_metric: Some(DistanceMetric::Cosine),
3057 ..Default::default()
3058 },
3059 )
3060 .await
3061 .unwrap();
3062
3063 let result = ContextStore::open_with_options(
3064 &uri,
3065 ContextStoreOptions {
3066 distance_metric: Some(DistanceMetric::Dot),
3067 ..Default::default()
3068 },
3069 )
3070 .await;
3071 let err = match result {
3072 Ok(_) => panic!("expected a distance-metric mismatch error"),
3073 Err(err) => err,
3074 };
3075 assert!(
3076 err.to_string().contains("distance metric"),
3077 "unexpected error: {err}"
3078 );
3079 });
3080 }
3081
3082 #[test]
3083 fn distance_metric_from_schema_defaults_l2_when_absent() {
3084 let schema = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
3086 assert_eq!(
3087 distance_metric_from_schema(&schema).unwrap(),
3088 DistanceMetric::L2
3089 );
3090 }
3091
3092 #[test]
3093 fn retrieve_fuses_text_and_vector_channels() {
3094 let dir = TempDir::new().unwrap();
3095 let uri = dir.path().to_string_lossy().to_string();
3096 let runtime = tokio::runtime::Runtime::new().unwrap();
3097 runtime.block_on(async {
3098 let mut store = ContextStore::open(&uri).await.unwrap();
3099 let mut semantic_near = text_record("semantic-near", 0.0);
3100 semantic_near.text_payload = Some("general rollout risk guidance".to_string());
3101 let mut exact_policy = text_record("exact-policy", 1.0);
3102 exact_policy.text_payload = Some("POLICY-123 blocks service-a rollouts".to_string());
3103
3104 store
3105 .add(&[semantic_near.clone(), exact_policy.clone()])
3106 .await
3107 .unwrap();
3108
3109 let query = make_embedding(0.0);
3110 let results = store
3111 .retrieve_filtered_with_options(
3112 Some("POLICY-123 service-a"),
3113 Some(&query),
3114 Some(2),
3115 None,
3116 LifecycleQueryOptions::default(),
3117 )
3118 .await
3119 .unwrap();
3120
3121 assert_eq!(results.len(), 2);
3122 assert_eq!(results[0].record.id, exact_policy.id);
3123 assert!(results[0].score > results[1].score);
3124 assert!(results[0].vector_distance.is_some());
3125 assert_eq!(results[0].text_score, Some(1.0));
3126 assert_eq!(results[0].matched_channels, ["vector", "text"]);
3127 });
3128 }
3129
3130 #[test]
3131 fn custom_embedding_dimension_round_trips_add_search_and_reopen() {
3132 let dir = TempDir::new().unwrap();
3133 let uri = dir.path().to_string_lossy().to_string();
3134 let runtime = tokio::runtime::Runtime::new().unwrap();
3135 runtime.block_on(async {
3136 let options = ContextStoreOptions {
3137 embedding_dim: Some(3),
3138 ..Default::default()
3139 };
3140 let mut store = ContextStore::open_with_options(&uri, options)
3141 .await
3142 .unwrap();
3143 assert_eq!(store.embedding_dim(), 3);
3144
3145 let mut first = text_record("custom-a", 0.0);
3146 first.embedding = Some(make_embedding_with_dim(3, 0.0));
3147 let mut second = text_record("custom-b", 0.0);
3148 second.embedding = Some(make_embedding_with_dim(3, 1.0));
3149 store.add(&[first.clone(), second.clone()]).await.unwrap();
3150
3151 let query = make_embedding_with_dim(3, 1.0);
3152 let results = store.search(&query, Some(2)).await.unwrap();
3153 assert_eq!(results[0].record.id, second.id);
3154
3155 let reopened = ContextStore::open(&uri).await.unwrap();
3156 assert_eq!(reopened.embedding_dim(), 3);
3157 let results = reopened.search(&query, Some(1)).await.unwrap();
3158 assert_eq!(results[0].record.id, second.id);
3159
3160 let err = reopened
3161 .search(&make_embedding(1.0), None)
3162 .await
3163 .unwrap_err();
3164 assert!(
3165 err.to_string().contains("embedding dimension 3"),
3166 "unexpected error message: {err}"
3167 );
3168 });
3169 }
3170
3171 #[test]
3172 fn existing_default_dimension_dataset_opens_without_options() {
3173 let dir = TempDir::new().unwrap();
3174 let uri = dir.path().to_string_lossy().to_string();
3175 let runtime = tokio::runtime::Runtime::new().unwrap();
3176 runtime.block_on(async {
3177 let mut store = ContextStore::open(&uri).await.unwrap();
3178 assert_eq!(store.embedding_dim(), DEFAULT_EMBEDDING_DIM);
3179 store.add(&[text_record("default-dim", 0.0)]).await.unwrap();
3180 drop(store);
3181
3182 let reopened = ContextStore::open(&uri).await.unwrap();
3183 assert_eq!(reopened.embedding_dim(), DEFAULT_EMBEDDING_DIM);
3184 reopened
3185 .search(&make_embedding(0.0), Some(1))
3186 .await
3187 .unwrap();
3188 });
3189 }
3190
3191 #[test]
3192 fn opening_existing_dataset_rejects_mismatched_requested_dimension() {
3193 let dir = TempDir::new().unwrap();
3194 let uri = dir.path().to_string_lossy().to_string();
3195 let runtime = tokio::runtime::Runtime::new().unwrap();
3196 runtime.block_on(async {
3197 let options = ContextStoreOptions {
3198 embedding_dim: Some(3),
3199 ..Default::default()
3200 };
3201 ContextStore::open_with_options(&uri, options)
3202 .await
3203 .unwrap();
3204
3205 let mismatched = ContextStoreOptions {
3206 embedding_dim: Some(4),
3207 ..Default::default()
3208 };
3209 let err = match ContextStore::open_with_options(&uri, mismatched).await {
3210 Ok(_) => panic!("expected mismatched embedding dimension to fail"),
3211 Err(err) => err,
3212 };
3213 assert!(
3214 err.to_string()
3215 .contains("does not match requested dimension 4"),
3216 "unexpected error message: {err}"
3217 );
3218 });
3219 }
3220
3221 #[test]
3222 fn list_hides_expired_and_retired_records_by_default() {
3223 let dir = TempDir::new().unwrap();
3224 let uri = dir.path().to_string_lossy().to_string();
3225 let runtime = tokio::runtime::Runtime::new().unwrap();
3226 runtime.block_on(async {
3227 let mut store = ContextStore::open(&uri).await.unwrap();
3228 let active = text_record("active", 0.0);
3229 let mut expired = text_record("expired", 0.0);
3230 expired.expires_at = Some(Utc::now() - ChronoDuration::minutes(1));
3231 let mut superseded = text_record("superseded", 0.0);
3232 superseded.lifecycle_status = "superseded".to_string();
3233 superseded.retired_reason = Some("replaced by newer fact".to_string());
3234 superseded.superseded_by_id = Some("active".to_string());
3235
3236 store
3237 .add(&[active.clone(), expired.clone(), superseded.clone()])
3238 .await
3239 .unwrap();
3240
3241 let visible = store.list(None, None).await.unwrap();
3242 assert_eq!(visible.len(), 1);
3243 assert_eq!(visible[0].id, active.id);
3244
3245 let all = store
3246 .list_with_options(None, None, LifecycleQueryOptions::new(true, true))
3247 .await
3248 .unwrap();
3249 assert_eq!(all.len(), 3);
3250 let expired_roundtrip = all.iter().find(|record| record.id == expired.id).unwrap();
3251 assert_eq!(
3252 expired_roundtrip
3253 .expires_at
3254 .map(|value| value.timestamp_micros()),
3255 expired.expires_at.map(|value| value.timestamp_micros())
3256 );
3257 let superseded_roundtrip = all
3258 .iter()
3259 .find(|record| record.id == superseded.id)
3260 .unwrap();
3261 assert_eq!(superseded_roundtrip.lifecycle_status, "superseded");
3262 assert_eq!(
3263 superseded_roundtrip.superseded_by_id.as_deref(),
3264 Some("active")
3265 );
3266 });
3267 }
3268
3269 #[test]
3270 fn list_hides_records_superseded_by_newer_pointer() {
3271 let dir = TempDir::new().unwrap();
3272 let uri = dir.path().to_string_lossy().to_string();
3273 let runtime = tokio::runtime::Runtime::new().unwrap();
3274 runtime.block_on(async {
3275 let mut store = ContextStore::open(&uri).await.unwrap();
3276 let old = text_record("old", 0.0);
3277 let mut replacement = text_record("new", 1.0);
3278 replacement.supersedes_id = Some(old.id.clone());
3279 store
3280 .add(&[old.clone(), replacement.clone()])
3281 .await
3282 .unwrap();
3283
3284 let visible = store.list(None, None).await.unwrap();
3285 assert_eq!(visible.len(), 1);
3286 assert_eq!(visible[0].id, replacement.id);
3287
3288 let history = store
3289 .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3290 .await
3291 .unwrap();
3292 assert_eq!(history.len(), 2);
3293 assert!(history.iter().any(|record| record.id == old.id));
3294 assert!(history.iter().any(|record| record.id == replacement.id));
3295 });
3296 }
3297
3298 #[test]
3299 fn search_filters_lifecycle_before_ranking() {
3300 let dir = TempDir::new().unwrap();
3301 let uri = dir.path().to_string_lossy().to_string();
3302 let runtime = tokio::runtime::Runtime::new().unwrap();
3303 runtime.block_on(async {
3304 let mut store = ContextStore::open(&uri).await.unwrap();
3305 let active = text_record("active", 1.0);
3306 let mut expired_better_match = text_record("expired", 0.0);
3307 expired_better_match.expires_at = Some(Utc::now() - ChronoDuration::minutes(1));
3308 store
3309 .add(&[active.clone(), expired_better_match.clone()])
3310 .await
3311 .unwrap();
3312
3313 let query = make_embedding(0.0);
3314 let visible = store.search(&query, Some(1)).await.unwrap();
3315 assert_eq!(visible.len(), 1);
3316 assert_eq!(visible[0].record.id, active.id);
3317
3318 let all = store
3319 .search_with_options(&query, Some(1), LifecycleQueryOptions::new(true, false))
3320 .await
3321 .unwrap();
3322 assert_eq!(all.len(), 1);
3323 assert_eq!(all[0].record.id, expired_better_match.id);
3324 });
3325 }
3326
3327 #[test]
3328 fn external_id_roundtrips_and_supports_lookup() {
3329 let dir = TempDir::new().unwrap();
3330 let uri = dir.path().to_string_lossy().to_string();
3331 let runtime = tokio::runtime::Runtime::new().unwrap();
3332 runtime.block_on(async {
3333 let mut store = ContextStore::open(&uri).await.unwrap();
3334 let mut record = text_record("a", 0.0);
3335 record.external_id = Some("doc-123#chunk-1".to_string());
3336 store.add(std::slice::from_ref(&record)).await.unwrap();
3337
3338 let by_external_id = store
3339 .get_by_external_id("doc-123#chunk-1")
3340 .await
3341 .unwrap()
3342 .unwrap();
3343 assert_eq!(by_external_id.id, record.id);
3344 assert_eq!(by_external_id.external_id, record.external_id);
3345
3346 let by_id = store.get_by_id(&record.id).await.unwrap().unwrap();
3347 assert_eq!(by_id.external_id.as_deref(), Some("doc-123#chunk-1"));
3348
3349 let missing = store.get_by_external_id("missing").await.unwrap();
3350 assert!(missing.is_none());
3351 });
3352 }
3353
3354 #[test]
3355 fn upsert_by_external_id_inserts_then_replaces_visible_record() {
3356 let dir = TempDir::new().unwrap();
3357 let uri = dir.path().to_string_lossy().to_string();
3358 let runtime = tokio::runtime::Runtime::new().unwrap();
3359 runtime.block_on(async {
3360 let mut store = ContextStore::open(&uri).await.unwrap();
3361
3362 let mut first = text_record("first", 0.0);
3363 first.external_id = Some("doc-123#chunk-1".to_string());
3364 let inserted = store.upsert_by_external_id(first.clone()).await.unwrap();
3365 assert!(inserted.inserted);
3366 assert_eq!(inserted.replaced_id, None);
3367 assert_eq!(inserted.record.id, first.id);
3368
3369 let mut replacement = text_record("replacement", 1.0);
3370 replacement.external_id = first.external_id.clone();
3371 let replaced = store
3372 .upsert_by_external_id(replacement.clone())
3373 .await
3374 .unwrap();
3375 assert!(!replaced.inserted);
3376 assert_eq!(replaced.replaced_id.as_deref(), Some(first.id.as_str()));
3377 assert_eq!(
3378 replaced.record.supersedes_id.as_deref(),
3379 Some(first.id.as_str())
3380 );
3381
3382 let visible = store.list(None, None).await.unwrap();
3383 assert_eq!(visible.len(), 1);
3384 assert_eq!(visible[0].id, replacement.id);
3385
3386 let by_external_id = store
3387 .get_by_external_id("doc-123#chunk-1")
3388 .await
3389 .unwrap()
3390 .unwrap();
3391 assert_eq!(by_external_id.id, replacement.id);
3392
3393 let history = store
3394 .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3395 .await
3396 .unwrap();
3397 assert_eq!(history.len(), 2);
3398 assert!(history.iter().any(|record| record.id == first.id));
3399 assert!(history.iter().any(|record| record.id == replacement.id));
3400 });
3401 }
3402
3403 fn upsert_record(id: &str, external_id: &str, pivot: f32) -> ContextRecord {
3404 let mut record = text_record(id, pivot);
3405 record.external_id = Some(external_id.to_string());
3406 record
3407 }
3408
3409 #[test]
3410 fn upsert_many_inserts_new_records() {
3411 let dir = TempDir::new().unwrap();
3412 let uri = dir.path().to_string_lossy().to_string();
3413 let runtime = tokio::runtime::Runtime::new().unwrap();
3414 runtime.block_on(async {
3415 let mut store = ContextStore::open(&uri).await.unwrap();
3416
3417 let batch = vec![
3418 upsert_record("a", "ext-a", 0.0),
3419 upsert_record("b", "ext-b", 1.0),
3420 ];
3421 let results = store.upsert_many_by_external_id(batch).await.unwrap();
3422
3423 assert_eq!(results.len(), 2);
3424 assert!(results.iter().all(|r| r.inserted));
3425 assert!(results.iter().all(|r| r.replaced_id.is_none()));
3426 assert_eq!(results[0].version, results[1].version);
3427
3428 let visible = store.list(None, None).await.unwrap();
3429 assert_eq!(visible.len(), 2);
3430 });
3431 }
3432
3433 #[test]
3434 fn upsert_many_replaces_existing_and_is_idempotent() {
3435 let dir = TempDir::new().unwrap();
3436 let uri = dir.path().to_string_lossy().to_string();
3437 let runtime = tokio::runtime::Runtime::new().unwrap();
3438 runtime.block_on(async {
3439 let mut store = ContextStore::open(&uri).await.unwrap();
3440
3441 let first = vec![
3442 upsert_record("a1", "ext-a", 0.0),
3443 upsert_record("b1", "ext-b", 1.0),
3444 ];
3445 store.upsert_many_by_external_id(first).await.unwrap();
3446
3447 let second = vec![
3450 upsert_record("a2", "ext-a", 2.0),
3451 upsert_record("b2", "ext-b", 3.0),
3452 ];
3453 let results = store.upsert_many_by_external_id(second).await.unwrap();
3454
3455 assert!(results.iter().all(|r| !r.inserted));
3456 assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3457 assert_eq!(results[1].replaced_id.as_deref(), Some("b1"));
3458 assert_eq!(results[0].record.supersedes_id.as_deref(), Some("a1"));
3459
3460 let visible = store.list(None, None).await.unwrap();
3461 assert_eq!(visible.len(), 2);
3462 let visible_ids: HashSet<&str> = visible.iter().map(|r| r.id.as_str()).collect();
3463 assert_eq!(
3464 visible_ids,
3465 HashSet::from(["a2", "b2"]),
3466 "only the successors should be visible"
3467 );
3468
3469 let third = vec![
3471 upsert_record("a3", "ext-a", 4.0),
3472 upsert_record("b3", "ext-b", 5.0),
3473 ];
3474 store.upsert_many_by_external_id(third).await.unwrap();
3475 assert_eq!(store.list(None, None).await.unwrap().len(), 2);
3476 });
3477 }
3478
3479 #[test]
3480 fn upsert_many_handles_mixed_insert_and_replace() {
3481 let dir = TempDir::new().unwrap();
3482 let uri = dir.path().to_string_lossy().to_string();
3483 let runtime = tokio::runtime::Runtime::new().unwrap();
3484 runtime.block_on(async {
3485 let mut store = ContextStore::open(&uri).await.unwrap();
3486 store
3487 .upsert_many_by_external_id(vec![upsert_record("a1", "ext-a", 0.0)])
3488 .await
3489 .unwrap();
3490
3491 let batch = vec![
3492 upsert_record("a2", "ext-a", 1.0), upsert_record("c1", "ext-c", 2.0), ];
3495 let results = store.upsert_many_by_external_id(batch).await.unwrap();
3496
3497 assert_eq!(results.len(), 2);
3498 assert!(!results[0].inserted);
3499 assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3500 assert!(results[1].inserted);
3501 assert!(results[1].replaced_id.is_none());
3502
3503 let visible_ids: HashSet<String> = store
3504 .list(None, None)
3505 .await
3506 .unwrap()
3507 .into_iter()
3508 .map(|r| r.id)
3509 .collect();
3510 assert_eq!(
3511 visible_ids,
3512 HashSet::from(["a2".to_string(), "c1".to_string()])
3513 );
3514 });
3515 }
3516
3517 #[test]
3518 fn upsert_many_rejects_within_batch_duplicate_external_id() {
3519 let dir = TempDir::new().unwrap();
3520 let uri = dir.path().to_string_lossy().to_string();
3521 let runtime = tokio::runtime::Runtime::new().unwrap();
3522 runtime.block_on(async {
3523 let mut store = ContextStore::open(&uri).await.unwrap();
3524 let batch = vec![
3525 upsert_record("a", "dup", 0.0),
3526 upsert_record("b", "dup", 1.0),
3527 ];
3528 let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3529 assert!(
3530 err.to_string()
3531 .contains("duplicate external_id 'dup' in batch"),
3532 "unexpected error: {err}"
3533 );
3534 assert_eq!(store.list(None, None).await.unwrap().len(), 0);
3536 });
3537 }
3538
3539 #[test]
3540 fn upsert_many_rejects_within_batch_duplicate_id() {
3541 let dir = TempDir::new().unwrap();
3542 let uri = dir.path().to_string_lossy().to_string();
3543 let runtime = tokio::runtime::Runtime::new().unwrap();
3544 runtime.block_on(async {
3545 let mut store = ContextStore::open(&uri).await.unwrap();
3546 let batch = vec![
3547 upsert_record("same", "ext-a", 0.0),
3548 upsert_record("same", "ext-b", 1.0),
3549 ];
3550 let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3551 assert!(
3552 err.to_string().contains("duplicate id 'same' in batch"),
3553 "unexpected error: {err}"
3554 );
3555 });
3556 }
3557
3558 #[test]
3559 fn upsert_many_rejects_missing_external_id() {
3560 let dir = TempDir::new().unwrap();
3561 let uri = dir.path().to_string_lossy().to_string();
3562 let runtime = tokio::runtime::Runtime::new().unwrap();
3563 runtime.block_on(async {
3564 let mut store = ContextStore::open(&uri).await.unwrap();
3565
3566 let no_ext = vec![text_record("a", 0.0)];
3567 let err = store.upsert_many_by_external_id(no_ext).await.unwrap_err();
3568 assert!(err.to_string().contains("external_id"), "unexpected: {err}");
3569
3570 let mut empty = text_record("b", 0.0);
3571 empty.external_id = Some(String::new());
3572 let err = store
3573 .upsert_many_by_external_id(vec![empty])
3574 .await
3575 .unwrap_err();
3576 assert!(
3577 err.to_string().contains("non-empty external_id"),
3578 "unexpected: {err}"
3579 );
3580 });
3581 }
3582
3583 #[test]
3584 fn upsert_many_rejects_id_collision_with_store() {
3585 let dir = TempDir::new().unwrap();
3586 let uri = dir.path().to_string_lossy().to_string();
3587 let runtime = tokio::runtime::Runtime::new().unwrap();
3588 runtime.block_on(async {
3589 let mut store = ContextStore::open(&uri).await.unwrap();
3590 store.add(&[text_record("taken", 0.0)]).await.unwrap();
3591
3592 let batch = vec![upsert_record("taken", "ext-a", 1.0)];
3593 let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3594 assert!(
3595 err.to_string().contains("id 'taken'")
3596 && err.to_string().contains("already exists"),
3597 "unexpected error: {err}"
3598 );
3599 });
3600 }
3601
3602 #[test]
3603 fn upsert_many_empty_batch_is_noop() {
3604 let dir = TempDir::new().unwrap();
3605 let uri = dir.path().to_string_lossy().to_string();
3606 let runtime = tokio::runtime::Runtime::new().unwrap();
3607 runtime.block_on(async {
3608 let mut store = ContextStore::open(&uri).await.unwrap();
3609 let results = store.upsert_many_by_external_id(Vec::new()).await.unwrap();
3610 assert!(results.is_empty());
3611 });
3612 }
3613
3614 #[test]
3615 fn upsert_many_matches_single_upsert_with_btree_index() {
3616 let dir = TempDir::new().unwrap();
3617 let uri = dir.path().to_string_lossy().to_string();
3618 let runtime = tokio::runtime::Runtime::new().unwrap();
3619 runtime.block_on(async {
3620 let options = ContextStoreOptions {
3621 id_index_type: IdIndexType::BTree,
3622 ..Default::default()
3623 };
3624 let mut store = ContextStore::open_with_options(&uri, options)
3625 .await
3626 .unwrap();
3627
3628 store
3630 .upsert_by_external_id(upsert_record("a1", "ext-a", 0.0))
3631 .await
3632 .unwrap();
3633
3634 let results = store
3636 .upsert_many_by_external_id(vec![
3637 upsert_record("a2", "ext-a", 1.0),
3638 upsert_record("b1", "ext-b", 2.0),
3639 ])
3640 .await
3641 .unwrap();
3642 assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3643 assert!(results[1].inserted);
3644
3645 assert_eq!(
3646 store.get_by_external_id("ext-a").await.unwrap().unwrap().id,
3647 "a2"
3648 );
3649 });
3650 }
3651
3652 #[test]
3653 fn update_by_external_id_patches_mutable_fields_and_preserves_payload() {
3654 let dir = TempDir::new().unwrap();
3655 let uri = dir.path().to_string_lossy().to_string();
3656 let runtime = tokio::runtime::Runtime::new().unwrap();
3657 runtime.block_on(async {
3658 let mut store = ContextStore::open(&uri).await.unwrap();
3659
3660 let mut record = text_record("stable", 0.0);
3661 record.external_id = Some("doc-123#chunk-1".to_string());
3662 record.metadata = Some(serde_json::json!({"revision": 1}));
3663 store.add(std::slice::from_ref(&record)).await.unwrap();
3664
3665 let patch = RecordPatch {
3666 bot_id: Some("bot-a".to_string()),
3667 session_id: Some("session-a".to_string()),
3668 metadata: Some(serde_json::json!({"revision": 2, "confidence": 0.9})),
3669 relationships: Some(vec![Relationship {
3670 target_id: "doc-123".to_string(),
3671 relation: "derived_from".to_string(),
3672 weight: None,
3673 }]),
3674 ..Default::default()
3675 };
3676 let updated = store
3677 .update_by_external_id("doc-123#chunk-1", patch)
3678 .await
3679 .unwrap()
3680 .unwrap();
3681
3682 assert_eq!(updated.replaced_id, record.id);
3683 assert_ne!(updated.record.id, record.id);
3684 assert_eq!(updated.record.external_id, record.external_id);
3685 assert_eq!(updated.record.text_payload, record.text_payload);
3686 assert_eq!(updated.record.embedding, record.embedding);
3687 assert_eq!(updated.record.bot_id.as_deref(), Some("bot-a"));
3688 assert_eq!(updated.record.session_id.as_deref(), Some("session-a"));
3689 assert_eq!(
3690 updated.record.metadata,
3691 Some(serde_json::json!({"revision": 2, "confidence": 0.9}))
3692 );
3693 assert_eq!(updated.record.relationships.len(), 1);
3694 assert_eq!(
3695 updated.record.supersedes_id.as_deref(),
3696 Some(record.id.as_str())
3697 );
3698
3699 let visible = store
3700 .get_by_external_id("doc-123#chunk-1")
3701 .await
3702 .unwrap()
3703 .unwrap();
3704 assert_eq!(visible.id, updated.record.id);
3705
3706 let history = store
3707 .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3708 .await
3709 .unwrap();
3710 assert_eq!(history.len(), 2);
3711 assert!(history.iter().any(|item| item.id == record.id));
3712 assert!(history.iter().any(|item| item.id == updated.record.id));
3713 });
3714 }
3715
3716 #[test]
3717 fn deferred_embedding_patch_makes_raw_record_searchable() {
3718 let dir = TempDir::new().unwrap();
3719 let uri = dir.path().to_string_lossy().to_string();
3720 let runtime = tokio::runtime::Runtime::new().unwrap();
3721 runtime.block_on(async {
3722 let mut store = ContextStore::open(&uri).await.unwrap();
3723
3724 let mut by_ext = text_record("raw-ext", 0.0);
3726 by_ext.embedding = None;
3727 by_ext.external_id = Some("doc-1#chunk-1".to_string());
3728 let mut by_id = text_record("raw-id", 0.0);
3729 by_id.embedding = None;
3730 by_id.external_id = None;
3731 store.add(&[by_ext.clone(), by_id.clone()]).await.unwrap();
3732
3733 let query = make_embedding(1.0);
3735 assert!(store.search(&query, Some(10)).await.unwrap().is_empty());
3736
3737 let enriched_ext = store
3739 .update_by_external_id(
3740 "doc-1#chunk-1",
3741 RecordPatch {
3742 embedding: Some(make_embedding(1.0)),
3743 ..Default::default()
3744 },
3745 )
3746 .await
3747 .unwrap()
3748 .unwrap();
3749 assert_eq!(enriched_ext.record.embedding, Some(make_embedding(1.0)));
3750 assert_eq!(enriched_ext.record.text_payload, by_ext.text_payload);
3752
3753 let enriched_id = store
3755 .update_by_id(
3756 &by_id.id,
3757 RecordPatch {
3758 embedding: Some(make_embedding(0.0)),
3759 ..Default::default()
3760 },
3761 )
3762 .await
3763 .unwrap()
3764 .unwrap();
3765 assert_eq!(enriched_id.record.embedding, Some(make_embedding(0.0)));
3766
3767 let results = store.search(&query, Some(10)).await.unwrap();
3769 let ids: Vec<&str> = results.iter().map(|r| r.record.id.as_str()).collect();
3770 assert!(ids.contains(&enriched_ext.record.id.as_str()));
3771 assert!(ids.contains(&enriched_id.record.id.as_str()));
3772 assert_eq!(results[0].record.id, enriched_ext.record.id);
3774 });
3775 }
3776
3777 #[test]
3778 fn relationships_roundtrip_and_support_related_lookup() {
3779 let dir = TempDir::new().unwrap();
3780 let uri = dir.path().to_string_lossy().to_string();
3781 let runtime = tokio::runtime::Runtime::new().unwrap();
3782 runtime.block_on(async {
3783 let mut store = ContextStore::open(&uri).await.unwrap();
3784 let mut related = text_record("related", 0.0);
3785 related.relationships = vec![
3786 Relationship {
3787 target_id: "doc-1#chunk-1".to_string(),
3788 relation: "cites".to_string(),
3789 weight: Some(0.75),
3790 },
3791 Relationship {
3792 target_id: "service-a".to_string(),
3793 relation: "mentions".to_string(),
3794 weight: None,
3795 },
3796 ];
3797 let unrelated = text_record("unrelated", 1.0);
3798 store.add(&[related.clone(), unrelated]).await.unwrap();
3799
3800 let listed = store.list(None, None).await.unwrap();
3801 let roundtrip = listed
3802 .iter()
3803 .find(|record| record.id == related.id)
3804 .unwrap();
3805 assert_eq!(roundtrip.relationships, related.relationships);
3806
3807 let by_target = store
3808 .list_related("doc-1#chunk-1", None, None)
3809 .await
3810 .unwrap();
3811 assert_eq!(by_target.len(), 1);
3812 assert_eq!(by_target[0].id, related.id);
3813
3814 let by_relation = store
3815 .list_related("doc-1#chunk-1", Some("cites"), None)
3816 .await
3817 .unwrap();
3818 assert_eq!(by_relation.len(), 1);
3819 assert_eq!(by_relation[0].id, related.id);
3820
3821 let wrong_relation = store
3822 .list_related("doc-1#chunk-1", Some("mentions"), None)
3823 .await
3824 .unwrap();
3825 assert!(wrong_relation.is_empty());
3826 });
3827 }
3828
3829 #[test]
3830 fn migrate_relationships_column_adds_missing_column() {
3831 let dir = TempDir::new().unwrap();
3832 let uri = dir.path().to_string_lossy().to_string();
3833 let runtime = tokio::runtime::Runtime::new().unwrap();
3834 runtime.block_on(async {
3835 let schema = Arc::new(ContextStore::schema_with_options(
3836 &HashSet::new(),
3837 true,
3838 true,
3839 false,
3840 true,
3841 DEFAULT_EMBEDDING_DIM,
3842 DistanceMetric::default(),
3843 ));
3844 let empty_batch = RecordBatch::new_empty(schema.clone());
3845 let batches = RecordBatchIterator::new(
3846 vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
3847 schema,
3848 );
3849 Dataset::write(
3850 batches,
3851 &uri,
3852 Some(WriteParams {
3853 mode: WriteMode::Create,
3854 ..Default::default()
3855 }),
3856 )
3857 .await
3858 .unwrap();
3859
3860 let mut store = ContextStore::open(&uri).await.unwrap();
3861 assert!(!store.has_relationships_column());
3862
3863 let mut record = text_record("with-relationships", 0.0);
3864 record.relationships.push(Relationship {
3865 target_id: "target".to_string(),
3866 relation: "mentions".to_string(),
3867 weight: None,
3868 });
3869 let err = store.add(std::slice::from_ref(&record)).await.unwrap_err();
3870 assert!(
3871 err.to_string().contains("migrate_relationships_column"),
3872 "unexpected error: {err}"
3873 );
3874
3875 assert!(store.migrate_relationships_column().await.unwrap());
3876 assert!(store.has_relationships_column());
3877 assert!(!store.migrate_relationships_column().await.unwrap());
3878
3879 store.add(std::slice::from_ref(&record)).await.unwrap();
3880 let roundtrip = store.get_by_id(&record.id).await.unwrap().unwrap();
3881 assert_eq!(roundtrip.relationships, record.relationships);
3882 });
3883 }
3884
3885 #[test]
3886 fn add_rejects_duplicate_external_id() {
3887 let dir = TempDir::new().unwrap();
3888 let uri = dir.path().to_string_lossy().to_string();
3889 let runtime = tokio::runtime::Runtime::new().unwrap();
3890 runtime.block_on(async {
3891 let mut store = ContextStore::open(&uri).await.unwrap();
3892 let mut first = text_record("a", 0.0);
3893 first.external_id = Some("doc-123#chunk-1".to_string());
3894 store.add(std::slice::from_ref(&first)).await.unwrap();
3895
3896 let mut duplicate = text_record("b", 0.0);
3897 duplicate.external_id = first.external_id.clone();
3898 let err = store.add(&[duplicate]).await.unwrap_err();
3899 let message = err.to_string();
3900 assert!(
3901 message.contains("external_id") && message.contains("already exists"),
3902 "unexpected error message: {message}"
3903 );
3904 });
3905 }
3906
3907 #[test]
3908 fn add_rejects_reserved_tombstone_content_type() {
3909 let dir = TempDir::new().unwrap();
3910 let uri = dir.path().to_string_lossy().to_string();
3911 let runtime = tokio::runtime::Runtime::new().unwrap();
3912 runtime.block_on(async {
3913 let mut store = ContextStore::open(&uri).await.unwrap();
3914 let mut record = text_record("a", 0.0);
3915 record.content_type = CONTENT_TYPE_TOMBSTONE.to_string();
3916
3917 let err = store.add(&[record]).await.unwrap_err();
3918 let message = err.to_string();
3919 assert!(
3920 message.contains("reserved") && message.contains("tombstone"),
3921 "unexpected error message: {message}"
3922 );
3923 });
3924 }
3925
3926 #[test]
3927 fn add_rejects_duplicate_id_against_existing() {
3928 let dir = TempDir::new().unwrap();
3929 let uri = dir.path().to_string_lossy().to_string();
3930 let runtime = tokio::runtime::Runtime::new().unwrap();
3931 runtime.block_on(async {
3932 let mut store = ContextStore::open(&uri).await.unwrap();
3933 store.add(&[text_record("dup", 0.0)]).await.unwrap();
3934
3935 let err = store.add(&[text_record("dup", 1.0)]).await.unwrap_err();
3936 let message = err.to_string();
3937 assert!(
3938 message.contains("id 'dup'") && message.contains("already exists"),
3939 "unexpected error message: {message}"
3940 );
3941 });
3942 }
3943
3944 #[test]
3945 fn add_rejects_duplicate_id_within_batch() {
3946 let dir = TempDir::new().unwrap();
3947 let uri = dir.path().to_string_lossy().to_string();
3948 let runtime = tokio::runtime::Runtime::new().unwrap();
3949 runtime.block_on(async {
3950 let mut store = ContextStore::open(&uri).await.unwrap();
3951 let err = store
3952 .add(&[text_record("same", 0.0), text_record("same", 1.0)])
3953 .await
3954 .unwrap_err();
3955 let message = err.to_string();
3956 assert!(
3957 message.contains("duplicate id 'same' in batch"),
3958 "unexpected error message: {message}"
3959 );
3960 });
3961 }
3962
3963 #[test]
3964 fn add_rejects_duplicate_external_id_within_batch() {
3965 let dir = TempDir::new().unwrap();
3966 let uri = dir.path().to_string_lossy().to_string();
3967 let runtime = tokio::runtime::Runtime::new().unwrap();
3968 runtime.block_on(async {
3969 let mut store = ContextStore::open(&uri).await.unwrap();
3970 let mut first = text_record("a", 0.0);
3971 first.external_id = Some("ext".to_string());
3972 let mut second = text_record("b", 1.0);
3973 second.external_id = Some("ext".to_string());
3974
3975 let err = store.add(&[first, second]).await.unwrap_err();
3976 let message = err.to_string();
3977 assert!(
3978 message.contains("duplicate external_id 'ext' in batch"),
3979 "unexpected error message: {message}"
3980 );
3981 });
3982 }
3983
3984 #[test]
3988 fn add_allows_external_id_reuse_after_delete() {
3989 let dir = TempDir::new().unwrap();
3990 let uri = dir.path().to_string_lossy().to_string();
3991 let runtime = tokio::runtime::Runtime::new().unwrap();
3992 runtime.block_on(async {
3993 let mut store = ContextStore::open(&uri).await.unwrap();
3994 let mut first = text_record("a", 0.0);
3995 first.external_id = Some("ext".to_string());
3996 store.add(std::slice::from_ref(&first)).await.unwrap();
3997 assert!(store.delete_by_external_id("ext").await.unwrap());
3998
3999 let mut reused = text_record("b", 1.0);
4000 reused.external_id = Some("ext".to_string());
4001 store
4002 .add(std::slice::from_ref(&reused))
4003 .await
4004 .expect("external_id should be reusable after delete");
4005
4006 let visible = store.get_by_external_id("ext").await.unwrap().unwrap();
4007 assert_eq!(visible.id, reused.id);
4008 });
4009 }
4010
4011 #[test]
4013 fn add_allows_id_reuse_after_delete() {
4014 let dir = TempDir::new().unwrap();
4015 let uri = dir.path().to_string_lossy().to_string();
4016 let runtime = tokio::runtime::Runtime::new().unwrap();
4017 runtime.block_on(async {
4018 let mut store = ContextStore::open(&uri).await.unwrap();
4019 let first = text_record("dup", 0.0);
4020 store.add(std::slice::from_ref(&first)).await.unwrap();
4021 assert!(store.delete_by_id("dup").await.unwrap());
4022
4023 store
4024 .add(&[text_record("dup", 1.0)])
4025 .await
4026 .expect("id should be reusable after delete");
4027
4028 let visible = store.get_by_id("dup").await.unwrap().unwrap();
4029 assert_eq!(visible.id, "dup");
4030 });
4031 }
4032
4033 #[test]
4036 fn add_rejects_external_id_after_supersede() {
4037 let dir = TempDir::new().unwrap();
4038 let uri = dir.path().to_string_lossy().to_string();
4039 let runtime = tokio::runtime::Runtime::new().unwrap();
4040 runtime.block_on(async {
4041 let mut store = ContextStore::open(&uri).await.unwrap();
4042 let mut first = text_record("a", 0.0);
4043 first.external_id = Some("ext".to_string());
4044 store.upsert_by_external_id(first).await.unwrap();
4045
4046 let mut successor = text_record("b", 1.0);
4047 successor.external_id = Some("ext".to_string());
4048 store.upsert_by_external_id(successor).await.unwrap();
4049
4050 let mut conflict = text_record("c", 2.0);
4053 conflict.external_id = Some("ext".to_string());
4054 let err = store.add(&[conflict]).await.unwrap_err();
4055 let message = err.to_string();
4056 assert!(
4057 message.contains("external_id 'ext'") && message.contains("already exists"),
4058 "unexpected error message: {message}"
4059 );
4060 });
4061 }
4062
4063 #[test]
4067 fn validate_uniqueness_with_btree_index() {
4068 let dir = TempDir::new().unwrap();
4069 let uri = dir.path().to_string_lossy().to_string();
4070 let runtime = tokio::runtime::Runtime::new().unwrap();
4071 runtime.block_on(async {
4072 let options = ContextStoreOptions {
4073 id_index_type: IdIndexType::BTree,
4074 ..Default::default()
4075 };
4076 let mut store = ContextStore::open_with_options(&uri, options)
4077 .await
4078 .unwrap();
4079
4080 let mut first = text_record("idx-a", 0.0);
4081 first.external_id = Some("ext".to_string());
4082 store.add(std::slice::from_ref(&first)).await.unwrap();
4083
4084 let dup_id = store.add(&[text_record("idx-a", 1.0)]).await.unwrap_err();
4086 assert!(
4087 dup_id.to_string().contains("id 'idx-a'")
4088 && dup_id.to_string().contains("already exists")
4089 );
4090
4091 let mut dup_ext = text_record("idx-b", 1.0);
4093 dup_ext.external_id = Some("ext".to_string());
4094 let dup_ext_err = store.add(&[dup_ext]).await.unwrap_err();
4095 assert!(
4096 dup_ext_err.to_string().contains("external_id 'ext'")
4097 && dup_ext_err.to_string().contains("already exists")
4098 );
4099
4100 assert!(store.delete_by_id("idx-a").await.unwrap());
4102 let mut reused = text_record("idx-a", 2.0);
4103 reused.external_id = Some("ext".to_string());
4104 store
4105 .add(std::slice::from_ref(&reused))
4106 .await
4107 .expect("keys should be reusable after delete with index configured");
4108 });
4109 }
4110
4111 #[test]
4115 fn validate_uniqueness_against_large_store() {
4116 let dir = TempDir::new().unwrap();
4117 let uri = dir.path().to_string_lossy().to_string();
4118 let runtime = tokio::runtime::Runtime::new().unwrap();
4119 runtime.block_on(async {
4120 let options = ContextStoreOptions {
4121 id_index_type: IdIndexType::BTree,
4122 ..Default::default()
4123 };
4124 let mut store = ContextStore::open_with_options(&uri, options)
4125 .await
4126 .unwrap();
4127
4128 for i in 0..300 {
4129 let mut record = text_record(&format!("rec-{i}"), i as f32);
4130 record.external_id = Some(format!("ext-{i}"));
4131 store.add(std::slice::from_ref(&record)).await.unwrap();
4132 }
4133
4134 let mut dup = text_record("rec-150", 0.0);
4136 dup.external_id = Some("ext-999".to_string());
4137 assert!(store
4138 .add(&[dup])
4139 .await
4140 .unwrap_err()
4141 .to_string()
4142 .contains("id 'rec-150'"));
4143
4144 let mut dup_ext = text_record("rec-new", 0.0);
4145 dup_ext.external_id = Some("ext-42".to_string());
4146 assert!(store
4147 .add(&[dup_ext])
4148 .await
4149 .unwrap_err()
4150 .to_string()
4151 .contains("external_id 'ext-42'"));
4152
4153 let mut fresh = text_record("rec-300", 0.0);
4155 fresh.external_id = Some("ext-300".to_string());
4156 store.add(std::slice::from_ref(&fresh)).await.unwrap();
4157 assert!(store.get_by_id("rec-300").await.unwrap().is_some());
4158 });
4159 }
4160
4161 #[test]
4166 #[ignore = "timing-sensitive benchmark; run explicitly with --ignored"]
4167 fn append_cost_does_not_grow_linearly() {
4168 use std::time::Instant;
4169
4170 let dir = TempDir::new().unwrap();
4171 let uri = dir.path().to_string_lossy().to_string();
4172 let runtime = tokio::runtime::Runtime::new().unwrap();
4173 runtime.block_on(async {
4174 let options = ContextStoreOptions {
4175 id_index_type: IdIndexType::BTree,
4176 ..Default::default()
4177 };
4178 let mut store = ContextStore::open_with_options(&uri, options)
4179 .await
4180 .unwrap();
4181
4182 async fn time_window(store: &mut ContextStore, tag: &str, window: usize) -> f64 {
4186 store.compact(None).await.unwrap();
4187 let start = Instant::now();
4188 for i in 0..window {
4189 let id = format!("{tag}-probe-{i}");
4190 store.add(&[text_record(&id, i as f32)]).await.unwrap();
4191 }
4192 start.elapsed().as_secs_f64() / window as f64
4193 }
4194
4195 async fn seed(store: &mut ContextStore, tag: &str, count: usize) {
4199 let chunk = 100;
4200 let mut i = 0;
4201 while i < count {
4202 let batch: Vec<ContextRecord> = (i..(i + chunk).min(count))
4203 .map(|j| text_record(&format!("{tag}-seed-{j}"), j as f32))
4204 .collect();
4205 store.add(&batch).await.unwrap();
4206 i += chunk;
4207 }
4208 store.compact(None).await.unwrap();
4209 }
4210
4211 let window = 30;
4212 seed(&mut store, "small", 100).await;
4213 let small = time_window(&mut store, "small", window).await;
4214
4215 seed(&mut store, "big", 2000).await;
4216 let large = time_window(&mut store, "big", window).await;
4217
4218 let ratio = large / small.max(f64::EPSILON);
4219 eprintln!(
4220 "append per-call: small={small:.6}s large={large:.6}s ratio={ratio:.2} (store grew ~20x)"
4221 );
4222 assert!(
4223 ratio < 8.0,
4224 "append cost appears to scale with store size (ratio {ratio:.2}); \
4225 expected roughly constant per-call validation"
4226 );
4227 });
4228 }
4229
4230 #[test]
4234 fn validation_handles_external_id_with_single_quote() {
4235 let dir = TempDir::new().unwrap();
4236 let uri = dir.path().to_string_lossy().to_string();
4237 let runtime = tokio::runtime::Runtime::new().unwrap();
4238 runtime.block_on(async {
4239 let mut store = ContextStore::open(&uri).await.unwrap();
4240 let tricky = "o'brien#chunk-1";
4241
4242 let mut first = text_record("a", 0.0);
4243 first.external_id = Some(tricky.to_string());
4244 store.add(std::slice::from_ref(&first)).await.unwrap();
4245
4246 let mut dup = text_record("b", 1.0);
4248 dup.external_id = Some(tricky.to_string());
4249 let err = store.add(&[dup]).await.unwrap_err();
4250 assert!(
4251 err.to_string().contains("already exists"),
4252 "unexpected error message: {err}"
4253 );
4254
4255 let mut other = text_record("c", 2.0);
4257 other.external_id = Some("d'angelo#chunk-2".to_string());
4258 store.add(std::slice::from_ref(&other)).await.unwrap();
4259 assert!(store
4260 .get_by_external_id("d'angelo#chunk-2")
4261 .await
4262 .unwrap()
4263 .is_some());
4264 });
4265 }
4266
4267 #[test]
4268 fn delete_by_external_id_hides_record_from_default_reads() {
4269 let dir = TempDir::new().unwrap();
4270 let uri = dir.path().to_string_lossy().to_string();
4271 let runtime = tokio::runtime::Runtime::new().unwrap();
4272 runtime.block_on(async {
4273 let mut store = ContextStore::open(&uri).await.unwrap();
4274 let mut first = text_record("a", 0.0);
4275 first.external_id = Some("doc-123#chunk-1".to_string());
4276 let second = text_record("b", 2.0);
4277 store.add(&[first.clone(), second.clone()]).await.unwrap();
4278
4279 assert!(store
4280 .delete_by_external_id("doc-123#chunk-1")
4281 .await
4282 .unwrap());
4283
4284 assert!(store
4285 .get_by_external_id("doc-123#chunk-1")
4286 .await
4287 .unwrap()
4288 .is_none());
4289 assert!(store.get_by_id(&first.id).await.unwrap().is_none());
4290
4291 let records = store.list(None, None).await.unwrap();
4292 assert_eq!(records.len(), 1);
4293 assert_eq!(records[0].id, second.id);
4294
4295 let query = make_embedding(0.0);
4296 let hits = store.search(&query, Some(10)).await.unwrap();
4297 assert_eq!(hits.len(), 1);
4298 assert_eq!(hits[0].record.id, second.id);
4299 });
4300 }
4301
4302 #[test]
4303 fn delete_by_id_hides_record_from_default_reads() {
4304 let dir = TempDir::new().unwrap();
4305 let uri = dir.path().to_string_lossy().to_string();
4306 let runtime = tokio::runtime::Runtime::new().unwrap();
4307 runtime.block_on(async {
4308 let mut store = ContextStore::open(&uri).await.unwrap();
4309 let mut first = text_record("a", 0.0);
4310 first.external_id = Some("doc-123#chunk-1".to_string());
4311 let second = text_record("b", 2.0);
4312 store.add(&[first.clone(), second.clone()]).await.unwrap();
4313
4314 assert!(store.delete_by_id(&first.id).await.unwrap());
4315
4316 assert!(store.get_by_id(&first.id).await.unwrap().is_none());
4317 assert!(store
4318 .get_by_external_id("doc-123#chunk-1")
4319 .await
4320 .unwrap()
4321 .is_none());
4322
4323 let records = store.list(None, None).await.unwrap();
4324 assert_eq!(records.len(), 1);
4325 assert_eq!(records[0].id, second.id);
4326
4327 let query = make_embedding(0.0);
4328 let hits = store.search(&query, Some(10)).await.unwrap();
4329 assert_eq!(hits.len(), 1);
4330 assert_eq!(hits[0].record.id, second.id);
4331 });
4332 }
4333
4334 #[test]
4335 fn delete_missing_id_is_noop() {
4336 let dir = TempDir::new().unwrap();
4337 let uri = dir.path().to_string_lossy().to_string();
4338 let runtime = tokio::runtime::Runtime::new().unwrap();
4339 runtime.block_on(async {
4340 let mut store = ContextStore::open(&uri).await.unwrap();
4341 assert!(!store.delete_by_id("missing").await.unwrap());
4342 assert!(!store.delete_by_external_id("missing").await.unwrap());
4343 });
4344 }
4345
4346 #[test]
4347 fn external_id_can_be_reused_after_delete() {
4348 let dir = TempDir::new().unwrap();
4349 let uri = dir.path().to_string_lossy().to_string();
4350 let runtime = tokio::runtime::Runtime::new().unwrap();
4351 runtime.block_on(async {
4352 let mut store = ContextStore::open(&uri).await.unwrap();
4353 let mut first = text_record("a", 0.0);
4354 first.external_id = Some("doc-123#chunk-1".to_string());
4355 store.add(std::slice::from_ref(&first)).await.unwrap();
4356 assert!(store
4357 .delete_by_external_id("doc-123#chunk-1")
4358 .await
4359 .unwrap());
4360
4361 let mut replacement = text_record("b", 1.0);
4362 replacement.external_id = first.external_id.clone();
4363 store.add(std::slice::from_ref(&replacement)).await.unwrap();
4364
4365 let by_external_id = store
4366 .get_by_external_id("doc-123#chunk-1")
4367 .await
4368 .unwrap()
4369 .unwrap();
4370 assert_eq!(by_external_id.id, replacement.id);
4371 assert_eq!(store.list(None, None).await.unwrap().len(), 1);
4372 });
4373 }
4374
4375 #[test]
4376 fn test_region_id_derivation_explicit() {
4377 let bot_id = Some("bot-123".to_string());
4378 let session_id = Some("session-456".to_string());
4379
4380 let region_id_1 = ContextStore::derive_region_id(&bot_id, &session_id);
4381 let region_id_2 = ContextStore::derive_region_id(&bot_id, &session_id);
4382
4383 assert_eq!(
4384 region_id_1, region_id_2,
4385 "Region ID should be deterministic for same inputs"
4386 );
4387
4388 let other_session = Some("session-789".to_string());
4389 let region_id_3 = ContextStore::derive_region_id(&bot_id, &other_session);
4390
4391 assert_ne!(
4392 region_id_1, region_id_3,
4393 "Region ID should differ for different inputs"
4394 );
4395
4396 let region_id_none = ContextStore::derive_region_id(&None, &None);
4398 let region_id_none_2 = ContextStore::derive_region_id(&None, &None);
4399 assert_eq!(
4400 region_id_none, region_id_none_2,
4401 "Region ID for None/None should be deterministic"
4402 );
4403 }
4404
4405 #[test]
4406 fn test_add_multiple_regions() {
4407 let dir = TempDir::new().unwrap();
4408 let uri = dir.path().to_string_lossy().to_string();
4409 let runtime = tokio::runtime::Runtime::new().unwrap();
4410
4411 runtime.block_on(async {
4412 let mut store = ContextStore::open(&uri).await.unwrap();
4413
4414 let mut record1 = text_record("r1", 0.0);
4416 record1.bot_id = Some("bot-A".to_string());
4417 record1.session_id = Some("session-1".to_string());
4418
4419 let mut record2 = text_record("r2", 0.0);
4420 record2.bot_id = Some("bot-B".to_string());
4421 record2.session_id = Some("session-2".to_string());
4422
4423 store
4425 .add(&[record1.clone(), record2.clone()])
4426 .await
4427 .unwrap();
4428
4429 let store = ContextStore::open(&uri).await.unwrap();
4431
4432 let results = store.list(None, None).await.unwrap();
4434 assert_eq!(results.len(), 2);
4435
4436 let ids: Vec<String> = results.iter().map(|r| r.id.clone()).collect();
4437 assert!(ids.contains(&"r1".to_string()));
4438 assert!(ids.contains(&"r2".to_string()));
4439 });
4440 }
4441
4442 #[test]
4443 fn test_blob_binary_payload() {
4444 let dir = TempDir::new().unwrap();
4445 let uri = dir.path().to_string_lossy().to_string();
4446 let runtime = tokio::runtime::Runtime::new().unwrap();
4447
4448 runtime.block_on(async {
4449 let options = ContextStoreOptions {
4450 blob_columns: HashSet::from(["binary_payload".to_string()]),
4451 ..Default::default()
4452 };
4453 let mut store = ContextStore::open_with_options(&uri, options)
4454 .await
4455 .unwrap();
4456
4457 let mut record = text_record("blob-bin-1", 0.0);
4458 record.binary_payload = Some(vec![0xDE, 0xAD, 0xBE, 0xEF]);
4459 store.add(std::slice::from_ref(&record)).await.unwrap();
4460
4461 let schema = ContextStore::schema(&store.blob_columns);
4463 let field = schema.field_with_name("binary_payload").unwrap();
4464 assert_eq!(
4465 field.metadata().get("lance-encoding:blob"),
4466 Some(&"true".to_string()),
4467 );
4468 let text_field = schema.field_with_name("text_payload").unwrap();
4470 assert_eq!(text_field.data_type(), &DataType::LargeUtf8);
4471 assert!(text_field.metadata().get("lance-encoding:blob").is_none());
4472 });
4473 }
4474
4475 #[test]
4476 fn test_blob_text_payload() {
4477 let dir = TempDir::new().unwrap();
4478 let uri = dir.path().to_string_lossy().to_string();
4479 let runtime = tokio::runtime::Runtime::new().unwrap();
4480
4481 runtime.block_on(async {
4482 let options = ContextStoreOptions {
4483 blob_columns: HashSet::from(["text_payload".to_string()]),
4484 ..Default::default()
4485 };
4486 let mut store = ContextStore::open_with_options(&uri, options)
4487 .await
4488 .unwrap();
4489
4490 let record = text_record("blob-txt-1", 0.0);
4491 store.add(std::slice::from_ref(&record)).await.unwrap();
4492
4493 let batch = store
4495 .records_to_batch(std::slice::from_ref(&record))
4496 .unwrap();
4497 let batch_schema = batch.schema();
4498 let text_field = batch_schema.field_with_name("text_payload").unwrap();
4499 assert_eq!(
4500 text_field.data_type(),
4501 &DataType::LargeBinary,
4502 "text_payload should be LargeBinary when blob-encoded"
4503 );
4504
4505 let roundtripped = batch_to_records(&batch).unwrap();
4506 assert_eq!(roundtripped.len(), 1);
4507 assert_eq!(
4508 roundtripped[0].text_payload, record.text_payload,
4509 "text payload should survive blob roundtrip"
4510 );
4511 });
4512 }
4513
4514 #[test]
4515 fn test_blob_both_columns() {
4516 let dir = TempDir::new().unwrap();
4517 let uri = dir.path().to_string_lossy().to_string();
4518 let runtime = tokio::runtime::Runtime::new().unwrap();
4519
4520 runtime.block_on(async {
4521 let options = ContextStoreOptions {
4522 blob_columns: HashSet::from([
4523 "text_payload".to_string(),
4524 "binary_payload".to_string(),
4525 ]),
4526 ..Default::default()
4527 };
4528 let mut store = ContextStore::open_with_options(&uri, options)
4529 .await
4530 .unwrap();
4531
4532 let mut record = text_record("blob-both-1", 0.0);
4533 record.binary_payload = Some(b"hello binary".to_vec());
4534 store.add(std::slice::from_ref(&record)).await.unwrap();
4535
4536 let schema = ContextStore::schema(&store.blob_columns);
4538 let text_field = schema.field_with_name("text_payload").unwrap();
4539 let bin_field = schema.field_with_name("binary_payload").unwrap();
4540 assert_eq!(
4541 text_field.metadata().get("lance-encoding:blob"),
4542 Some(&"true".to_string()),
4543 );
4544 assert_eq!(
4545 bin_field.metadata().get("lance-encoding:blob"),
4546 Some(&"true".to_string()),
4547 );
4548
4549 let batch = store
4551 .records_to_batch(std::slice::from_ref(&record))
4552 .unwrap();
4553 let roundtripped = batch_to_records(&batch).unwrap();
4554 assert_eq!(roundtripped.len(), 1);
4555 assert_eq!(roundtripped[0].text_payload, record.text_payload);
4556 assert_eq!(roundtripped[0].binary_payload, record.binary_payload);
4557 });
4558 }
4559
4560 #[test]
4561 fn test_no_blob_default() {
4562 let schema = ContextStore::schema(&HashSet::new());
4564 let text_field = schema.field_with_name("text_payload").unwrap();
4565 let bin_field = schema.field_with_name("binary_payload").unwrap();
4566
4567 assert_eq!(text_field.data_type(), &DataType::LargeUtf8);
4568 assert!(text_field.metadata().get("lance-encoding:blob").is_none());
4569 assert_eq!(bin_field.data_type(), &DataType::LargeBinary);
4570 assert!(bin_field.metadata().get("lance-encoding:blob").is_none());
4571 }
4572
4573 #[test]
4574 fn test_blob_schema_metadata() {
4575 let blob_columns =
4576 HashSet::from(["text_payload".to_string(), "binary_payload".to_string()]);
4577 let schema = ContextStore::schema(&blob_columns);
4578
4579 let text_field = schema.field_with_name("text_payload").unwrap();
4580 assert_eq!(text_field.data_type(), &DataType::LargeBinary);
4581 assert_eq!(
4582 text_field.metadata().get("lance-encoding:blob"),
4583 Some(&"true".to_string()),
4584 );
4585
4586 let bin_field = schema.field_with_name("binary_payload").unwrap();
4587 assert_eq!(bin_field.data_type(), &DataType::LargeBinary);
4588 assert_eq!(
4589 bin_field.metadata().get("lance-encoding:blob"),
4590 Some(&"true".to_string()),
4591 );
4592
4593 let id_field = schema.field_with_name("id").unwrap();
4595 assert!(id_field.metadata().get("lance-encoding:blob").is_none());
4596 }
4597
4598 #[test]
4599 fn test_blob_invalid_column_name() {
4600 let dir = TempDir::new().unwrap();
4601 let uri = dir.path().to_string_lossy().to_string();
4602 let runtime = tokio::runtime::Runtime::new().unwrap();
4603
4604 runtime.block_on(async {
4605 let options = ContextStoreOptions {
4606 blob_columns: HashSet::from(["nonexistent_column".to_string()]),
4607 ..Default::default()
4608 };
4609 let result = ContextStore::open_with_options(&uri, options).await;
4610 assert!(result.is_err(), "should reject invalid blob column names");
4611 let err_msg = result.err().unwrap().to_string();
4612 assert!(
4613 err_msg.contains("invalid blob column"),
4614 "error should mention invalid blob column: {err_msg}"
4615 );
4616 });
4617 }
4618
4619 #[test]
4620 fn test_batch_to_records_autodetects_text_type() {
4621 let runtime = tokio::runtime::Runtime::new().unwrap();
4624 runtime.block_on(async {
4625 let dir1 = TempDir::new().unwrap();
4627 let uri1 = dir1.path().to_string_lossy().to_string();
4628 let store_default = ContextStore::open(&uri1).await.unwrap();
4629 let record = text_record("auto-1", 0.0);
4630 let batch_utf8 = store_default
4631 .records_to_batch(std::slice::from_ref(&record))
4632 .unwrap();
4633 let results_utf8 = batch_to_records(&batch_utf8).unwrap();
4634 assert_eq!(results_utf8[0].text_payload, record.text_payload);
4635
4636 let dir2 = TempDir::new().unwrap();
4638 let uri2 = dir2.path().to_string_lossy().to_string();
4639 let options = ContextStoreOptions {
4640 blob_columns: HashSet::from(["text_payload".to_string()]),
4641 ..Default::default()
4642 };
4643 let store_blob = ContextStore::open_with_options(&uri2, options)
4644 .await
4645 .unwrap();
4646 let batch_binary = store_blob
4647 .records_to_batch(std::slice::from_ref(&record))
4648 .unwrap();
4649 let results_binary = batch_to_records(&batch_binary).unwrap();
4650 assert_eq!(results_binary[0].text_payload, record.text_payload);
4651 });
4652 }
4653
4654 #[test]
4655 fn test_id_index_btree() {
4656 let dir = TempDir::new().unwrap();
4657 let uri = dir.path().to_string_lossy().to_string();
4658 let runtime = tokio::runtime::Runtime::new().unwrap();
4659
4660 runtime.block_on(async {
4661 let options = ContextStoreOptions {
4662 id_index_type: IdIndexType::BTree,
4663 ..Default::default()
4664 };
4665 let mut store = ContextStore::open_with_options(&uri, options)
4666 .await
4667 .unwrap();
4668
4669 let indices = store.dataset.load_indices().await.unwrap();
4671 assert!(
4672 indices.iter().any(|i| i.name == ID_INDEX_NAME),
4673 "btree index should be created on open"
4674 );
4675
4676 for i in 0..5 {
4678 store
4679 .add(&[text_record(&format!("btree-{i}"), i as f32)])
4680 .await
4681 .unwrap();
4682 }
4683 store.compact(None).await.unwrap();
4684
4685 let indices = store.dataset.load_indices().await.unwrap();
4687 assert!(
4688 indices.iter().any(|i| i.name == ID_INDEX_NAME),
4689 "btree index should persist after compaction"
4690 );
4691 });
4692 }
4693
4694 #[test]
4695 fn test_id_index_zonemap() {
4696 let dir = TempDir::new().unwrap();
4697 let uri = dir.path().to_string_lossy().to_string();
4698 let runtime = tokio::runtime::Runtime::new().unwrap();
4699
4700 runtime.block_on(async {
4701 let options = ContextStoreOptions {
4702 id_index_type: IdIndexType::ZoneMap,
4703 ..Default::default()
4704 };
4705 let mut store = ContextStore::open_with_options(&uri, options)
4706 .await
4707 .unwrap();
4708
4709 let indices = store.dataset.load_indices().await.unwrap();
4711 assert!(
4712 indices.iter().any(|i| i.name == ID_INDEX_NAME),
4713 "zonemap index should be created on open"
4714 );
4715
4716 for i in 0..5 {
4717 store
4718 .add(&[text_record(&format!("zm-{i}"), i as f32)])
4719 .await
4720 .unwrap();
4721 }
4722 store.compact(None).await.unwrap();
4723
4724 let indices = store.dataset.load_indices().await.unwrap();
4725 assert!(
4726 indices.iter().any(|i| i.name == ID_INDEX_NAME),
4727 "zonemap index should persist after compaction"
4728 );
4729 });
4730 }
4731
4732 #[test]
4733 fn test_id_index_none_by_default() {
4734 let dir = TempDir::new().unwrap();
4735 let uri = dir.path().to_string_lossy().to_string();
4736 let runtime = tokio::runtime::Runtime::new().unwrap();
4737
4738 runtime.block_on(async {
4739 let mut store = ContextStore::open(&uri).await.unwrap();
4740
4741 store.add(&[text_record("no-idx-1", 0.0)]).await.unwrap();
4742 store.compact(None).await.unwrap();
4743
4744 let indices = store.dataset.load_indices().await.unwrap();
4745 assert!(
4746 !indices.iter().any(|i| i.name == ID_INDEX_NAME),
4747 "no id index should be created when IdIndexType::None"
4748 );
4749 });
4750 }
4751
4752 #[test]
4753 fn test_id_index_idempotent() {
4754 let dir = TempDir::new().unwrap();
4755 let uri = dir.path().to_string_lossy().to_string();
4756 let runtime = tokio::runtime::Runtime::new().unwrap();
4757
4758 runtime.block_on(async {
4759 let options = ContextStoreOptions {
4760 id_index_type: IdIndexType::BTree,
4761 ..Default::default()
4762 };
4763 let mut store = ContextStore::open_with_options(&uri, options)
4764 .await
4765 .unwrap();
4766
4767 for i in 0..5 {
4768 store
4769 .add(&[text_record(&format!("idem-{i}"), i as f32)])
4770 .await
4771 .unwrap();
4772 }
4773
4774 store.create_id_index().await.unwrap();
4776 let v1 = store.version();
4777 store.ensure_id_index().await.unwrap();
4778 let v2 = store.version();
4779 assert_eq!(v1, v2, "ensure_id_index should not recreate existing index");
4780 });
4781 }
4782}