Skip to main content

lance_context_core/
store.rs

1use std::cmp::Ordering;
2use std::collections::{HashMap, HashSet};
3use std::sync::Arc;
4use std::time::Duration;
5
6use arrow_array::builder::{
7    FixedSizeListBuilder, Float32Builder, Int32Builder, LargeBinaryBuilder, LargeStringBuilder,
8    ListBuilder, StringBuilder, StringDictionaryBuilder, StructBuilder,
9    TimestampMicrosecondBuilder,
10};
11use arrow_array::types::Int8Type;
12use arrow_array::{
13    Array, ArrayRef, DictionaryArray, FixedSizeListArray, Float32Array, Int32Array,
14    LargeBinaryArray, LargeStringArray, ListArray, RecordBatch, RecordBatchIterator, StringArray,
15    StructArray, TimestampMicrosecondArray,
16};
17use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, TimeUnit};
18use chrono::{DateTime, Timelike, Utc};
19use futures::TryStreamExt;
20use lance::dataset::mem_wal::{
21    DatasetMemWalExt, LsmScanner, ShardManifestStore, ShardSnapshot, ShardWriterConfig,
22};
23use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptions};
24use lance::dataset::NewColumnTransform;
25use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams};
26use lance::index::DatasetIndexExt;
27use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
28use lance::{Error as LanceError, Result as LanceResult};
29use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
30use lance_index::scalar::ScalarIndexParams;
31use lance_index::IndexType;
32use tokio::sync::Mutex;
33use tokio::task::JoinHandle;
34use tracing::{error, info, warn};
35use uuid::Uuid;
36
37use crate::record::{
38    ContextRecord, LifecycleQueryOptions, RecordFilters, RecordPatch, Relationship, RetrieveResult,
39    SearchResult, StateMetadata, UpdateResult, UpsertResult, LIFECYCLE_ACTIVE,
40};
41use crate::serde::CONTENT_TYPE_TOMBSTONE;
42
43/// Embedding length used for the semantic index column.
44const DEFAULT_EMBEDDING_DIM: i32 = 1536;
45const DEFAULT_SEARCH_LIMIT: usize = 10;
46const DEFAULT_MANIFEST_SCAN_BATCH_SIZE: usize = 16;
47const RRF_K: f32 = 60.0;
48const ID_INDEX_NAME: &str = "id_idx";
49const RELATIONSHIPS_COLUMN: &str = "relationships";
50/// Schema-metadata key under which the configured [`DistanceMetric`] is persisted
51/// so it round-trips on `open` without being re-specified by the caller.
52const DISTANCE_METRIC_METADATA_KEY: &str = "lance-context:distance_metric";
53
54/// Configuration for background compaction.
55#[derive(Debug, Clone)]
56pub struct CompactionConfig {
57    /// Whether background compaction is enabled.
58    pub enabled: bool,
59    /// Minimum number of fragments to trigger compaction.
60    pub min_fragments: usize,
61    /// Target rows per fragment after compaction.
62    pub target_rows_per_fragment: usize,
63    /// Maximum rows per row group.
64    pub max_rows_per_group: usize,
65    /// Whether to materialize (remove) deleted rows during compaction.
66    pub materialize_deletions: bool,
67    /// Deletion threshold (0.0-1.0) to trigger materialization.
68    pub materialize_deletions_threshold: f32,
69    /// Number of threads for compaction (None = auto).
70    pub num_threads: Option<usize>,
71    /// Interval in seconds between compaction checks.
72    pub check_interval_secs: u64,
73    /// Quiet hours during which compaction is skipped [(start_hour, end_hour)].
74    pub quiet_hours: Vec<(u8, u8)>,
75}
76
77impl Default for CompactionConfig {
78    fn default() -> Self {
79        Self {
80            enabled: false,
81            min_fragments: 5,
82            target_rows_per_fragment: 1_000_000,
83            max_rows_per_group: 1024,
84            materialize_deletions: true,
85            materialize_deletions_threshold: 0.1,
86            num_threads: None,
87            check_interval_secs: 300,
88            quiet_hours: vec![],
89        }
90    }
91}
92
93/// Type of scalar index on the `id` column.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
95pub enum IdIndexType {
96    /// No index on the id column.
97    #[default]
98    None,
99    /// Zone-map index (min/max per fragment, lightweight).
100    ZoneMap,
101    /// B-tree index (point lookups, heavier).
102    BTree,
103}
104
105/// Distance metric used to rank candidates during vector search.
106///
107/// All variants are normalized so that a **smaller** value means a closer
108/// match, keeping the search ranking ascending regardless of metric.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
110pub enum DistanceMetric {
111    /// Euclidean (L2) distance. Default for backward compatibility.
112    #[default]
113    L2,
114    /// Cosine distance (`1 - cosine_similarity`). Common for normalized
115    /// embeddings from most modern models.
116    Cosine,
117    /// Negated dot product (maximum inner product search).
118    Dot,
119}
120
121impl DistanceMetric {
122    /// Parse a metric from its string identifier (`"l2"`, `"cosine"`, `"dot"`).
123    /// Matching is case-insensitive.
124    ///
125    /// # Errors
126    /// Returns an error if the identifier is not a recognized metric.
127    pub fn parse(value: &str) -> LanceResult<Self> {
128        match value.to_ascii_lowercase().as_str() {
129            "l2" | "euclidean" => Ok(Self::L2),
130            "cosine" => Ok(Self::Cosine),
131            "dot" | "dot_product" => Ok(Self::Dot),
132            other => Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
133                "invalid distance metric '{other}': valid values are 'l2', 'cosine', 'dot'"
134            )))),
135        }
136    }
137
138    /// Compute the metric between a query and a candidate vector.
139    ///
140    /// The returned value is always "smaller is better".
141    #[must_use]
142    pub fn distance(self, query: &[f32], candidate: &[f32]) -> f32 {
143        match self {
144            Self::L2 => l2_distance(query, candidate),
145            Self::Cosine => cosine_distance(query, candidate),
146            Self::Dot => dot_distance(query, candidate),
147        }
148    }
149
150    /// Stable string identifier for this metric, used when persisting it in
151    /// dataset schema metadata. Round-trips through [`DistanceMetric::parse`].
152    #[must_use]
153    pub fn as_str(self) -> &'static str {
154        match self {
155            Self::L2 => "l2",
156            Self::Cosine => "cosine",
157            Self::Dot => "dot",
158        }
159    }
160}
161
162/// Statistics about compaction status and history.
163#[derive(Debug, Clone)]
164pub struct CompactionStats {
165    /// Current number of fragments in the dataset.
166    pub total_fragments: usize,
167    /// Whether a compaction is currently in progress.
168    pub is_compacting: bool,
169    /// Timestamp of the last successful compaction.
170    pub last_compaction: Option<DateTime<Utc>>,
171    /// Error message from the last failed compaction.
172    pub last_error: Option<String>,
173    /// Total number of successful compactions performed.
174    pub total_compactions: u64,
175}
176
177/// Internal state for tracking background compaction.
178struct CompactionState {
179    background_task: Option<JoinHandle<()>>,
180    is_compacting: bool,
181    last_compaction: Option<DateTime<Utc>>,
182    last_error: Option<String>,
183    total_compactions: u64,
184}
185
186/// Valid column names that may use blob encoding.
187const VALID_BLOB_COLUMNS: &[&str] = &["text_payload", "binary_payload"];
188
189/// Persistent Lance-backed context store.
190#[derive(Clone)]
191pub struct ContextStore {
192    dataset: Dataset,
193    compaction_state: Arc<Mutex<CompactionState>>,
194    pub compaction_config: CompactionConfig,
195    blob_columns: HashSet<String>,
196    id_index_type: IdIndexType,
197    embedding_dim: i32,
198    distance_metric: DistanceMetric,
199}
200
201/// Additional configuration when opening a [`ContextStore`].
202#[derive(Debug, Clone, Default)]
203pub struct ContextStoreOptions {
204    pub storage_options: Option<HashMap<String, String>>,
205    pub compaction: CompactionConfig,
206    /// Width of the fixed-size embedding vector for newly-created datasets.
207    /// Existing datasets always use the dimension persisted in their schema.
208    pub embedding_dim: Option<i32>,
209    /// Column names that should use Lance V1 blob encoding.
210    /// Valid values: `"text_payload"`, `"binary_payload"`.
211    pub blob_columns: HashSet<String>,
212    /// Type of scalar index to create on the `id` column.
213    pub id_index_type: IdIndexType,
214    /// Distance metric used to rank vector-search results.
215    ///
216    /// For newly-created datasets this is persisted in the schema metadata and
217    /// becomes the dataset's metric. For existing datasets the persisted metric
218    /// is used; passing a different metric here is an error. `None` defaults to
219    /// the persisted metric (or `L2` for datasets created before persistence).
220    pub distance_metric: Option<DistanceMetric>,
221}
222
223impl ContextStoreOptions {
224    #[must_use]
225    pub fn storage_options(&self) -> Option<HashMap<String, String>> {
226        self.storage_options.clone()
227    }
228}
229
230fn relationship_struct_fields() -> Vec<Field> {
231    vec![
232        Field::new("target_id", DataType::Utf8, true),
233        Field::new("relation", DataType::Utf8, true),
234        Field::new("weight", DataType::Float32, true),
235    ]
236}
237
238fn relationship_struct_data_type() -> DataType {
239    DataType::Struct(relationship_struct_fields().into())
240}
241
242fn relationship_list_item_field() -> FieldRef {
243    Arc::new(Field::new("item", relationship_struct_data_type(), true))
244}
245
246fn relationship_field() -> Field {
247    Field::new(
248        RELATIONSHIPS_COLUMN,
249        DataType::List(relationship_list_item_field()),
250        true,
251    )
252}
253
254fn relationship_struct_builder() -> StructBuilder {
255    let fields: Vec<FieldRef> = relationship_struct_fields()
256        .into_iter()
257        .map(|field| Arc::new(field) as FieldRef)
258        .collect();
259    StructBuilder::new(
260        fields,
261        vec![
262            Box::new(StringBuilder::new()),
263            Box::new(StringBuilder::new()),
264            Box::new(Float32Builder::new()),
265        ],
266    )
267}
268
269/// Per-`external_id` resolution computed in a single scan for batch upsert.
270#[derive(Default)]
271struct ExternalIdState {
272    /// Ids of currently-visible records carrying this external_id (default
273    /// lifecycle: not tombstoned/expired/retired/superseded).
274    visible_ids: Vec<String>,
275    /// Whether any non-tombstone row (visible or hidden) carries it. Mirrors
276    /// the uniqueness check `add` performs on the single-upsert insert path.
277    has_non_tombstone: bool,
278}
279
280impl ContextStore {
281    /// Open an existing context dataset or create a new one with the project schema.
282    pub async fn open(uri: &str) -> LanceResult<Self> {
283        Self::open_with_options(uri, ContextStoreOptions::default()).await
284    }
285
286    /// Open a dataset with explicit object store configuration (e.g. S3 credentials).
287    pub async fn open_with_options(uri: &str, options: ContextStoreOptions) -> LanceResult<Self> {
288        // Validate blob_columns
289        for col in &options.blob_columns {
290            if !VALID_BLOB_COLUMNS.contains(&col.as_str()) {
291                return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
292                    "invalid blob column '{}': valid columns are {:?}",
293                    col, VALID_BLOB_COLUMNS
294                ))));
295            }
296        }
297
298        let requested_embedding_dim = match options.embedding_dim {
299            Some(dim) => {
300                validate_embedding_dim(dim)?;
301                dim
302            }
303            None => DEFAULT_EMBEDDING_DIM,
304        };
305        let storage_options = options.storage_options();
306        let blob_columns = options.blob_columns.clone();
307        let (dataset, created) = match Self::load_with_options(uri, storage_options.clone()).await {
308            Ok(dataset) => (dataset, false),
309            Err(LanceError::DatasetNotFound { .. }) => {
310                let dataset = Self::create_with_options(
311                    uri,
312                    storage_options,
313                    &blob_columns,
314                    requested_embedding_dim,
315                    options.distance_metric.unwrap_or_default(),
316                )
317                .await?;
318                (dataset, true)
319            }
320            Err(err) => return Err(err),
321        };
322        let arrow_schema: Schema = dataset.schema().into();
323        let embedding_dim = embedding_dim_from_schema(&arrow_schema)?;
324        if !created && options.embedding_dim.is_some() && embedding_dim != requested_embedding_dim {
325            return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
326                "existing context embedding dimension {} does not match requested dimension {}",
327                embedding_dim, requested_embedding_dim
328            ))));
329        }
330        let distance_metric = distance_metric_from_schema(&arrow_schema)?;
331        if !created {
332            if let Some(requested) = options.distance_metric {
333                if requested != distance_metric {
334                    return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
335                        "existing context distance metric '{}' does not match requested metric '{}'",
336                        distance_metric.as_str(),
337                        requested.as_str()
338                    ))));
339                }
340            }
341        }
342
343        let mut store = Self {
344            dataset,
345            compaction_state: Arc::new(Mutex::new(CompactionState {
346                background_task: None,
347                is_compacting: false,
348                last_compaction: None,
349                last_error: None,
350                total_compactions: 0,
351            })),
352            compaction_config: options.compaction,
353            blob_columns,
354            id_index_type: options.id_index_type,
355            embedding_dim,
356            distance_metric,
357        };
358
359        // Ensure id index if configured
360        store.ensure_id_index().await?;
361
362        // Start background compaction if enabled
363        store.start_background_compaction().await?;
364
365        Ok(store)
366    }
367
368    /// Embedding vector width persisted in this context dataset schema.
369    #[must_use]
370    pub fn embedding_dim(&self) -> i32 {
371        self.embedding_dim
372    }
373
374    /// URI of the underlying Lance dataset.
375    #[must_use]
376    pub fn uri(&self) -> &str {
377        self.dataset.uri()
378    }
379
380    /// Distance metric this context ranks vector-search results with.
381    #[must_use]
382    pub fn distance_metric(&self) -> DistanceMetric {
383        self.distance_metric
384    }
385
386    /// Append context records to the store and return the new dataset version.
387    pub async fn add(&mut self, entries: &[ContextRecord]) -> LanceResult<u64> {
388        if entries.is_empty() {
389            return Ok(self.dataset.manifest.version);
390        }
391
392        self.validate_unique_ids(entries).await?;
393        self.write_entries(entries).await
394    }
395
396    async fn write_entries(&mut self, entries: &[ContextRecord]) -> LanceResult<u64> {
397        if entries.is_empty() {
398            return Ok(self.dataset.manifest.version);
399        }
400
401        // Group entries by (bot_id, session_id)
402        let mut groups: HashMap<(Option<String>, Option<String>), Vec<ContextRecord>> =
403            HashMap::new();
404        for entry in entries {
405            let key = (entry.bot_id.clone(), entry.session_id.clone());
406            groups.entry(key).or_default().push(entry.clone());
407        }
408
409        // Ensure MemWAL is initialized (once for the dataset)
410        {
411            let indices = self.dataset.load_indices().await?;
412            let has_mem_wal = indices.iter().any(|i| i.name == MEM_WAL_INDEX_NAME);
413
414            if !has_mem_wal {
415                // ZoneMap indices are not supported by MemWAL; exclude them
416                let maintained_indexes: Vec<String> = indices
417                    .iter()
418                    .filter(|i| {
419                        !(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)
420                    })
421                    .map(|i| i.name.clone())
422                    .collect();
423                self.dataset
424                    .initialize_mem_wal()
425                    .unsharded()
426                    .maintained_indexes(maintained_indexes)
427                    .execute()
428                    .await?;
429            }
430        }
431
432        for ((bot_id, session_id), group_entries) in groups {
433            let region_id = Self::derive_region_id(&bot_id, &session_id);
434            let batch = self.records_to_batch(&group_entries)?;
435            let config = ShardWriterConfig {
436                shard_id: region_id,
437                ..Default::default()
438            };
439
440            let writer = self.dataset.mem_wal_writer(region_id, config).await?;
441            writer.put(vec![batch]).await?;
442            writer.close().await?;
443        }
444
445        Ok(self.dataset.manifest.version)
446    }
447
448    /// Logically forget a record by internal storage id.
449    ///
450    /// This writes a tombstone with the same primary key, preserving prior
451    /// dataset versions while hiding the record from default reads.
452    pub async fn delete_by_id(&mut self, id: &str) -> LanceResult<bool> {
453        let Some(record) = self.get_by_id(id).await? else {
454            return Ok(false);
455        };
456        self.write_tombstone_for(record).await?;
457        Ok(true)
458    }
459
460    /// Logically forget a record by caller-supplied external id.
461    pub async fn delete_by_external_id(&mut self, external_id: &str) -> LanceResult<bool> {
462        let Some(record) = self.get_by_external_id(external_id).await? else {
463            return Ok(false);
464        };
465        self.write_tombstone_for(record).await?;
466        Ok(true)
467    }
468
469    /// Insert a record or replace the currently-visible record with the same external id.
470    ///
471    /// Replacement is append-only: the new record keeps the same `external_id`
472    /// and gets `supersedes_id` set to the old record id. Default reads hide
473    /// the superseded record while `include_retired` reads can still inspect
474    /// both versions. Caller-supplied supersession fields are ignored because
475    /// this method manages replacement by `external_id`.
476    pub async fn upsert_by_external_id(
477        &mut self,
478        mut record: ContextRecord,
479    ) -> LanceResult<UpsertResult> {
480        let Some(external_id) = record.external_id.clone() else {
481            return Err(ArrowError::InvalidArgumentError(
482                "upsert_by_external_id requires external_id".to_string(),
483            )
484            .into());
485        };
486        if external_id.is_empty() {
487            return Err(ArrowError::InvalidArgumentError(
488                "upsert_by_external_id requires a non-empty external_id".to_string(),
489            )
490            .into());
491        }
492        if record.is_tombstone() {
493            return Err(ArrowError::InvalidArgumentError(format!(
494                "content_type '{}' is reserved for internal tombstones",
495                CONTENT_TYPE_TOMBSTONE
496            ))
497            .into());
498        }
499        record.supersedes_id = None;
500        record.superseded_by_id = None;
501        self.validate_new_record_id(&record).await?;
502
503        let matches: Vec<ContextRecord> = self
504            .list(None, None)
505            .await?
506            .into_iter()
507            .filter(|existing| existing.external_id.as_deref() == Some(external_id.as_str()))
508            .collect();
509
510        match matches.as_slice() {
511            [] => {
512                let version = self.add(std::slice::from_ref(&record)).await?;
513                Ok(UpsertResult {
514                    record,
515                    inserted: true,
516                    replaced_id: None,
517                    version,
518                })
519            }
520            [existing] => {
521                record.supersedes_id = Some(existing.id.clone());
522                let version = self.write_entries(std::slice::from_ref(&record)).await?;
523                Ok(UpsertResult {
524                    record,
525                    inserted: false,
526                    replaced_id: Some(existing.id.clone()),
527                    version,
528                })
529            }
530            _ => Err(ArrowError::InvalidArgumentError(format!(
531                "external_id '{}' matches multiple visible records",
532                external_id
533            ))
534            .into()),
535        }
536    }
537
538    /// Insert-or-replace a batch of records keyed by `external_id`, in one
539    /// logical operation.
540    ///
541    /// For each record: if a currently-visible record with the same
542    /// `external_id` exists, it is replaced append-only (the successor gets
543    /// `supersedes_id` set to the existing record id and the original is hidden
544    /// from default reads); otherwise the record is inserted. All rows are
545    /// written in a single pass, so records sharing a shard land in a single
546    /// version bump.
547    ///
548    /// Semantics (parity with [`Self::upsert_by_external_id`] and `add_many`):
549    /// - every record must carry a non-empty `external_id`;
550    /// - duplicate `id`s or `external_id`s *within* the batch are rejected;
551    /// - validation is all-or-nothing — if any record is invalid, nothing is
552    ///   written;
553    /// - caller-supplied supersession fields are ignored (replacement is
554    ///   managed by `external_id`);
555    /// - an insert whose `external_id` already exists on a non-tombstone but
556    ///   hidden record is rejected, exactly as a single insert would be.
557    ///
558    /// Existing-key resolution and `id` uniqueness validation are each done in
559    /// a single scan for the whole batch (not per record), composing with the
560    /// indexed `id` validation so a batch does not full-scan per record.
561    ///
562    /// Returns one [`UpsertResult`] per input record, in input order, all
563    /// carrying the final dataset version.
564    pub async fn upsert_many_by_external_id(
565        &mut self,
566        mut records: Vec<ContextRecord>,
567    ) -> LanceResult<Vec<UpsertResult>> {
568        if records.is_empty() {
569            return Ok(Vec::new());
570        }
571
572        // 1. Per-record validation + within-batch duplicate detection.
573        let mut seen_ids: HashSet<&str> = HashSet::with_capacity(records.len());
574        let mut seen_external_ids: HashSet<&str> = HashSet::with_capacity(records.len());
575        for record in &records {
576            let Some(external_id) = record.external_id.as_deref() else {
577                return Err(ArrowError::InvalidArgumentError(
578                    "upsert_many_by_external_id requires external_id on every record".to_string(),
579                )
580                .into());
581            };
582            if external_id.is_empty() {
583                return Err(ArrowError::InvalidArgumentError(
584                    "upsert_many_by_external_id requires a non-empty external_id".to_string(),
585                )
586                .into());
587            }
588            if record.is_tombstone() {
589                return Err(ArrowError::InvalidArgumentError(format!(
590                    "content_type '{}' is reserved for internal tombstones",
591                    CONTENT_TYPE_TOMBSTONE
592                ))
593                .into());
594            }
595            if !seen_ids.insert(record.id.as_str()) {
596                return Err(ArrowError::InvalidArgumentError(format!(
597                    "duplicate id '{}' in batch",
598                    record.id
599                ))
600                .into());
601            }
602            if !seen_external_ids.insert(external_id) {
603                return Err(ArrowError::InvalidArgumentError(format!(
604                    "duplicate external_id '{}' in batch",
605                    external_id
606                ))
607                .into());
608            }
609        }
610
611        // 2. Replacement is managed here; ignore caller-supplied supersession.
612        for record in &mut records {
613            record.supersedes_id = None;
614            record.superseded_by_id = None;
615        }
616
617        // 3. id uniqueness against the store (indexed). external_id is NOT
618        //    rejected here: an existing external_id means "replace".
619        let id_list: Vec<&str> = records.iter().map(|r| r.id.as_str()).collect();
620        let (existing_ids, _) = self.find_existing_keys(&id_list, &[]).await?;
621        if let Some(record) = records
622            .iter()
623            .find(|r| existing_ids.contains(r.id.as_str()))
624        {
625            return Err(ArrowError::InvalidArgumentError(format!(
626                "id '{}' already exists",
627                record.id
628            ))
629            .into());
630        }
631
632        // 4. Resolve every external_id to its visible record (if any) in one scan.
633        let external_id_list: Vec<&str> = records
634            .iter()
635            .map(|r| r.external_id.as_deref().unwrap_or_default())
636            .collect();
637        let states = self.external_id_states(&external_id_list).await?;
638
639        // 5. Wire supersession + per-record outcomes, mirroring the single path.
640        let mut outcomes: Vec<(bool, Option<String>)> = Vec::with_capacity(records.len());
641        for record in &mut records {
642            let external_id = record.external_id.as_deref().unwrap_or_default();
643            match states.get(external_id) {
644                Some(state) if state.visible_ids.len() > 1 => {
645                    return Err(ArrowError::InvalidArgumentError(format!(
646                        "external_id '{}' matches multiple visible records",
647                        external_id
648                    ))
649                    .into());
650                }
651                Some(state) if state.visible_ids.len() == 1 => {
652                    let existing_id = state.visible_ids[0].clone();
653                    record.supersedes_id = Some(existing_id.clone());
654                    outcomes.push((false, Some(existing_id)));
655                }
656                Some(state) if state.has_non_tombstone => {
657                    // No visible record, but a hidden non-tombstone row already
658                    // holds this external_id — an insert would collide, exactly
659                    // as the single-record insert path (via `add`) rejects it.
660                    return Err(ArrowError::InvalidArgumentError(format!(
661                        "external_id '{}' already exists",
662                        external_id
663                    ))
664                    .into());
665                }
666                _ => outcomes.push((true, None)),
667            }
668        }
669
670        // 6. Single write for the whole batch.
671        let version = self.write_entries(&records).await?;
672
673        Ok(records
674            .into_iter()
675            .zip(outcomes)
676            .map(|(record, (inserted, replaced_id))| UpsertResult {
677                record,
678                inserted,
679                replaced_id,
680                version,
681            })
682            .collect())
683    }
684
685    /// Resolve, for each candidate `external_id`, the set of currently-visible
686    /// record ids and whether any non-tombstone row carries it — in a single
687    /// projected, filtered scan rather than a full dataset list.
688    ///
689    /// A record that supersedes another keeps the same `external_id`, so the
690    /// supersession relation is resolved correctly within the filtered set for
691    /// every flow that creates supersession through the public API.
692    async fn external_id_states(
693        &self,
694        external_ids: &[&str],
695    ) -> LanceResult<HashMap<String, ExternalIdState>> {
696        let mut states: HashMap<String, ExternalIdState> = HashMap::new();
697        let candidates: HashSet<&str> = external_ids
698            .iter()
699            .copied()
700            .filter(|value| !value.is_empty())
701            .collect();
702        if candidates.is_empty() {
703            return Ok(states);
704        }
705
706        let filter_values: Vec<&str> = candidates.iter().copied().collect();
707        let filter = format!("external_id IN ({})", sql_quoted_list(&filter_values));
708        let scanner = self.lsm_scanner().await?.filter(&filter)?;
709        let mut stream = scanner.try_into_stream().await?;
710        let mut rows: Vec<ContextRecord> = Vec::new();
711        while let Some(batch) = stream.try_next().await? {
712            rows.extend(batch_to_records(&batch)?);
713        }
714
715        let superseded_ids: HashSet<String> = rows
716            .iter()
717            .filter_map(|record| {
718                let supersedes_id = record.supersedes_id.as_ref()?;
719                if supersedes_id == &record.id {
720                    None
721                } else {
722                    Some(supersedes_id.clone())
723                }
724            })
725            .collect();
726
727        let options = LifecycleQueryOptions::default();
728        for record in rows {
729            let Some(external_id) = record.external_id.as_deref() else {
730                continue;
731            };
732            if !candidates.contains(external_id) {
733                continue;
734            }
735            let entry = states.entry(external_id.to_string()).or_default();
736            if !record.is_tombstone() {
737                entry.has_non_tombstone = true;
738            }
739            if options.is_visible(&record) && !superseded_ids.contains(&record.id) {
740                entry.visible_ids.push(record.id);
741            }
742        }
743
744        Ok(states)
745    }
746
747    /// Partially update mutable fields on a visible record by internal id.
748    ///
749    /// The update is append-only: it writes a replacement record that
750    /// supersedes the current visible record, preserving the original payload
751    /// and embedding while changing only the requested patch fields.
752    pub async fn update_by_id(
753        &mut self,
754        id: &str,
755        patch: RecordPatch,
756    ) -> LanceResult<Option<UpdateResult>> {
757        if id.is_empty() {
758            return Err(ArrowError::InvalidArgumentError(
759                "update_by_id requires a non-empty id".to_string(),
760            )
761            .into());
762        }
763        let Some(existing) = self.get_by_id(id).await? else {
764            return Ok(None);
765        };
766        self.update_visible_record(existing, patch).await.map(Some)
767    }
768
769    /// Partially update mutable fields on a visible record by external id.
770    ///
771    /// Returns `Ok(None)` when no visible record currently has the external id.
772    pub async fn update_by_external_id(
773        &mut self,
774        external_id: &str,
775        patch: RecordPatch,
776    ) -> LanceResult<Option<UpdateResult>> {
777        if external_id.is_empty() {
778            return Err(ArrowError::InvalidArgumentError(
779                "update_by_external_id requires a non-empty external_id".to_string(),
780            )
781            .into());
782        }
783
784        let matches: Vec<ContextRecord> = self
785            .list(None, None)
786            .await?
787            .into_iter()
788            .filter(|existing| existing.external_id.as_deref() == Some(external_id))
789            .collect();
790
791        match matches.as_slice() {
792            [] => Ok(None),
793            [existing] => self
794                .update_visible_record(existing.clone(), patch)
795                .await
796                .map(Some),
797            _ => Err(ArrowError::InvalidArgumentError(format!(
798                "external_id '{}' matches multiple visible records",
799                external_id
800            ))
801            .into()),
802        }
803    }
804
805    async fn update_visible_record(
806        &mut self,
807        existing: ContextRecord,
808        patch: RecordPatch,
809    ) -> LanceResult<UpdateResult> {
810        if patch.is_empty() {
811            return Err(ArrowError::InvalidArgumentError(
812                "update requires at least one patch field".to_string(),
813            )
814            .into());
815        }
816
817        let mut record = existing.clone();
818        record.id = Uuid::new_v4().to_string();
819        record.run_id = Uuid::new_v4().to_string();
820        record.created_at = Utc::now();
821        record.supersedes_id = Some(existing.id.clone());
822        record.superseded_by_id = None;
823
824        if let Some(bot_id) = patch.bot_id {
825            record.bot_id = Some(bot_id);
826        }
827        if let Some(session_id) = patch.session_id {
828            record.session_id = Some(session_id);
829        }
830        if let Some(tenant) = patch.tenant {
831            record.tenant = Some(tenant);
832        }
833        if let Some(source) = patch.source {
834            record.source = Some(source);
835        }
836        if let Some(state_metadata) = patch.state_metadata {
837            record.state_metadata = Some(state_metadata);
838        }
839        if let Some(metadata) = patch.metadata {
840            record.metadata = Some(metadata);
841        }
842        if let Some(relationships) = patch.relationships {
843            record.relationships = relationships;
844        }
845        if let Some(expires_at) = patch.expires_at {
846            record.expires_at = Some(expires_at);
847        }
848        if let Some(retention_policy) = patch.retention_policy {
849            record.retention_policy = Some(retention_policy);
850        }
851        if let Some(lifecycle_status) = patch.lifecycle_status {
852            record.lifecycle_status = lifecycle_status;
853        }
854        if let Some(retired_at) = patch.retired_at {
855            record.retired_at = Some(retired_at);
856        }
857        if let Some(retired_reason) = patch.retired_reason {
858            record.retired_reason = Some(retired_reason);
859        }
860        if let Some(embedding) = patch.embedding {
861            record.embedding = Some(embedding);
862        }
863
864        self.validate_new_record_id(&record).await?;
865        let version = self.write_entries(std::slice::from_ref(&record)).await?;
866        Ok(UpdateResult {
867            record,
868            replaced_id: existing.id,
869            version,
870        })
871    }
872
873    async fn write_tombstone_for(&mut self, record: ContextRecord) -> LanceResult<u64> {
874        let tombstone = ContextRecord {
875            id: record.id,
876            external_id: record.external_id,
877            run_id: record.run_id,
878            bot_id: record.bot_id,
879            session_id: record.session_id,
880            tenant: record.tenant,
881            source: record.source,
882            created_at: Utc::now(),
883            role: record.role,
884            state_metadata: None,
885            metadata: None,
886            relationships: Vec::new(),
887            expires_at: None,
888            retention_policy: None,
889            lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
890            retired_at: None,
891            retired_reason: None,
892            supersedes_id: None,
893            superseded_by_id: None,
894            content_type: CONTENT_TYPE_TOMBSTONE.to_string(),
895            text_payload: None,
896            binary_payload: None,
897            embedding: None,
898        };
899        self.write_entries(std::slice::from_ref(&tombstone)).await
900    }
901
902    async fn validate_unique_ids(&self, entries: &[ContextRecord]) -> LanceResult<()> {
903        let mut ids = HashSet::new();
904        let mut external_ids = HashSet::new();
905        for entry in entries {
906            if entry.is_tombstone() {
907                return Err(ArrowError::InvalidArgumentError(format!(
908                    "content_type '{}' is reserved for internal tombstones",
909                    CONTENT_TYPE_TOMBSTONE
910                ))
911                .into());
912            }
913            if !ids.insert(entry.id.as_str()) {
914                return Err(ArrowError::InvalidArgumentError(format!(
915                    "duplicate id '{}' in batch",
916                    entry.id
917                ))
918                .into());
919            }
920            if let Some(external_id) = &entry.external_id {
921                if !external_ids.insert(external_id.as_str()) {
922                    return Err(ArrowError::InvalidArgumentError(format!(
923                        "duplicate external_id '{}' in batch",
924                        external_id
925                    ))
926                    .into());
927                }
928            }
929        }
930
931        let id_list: Vec<&str> = ids.iter().copied().collect();
932        let external_id_list: Vec<&str> = external_ids.iter().copied().collect();
933        let (existing_ids, existing_external_ids) =
934            self.find_existing_keys(&id_list, &external_id_list).await?;
935
936        // Report collisions in input order for deterministic, intuitive errors.
937        for entry in entries {
938            if existing_ids.contains(entry.id.as_str()) {
939                return Err(ArrowError::InvalidArgumentError(format!(
940                    "id '{}' already exists",
941                    entry.id
942                ))
943                .into());
944            }
945            if let Some(external_id) = &entry.external_id {
946                if existing_external_ids.contains(external_id.as_str()) {
947                    return Err(ArrowError::InvalidArgumentError(format!(
948                        "external_id '{}' already exists",
949                        external_id
950                    ))
951                    .into());
952                }
953            }
954        }
955
956        Ok(())
957    }
958
959    async fn validate_new_record_id(&self, entry: &ContextRecord) -> LanceResult<()> {
960        let id = entry.id.as_str();
961        let (existing_ids, _) = self.find_existing_keys(&[id], &[]).await?;
962        if existing_ids.contains(id) {
963            return Err(ArrowError::InvalidArgumentError(format!(
964                "id '{}' already exists",
965                entry.id
966            ))
967            .into());
968        }
969        Ok(())
970    }
971
972    /// Resolve which of the candidate `id` / `external_id` values already exist
973    /// among non-tombstone records.
974    ///
975    /// Unlike a full `list` + deserialize, this issues projected, filtered
976    /// scans that read only the `id` / `external_id` / `content_type` columns
977    /// for rows matching the candidate keys. When an `id` scalar index is
978    /// configured the `id` lookup is index-accelerated, so uniqueness
979    /// validation cost is bounded by the candidate batch (and index
980    /// selectivity) rather than the total number of stored records — history,
981    /// retired, expired, and superseded rows included.
982    ///
983    /// Tombstones are skipped to preserve existing behavior: a key whose only
984    /// surviving row is a tombstone is free for reuse, while a
985    /// superseded/retired/expired (non-tombstone) row still reserves its key.
986    async fn find_existing_keys(
987        &self,
988        ids: &[&str],
989        external_ids: &[&str],
990    ) -> LanceResult<(HashSet<String>, HashSet<String>)> {
991        let mut existing_ids = HashSet::new();
992        let mut existing_external_ids = HashSet::new();
993
994        let candidate_ids: HashSet<&str> = ids.iter().copied().collect();
995        let candidate_external_ids: HashSet<&str> = external_ids.iter().copied().collect();
996
997        if !candidate_ids.is_empty() {
998            let filter = format!("id IN ({})", sql_quoted_list(ids));
999            let scanner = self
1000                .lsm_scanner()
1001                .await?
1002                .project(&["id", "content_type"])
1003                .filter(&filter)?;
1004            let mut stream = scanner.try_into_stream().await?;
1005            while let Some(batch) = stream.try_next().await? {
1006                let id_array = column_as::<StringArray>(&batch, "id")?;
1007                let content_type_array = column_as::<StringArray>(&batch, "content_type")?;
1008                for row in 0..batch.num_rows() {
1009                    if content_type_array.value(row) == CONTENT_TYPE_TOMBSTONE {
1010                        continue;
1011                    }
1012                    let id = id_array.value(row);
1013                    if candidate_ids.contains(id) {
1014                        existing_ids.insert(id.to_string());
1015                    }
1016                }
1017            }
1018        }
1019
1020        if !candidate_external_ids.is_empty() && self.has_external_id_column() {
1021            let filter = format!("external_id IN ({})", sql_quoted_list(external_ids));
1022            let scanner = self
1023                .lsm_scanner()
1024                .await?
1025                .project(&["external_id", "content_type"])
1026                .filter(&filter)?;
1027            let mut stream = scanner.try_into_stream().await?;
1028            while let Some(batch) = stream.try_next().await? {
1029                let content_type_array = column_as::<StringArray>(&batch, "content_type")?;
1030                let Some(external_id_array) =
1031                    column_as_optional::<StringArray>(&batch, "external_id")
1032                else {
1033                    continue;
1034                };
1035                for row in 0..batch.num_rows() {
1036                    if content_type_array.value(row) == CONTENT_TYPE_TOMBSTONE {
1037                        continue;
1038                    }
1039                    if external_id_array.is_null(row) {
1040                        continue;
1041                    }
1042                    let external_id = external_id_array.value(row);
1043                    if candidate_external_ids.contains(external_id) {
1044                        existing_external_ids.insert(external_id.to_string());
1045                    }
1046                }
1047            }
1048        }
1049
1050        Ok((existing_ids, existing_external_ids))
1051    }
1052
1053    fn derive_region_id(bot_id: &Option<String>, session_id: &Option<String>) -> Uuid {
1054        let mut input = String::new();
1055
1056        if let Some(bid) = bot_id {
1057            input.push_str(bid);
1058        }
1059        input.push('#');
1060        if let Some(sid) = session_id {
1061            input.push_str(sid);
1062        }
1063
1064        // Use OID namespace as a base for our deterministic UUIDs
1065        Uuid::new_v5(&Uuid::NAMESPACE_OID, input.as_bytes())
1066    }
1067
1068    fn has_relationships_column(&self) -> bool {
1069        self.dataset
1070            .schema()
1071            .field_paths()
1072            .iter()
1073            .any(|path| path == RELATIONSHIPS_COLUMN)
1074    }
1075
1076    fn has_external_id_column(&self) -> bool {
1077        self.dataset
1078            .schema()
1079            .field_paths()
1080            .iter()
1081            .any(|path| path == "external_id")
1082    }
1083
1084    /// Current dataset version.
1085    pub fn version(&self) -> u64 {
1086        self.dataset.manifest.version
1087    }
1088
1089    /// Add the relationships column to an older dataset if it is missing.
1090    ///
1091    /// Existing rows are stored as null in the new column and read back as an
1092    /// empty relationship list.
1093    pub async fn migrate_relationships_column(&mut self) -> LanceResult<bool> {
1094        if self.has_relationships_column() {
1095            return Ok(false);
1096        }
1097
1098        let schema = Arc::new(Schema::new(vec![relationship_field()]));
1099        self.dataset
1100            .add_columns(NewColumnTransform::AllNulls(schema), None, None)
1101            .await?;
1102        Ok(true)
1103    }
1104
1105    /// Checkout a specific dataset version.
1106    pub async fn checkout(&mut self, version_id: u64) -> LanceResult<()> {
1107        let dataset = self.dataset.checkout_version(version_id).await?;
1108        self.dataset = dataset;
1109        Ok(())
1110    }
1111
1112    /// Retrieve a single record by its unique ID.
1113    pub async fn get(&self, id: &str) -> LanceResult<Option<ContextRecord>> {
1114        let escaped_id = id.replace('\'', "''");
1115        let mut scanner = self.dataset.scan();
1116        scanner.filter(&format!("id = '{}'", escaped_id))?;
1117        scanner.limit(Some(1), None)?;
1118
1119        let mut stream = scanner.try_into_stream().await?;
1120        if let Some(batch) = stream.try_next().await? {
1121            let records = batch_to_records(&batch)?;
1122            return Ok(records.into_iter().next());
1123        }
1124        Ok(None)
1125    }
1126
1127    /// List all records in the dataset.
1128    pub async fn list(
1129        &self,
1130        limit: Option<usize>,
1131        offset: Option<usize>,
1132    ) -> LanceResult<Vec<ContextRecord>> {
1133        self.list_filtered_with_options(limit, offset, None, LifecycleQueryOptions::default())
1134            .await
1135    }
1136
1137    /// List records matching filters.
1138    pub async fn list_filtered(
1139        &self,
1140        limit: Option<usize>,
1141        offset: Option<usize>,
1142        filters: Option<&RecordFilters>,
1143    ) -> LanceResult<Vec<ContextRecord>> {
1144        self.list_filtered_with_options(limit, offset, filters, LifecycleQueryOptions::default())
1145            .await
1146    }
1147
1148    /// List records, applying lifecycle visibility and supersession before offset/limit.
1149    pub async fn list_with_options(
1150        &self,
1151        limit: Option<usize>,
1152        offset: Option<usize>,
1153        options: LifecycleQueryOptions,
1154    ) -> LanceResult<Vec<ContextRecord>> {
1155        self.list_filtered_with_options(limit, offset, None, options)
1156            .await
1157    }
1158
1159    /// List records matching filters, applying lifecycle visibility before offset/limit.
1160    pub async fn list_filtered_with_options(
1161        &self,
1162        limit: Option<usize>,
1163        offset: Option<usize>,
1164        filters: Option<&RecordFilters>,
1165        options: LifecycleQueryOptions,
1166    ) -> LanceResult<Vec<ContextRecord>> {
1167        let scanner = self.lsm_scanner().await?;
1168        let mut stream = scanner.try_into_stream().await?;
1169        let mut results = Vec::new();
1170        while let Some(batch) = stream.try_next().await? {
1171            results.extend(batch_to_records(&batch)?);
1172        }
1173
1174        let superseded_ids: HashSet<String> = results
1175            .iter()
1176            .filter_map(|record| {
1177                let supersedes_id = record.supersedes_id.as_ref()?;
1178                if supersedes_id == &record.id {
1179                    None
1180                } else {
1181                    Some(supersedes_id.clone())
1182                }
1183            })
1184            .collect();
1185        results.retain(|record| {
1186            options.is_visible(record)
1187                && (options.include_retired || !superseded_ids.contains(&record.id))
1188        });
1189        if let Some(filters) = filters.filter(|filters| !filters.is_empty()) {
1190            results.retain(|record| filters.matches(record));
1191        }
1192
1193        if let Some(offset) = offset {
1194            results = results.into_iter().skip(offset).collect();
1195        }
1196        if let Some(limit) = limit {
1197            results.truncate(limit);
1198        }
1199        Ok(results)
1200    }
1201
1202    /// Find a record by its internal storage id.
1203    pub async fn get_by_id(&self, id: &str) -> LanceResult<Option<ContextRecord>> {
1204        Ok(self
1205            .list(None, None)
1206            .await?
1207            .into_iter()
1208            .find(|record| record.id == id))
1209    }
1210
1211    /// Find a record by its caller-supplied external id.
1212    pub async fn get_by_external_id(
1213        &self,
1214        external_id: &str,
1215    ) -> LanceResult<Option<ContextRecord>> {
1216        Ok(self
1217            .list(None, None)
1218            .await?
1219            .into_iter()
1220            .find(|record| record.external_id.as_deref() == Some(external_id)))
1221    }
1222
1223    /// List records that have a relationship targeting `target_id`.
1224    pub async fn list_related(
1225        &self,
1226        target_id: &str,
1227        relation: Option<&str>,
1228        limit: Option<usize>,
1229    ) -> LanceResult<Vec<ContextRecord>> {
1230        self.list_related_with_options(target_id, relation, limit, LifecycleQueryOptions::default())
1231            .await
1232    }
1233
1234    /// List related records, applying lifecycle visibility before relationship matching.
1235    pub async fn list_related_with_options(
1236        &self,
1237        target_id: &str,
1238        relation: Option<&str>,
1239        limit: Option<usize>,
1240        options: LifecycleQueryOptions,
1241    ) -> LanceResult<Vec<ContextRecord>> {
1242        let mut results: Vec<ContextRecord> = self
1243            .list_with_options(None, None, options)
1244            .await?
1245            .into_iter()
1246            .filter(|record| {
1247                record.relationships.iter().any(|relationship| {
1248                    relationship.target_id == target_id
1249                        && relation.is_none_or(|value| relationship.relation == value)
1250                })
1251            })
1252            .collect();
1253
1254        if let Some(limit) = limit {
1255            results.truncate(limit);
1256        }
1257        Ok(results)
1258    }
1259
1260    /// Perform a nearest-neighbor search over stored embeddings.
1261    pub async fn search(
1262        &self,
1263        query: &[f32],
1264        limit: Option<usize>,
1265    ) -> LanceResult<Vec<SearchResult>> {
1266        self.search_filtered_with_options(query, limit, None, LifecycleQueryOptions::default())
1267            .await
1268    }
1269
1270    /// Perform a nearest-neighbor search over stored embeddings matching filters.
1271    pub async fn search_filtered(
1272        &self,
1273        query: &[f32],
1274        limit: Option<usize>,
1275        filters: Option<&RecordFilters>,
1276    ) -> LanceResult<Vec<SearchResult>> {
1277        self.search_filtered_with_options(query, limit, filters, LifecycleQueryOptions::default())
1278            .await
1279    }
1280
1281    /// Perform nearest-neighbor search after applying lifecycle visibility.
1282    pub async fn search_with_options(
1283        &self,
1284        query: &[f32],
1285        limit: Option<usize>,
1286        options: LifecycleQueryOptions,
1287    ) -> LanceResult<Vec<SearchResult>> {
1288        self.search_filtered_with_options(query, limit, None, options)
1289            .await
1290    }
1291
1292    /// Perform nearest-neighbor search after applying filters and lifecycle visibility.
1293    pub async fn search_filtered_with_options(
1294        &self,
1295        query: &[f32],
1296        limit: Option<usize>,
1297        filters: Option<&RecordFilters>,
1298        options: LifecycleQueryOptions,
1299    ) -> LanceResult<Vec<SearchResult>> {
1300        validate_query_dimension(query, self.embedding_dim)?;
1301
1302        let top_k = limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
1303        if top_k == 0 {
1304            return Ok(Vec::new());
1305        }
1306
1307        let mut results: Vec<SearchResult> = self
1308            .list_filtered_with_options(None, None, filters, options)
1309            .await?
1310            .into_iter()
1311            .filter_map(|record| {
1312                let distance = self
1313                    .distance_metric
1314                    .distance(query, record.embedding.as_ref()?);
1315                Some(SearchResult { record, distance })
1316            })
1317            .collect();
1318        results.sort_by(|left, right| left.distance.total_cmp(&right.distance));
1319        results.truncate(top_k);
1320        Ok(results)
1321    }
1322
1323    /// Retrieve records using optional text and vector channels, after filters and lifecycle visibility.
1324    pub async fn retrieve_filtered_with_options(
1325        &self,
1326        text: Option<&str>,
1327        vector: Option<&[f32]>,
1328        limit: Option<usize>,
1329        filters: Option<&RecordFilters>,
1330        options: LifecycleQueryOptions,
1331    ) -> LanceResult<Vec<RetrieveResult>> {
1332        let text_terms = text.map(unique_query_terms).unwrap_or_default();
1333        let has_text = !text_terms.is_empty();
1334
1335        if !has_text && vector.is_none() {
1336            return Err(ArrowError::InvalidArgumentError(
1337                "retrieve requires text or vector".to_string(),
1338            )
1339            .into());
1340        }
1341
1342        if let Some(query) = vector {
1343            validate_query_dimension(query, self.embedding_dim)?;
1344        }
1345
1346        let top_k = limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
1347        if top_k == 0 {
1348            return Ok(Vec::new());
1349        }
1350
1351        let records = self
1352            .list_filtered_with_options(None, None, filters, options)
1353            .await?;
1354        let mut candidates: HashMap<String, RetrieveResult> = HashMap::new();
1355
1356        if let Some(query) = vector {
1357            let mut vector_hits: Vec<(usize, f32)> = records
1358                .iter()
1359                .enumerate()
1360                .filter_map(|(index, record)| {
1361                    let distance = self
1362                        .distance_metric
1363                        .distance(query, record.embedding.as_ref()?);
1364                    Some((index, distance))
1365                })
1366                .collect();
1367            vector_hits.sort_by(|left, right| {
1368                left.1
1369                    .total_cmp(&right.1)
1370                    .then_with(|| records[left.0].id.cmp(&records[right.0].id))
1371            });
1372
1373            for (rank, (index, distance)) in vector_hits.into_iter().enumerate() {
1374                add_retrieve_channel(
1375                    &mut candidates,
1376                    &records[index],
1377                    rank + 1,
1378                    "vector",
1379                    Some(distance),
1380                    None,
1381                );
1382            }
1383        }
1384
1385        if has_text {
1386            let mut text_hits: Vec<(usize, f32)> = records
1387                .iter()
1388                .enumerate()
1389                .filter_map(|(index, record)| {
1390                    lexical_score(&text_terms, record.text_payload.as_deref())
1391                        .map(|score| (index, score))
1392                })
1393                .collect();
1394            text_hits.sort_by(|left, right| {
1395                right
1396                    .1
1397                    .total_cmp(&left.1)
1398                    .then_with(|| records[left.0].id.cmp(&records[right.0].id))
1399            });
1400
1401            for (rank, (index, score)) in text_hits.into_iter().enumerate() {
1402                add_retrieve_channel(
1403                    &mut candidates,
1404                    &records[index],
1405                    rank + 1,
1406                    "text",
1407                    None,
1408                    Some(score),
1409                );
1410            }
1411        }
1412
1413        let mut results: Vec<RetrieveResult> = candidates.into_values().collect();
1414        results.sort_by(compare_retrieve_results);
1415        results.truncate(top_k);
1416        Ok(results)
1417    }
1418
1419    async fn lsm_scanner(&self) -> LanceResult<LsmScanner> {
1420        let object_store = self.dataset.object_store(None).await?;
1421        let branch_location = self.dataset.branch_location();
1422        let shard_ids = self.dataset.list_mem_wal_latest_shard_ids().await?;
1423
1424        let mut shard_snapshots = Vec::with_capacity(shard_ids.len());
1425        for shard_id in shard_ids {
1426            let manifest_store = ShardManifestStore::new(
1427                object_store.clone(),
1428                &branch_location.path,
1429                shard_id,
1430                DEFAULT_MANIFEST_SCAN_BATCH_SIZE,
1431            );
1432            let Some(manifest) = manifest_store.read_latest().await? else {
1433                continue;
1434            };
1435
1436            let mut snapshot = ShardSnapshot::new(shard_id)
1437                .with_spec_id(manifest.shard_spec_id)
1438                .with_current_generation(manifest.current_generation);
1439            for flushed in manifest.flushed_generations {
1440                snapshot = snapshot.with_flushed_generation(flushed.generation, flushed.path);
1441            }
1442            shard_snapshots.push(snapshot);
1443        }
1444
1445        Ok(LsmScanner::new(
1446            Arc::new(self.dataset.clone()),
1447            shard_snapshots,
1448            vec!["id".to_string()],
1449        ))
1450    }
1451
1452    /// Manually trigger compaction to merge small fragments.
1453    pub async fn compact(
1454        &mut self,
1455        options: Option<CompactionConfig>,
1456    ) -> LanceResult<CompactionMetrics> {
1457        let config = options.unwrap_or_else(|| self.compaction_config.clone());
1458
1459        info!(
1460            "Starting compaction: {} fragments",
1461            self.dataset.count_fragments()
1462        );
1463        let start = std::time::Instant::now();
1464
1465        // Mark as compacting
1466        {
1467            let mut state = self.compaction_state.lock().await;
1468            if state.is_compacting {
1469                warn!("Compaction already in progress, skipping");
1470                return Err(LanceError::from(ArrowError::InvalidArgumentError(
1471                    "Compaction already in progress".to_string(),
1472                )));
1473            }
1474            state.is_compacting = true;
1475        }
1476
1477        // Build Lance CompactionOptions
1478        let lance_options = CompactionOptions {
1479            target_rows_per_fragment: config.target_rows_per_fragment,
1480            max_rows_per_group: config.max_rows_per_group,
1481            materialize_deletions: config.materialize_deletions,
1482            materialize_deletions_threshold: config.materialize_deletions_threshold,
1483            num_threads: config.num_threads,
1484            ..Default::default()
1485        };
1486
1487        // Run compaction
1488        let result = compact_files(&mut self.dataset, lance_options, None).await;
1489
1490        // Update state
1491        let mut state = self.compaction_state.lock().await;
1492        state.is_compacting = false;
1493
1494        match result {
1495            Ok(metrics) => {
1496                state.last_compaction = Some(Utc::now());
1497                state.total_compactions += 1;
1498                state.last_error = None;
1499                drop(state); // Release lock before ensure_id_index
1500
1501                info!(
1502                    "Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)",
1503                    start.elapsed(),
1504                    metrics.fragments_removed,
1505                    metrics.files_removed,
1506                    metrics.fragments_added,
1507                    metrics.files_added
1508                );
1509
1510                // Reload dataset to see new version
1511                self.dataset = Dataset::open(self.dataset.uri()).await?;
1512
1513                // Ensure id index exists after compaction
1514                // (handles first-time creation on previously empty dataset)
1515                if let Err(e) = self.ensure_id_index().await {
1516                    warn!("Failed to ensure id index after compaction: {}", e);
1517                }
1518
1519                Ok(metrics)
1520            }
1521            Err(e) => {
1522                error!("Compaction failed: {}", e);
1523                state.last_error = Some(e.to_string());
1524                Err(e)
1525            }
1526        }
1527    }
1528
1529    /// Check if compaction should run based on configuration thresholds.
1530    pub async fn should_compact(&self) -> LanceResult<bool> {
1531        let fragment_count = self.dataset.count_fragments();
1532
1533        if fragment_count < self.compaction_config.min_fragments {
1534            return Ok(false);
1535        }
1536
1537        // Check quiet hours
1538        if !self.compaction_config.quiet_hours.is_empty() {
1539            let now = Utc::now();
1540            let current_hour = now.hour() as u8;
1541
1542            for (start, end) in &self.compaction_config.quiet_hours {
1543                if current_hour >= *start && current_hour < *end {
1544                    info!("Skipping compaction during quiet hours ({}-{})", start, end);
1545                    return Ok(false);
1546                }
1547            }
1548        }
1549
1550        Ok(true)
1551    }
1552
1553    /// Get current compaction statistics.
1554    pub async fn compaction_stats(&self) -> LanceResult<CompactionStats> {
1555        let state = self.compaction_state.lock().await;
1556
1557        Ok(CompactionStats {
1558            total_fragments: self.dataset.count_fragments(),
1559            is_compacting: state.is_compacting,
1560            last_compaction: state.last_compaction,
1561            last_error: state.last_error.clone(),
1562            total_compactions: state.total_compactions,
1563        })
1564    }
1565
1566    /// Ensure the configured id index exists on the dataset.
1567    async fn ensure_id_index(&mut self) -> LanceResult<()> {
1568        if self.id_index_type == IdIndexType::None {
1569            return Ok(());
1570        }
1571
1572        let indices = self.dataset.load_indices().await?;
1573        if indices.iter().any(|i| i.name == ID_INDEX_NAME) {
1574            return Ok(());
1575        }
1576
1577        self.create_id_index().await
1578    }
1579
1580    /// Create (or replace) the scalar index on the `id` column.
1581    pub async fn create_id_index(&mut self) -> LanceResult<()> {
1582        let index_type = match self.id_index_type {
1583            IdIndexType::ZoneMap => IndexType::ZoneMap,
1584            IdIndexType::BTree => IndexType::BTree,
1585            IdIndexType::None => return Ok(()),
1586        };
1587
1588        info!("Creating {:?} index on id column", index_type);
1589
1590        let params = ScalarIndexParams::default();
1591
1592        self.dataset
1593            .create_index_builder(&["id"], index_type, &params)
1594            .name(ID_INDEX_NAME.to_string())
1595            .replace(true)
1596            .await?;
1597
1598        // Reload dataset to pick up new index
1599        self.dataset = Dataset::open(self.dataset.uri()).await?;
1600
1601        Ok(())
1602    }
1603
1604    /// Start background compaction task if enabled.
1605    async fn start_background_compaction(&mut self) -> LanceResult<()> {
1606        if !self.compaction_config.enabled {
1607            return Ok(());
1608        }
1609
1610        let mut state = self.compaction_state.lock().await;
1611        if state.background_task.is_some() {
1612            warn!("Background compaction already running");
1613            return Ok(());
1614        }
1615
1616        info!(
1617            "Starting background compaction (interval: {}s, min fragments: {})",
1618            self.compaction_config.check_interval_secs, self.compaction_config.min_fragments
1619        );
1620
1621        let mut store_clone = self.clone();
1622        let interval_secs = self.compaction_config.check_interval_secs;
1623
1624        let task = tokio::spawn(async move {
1625            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
1626
1627            loop {
1628                interval.tick().await;
1629
1630                match store_clone.should_compact().await {
1631                    Ok(true) => {
1632                        info!("Background compaction triggered");
1633                        if let Err(e) = store_clone.compact(None).await {
1634                            error!("Background compaction failed: {}", e);
1635                        }
1636                    }
1637                    Ok(false) => {
1638                        // Not needed or in quiet hours
1639                    }
1640                    Err(e) => {
1641                        error!("Error checking compaction need: {}", e);
1642                    }
1643                }
1644            }
1645        });
1646
1647        state.background_task = Some(task);
1648        Ok(())
1649    }
1650
1651    /// Stop background compaction task.
1652    pub async fn stop_background_compaction(&mut self) -> LanceResult<()> {
1653        let mut state = self.compaction_state.lock().await;
1654
1655        if let Some(task) = state.background_task.take() {
1656            info!("Stopping background compaction");
1657            task.abort();
1658        }
1659
1660        Ok(())
1661    }
1662
1663    /// Lance schema for the context store.
1664    ///
1665    /// When `blob_columns` contains a column name, that column is stored using
1666    /// Lance V1 blob encoding (out-of-line binary buffers). For `text_payload`,
1667    /// this also changes the Arrow type from `LargeUtf8` to `LargeBinary`.
1668    pub fn schema(blob_columns: &HashSet<String>) -> Schema {
1669        Self::schema_with_embedding_dim(blob_columns, DEFAULT_EMBEDDING_DIM)
1670    }
1671
1672    /// Lance schema for a context store using a caller-selected embedding width.
1673    pub fn schema_with_embedding_dim(blob_columns: &HashSet<String>, embedding_dim: i32) -> Schema {
1674        Self::schema_with_options(
1675            blob_columns,
1676            true,
1677            true,
1678            true,
1679            true,
1680            embedding_dim,
1681            DistanceMetric::default(),
1682        )
1683    }
1684
1685    fn schema_with_options(
1686        blob_columns: &HashSet<String>,
1687        include_external_id: bool,
1688        include_metadata: bool,
1689        include_relationships: bool,
1690        include_lifecycle: bool,
1691        embedding_dim: i32,
1692        distance_metric: DistanceMetric,
1693    ) -> Schema {
1694        let mut id_metadata = HashMap::new();
1695        id_metadata.insert(
1696            "lance-schema:unenforced-primary-key".to_string(),
1697            "true".to_string(),
1698        );
1699
1700        let text_field = if blob_columns.contains("text_payload") {
1701            let mut metadata = HashMap::new();
1702            metadata.insert("lance-encoding:blob".to_string(), "true".to_string());
1703            Field::new("text_payload", DataType::LargeBinary, true).with_metadata(metadata)
1704        } else {
1705            Field::new("text_payload", DataType::LargeUtf8, true)
1706        };
1707
1708        let binary_field = if blob_columns.contains("binary_payload") {
1709            let mut metadata = HashMap::new();
1710            metadata.insert("lance-encoding:blob".to_string(), "true".to_string());
1711            Field::new("binary_payload", DataType::LargeBinary, true).with_metadata(metadata)
1712        } else {
1713            Field::new("binary_payload", DataType::LargeBinary, true)
1714        };
1715
1716        let mut fields = vec![Field::new("id", DataType::Utf8, false).with_metadata(id_metadata)];
1717        if include_external_id {
1718            fields.push(Field::new("external_id", DataType::Utf8, true));
1719        }
1720        fields.extend([
1721            Field::new("run_id", DataType::Utf8, false),
1722            Field::new("bot_id", DataType::Utf8, true),
1723            Field::new("session_id", DataType::Utf8, true),
1724            Field::new("tenant", DataType::Utf8, true),
1725            Field::new("source", DataType::Utf8, true),
1726            Field::new(
1727                "created_at",
1728                DataType::Timestamp(TimeUnit::Microsecond, None),
1729                false,
1730            ),
1731            Field::new(
1732                "role",
1733                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1734                false,
1735            ),
1736            Field::new(
1737                "state_metadata",
1738                DataType::Struct(
1739                    vec![
1740                        Field::new("step", DataType::Int32, true),
1741                        Field::new("active_plan_id", DataType::Utf8, true),
1742                        Field::new("tokens_used", DataType::Int32, true),
1743                        Field::new("custom", DataType::Utf8, true),
1744                    ]
1745                    .into(),
1746                ),
1747                true,
1748            ),
1749        ]);
1750        if include_metadata {
1751            fields.push(Field::new("metadata", DataType::LargeUtf8, true));
1752        }
1753        if include_relationships {
1754            fields.push(relationship_field());
1755        }
1756        if include_lifecycle {
1757            fields.extend([
1758                Field::new(
1759                    "expires_at",
1760                    DataType::Timestamp(TimeUnit::Microsecond, None),
1761                    true,
1762                ),
1763                Field::new("retention_policy", DataType::Utf8, true),
1764                Field::new("lifecycle_status", DataType::Utf8, false),
1765                Field::new(
1766                    "retired_at",
1767                    DataType::Timestamp(TimeUnit::Microsecond, None),
1768                    true,
1769                ),
1770                Field::new("retired_reason", DataType::Utf8, true),
1771                Field::new("supersedes_id", DataType::Utf8, true),
1772                Field::new("superseded_by_id", DataType::Utf8, true),
1773            ]);
1774        }
1775        fields.extend([
1776            Field::new("content_type", DataType::Utf8, false),
1777            text_field,
1778            binary_field,
1779            Field::new(
1780                "embedding",
1781                DataType::FixedSizeList(
1782                    Arc::new(Field::new("item", DataType::Float32, true)),
1783                    embedding_dim,
1784                ),
1785                true,
1786            ),
1787        ]);
1788
1789        let schema_metadata = HashMap::from([(
1790            DISTANCE_METRIC_METADATA_KEY.to_string(),
1791            distance_metric.as_str().to_string(),
1792        )]);
1793
1794        Schema::new_with_metadata(fields, schema_metadata)
1795    }
1796
1797    async fn load_with_options(
1798        uri: &str,
1799        storage_options: Option<HashMap<String, String>>,
1800    ) -> LanceResult<Dataset> {
1801        if let Some(options) = storage_options {
1802            DatasetBuilder::from_uri(uri)
1803                .with_storage_options(options)
1804                .load()
1805                .await
1806        } else {
1807            Dataset::open(uri).await
1808        }
1809    }
1810
1811    async fn create_with_options(
1812        uri: &str,
1813        storage_options: Option<HashMap<String, String>>,
1814        blob_columns: &HashSet<String>,
1815        embedding_dim: i32,
1816        distance_metric: DistanceMetric,
1817    ) -> LanceResult<Dataset> {
1818        let schema = Arc::new(Self::schema_with_options(
1819            blob_columns,
1820            true,
1821            true,
1822            true,
1823            true,
1824            embedding_dim,
1825            distance_metric,
1826        ));
1827        let empty_batch = RecordBatch::new_empty(schema.clone());
1828        let batches = RecordBatchIterator::new(
1829            vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
1830            schema.clone(),
1831        );
1832
1833        let mut params = WriteParams {
1834            mode: WriteMode::Create,
1835            ..Default::default()
1836        };
1837
1838        if let Some(options) = storage_options {
1839            let store_params = ObjectStoreParams {
1840                storage_options_accessor: Some(Arc::new(
1841                    StorageOptionsAccessor::with_static_options(options),
1842                )),
1843                ..Default::default()
1844            };
1845            params.store_params = Some(store_params);
1846        }
1847
1848        Dataset::write(batches, uri, Some(params)).await
1849    }
1850
1851    fn records_to_batch(&self, entries: &[ContextRecord]) -> LanceResult<RecordBatch> {
1852        let include_external_id = self
1853            .dataset
1854            .schema()
1855            .field_paths()
1856            .iter()
1857            .any(|path| path == "external_id");
1858        let include_lifecycle = self
1859            .dataset
1860            .schema()
1861            .field_paths()
1862            .iter()
1863            .any(|path| path == "expires_at");
1864        let include_metadata = self
1865            .dataset
1866            .schema()
1867            .field_paths()
1868            .iter()
1869            .any(|path| path == "metadata");
1870        let include_tenant = self
1871            .dataset
1872            .schema()
1873            .field_paths()
1874            .iter()
1875            .any(|path| path == "tenant");
1876        let include_source = self
1877            .dataset
1878            .schema()
1879            .field_paths()
1880            .iter()
1881            .any(|path| path == "source");
1882        let include_relationships = self.has_relationships_column();
1883        if !include_external_id && entries.iter().any(|entry| entry.external_id.is_some()) {
1884            return Err(ArrowError::InvalidArgumentError(
1885                "external_id requires a context dataset created with external_id support"
1886                    .to_string(),
1887            )
1888            .into());
1889        }
1890        if !include_metadata && entries.iter().any(|entry| entry.metadata.is_some()) {
1891            return Err(ArrowError::InvalidArgumentError(
1892                "metadata requires a context dataset created with metadata support".to_string(),
1893            )
1894            .into());
1895        }
1896        if !include_tenant && entries.iter().any(|entry| entry.tenant.is_some()) {
1897            return Err(ArrowError::InvalidArgumentError(
1898                "tenant requires a context dataset created with partition-key column support"
1899                    .to_string(),
1900            )
1901            .into());
1902        }
1903        if !include_source && entries.iter().any(|entry| entry.source.is_some()) {
1904            return Err(ArrowError::InvalidArgumentError(
1905                "source requires a context dataset created with partition-key column support"
1906                    .to_string(),
1907            )
1908            .into());
1909        }
1910        if !include_relationships && entries.iter().any(|entry| !entry.relationships.is_empty()) {
1911            return Err(ArrowError::InvalidArgumentError(
1912                "relationships require a context dataset with relationships support; run migrate_relationships_column() on older datasets".to_string(),
1913            )
1914            .into());
1915        }
1916        if !include_lifecycle && entries.iter().any(ContextRecord::has_non_default_lifecycle) {
1917            return Err(ArrowError::InvalidArgumentError(
1918                "lifecycle fields require a context dataset created with lifecycle support"
1919                    .to_string(),
1920            )
1921            .into());
1922        }
1923
1924        let mut id_builder = StringBuilder::new();
1925        let mut external_id_builder = StringBuilder::new();
1926        let mut run_id_builder = StringBuilder::new();
1927        let mut bot_id_builder = StringBuilder::new();
1928        let mut session_id_builder = StringBuilder::new();
1929        let mut tenant_builder = StringBuilder::new();
1930        let mut source_builder = StringBuilder::new();
1931        let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1932        let mut role_builder = StringDictionaryBuilder::<Int8Type>::new();
1933        let mut metadata_builder = LargeStringBuilder::new();
1934        let mut relationships_builder = ListBuilder::new(relationship_struct_builder())
1935            .with_field(relationship_list_item_field());
1936        let mut expires_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1937        let mut retention_policy_builder = StringBuilder::new();
1938        let mut lifecycle_status_builder = StringBuilder::new();
1939        let mut retired_at_builder = TimestampMicrosecondBuilder::with_capacity(entries.len());
1940        let mut retired_reason_builder = StringBuilder::new();
1941        let mut supersedes_id_builder = StringBuilder::new();
1942        let mut superseded_by_id_builder = StringBuilder::new();
1943        let mut content_type_builder = StringBuilder::new();
1944        let mut binary_builder = LargeBinaryBuilder::new();
1945
1946        let text_is_blob = self.blob_columns.contains("text_payload");
1947        let mut text_string_builder = if !text_is_blob {
1948            Some(LargeStringBuilder::new())
1949        } else {
1950            None
1951        };
1952        let mut text_binary_builder = if text_is_blob {
1953            Some(LargeBinaryBuilder::new())
1954        } else {
1955            None
1956        };
1957
1958        let state_fields: Vec<FieldRef> = vec![
1959            Arc::new(Field::new("step", DataType::Int32, true)),
1960            Arc::new(Field::new("active_plan_id", DataType::Utf8, true)),
1961            Arc::new(Field::new("tokens_used", DataType::Int32, true)),
1962            Arc::new(Field::new("custom", DataType::Utf8, true)),
1963        ];
1964        let mut state_builder = StructBuilder::new(
1965            state_fields,
1966            vec![
1967                Box::new(Int32Builder::new()),
1968                Box::new(StringBuilder::new()),
1969                Box::new(Int32Builder::new()),
1970                Box::new(StringBuilder::new()),
1971            ],
1972        );
1973
1974        let mut embedding_builder =
1975            FixedSizeListBuilder::new(Float32Builder::new(), self.embedding_dim);
1976
1977        for entry in entries {
1978            id_builder.append_value(&entry.id);
1979            external_id_builder.append_option(entry.external_id.as_deref());
1980            run_id_builder.append_value(&entry.run_id);
1981            bot_id_builder.append_option(entry.bot_id.as_deref());
1982            session_id_builder.append_option(entry.session_id.as_deref());
1983            tenant_builder.append_option(entry.tenant.as_deref());
1984            source_builder.append_option(entry.source.as_deref());
1985            created_at_builder.append_value(entry.created_at.timestamp_micros());
1986            role_builder.append(&entry.role)?;
1987            match &entry.metadata {
1988                Some(metadata) => metadata_builder.append_value(metadata.to_string()),
1989                None => metadata_builder.append_null(),
1990            }
1991            for relationship in &entry.relationships {
1992                let values_builder = relationships_builder.values();
1993                values_builder
1994                    .field_builder::<StringBuilder>(0)
1995                    .unwrap()
1996                    .append_value(&relationship.target_id);
1997                values_builder
1998                    .field_builder::<StringBuilder>(1)
1999                    .unwrap()
2000                    .append_value(&relationship.relation);
2001                values_builder
2002                    .field_builder::<Float32Builder>(2)
2003                    .unwrap()
2004                    .append_option(relationship.weight);
2005                values_builder.append(true);
2006            }
2007            relationships_builder.append(true);
2008            expires_at_builder
2009                .append_option(entry.expires_at.map(|value| value.timestamp_micros()));
2010            retention_policy_builder.append_option(entry.retention_policy.as_deref());
2011            lifecycle_status_builder.append_value(&entry.lifecycle_status);
2012            retired_at_builder
2013                .append_option(entry.retired_at.map(|value| value.timestamp_micros()));
2014            retired_reason_builder.append_option(entry.retired_reason.as_deref());
2015            supersedes_id_builder.append_option(entry.supersedes_id.as_deref());
2016            superseded_by_id_builder.append_option(entry.superseded_by_id.as_deref());
2017            content_type_builder.append_value(&entry.content_type);
2018
2019            if text_is_blob {
2020                match &entry.text_payload {
2021                    Some(value) => text_binary_builder
2022                        .as_mut()
2023                        .unwrap()
2024                        .append_value(value.as_bytes()),
2025                    None => text_binary_builder.as_mut().unwrap().append_null(),
2026                }
2027            } else {
2028                match &entry.text_payload {
2029                    Some(value) => text_string_builder.as_mut().unwrap().append_value(value),
2030                    None => text_string_builder.as_mut().unwrap().append_null(),
2031                }
2032            }
2033
2034            match &entry.binary_payload {
2035                Some(value) => binary_builder.append_value(value),
2036                None => binary_builder.append_null(),
2037            }
2038
2039            if let Some(metadata) = &entry.state_metadata {
2040                state_builder
2041                    .field_builder::<Int32Builder>(0)
2042                    .unwrap()
2043                    .append_option(metadata.step);
2044                state_builder
2045                    .field_builder::<StringBuilder>(1)
2046                    .unwrap()
2047                    .append_option(metadata.active_plan_id.as_deref());
2048                state_builder
2049                    .field_builder::<Int32Builder>(2)
2050                    .unwrap()
2051                    .append_option(metadata.tokens_used);
2052                state_builder
2053                    .field_builder::<StringBuilder>(3)
2054                    .unwrap()
2055                    .append_option(metadata.custom.as_deref());
2056                state_builder.append(true);
2057            } else {
2058                state_builder
2059                    .field_builder::<Int32Builder>(0)
2060                    .unwrap()
2061                    .append_null();
2062                state_builder
2063                    .field_builder::<StringBuilder>(1)
2064                    .unwrap()
2065                    .append_null();
2066                state_builder
2067                    .field_builder::<Int32Builder>(2)
2068                    .unwrap()
2069                    .append_null();
2070                state_builder
2071                    .field_builder::<StringBuilder>(3)
2072                    .unwrap()
2073                    .append_null();
2074                state_builder.append(false);
2075            }
2076
2077            if let Some(embedding) = &entry.embedding {
2078                if embedding.len() != self.embedding_dim as usize {
2079                    return Err(ArrowError::InvalidArgumentError(format!(
2080                        "embedding length {} does not match expected dimension {}",
2081                        embedding.len(),
2082                        self.embedding_dim
2083                    ))
2084                    .into());
2085                }
2086                {
2087                    let values_builder = embedding_builder.values();
2088                    for value in embedding {
2089                        values_builder.append_value(*value);
2090                    }
2091                }
2092                embedding_builder.append(true);
2093            } else {
2094                // FixedSizeListBuilder requires padding values for null slots.
2095                let values_builder = embedding_builder.values();
2096                for _ in 0..self.embedding_dim {
2097                    values_builder.append_null();
2098                }
2099                embedding_builder.append(false);
2100            }
2101        }
2102
2103        let id_array: ArrayRef = Arc::new(id_builder.finish());
2104        let external_id_array: ArrayRef = Arc::new(external_id_builder.finish());
2105        let run_id_array: ArrayRef = Arc::new(run_id_builder.finish());
2106        let bot_id_array: ArrayRef = Arc::new(bot_id_builder.finish());
2107        let session_id_array: ArrayRef = Arc::new(session_id_builder.finish());
2108        let tenant_array: ArrayRef = Arc::new(tenant_builder.finish());
2109        let source_array: ArrayRef = Arc::new(source_builder.finish());
2110        let created_at_array: ArrayRef = Arc::new(created_at_builder.finish());
2111        let role_array: ArrayRef = Arc::new(role_builder.finish());
2112        let metadata_array: ArrayRef = Arc::new(metadata_builder.finish());
2113        let relationships_array: ArrayRef = Arc::new(relationships_builder.finish());
2114        let expires_at_array: ArrayRef = Arc::new(expires_at_builder.finish());
2115        let retention_policy_array: ArrayRef = Arc::new(retention_policy_builder.finish());
2116        let lifecycle_status_array: ArrayRef = Arc::new(lifecycle_status_builder.finish());
2117        let retired_at_array: ArrayRef = Arc::new(retired_at_builder.finish());
2118        let retired_reason_array: ArrayRef = Arc::new(retired_reason_builder.finish());
2119        let supersedes_id_array: ArrayRef = Arc::new(supersedes_id_builder.finish());
2120        let superseded_by_id_array: ArrayRef = Arc::new(superseded_by_id_builder.finish());
2121        let content_type_array: ArrayRef = Arc::new(content_type_builder.finish());
2122        let text_array: ArrayRef = if text_is_blob {
2123            Arc::new(text_binary_builder.unwrap().finish())
2124        } else {
2125            Arc::new(text_string_builder.unwrap().finish())
2126        };
2127        let binary_array: ArrayRef = Arc::new(binary_builder.finish());
2128        let state_array: ArrayRef = Arc::new(state_builder.finish());
2129        let embedding_array: ArrayRef = Arc::new(embedding_builder.finish());
2130
2131        let mut arrays_by_name = HashMap::from([("id".to_string(), id_array)]);
2132        if include_external_id {
2133            arrays_by_name.insert("external_id".to_string(), external_id_array);
2134        }
2135        arrays_by_name.extend([
2136            ("run_id".to_string(), run_id_array),
2137            ("bot_id".to_string(), bot_id_array),
2138            ("session_id".to_string(), session_id_array),
2139            ("created_at".to_string(), created_at_array),
2140            ("role".to_string(), role_array),
2141            ("state_metadata".to_string(), state_array),
2142        ]);
2143        if include_tenant {
2144            arrays_by_name.insert("tenant".to_string(), tenant_array);
2145        }
2146        if include_source {
2147            arrays_by_name.insert("source".to_string(), source_array);
2148        }
2149        if include_metadata {
2150            arrays_by_name.insert("metadata".to_string(), metadata_array);
2151        }
2152        if include_relationships {
2153            arrays_by_name.insert(RELATIONSHIPS_COLUMN.to_string(), relationships_array);
2154        }
2155        if include_lifecycle {
2156            arrays_by_name.extend([
2157                ("expires_at".to_string(), expires_at_array),
2158                ("retention_policy".to_string(), retention_policy_array),
2159                ("lifecycle_status".to_string(), lifecycle_status_array),
2160                ("retired_at".to_string(), retired_at_array),
2161                ("retired_reason".to_string(), retired_reason_array),
2162                ("supersedes_id".to_string(), supersedes_id_array),
2163                ("superseded_by_id".to_string(), superseded_by_id_array),
2164            ]);
2165        }
2166        arrays_by_name.extend([
2167            ("content_type".to_string(), content_type_array),
2168            ("text_payload".to_string(), text_array),
2169            ("binary_payload".to_string(), binary_array),
2170            ("embedding".to_string(), embedding_array),
2171        ]);
2172
2173        let schema: Arc<Schema> = Arc::new(self.dataset.schema().into());
2174        let arrays = schema
2175            .fields()
2176            .iter()
2177            .map(|field| {
2178                arrays_by_name.remove(field.name().as_str()).ok_or_else(|| {
2179                    LanceError::from(ArrowError::InvalidArgumentError(format!(
2180                        "unsupported dataset column '{}'",
2181                        field.name()
2182                    )))
2183                })
2184            })
2185            .collect::<LanceResult<Vec<_>>>()?;
2186        let batch = RecordBatch::try_new(schema, arrays)?;
2187
2188        Ok(batch)
2189    }
2190}
2191
2192impl Drop for ContextStore {
2193    fn drop(&mut self) {
2194        // Best-effort cleanup of background task
2195        if let Ok(mut state) = self.compaction_state.try_lock() {
2196            if let Some(task) = state.background_task.take() {
2197                task.abort();
2198            }
2199        }
2200    }
2201}
2202
2203/// Convert a record batch to context records.
2204fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
2205    let id_array = column_as::<StringArray>(batch, "id")?;
2206    let external_id_array = column_as_optional::<StringArray>(batch, "external_id");
2207    let run_id_array = column_as::<StringArray>(batch, "run_id")?;
2208    let bot_id_array = column_as_optional::<StringArray>(batch, "bot_id");
2209    let session_id_array = column_as_optional::<StringArray>(batch, "session_id");
2210    let tenant_array = column_as_optional::<StringArray>(batch, "tenant");
2211    let source_array = column_as_optional::<StringArray>(batch, "source");
2212    let created_at_array = column_as::<TimestampMicrosecondArray>(batch, "created_at")?;
2213    let role_array = column_as::<DictionaryArray<Int8Type>>(batch, "role")?;
2214    let state_array = column_as::<StructArray>(batch, "state_metadata")?;
2215    let metadata_array = column_as_optional::<LargeStringArray>(batch, "metadata");
2216    let relationships_array = column_as_optional::<ListArray>(batch, RELATIONSHIPS_COLUMN);
2217    let expires_at_array = column_as_optional::<TimestampMicrosecondArray>(batch, "expires_at");
2218    let retention_policy_array = column_as_optional::<StringArray>(batch, "retention_policy");
2219    let lifecycle_status_array = column_as_optional::<StringArray>(batch, "lifecycle_status");
2220    let retired_at_array = column_as_optional::<TimestampMicrosecondArray>(batch, "retired_at");
2221    let retired_reason_array = column_as_optional::<StringArray>(batch, "retired_reason");
2222    let supersedes_id_array = column_as_optional::<StringArray>(batch, "supersedes_id");
2223    let superseded_by_id_array = column_as_optional::<StringArray>(batch, "superseded_by_id");
2224    let content_type_array = column_as::<StringArray>(batch, "content_type")?;
2225    let binary_array = column_as::<LargeBinaryArray>(batch, "binary_payload")?;
2226    let embedding_array = column_as::<FixedSizeListArray>(batch, "embedding")?;
2227
2228    // Auto-detect whether text_payload is LargeBinary (blob) or LargeUtf8 (default)
2229    let text_is_binary = batch
2230        .schema()
2231        .field_with_name("text_payload")
2232        .is_ok_and(|f| f.data_type() == &DataType::LargeBinary);
2233
2234    let text_string_array = if !text_is_binary {
2235        Some(column_as::<LargeStringArray>(batch, "text_payload")?)
2236    } else {
2237        None
2238    };
2239    let text_binary_array = if text_is_binary {
2240        Some(column_as::<LargeBinaryArray>(batch, "text_payload")?)
2241    } else {
2242        None
2243    };
2244
2245    let step_array = state_array
2246        .column(0)
2247        .as_ref()
2248        .as_any()
2249        .downcast_ref::<Int32Array>()
2250        .ok_or_else(|| {
2251            LanceError::from(ArrowError::InvalidArgumentError(
2252                "step column has unexpected data type".to_string(),
2253            ))
2254        })?;
2255    let active_plan_array = state_array
2256        .column(1)
2257        .as_ref()
2258        .as_any()
2259        .downcast_ref::<StringArray>()
2260        .ok_or_else(|| {
2261            LanceError::from(ArrowError::InvalidArgumentError(
2262                "active_plan_id column has unexpected data type".to_string(),
2263            ))
2264        })?;
2265    let tokens_used_array = state_array
2266        .column(2)
2267        .as_ref()
2268        .as_any()
2269        .downcast_ref::<Int32Array>()
2270        .ok_or_else(|| {
2271            LanceError::from(ArrowError::InvalidArgumentError(
2272                "tokens_used column has unexpected data type".to_string(),
2273            ))
2274        })?;
2275    let custom_array = state_array
2276        .column(3)
2277        .as_ref()
2278        .as_any()
2279        .downcast_ref::<StringArray>()
2280        .ok_or_else(|| {
2281            LanceError::from(ArrowError::InvalidArgumentError(
2282                "custom column has unexpected data type".to_string(),
2283            ))
2284        })?;
2285
2286    let mut results = Vec::with_capacity(batch.num_rows());
2287    for row in 0..batch.num_rows() {
2288        let created_at = timestamp_from_micros(created_at_array.value(row), "created_at")?;
2289
2290        let state_metadata = if state_array.is_null(row) {
2291            None
2292        } else {
2293            Some(StateMetadata {
2294                step: if step_array.is_null(row) {
2295                    None
2296                } else {
2297                    Some(step_array.value(row))
2298                },
2299                active_plan_id: if active_plan_array.is_null(row) {
2300                    None
2301                } else {
2302                    Some(active_plan_array.value(row).to_string())
2303                },
2304                tokens_used: if tokens_used_array.is_null(row) {
2305                    None
2306                } else {
2307                    Some(tokens_used_array.value(row))
2308                },
2309                custom: if custom_array.is_null(row) {
2310                    None
2311                } else {
2312                    Some(custom_array.value(row).to_string())
2313                },
2314            })
2315        };
2316
2317        let text_payload = if text_is_binary {
2318            let arr = text_binary_array.unwrap();
2319            if arr.is_null(row) {
2320                None
2321            } else {
2322                Some(String::from_utf8_lossy(arr.value(row)).to_string())
2323            }
2324        } else {
2325            let arr = text_string_array.unwrap();
2326            if arr.is_null(row) {
2327                None
2328            } else {
2329                Some(arr.value(row).to_string())
2330            }
2331        };
2332
2333        let binary_payload = if binary_array.is_null(row) {
2334            None
2335        } else {
2336            Some(binary_array.value(row).to_vec())
2337        };
2338
2339        let embedding = if embedding_array.is_null(row) {
2340            None
2341        } else {
2342            Some(embedding_from_list(embedding_array, row)?)
2343        };
2344
2345        let role = if role_array.is_null(row) {
2346            return Err(LanceError::from(ArrowError::InvalidArgumentError(
2347                "role column contains null values".to_string(),
2348            )));
2349        } else {
2350            let role_values = role_array
2351                .values()
2352                .as_any()
2353                .downcast_ref::<StringArray>()
2354                .ok_or_else(|| {
2355                    LanceError::from(ArrowError::InvalidArgumentError(
2356                        "role dictionary values are not strings".to_string(),
2357                    ))
2358                })?;
2359            let key = role_array.keys().value(row) as usize;
2360            role_values.value(key).to_string()
2361        };
2362
2363        let bot_id = bot_id_array.and_then(|arr| {
2364            if arr.is_null(row) {
2365                None
2366            } else {
2367                Some(arr.value(row).to_string())
2368            }
2369        });
2370
2371        let session_id = session_id_array.and_then(|arr| {
2372            if arr.is_null(row) {
2373                None
2374            } else {
2375                Some(arr.value(row).to_string())
2376            }
2377        });
2378
2379        let tenant = tenant_array.and_then(|arr| {
2380            if arr.is_null(row) {
2381                None
2382            } else {
2383                Some(arr.value(row).to_string())
2384            }
2385        });
2386
2387        let source = source_array.and_then(|arr| {
2388            if arr.is_null(row) {
2389                None
2390            } else {
2391                Some(arr.value(row).to_string())
2392            }
2393        });
2394
2395        let metadata = match metadata_array {
2396            Some(arr) if !arr.is_null(row) => {
2397                Some(serde_json::from_str(arr.value(row)).map_err(|err| {
2398                    LanceError::from(ArrowError::InvalidArgumentError(format!(
2399                        "invalid metadata JSON for record {}: {}",
2400                        id_array.value(row),
2401                        err
2402                    )))
2403                })?)
2404            }
2405            _ => None,
2406        };
2407        let relationships = match relationships_array {
2408            Some(arr) if !arr.is_null(row) => relationships_from_list(arr, row)?,
2409            _ => Vec::new(),
2410        };
2411        let expires_at = optional_timestamp_from_array(expires_at_array, row, "expires_at")?;
2412        let retention_policy = optional_string_from_array(retention_policy_array, row);
2413        let lifecycle_status = optional_string_from_array(lifecycle_status_array, row)
2414            .unwrap_or_else(|| LIFECYCLE_ACTIVE.to_string());
2415        let retired_at = optional_timestamp_from_array(retired_at_array, row, "retired_at")?;
2416        let retired_reason = optional_string_from_array(retired_reason_array, row);
2417        let supersedes_id = optional_string_from_array(supersedes_id_array, row);
2418        let superseded_by_id = optional_string_from_array(superseded_by_id_array, row);
2419
2420        results.push(ContextRecord {
2421            id: id_array.value(row).to_string(),
2422            external_id: external_id_array.and_then(|arr| {
2423                if arr.is_null(row) {
2424                    None
2425                } else {
2426                    Some(arr.value(row).to_string())
2427                }
2428            }),
2429            run_id: run_id_array.value(row).to_string(),
2430            bot_id,
2431            session_id,
2432            tenant,
2433            source,
2434            created_at,
2435            role,
2436            state_metadata,
2437            metadata,
2438            relationships,
2439            expires_at,
2440            retention_policy,
2441            lifecycle_status,
2442            retired_at,
2443            retired_reason,
2444            supersedes_id,
2445            superseded_by_id,
2446            content_type: content_type_array.value(row).to_string(),
2447            text_payload,
2448            binary_payload,
2449            embedding,
2450        });
2451    }
2452
2453    Ok(results)
2454}
2455
2456fn embedding_from_list(list: &FixedSizeListArray, row: usize) -> LanceResult<Vec<f32>> {
2457    let values = list.value(row);
2458    let float_array = values
2459        .as_ref()
2460        .as_any()
2461        .downcast_ref::<Float32Array>()
2462        .ok_or_else(|| {
2463            LanceError::from(ArrowError::InvalidArgumentError(
2464                "embedding column does not contain float32 values".to_string(),
2465            ))
2466        })?;
2467    let mut embedding = Vec::with_capacity(float_array.len());
2468    for idx in 0..float_array.len() {
2469        embedding.push(float_array.value(idx));
2470    }
2471    Ok(embedding)
2472}
2473
2474fn relationships_from_list(list: &ListArray, row: usize) -> LanceResult<Vec<Relationship>> {
2475    let values = list.value(row);
2476    let struct_array = values
2477        .as_ref()
2478        .as_any()
2479        .downcast_ref::<StructArray>()
2480        .ok_or_else(|| {
2481            LanceError::from(ArrowError::InvalidArgumentError(
2482                "relationships column does not contain struct values".to_string(),
2483            ))
2484        })?;
2485
2486    let target_id_array = struct_array
2487        .column(0)
2488        .as_ref()
2489        .as_any()
2490        .downcast_ref::<StringArray>()
2491        .ok_or_else(|| {
2492            LanceError::from(ArrowError::InvalidArgumentError(
2493                "relationships.target_id column has unexpected data type".to_string(),
2494            ))
2495        })?;
2496    let relation_array = struct_array
2497        .column(1)
2498        .as_ref()
2499        .as_any()
2500        .downcast_ref::<StringArray>()
2501        .ok_or_else(|| {
2502            LanceError::from(ArrowError::InvalidArgumentError(
2503                "relationships.relation column has unexpected data type".to_string(),
2504            ))
2505        })?;
2506    let weight_array = struct_array
2507        .column(2)
2508        .as_ref()
2509        .as_any()
2510        .downcast_ref::<Float32Array>()
2511        .ok_or_else(|| {
2512            LanceError::from(ArrowError::InvalidArgumentError(
2513                "relationships.weight column has unexpected data type".to_string(),
2514            ))
2515        })?;
2516
2517    let mut relationships = Vec::with_capacity(struct_array.len());
2518    for idx in 0..struct_array.len() {
2519        if struct_array.is_null(idx) {
2520            continue;
2521        }
2522        if target_id_array.is_null(idx) {
2523            return Err(LanceError::from(ArrowError::InvalidArgumentError(
2524                "relationships.target_id contains null values".to_string(),
2525            )));
2526        }
2527        if relation_array.is_null(idx) {
2528            return Err(LanceError::from(ArrowError::InvalidArgumentError(
2529                "relationships.relation contains null values".to_string(),
2530            )));
2531        }
2532
2533        relationships.push(Relationship {
2534            target_id: target_id_array.value(idx).to_string(),
2535            relation: relation_array.value(idx).to_string(),
2536            weight: if weight_array.is_null(idx) {
2537                None
2538            } else {
2539                Some(weight_array.value(idx))
2540            },
2541        });
2542    }
2543    Ok(relationships)
2544}
2545
2546fn timestamp_from_micros(value: i64, column: &str) -> LanceResult<DateTime<Utc>> {
2547    DateTime::from_timestamp_micros(value).ok_or_else(|| {
2548        LanceError::from(ArrowError::InvalidArgumentError(format!(
2549            "invalid timestamp value {value} in column '{column}'"
2550        )))
2551    })
2552}
2553
2554fn optional_timestamp_from_array(
2555    array: Option<&TimestampMicrosecondArray>,
2556    row: usize,
2557    column: &str,
2558) -> LanceResult<Option<DateTime<Utc>>> {
2559    let Some(array) = array else {
2560        return Ok(None);
2561    };
2562    if array.is_null(row) {
2563        Ok(None)
2564    } else {
2565        timestamp_from_micros(array.value(row), column).map(Some)
2566    }
2567}
2568
2569fn optional_string_from_array(array: Option<&StringArray>, row: usize) -> Option<String> {
2570    array.and_then(|arr| {
2571        if arr.is_null(row) {
2572            None
2573        } else {
2574            Some(arr.value(row).to_string())
2575        }
2576    })
2577}
2578
2579fn l2_distance(left: &[f32], right: &[f32]) -> f32 {
2580    left.iter()
2581        .zip(right)
2582        .map(|(left, right)| {
2583            let delta = left - right;
2584            delta * delta
2585        })
2586        .sum::<f32>()
2587        .sqrt()
2588}
2589
2590fn validate_embedding_dim(embedding_dim: i32) -> LanceResult<()> {
2591    if embedding_dim <= 0 {
2592        return Err(LanceError::from(ArrowError::InvalidArgumentError(format!(
2593            "embedding_dim must be positive, got {embedding_dim}"
2594        ))));
2595    }
2596    Ok(())
2597}
2598
2599fn validate_query_dimension(query: &[f32], embedding_dim: i32) -> LanceResult<()> {
2600    if query.len() != embedding_dim as usize {
2601        return Err(ArrowError::InvalidArgumentError(format!(
2602            "query length {} does not match embedding dimension {}",
2603            query.len(),
2604            embedding_dim
2605        ))
2606        .into());
2607    }
2608    Ok(())
2609}
2610
2611fn unique_query_terms(text: &str) -> Vec<String> {
2612    let mut seen = HashSet::new();
2613    tokenize_for_retrieval(text)
2614        .into_iter()
2615        .filter(|term| seen.insert(term.clone()))
2616        .collect()
2617}
2618
2619fn tokenize_for_retrieval(text: &str) -> Vec<String> {
2620    let mut terms = Vec::new();
2621    let mut current = String::new();
2622
2623    for character in text.chars() {
2624        if character.is_alphanumeric() {
2625            current.extend(character.to_lowercase());
2626        } else if !current.is_empty() {
2627            terms.push(std::mem::take(&mut current));
2628        }
2629    }
2630
2631    if !current.is_empty() {
2632        terms.push(current);
2633    }
2634
2635    terms
2636}
2637
2638fn lexical_score(query_terms: &[String], text: Option<&str>) -> Option<f32> {
2639    let text = text?;
2640    if query_terms.is_empty() {
2641        return None;
2642    }
2643
2644    let payload_terms: HashSet<String> = tokenize_for_retrieval(text).into_iter().collect();
2645    if payload_terms.is_empty() {
2646        return None;
2647    }
2648
2649    let matched_terms = query_terms
2650        .iter()
2651        .filter(|term| payload_terms.contains(*term))
2652        .count();
2653    if matched_terms == 0 {
2654        return None;
2655    }
2656
2657    Some(matched_terms as f32 / query_terms.len() as f32)
2658}
2659
2660fn add_retrieve_channel(
2661    candidates: &mut HashMap<String, RetrieveResult>,
2662    record: &ContextRecord,
2663    rank: usize,
2664    channel: &str,
2665    vector_distance: Option<f32>,
2666    text_score: Option<f32>,
2667) {
2668    let candidate = candidates
2669        .entry(record.id.clone())
2670        .or_insert_with(|| RetrieveResult {
2671            record: record.clone(),
2672            score: 0.0,
2673            vector_distance: None,
2674            text_score: None,
2675            matched_channels: Vec::new(),
2676        });
2677    candidate.score += 1.0 / (RRF_K + rank as f32);
2678    if let Some(distance) = vector_distance {
2679        candidate.vector_distance = Some(distance);
2680    }
2681    if let Some(score) = text_score {
2682        candidate.text_score = Some(score);
2683    }
2684    if !candidate
2685        .matched_channels
2686        .iter()
2687        .any(|existing| existing == channel)
2688    {
2689        candidate.matched_channels.push(channel.to_string());
2690    }
2691}
2692
2693fn compare_retrieve_results(left: &RetrieveResult, right: &RetrieveResult) -> Ordering {
2694    right
2695        .score
2696        .total_cmp(&left.score)
2697        .then_with(|| compare_optional_distance(left.vector_distance, right.vector_distance))
2698        .then_with(|| compare_optional_score(left.text_score, right.text_score))
2699        .then_with(|| left.record.id.cmp(&right.record.id))
2700}
2701
2702fn compare_optional_distance(left: Option<f32>, right: Option<f32>) -> Ordering {
2703    match (left, right) {
2704        (Some(left), Some(right)) => left.total_cmp(&right),
2705        (Some(_), None) => Ordering::Less,
2706        (None, Some(_)) => Ordering::Greater,
2707        (None, None) => Ordering::Equal,
2708    }
2709}
2710
2711fn compare_optional_score(left: Option<f32>, right: Option<f32>) -> Ordering {
2712    match (left, right) {
2713        (Some(left), Some(right)) => right.total_cmp(&left),
2714        (Some(_), None) => Ordering::Less,
2715        (None, Some(_)) => Ordering::Greater,
2716        (None, None) => Ordering::Equal,
2717    }
2718}
2719
2720fn embedding_dim_from_schema(schema: &Schema) -> LanceResult<i32> {
2721    let field = schema
2722        .field_with_name("embedding")
2723        .map_err(LanceError::from)?;
2724    let DataType::FixedSizeList(item_field, embedding_dim) = field.data_type() else {
2725        return Err(LanceError::from(ArrowError::InvalidArgumentError(
2726            "embedding column must be a FixedSizeList<Float32>".to_string(),
2727        )));
2728    };
2729    if item_field.data_type() != &DataType::Float32 {
2730        return Err(LanceError::from(ArrowError::InvalidArgumentError(
2731            "embedding column must contain Float32 values".to_string(),
2732        )));
2733    }
2734    validate_embedding_dim(*embedding_dim)?;
2735    Ok(*embedding_dim)
2736}
2737
2738/// Read the persisted [`DistanceMetric`] from the dataset's schema metadata.
2739///
2740/// Datasets created before metric persistence (no key present) default to
2741/// [`DistanceMetric::L2`], preserving historical ranking behavior.
2742fn distance_metric_from_schema(schema: &Schema) -> LanceResult<DistanceMetric> {
2743    match schema.metadata.get(DISTANCE_METRIC_METADATA_KEY) {
2744        Some(value) => DistanceMetric::parse(value),
2745        None => Ok(DistanceMetric::default()),
2746    }
2747}
2748
2749/// Dot product of two vectors.
2750fn dot_product(left: &[f32], right: &[f32]) -> f32 {
2751    left.iter()
2752        .zip(right)
2753        .map(|(left, right)| left * right)
2754        .sum::<f32>()
2755}
2756
2757/// Cosine distance (`1 - cosine_similarity`), ranging from 0 (identical
2758/// direction) to 2 (opposite). If either vector has zero magnitude the
2759/// similarity is undefined, so we return the maximum distance (`1.0`) to keep
2760/// such records ranked last without producing `NaN`.
2761fn cosine_distance(left: &[f32], right: &[f32]) -> f32 {
2762    let dot = dot_product(left, right);
2763    let left_norm = dot_product(left, left).sqrt();
2764    let right_norm = dot_product(right, right).sqrt();
2765    if left_norm == 0.0 || right_norm == 0.0 {
2766        return 1.0;
2767    }
2768    1.0 - (dot / (left_norm * right_norm))
2769}
2770
2771/// Negated dot product, so that a larger inner product (a closer match for
2772/// maximum-inner-product search) sorts first under ascending ordering.
2773fn dot_distance(left: &[f32], right: &[f32]) -> f32 {
2774    -dot_product(left, right)
2775}
2776
2777fn column_as<'a, A>(batch: &'a RecordBatch, name: &str) -> LanceResult<&'a A>
2778where
2779    A: Array + 'static,
2780{
2781    let column = batch.column_by_name(name).ok_or_else(|| {
2782        LanceError::from(ArrowError::InvalidArgumentError(format!(
2783            "column '{name}' not found"
2784        )))
2785    })?;
2786    column.as_ref().as_any().downcast_ref::<A>().ok_or_else(|| {
2787        LanceError::from(ArrowError::InvalidArgumentError(format!(
2788            "column '{name}' has unexpected data type"
2789        )))
2790    })
2791}
2792
2793fn column_as_optional<'a, A>(batch: &'a RecordBatch, name: &str) -> Option<&'a A>
2794where
2795    A: Array + 'static,
2796{
2797    batch
2798        .column_by_name(name)
2799        .and_then(|col| col.as_ref().as_any().downcast_ref::<A>())
2800}
2801
2802/// Render a SQL `IN (...)` value list as comma-separated quoted string
2803/// literals, escaping any embedded single quotes. Callers ensure the input is
2804/// non-empty before building the surrounding `IN ()` clause.
2805fn sql_quoted_list(values: &[&str]) -> String {
2806    values
2807        .iter()
2808        .map(|value| format!("'{}'", value.replace('\'', "''")))
2809        .collect::<Vec<_>>()
2810        .join(",")
2811}
2812
2813#[cfg(test)]
2814mod tests {
2815    use super::*;
2816    use crate::serde::CONTENT_TYPE_TEXT;
2817    use chrono::{Duration as ChronoDuration, Utc};
2818    use tempfile::TempDir;
2819
2820    fn make_embedding_with_dim(dim: usize, pivot: f32) -> Vec<f32> {
2821        let mut values = vec![0.0; dim];
2822        if !values.is_empty() {
2823            values[0] = pivot;
2824        }
2825        values
2826    }
2827
2828    fn make_embedding(pivot: f32) -> Vec<f32> {
2829        make_embedding_with_dim(DEFAULT_EMBEDDING_DIM as usize, pivot)
2830    }
2831
2832    fn text_record(id: &str, embedding_pivot: f32) -> ContextRecord {
2833        ContextRecord {
2834            id: id.to_string(),
2835            external_id: None,
2836            run_id: format!("run-{id}"),
2837            bot_id: None,
2838            session_id: None,
2839            tenant: None,
2840            source: None,
2841            created_at: Utc::now(),
2842            role: "user".to_string(),
2843            state_metadata: Some(StateMetadata {
2844                step: Some(1),
2845                active_plan_id: Some("plan".to_string()),
2846                tokens_used: Some(10),
2847                custom: None,
2848            }),
2849            metadata: None,
2850            relationships: Vec::new(),
2851            expires_at: None,
2852            retention_policy: None,
2853            lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
2854            retired_at: None,
2855            retired_reason: None,
2856            supersedes_id: None,
2857            superseded_by_id: None,
2858            content_type: CONTENT_TYPE_TEXT.to_string(),
2859            text_payload: Some(format!("payload-{id}")),
2860            binary_payload: None,
2861            embedding: Some(make_embedding(embedding_pivot)),
2862        }
2863    }
2864
2865    #[test]
2866    fn search_orders_by_distance() {
2867        let dir = TempDir::new().unwrap();
2868        let uri = dir.path().to_string_lossy().to_string();
2869        let runtime = tokio::runtime::Runtime::new().unwrap();
2870        runtime.block_on(async {
2871            let mut store = ContextStore::open(&uri).await.unwrap();
2872            let first = text_record("a", 0.0);
2873            let second = text_record("b", 1.0);
2874            store.add(&[first.clone(), second.clone()]).await.unwrap();
2875
2876            let query = make_embedding(1.0);
2877            let results = store.search(&query, Some(2)).await.unwrap();
2878
2879            assert_eq!(results.len(), 2);
2880            assert_eq!(results[0].record.id, second.id);
2881            assert!(
2882                results[0].distance <= results[1].distance,
2883                "results not ordered by distance: {:?}",
2884                results
2885            );
2886        });
2887    }
2888
2889    #[test]
2890    fn search_validates_query_length() {
2891        let dir = TempDir::new().unwrap();
2892        let uri = dir.path().to_string_lossy().to_string();
2893        let runtime = tokio::runtime::Runtime::new().unwrap();
2894        runtime.block_on(async {
2895            let store = ContextStore::open(&uri).await.unwrap();
2896            let err = store.search(&[0.0_f32], None).await.unwrap_err();
2897            let message = err.to_string();
2898            assert!(
2899                message.contains("embedding dimension"),
2900                "unexpected error message: {message}"
2901            );
2902        });
2903    }
2904
2905    fn make_embedding2(x0: f32, x1: f32) -> Vec<f32> {
2906        let mut values = vec![0.0; DEFAULT_EMBEDDING_DIM as usize];
2907        values[0] = x0;
2908        values[1] = x1;
2909        values
2910    }
2911
2912    fn text_record_with(id: &str, embedding: Vec<f32>) -> ContextRecord {
2913        let mut record = text_record(id, 0.0);
2914        record.embedding = Some(embedding);
2915        record
2916    }
2917
2918    #[test]
2919    fn distance_metric_parse_and_math() {
2920        assert_eq!(DistanceMetric::parse("l2").unwrap(), DistanceMetric::L2);
2921        assert_eq!(DistanceMetric::parse("L2").unwrap(), DistanceMetric::L2);
2922        assert_eq!(
2923            DistanceMetric::parse("cosine").unwrap(),
2924            DistanceMetric::Cosine
2925        );
2926        assert_eq!(DistanceMetric::parse("DOT").unwrap(), DistanceMetric::Dot);
2927        assert!(DistanceMetric::parse("manhattan").is_err());
2928        assert_eq!(DistanceMetric::default(), DistanceMetric::L2);
2929
2930        let a = [1.0_f32, 0.0];
2931        let b = [1.0_f32, 1.0];
2932        // L2: sqrt(0 + 1) = 1
2933        assert!((DistanceMetric::L2.distance(&a, &b) - 1.0).abs() < 1e-6);
2934        // Cosine distance: 1 - (1 / (1 * sqrt(2))) = 1 - 0.70710677
2935        assert!((DistanceMetric::Cosine.distance(&a, &b) - (1.0 - 0.707_106_77)).abs() < 1e-5);
2936        // Dot: -(1*1 + 0*1) = -1
2937        assert!((DistanceMetric::Dot.distance(&a, &b) + 1.0).abs() < 1e-6);
2938        // Zero-magnitude vectors yield max cosine distance, never NaN.
2939        let zero = [0.0_f32, 0.0];
2940        assert!((DistanceMetric::Cosine.distance(&a, &zero) - 1.0).abs() < 1e-6);
2941    }
2942
2943    #[test]
2944    fn search_metric_changes_ranking() {
2945        let runtime = tokio::runtime::Runtime::new().unwrap();
2946        runtime.block_on(async {
2947            // query direction matches "aligned" but "near" is closer in L2.
2948            let query = make_embedding2(1.0, 0.0);
2949            // aligned: same direction as query, larger magnitude -> far in L2,
2950            //          but cosine distance 0 and largest dot product.
2951            let aligned = make_embedding2(10.0, 0.0);
2952            // near: closest in L2, but off-axis -> larger cosine distance.
2953            let near = make_embedding2(1.0, 1.0);
2954
2955            // Default (L2): `near` should rank first.
2956            let l2_dir = TempDir::new().unwrap();
2957            let mut l2_store = ContextStore::open(&l2_dir.path().to_string_lossy())
2958                .await
2959                .unwrap();
2960            l2_store
2961                .add(&[
2962                    text_record_with("aligned", aligned.clone()),
2963                    text_record_with("near", near.clone()),
2964                ])
2965                .await
2966                .unwrap();
2967            let l2_results = l2_store.search(&query, Some(2)).await.unwrap();
2968            assert_eq!(l2_results[0].record.id, "near");
2969
2970            // Cosine: `aligned` should rank first despite the larger L2 distance.
2971            let cos_dir = TempDir::new().unwrap();
2972            let cos_opts = ContextStoreOptions {
2973                distance_metric: Some(DistanceMetric::Cosine),
2974                ..Default::default()
2975            };
2976            let mut cos_store =
2977                ContextStore::open_with_options(&cos_dir.path().to_string_lossy(), cos_opts)
2978                    .await
2979                    .unwrap();
2980            cos_store
2981                .add(&[
2982                    text_record_with("aligned", aligned.clone()),
2983                    text_record_with("near", near.clone()),
2984                ])
2985                .await
2986                .unwrap();
2987            let cos_results = cos_store.search(&query, Some(2)).await.unwrap();
2988            assert_eq!(cos_results[0].record.id, "aligned");
2989
2990            // Dot: `aligned` has the largest inner product -> first.
2991            let dot_dir = TempDir::new().unwrap();
2992            let dot_opts = ContextStoreOptions {
2993                distance_metric: Some(DistanceMetric::Dot),
2994                ..Default::default()
2995            };
2996            let mut dot_store =
2997                ContextStore::open_with_options(&dot_dir.path().to_string_lossy(), dot_opts)
2998                    .await
2999                    .unwrap();
3000            dot_store
3001                .add(&[
3002                    text_record_with("aligned", aligned),
3003                    text_record_with("near", near),
3004                ])
3005                .await
3006                .unwrap();
3007            let dot_results = dot_store.search(&query, Some(2)).await.unwrap();
3008            assert_eq!(dot_results[0].record.id, "aligned");
3009        });
3010    }
3011
3012    #[test]
3013    fn distance_metric_persists_across_reopen() {
3014        let runtime = tokio::runtime::Runtime::new().unwrap();
3015        runtime.block_on(async {
3016            let dir = TempDir::new().unwrap();
3017            let uri = dir.path().to_string_lossy().to_string();
3018            let query = make_embedding2(1.0, 0.0);
3019            let aligned = make_embedding2(10.0, 0.0);
3020            let near = make_embedding2(1.0, 1.0);
3021
3022            // Create with cosine and write records.
3023            {
3024                let opts = ContextStoreOptions {
3025                    distance_metric: Some(DistanceMetric::Cosine),
3026                    ..Default::default()
3027                };
3028                let mut store = ContextStore::open_with_options(&uri, opts).await.unwrap();
3029                store
3030                    .add(&[
3031                        text_record_with("aligned", aligned.clone()),
3032                        text_record_with("near", near.clone()),
3033                    ])
3034                    .await
3035                    .unwrap();
3036            }
3037
3038            // Reopen WITHOUT passing the metric: it must be recovered from the
3039            // schema, so cosine ranking (`aligned` first) still applies.
3040            let store = ContextStore::open(&uri).await.unwrap();
3041            assert_eq!(store.distance_metric, DistanceMetric::Cosine);
3042            let results = store.search(&query, Some(2)).await.unwrap();
3043            assert_eq!(results[0].record.id, "aligned");
3044        });
3045    }
3046
3047    #[test]
3048    fn distance_metric_mismatch_errors() {
3049        let runtime = tokio::runtime::Runtime::new().unwrap();
3050        runtime.block_on(async {
3051            let dir = TempDir::new().unwrap();
3052            let uri = dir.path().to_string_lossy().to_string();
3053            ContextStore::open_with_options(
3054                &uri,
3055                ContextStoreOptions {
3056                    distance_metric: Some(DistanceMetric::Cosine),
3057                    ..Default::default()
3058                },
3059            )
3060            .await
3061            .unwrap();
3062
3063            let result = ContextStore::open_with_options(
3064                &uri,
3065                ContextStoreOptions {
3066                    distance_metric: Some(DistanceMetric::Dot),
3067                    ..Default::default()
3068                },
3069            )
3070            .await;
3071            let err = match result {
3072                Ok(_) => panic!("expected a distance-metric mismatch error"),
3073                Err(err) => err,
3074            };
3075            assert!(
3076                err.to_string().contains("distance metric"),
3077                "unexpected error: {err}"
3078            );
3079        });
3080    }
3081
3082    #[test]
3083    fn distance_metric_from_schema_defaults_l2_when_absent() {
3084        // Datasets created before metric persistence carry no metadata key.
3085        let schema = Schema::new(vec![Field::new("id", DataType::Utf8, false)]);
3086        assert_eq!(
3087            distance_metric_from_schema(&schema).unwrap(),
3088            DistanceMetric::L2
3089        );
3090    }
3091
3092    #[test]
3093    fn retrieve_fuses_text_and_vector_channels() {
3094        let dir = TempDir::new().unwrap();
3095        let uri = dir.path().to_string_lossy().to_string();
3096        let runtime = tokio::runtime::Runtime::new().unwrap();
3097        runtime.block_on(async {
3098            let mut store = ContextStore::open(&uri).await.unwrap();
3099            let mut semantic_near = text_record("semantic-near", 0.0);
3100            semantic_near.text_payload = Some("general rollout risk guidance".to_string());
3101            let mut exact_policy = text_record("exact-policy", 1.0);
3102            exact_policy.text_payload = Some("POLICY-123 blocks service-a rollouts".to_string());
3103
3104            store
3105                .add(&[semantic_near.clone(), exact_policy.clone()])
3106                .await
3107                .unwrap();
3108
3109            let query = make_embedding(0.0);
3110            let results = store
3111                .retrieve_filtered_with_options(
3112                    Some("POLICY-123 service-a"),
3113                    Some(&query),
3114                    Some(2),
3115                    None,
3116                    LifecycleQueryOptions::default(),
3117                )
3118                .await
3119                .unwrap();
3120
3121            assert_eq!(results.len(), 2);
3122            assert_eq!(results[0].record.id, exact_policy.id);
3123            assert!(results[0].score > results[1].score);
3124            assert!(results[0].vector_distance.is_some());
3125            assert_eq!(results[0].text_score, Some(1.0));
3126            assert_eq!(results[0].matched_channels, ["vector", "text"]);
3127        });
3128    }
3129
3130    #[test]
3131    fn custom_embedding_dimension_round_trips_add_search_and_reopen() {
3132        let dir = TempDir::new().unwrap();
3133        let uri = dir.path().to_string_lossy().to_string();
3134        let runtime = tokio::runtime::Runtime::new().unwrap();
3135        runtime.block_on(async {
3136            let options = ContextStoreOptions {
3137                embedding_dim: Some(3),
3138                ..Default::default()
3139            };
3140            let mut store = ContextStore::open_with_options(&uri, options)
3141                .await
3142                .unwrap();
3143            assert_eq!(store.embedding_dim(), 3);
3144
3145            let mut first = text_record("custom-a", 0.0);
3146            first.embedding = Some(make_embedding_with_dim(3, 0.0));
3147            let mut second = text_record("custom-b", 0.0);
3148            second.embedding = Some(make_embedding_with_dim(3, 1.0));
3149            store.add(&[first.clone(), second.clone()]).await.unwrap();
3150
3151            let query = make_embedding_with_dim(3, 1.0);
3152            let results = store.search(&query, Some(2)).await.unwrap();
3153            assert_eq!(results[0].record.id, second.id);
3154
3155            let reopened = ContextStore::open(&uri).await.unwrap();
3156            assert_eq!(reopened.embedding_dim(), 3);
3157            let results = reopened.search(&query, Some(1)).await.unwrap();
3158            assert_eq!(results[0].record.id, second.id);
3159
3160            let err = reopened
3161                .search(&make_embedding(1.0), None)
3162                .await
3163                .unwrap_err();
3164            assert!(
3165                err.to_string().contains("embedding dimension 3"),
3166                "unexpected error message: {err}"
3167            );
3168        });
3169    }
3170
3171    #[test]
3172    fn existing_default_dimension_dataset_opens_without_options() {
3173        let dir = TempDir::new().unwrap();
3174        let uri = dir.path().to_string_lossy().to_string();
3175        let runtime = tokio::runtime::Runtime::new().unwrap();
3176        runtime.block_on(async {
3177            let mut store = ContextStore::open(&uri).await.unwrap();
3178            assert_eq!(store.embedding_dim(), DEFAULT_EMBEDDING_DIM);
3179            store.add(&[text_record("default-dim", 0.0)]).await.unwrap();
3180            drop(store);
3181
3182            let reopened = ContextStore::open(&uri).await.unwrap();
3183            assert_eq!(reopened.embedding_dim(), DEFAULT_EMBEDDING_DIM);
3184            reopened
3185                .search(&make_embedding(0.0), Some(1))
3186                .await
3187                .unwrap();
3188        });
3189    }
3190
3191    #[test]
3192    fn opening_existing_dataset_rejects_mismatched_requested_dimension() {
3193        let dir = TempDir::new().unwrap();
3194        let uri = dir.path().to_string_lossy().to_string();
3195        let runtime = tokio::runtime::Runtime::new().unwrap();
3196        runtime.block_on(async {
3197            let options = ContextStoreOptions {
3198                embedding_dim: Some(3),
3199                ..Default::default()
3200            };
3201            ContextStore::open_with_options(&uri, options)
3202                .await
3203                .unwrap();
3204
3205            let mismatched = ContextStoreOptions {
3206                embedding_dim: Some(4),
3207                ..Default::default()
3208            };
3209            let err = match ContextStore::open_with_options(&uri, mismatched).await {
3210                Ok(_) => panic!("expected mismatched embedding dimension to fail"),
3211                Err(err) => err,
3212            };
3213            assert!(
3214                err.to_string()
3215                    .contains("does not match requested dimension 4"),
3216                "unexpected error message: {err}"
3217            );
3218        });
3219    }
3220
3221    #[test]
3222    fn list_hides_expired_and_retired_records_by_default() {
3223        let dir = TempDir::new().unwrap();
3224        let uri = dir.path().to_string_lossy().to_string();
3225        let runtime = tokio::runtime::Runtime::new().unwrap();
3226        runtime.block_on(async {
3227            let mut store = ContextStore::open(&uri).await.unwrap();
3228            let active = text_record("active", 0.0);
3229            let mut expired = text_record("expired", 0.0);
3230            expired.expires_at = Some(Utc::now() - ChronoDuration::minutes(1));
3231            let mut superseded = text_record("superseded", 0.0);
3232            superseded.lifecycle_status = "superseded".to_string();
3233            superseded.retired_reason = Some("replaced by newer fact".to_string());
3234            superseded.superseded_by_id = Some("active".to_string());
3235
3236            store
3237                .add(&[active.clone(), expired.clone(), superseded.clone()])
3238                .await
3239                .unwrap();
3240
3241            let visible = store.list(None, None).await.unwrap();
3242            assert_eq!(visible.len(), 1);
3243            assert_eq!(visible[0].id, active.id);
3244
3245            let all = store
3246                .list_with_options(None, None, LifecycleQueryOptions::new(true, true))
3247                .await
3248                .unwrap();
3249            assert_eq!(all.len(), 3);
3250            let expired_roundtrip = all.iter().find(|record| record.id == expired.id).unwrap();
3251            assert_eq!(
3252                expired_roundtrip
3253                    .expires_at
3254                    .map(|value| value.timestamp_micros()),
3255                expired.expires_at.map(|value| value.timestamp_micros())
3256            );
3257            let superseded_roundtrip = all
3258                .iter()
3259                .find(|record| record.id == superseded.id)
3260                .unwrap();
3261            assert_eq!(superseded_roundtrip.lifecycle_status, "superseded");
3262            assert_eq!(
3263                superseded_roundtrip.superseded_by_id.as_deref(),
3264                Some("active")
3265            );
3266        });
3267    }
3268
3269    #[test]
3270    fn list_hides_records_superseded_by_newer_pointer() {
3271        let dir = TempDir::new().unwrap();
3272        let uri = dir.path().to_string_lossy().to_string();
3273        let runtime = tokio::runtime::Runtime::new().unwrap();
3274        runtime.block_on(async {
3275            let mut store = ContextStore::open(&uri).await.unwrap();
3276            let old = text_record("old", 0.0);
3277            let mut replacement = text_record("new", 1.0);
3278            replacement.supersedes_id = Some(old.id.clone());
3279            store
3280                .add(&[old.clone(), replacement.clone()])
3281                .await
3282                .unwrap();
3283
3284            let visible = store.list(None, None).await.unwrap();
3285            assert_eq!(visible.len(), 1);
3286            assert_eq!(visible[0].id, replacement.id);
3287
3288            let history = store
3289                .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3290                .await
3291                .unwrap();
3292            assert_eq!(history.len(), 2);
3293            assert!(history.iter().any(|record| record.id == old.id));
3294            assert!(history.iter().any(|record| record.id == replacement.id));
3295        });
3296    }
3297
3298    #[test]
3299    fn search_filters_lifecycle_before_ranking() {
3300        let dir = TempDir::new().unwrap();
3301        let uri = dir.path().to_string_lossy().to_string();
3302        let runtime = tokio::runtime::Runtime::new().unwrap();
3303        runtime.block_on(async {
3304            let mut store = ContextStore::open(&uri).await.unwrap();
3305            let active = text_record("active", 1.0);
3306            let mut expired_better_match = text_record("expired", 0.0);
3307            expired_better_match.expires_at = Some(Utc::now() - ChronoDuration::minutes(1));
3308            store
3309                .add(&[active.clone(), expired_better_match.clone()])
3310                .await
3311                .unwrap();
3312
3313            let query = make_embedding(0.0);
3314            let visible = store.search(&query, Some(1)).await.unwrap();
3315            assert_eq!(visible.len(), 1);
3316            assert_eq!(visible[0].record.id, active.id);
3317
3318            let all = store
3319                .search_with_options(&query, Some(1), LifecycleQueryOptions::new(true, false))
3320                .await
3321                .unwrap();
3322            assert_eq!(all.len(), 1);
3323            assert_eq!(all[0].record.id, expired_better_match.id);
3324        });
3325    }
3326
3327    #[test]
3328    fn external_id_roundtrips_and_supports_lookup() {
3329        let dir = TempDir::new().unwrap();
3330        let uri = dir.path().to_string_lossy().to_string();
3331        let runtime = tokio::runtime::Runtime::new().unwrap();
3332        runtime.block_on(async {
3333            let mut store = ContextStore::open(&uri).await.unwrap();
3334            let mut record = text_record("a", 0.0);
3335            record.external_id = Some("doc-123#chunk-1".to_string());
3336            store.add(std::slice::from_ref(&record)).await.unwrap();
3337
3338            let by_external_id = store
3339                .get_by_external_id("doc-123#chunk-1")
3340                .await
3341                .unwrap()
3342                .unwrap();
3343            assert_eq!(by_external_id.id, record.id);
3344            assert_eq!(by_external_id.external_id, record.external_id);
3345
3346            let by_id = store.get_by_id(&record.id).await.unwrap().unwrap();
3347            assert_eq!(by_id.external_id.as_deref(), Some("doc-123#chunk-1"));
3348
3349            let missing = store.get_by_external_id("missing").await.unwrap();
3350            assert!(missing.is_none());
3351        });
3352    }
3353
3354    #[test]
3355    fn upsert_by_external_id_inserts_then_replaces_visible_record() {
3356        let dir = TempDir::new().unwrap();
3357        let uri = dir.path().to_string_lossy().to_string();
3358        let runtime = tokio::runtime::Runtime::new().unwrap();
3359        runtime.block_on(async {
3360            let mut store = ContextStore::open(&uri).await.unwrap();
3361
3362            let mut first = text_record("first", 0.0);
3363            first.external_id = Some("doc-123#chunk-1".to_string());
3364            let inserted = store.upsert_by_external_id(first.clone()).await.unwrap();
3365            assert!(inserted.inserted);
3366            assert_eq!(inserted.replaced_id, None);
3367            assert_eq!(inserted.record.id, first.id);
3368
3369            let mut replacement = text_record("replacement", 1.0);
3370            replacement.external_id = first.external_id.clone();
3371            let replaced = store
3372                .upsert_by_external_id(replacement.clone())
3373                .await
3374                .unwrap();
3375            assert!(!replaced.inserted);
3376            assert_eq!(replaced.replaced_id.as_deref(), Some(first.id.as_str()));
3377            assert_eq!(
3378                replaced.record.supersedes_id.as_deref(),
3379                Some(first.id.as_str())
3380            );
3381
3382            let visible = store.list(None, None).await.unwrap();
3383            assert_eq!(visible.len(), 1);
3384            assert_eq!(visible[0].id, replacement.id);
3385
3386            let by_external_id = store
3387                .get_by_external_id("doc-123#chunk-1")
3388                .await
3389                .unwrap()
3390                .unwrap();
3391            assert_eq!(by_external_id.id, replacement.id);
3392
3393            let history = store
3394                .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3395                .await
3396                .unwrap();
3397            assert_eq!(history.len(), 2);
3398            assert!(history.iter().any(|record| record.id == first.id));
3399            assert!(history.iter().any(|record| record.id == replacement.id));
3400        });
3401    }
3402
3403    fn upsert_record(id: &str, external_id: &str, pivot: f32) -> ContextRecord {
3404        let mut record = text_record(id, pivot);
3405        record.external_id = Some(external_id.to_string());
3406        record
3407    }
3408
3409    #[test]
3410    fn upsert_many_inserts_new_records() {
3411        let dir = TempDir::new().unwrap();
3412        let uri = dir.path().to_string_lossy().to_string();
3413        let runtime = tokio::runtime::Runtime::new().unwrap();
3414        runtime.block_on(async {
3415            let mut store = ContextStore::open(&uri).await.unwrap();
3416
3417            let batch = vec![
3418                upsert_record("a", "ext-a", 0.0),
3419                upsert_record("b", "ext-b", 1.0),
3420            ];
3421            let results = store.upsert_many_by_external_id(batch).await.unwrap();
3422
3423            assert_eq!(results.len(), 2);
3424            assert!(results.iter().all(|r| r.inserted));
3425            assert!(results.iter().all(|r| r.replaced_id.is_none()));
3426            assert_eq!(results[0].version, results[1].version);
3427
3428            let visible = store.list(None, None).await.unwrap();
3429            assert_eq!(visible.len(), 2);
3430        });
3431    }
3432
3433    #[test]
3434    fn upsert_many_replaces_existing_and_is_idempotent() {
3435        let dir = TempDir::new().unwrap();
3436        let uri = dir.path().to_string_lossy().to_string();
3437        let runtime = tokio::runtime::Runtime::new().unwrap();
3438        runtime.block_on(async {
3439            let mut store = ContextStore::open(&uri).await.unwrap();
3440
3441            let first = vec![
3442                upsert_record("a1", "ext-a", 0.0),
3443                upsert_record("b1", "ext-b", 1.0),
3444            ];
3445            store.upsert_many_by_external_id(first).await.unwrap();
3446
3447            // Re-apply with new ids: both should replace (supersede) the
3448            // originals, and only the successors remain visible.
3449            let second = vec![
3450                upsert_record("a2", "ext-a", 2.0),
3451                upsert_record("b2", "ext-b", 3.0),
3452            ];
3453            let results = store.upsert_many_by_external_id(second).await.unwrap();
3454
3455            assert!(results.iter().all(|r| !r.inserted));
3456            assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3457            assert_eq!(results[1].replaced_id.as_deref(), Some("b1"));
3458            assert_eq!(results[0].record.supersedes_id.as_deref(), Some("a1"));
3459
3460            let visible = store.list(None, None).await.unwrap();
3461            assert_eq!(visible.len(), 2);
3462            let visible_ids: HashSet<&str> = visible.iter().map(|r| r.id.as_str()).collect();
3463            assert_eq!(
3464                visible_ids,
3465                HashSet::from(["a2", "b2"]),
3466                "only the successors should be visible"
3467            );
3468
3469            // Idempotent re-application again still leaves one visible per key.
3470            let third = vec![
3471                upsert_record("a3", "ext-a", 4.0),
3472                upsert_record("b3", "ext-b", 5.0),
3473            ];
3474            store.upsert_many_by_external_id(third).await.unwrap();
3475            assert_eq!(store.list(None, None).await.unwrap().len(), 2);
3476        });
3477    }
3478
3479    #[test]
3480    fn upsert_many_handles_mixed_insert_and_replace() {
3481        let dir = TempDir::new().unwrap();
3482        let uri = dir.path().to_string_lossy().to_string();
3483        let runtime = tokio::runtime::Runtime::new().unwrap();
3484        runtime.block_on(async {
3485            let mut store = ContextStore::open(&uri).await.unwrap();
3486            store
3487                .upsert_many_by_external_id(vec![upsert_record("a1", "ext-a", 0.0)])
3488                .await
3489                .unwrap();
3490
3491            let batch = vec![
3492                upsert_record("a2", "ext-a", 1.0), // replace
3493                upsert_record("c1", "ext-c", 2.0), // insert
3494            ];
3495            let results = store.upsert_many_by_external_id(batch).await.unwrap();
3496
3497            assert_eq!(results.len(), 2);
3498            assert!(!results[0].inserted);
3499            assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3500            assert!(results[1].inserted);
3501            assert!(results[1].replaced_id.is_none());
3502
3503            let visible_ids: HashSet<String> = store
3504                .list(None, None)
3505                .await
3506                .unwrap()
3507                .into_iter()
3508                .map(|r| r.id)
3509                .collect();
3510            assert_eq!(
3511                visible_ids,
3512                HashSet::from(["a2".to_string(), "c1".to_string()])
3513            );
3514        });
3515    }
3516
3517    #[test]
3518    fn upsert_many_rejects_within_batch_duplicate_external_id() {
3519        let dir = TempDir::new().unwrap();
3520        let uri = dir.path().to_string_lossy().to_string();
3521        let runtime = tokio::runtime::Runtime::new().unwrap();
3522        runtime.block_on(async {
3523            let mut store = ContextStore::open(&uri).await.unwrap();
3524            let batch = vec![
3525                upsert_record("a", "dup", 0.0),
3526                upsert_record("b", "dup", 1.0),
3527            ];
3528            let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3529            assert!(
3530                err.to_string()
3531                    .contains("duplicate external_id 'dup' in batch"),
3532                "unexpected error: {err}"
3533            );
3534            // Nothing was written (all-or-nothing).
3535            assert_eq!(store.list(None, None).await.unwrap().len(), 0);
3536        });
3537    }
3538
3539    #[test]
3540    fn upsert_many_rejects_within_batch_duplicate_id() {
3541        let dir = TempDir::new().unwrap();
3542        let uri = dir.path().to_string_lossy().to_string();
3543        let runtime = tokio::runtime::Runtime::new().unwrap();
3544        runtime.block_on(async {
3545            let mut store = ContextStore::open(&uri).await.unwrap();
3546            let batch = vec![
3547                upsert_record("same", "ext-a", 0.0),
3548                upsert_record("same", "ext-b", 1.0),
3549            ];
3550            let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3551            assert!(
3552                err.to_string().contains("duplicate id 'same' in batch"),
3553                "unexpected error: {err}"
3554            );
3555        });
3556    }
3557
3558    #[test]
3559    fn upsert_many_rejects_missing_external_id() {
3560        let dir = TempDir::new().unwrap();
3561        let uri = dir.path().to_string_lossy().to_string();
3562        let runtime = tokio::runtime::Runtime::new().unwrap();
3563        runtime.block_on(async {
3564            let mut store = ContextStore::open(&uri).await.unwrap();
3565
3566            let no_ext = vec![text_record("a", 0.0)];
3567            let err = store.upsert_many_by_external_id(no_ext).await.unwrap_err();
3568            assert!(err.to_string().contains("external_id"), "unexpected: {err}");
3569
3570            let mut empty = text_record("b", 0.0);
3571            empty.external_id = Some(String::new());
3572            let err = store
3573                .upsert_many_by_external_id(vec![empty])
3574                .await
3575                .unwrap_err();
3576            assert!(
3577                err.to_string().contains("non-empty external_id"),
3578                "unexpected: {err}"
3579            );
3580        });
3581    }
3582
3583    #[test]
3584    fn upsert_many_rejects_id_collision_with_store() {
3585        let dir = TempDir::new().unwrap();
3586        let uri = dir.path().to_string_lossy().to_string();
3587        let runtime = tokio::runtime::Runtime::new().unwrap();
3588        runtime.block_on(async {
3589            let mut store = ContextStore::open(&uri).await.unwrap();
3590            store.add(&[text_record("taken", 0.0)]).await.unwrap();
3591
3592            let batch = vec![upsert_record("taken", "ext-a", 1.0)];
3593            let err = store.upsert_many_by_external_id(batch).await.unwrap_err();
3594            assert!(
3595                err.to_string().contains("id 'taken'")
3596                    && err.to_string().contains("already exists"),
3597                "unexpected error: {err}"
3598            );
3599        });
3600    }
3601
3602    #[test]
3603    fn upsert_many_empty_batch_is_noop() {
3604        let dir = TempDir::new().unwrap();
3605        let uri = dir.path().to_string_lossy().to_string();
3606        let runtime = tokio::runtime::Runtime::new().unwrap();
3607        runtime.block_on(async {
3608            let mut store = ContextStore::open(&uri).await.unwrap();
3609            let results = store.upsert_many_by_external_id(Vec::new()).await.unwrap();
3610            assert!(results.is_empty());
3611        });
3612    }
3613
3614    #[test]
3615    fn upsert_many_matches_single_upsert_with_btree_index() {
3616        let dir = TempDir::new().unwrap();
3617        let uri = dir.path().to_string_lossy().to_string();
3618        let runtime = tokio::runtime::Runtime::new().unwrap();
3619        runtime.block_on(async {
3620            let options = ContextStoreOptions {
3621                id_index_type: IdIndexType::BTree,
3622                ..Default::default()
3623            };
3624            let mut store = ContextStore::open_with_options(&uri, options)
3625                .await
3626                .unwrap();
3627
3628            // Seed one record via the single-record path.
3629            store
3630                .upsert_by_external_id(upsert_record("a1", "ext-a", 0.0))
3631                .await
3632                .unwrap();
3633
3634            // Batch replaces ext-a and inserts ext-b through the indexed path.
3635            let results = store
3636                .upsert_many_by_external_id(vec![
3637                    upsert_record("a2", "ext-a", 1.0),
3638                    upsert_record("b1", "ext-b", 2.0),
3639                ])
3640                .await
3641                .unwrap();
3642            assert_eq!(results[0].replaced_id.as_deref(), Some("a1"));
3643            assert!(results[1].inserted);
3644
3645            assert_eq!(
3646                store.get_by_external_id("ext-a").await.unwrap().unwrap().id,
3647                "a2"
3648            );
3649        });
3650    }
3651
3652    #[test]
3653    fn update_by_external_id_patches_mutable_fields_and_preserves_payload() {
3654        let dir = TempDir::new().unwrap();
3655        let uri = dir.path().to_string_lossy().to_string();
3656        let runtime = tokio::runtime::Runtime::new().unwrap();
3657        runtime.block_on(async {
3658            let mut store = ContextStore::open(&uri).await.unwrap();
3659
3660            let mut record = text_record("stable", 0.0);
3661            record.external_id = Some("doc-123#chunk-1".to_string());
3662            record.metadata = Some(serde_json::json!({"revision": 1}));
3663            store.add(std::slice::from_ref(&record)).await.unwrap();
3664
3665            let patch = RecordPatch {
3666                bot_id: Some("bot-a".to_string()),
3667                session_id: Some("session-a".to_string()),
3668                metadata: Some(serde_json::json!({"revision": 2, "confidence": 0.9})),
3669                relationships: Some(vec![Relationship {
3670                    target_id: "doc-123".to_string(),
3671                    relation: "derived_from".to_string(),
3672                    weight: None,
3673                }]),
3674                ..Default::default()
3675            };
3676            let updated = store
3677                .update_by_external_id("doc-123#chunk-1", patch)
3678                .await
3679                .unwrap()
3680                .unwrap();
3681
3682            assert_eq!(updated.replaced_id, record.id);
3683            assert_ne!(updated.record.id, record.id);
3684            assert_eq!(updated.record.external_id, record.external_id);
3685            assert_eq!(updated.record.text_payload, record.text_payload);
3686            assert_eq!(updated.record.embedding, record.embedding);
3687            assert_eq!(updated.record.bot_id.as_deref(), Some("bot-a"));
3688            assert_eq!(updated.record.session_id.as_deref(), Some("session-a"));
3689            assert_eq!(
3690                updated.record.metadata,
3691                Some(serde_json::json!({"revision": 2, "confidence": 0.9}))
3692            );
3693            assert_eq!(updated.record.relationships.len(), 1);
3694            assert_eq!(
3695                updated.record.supersedes_id.as_deref(),
3696                Some(record.id.as_str())
3697            );
3698
3699            let visible = store
3700                .get_by_external_id("doc-123#chunk-1")
3701                .await
3702                .unwrap()
3703                .unwrap();
3704            assert_eq!(visible.id, updated.record.id);
3705
3706            let history = store
3707                .list_with_options(None, None, LifecycleQueryOptions::new(false, true))
3708                .await
3709                .unwrap();
3710            assert_eq!(history.len(), 2);
3711            assert!(history.iter().any(|item| item.id == record.id));
3712            assert!(history.iter().any(|item| item.id == updated.record.id));
3713        });
3714    }
3715
3716    #[test]
3717    fn deferred_embedding_patch_makes_raw_record_searchable() {
3718        let dir = TempDir::new().unwrap();
3719        let uri = dir.path().to_string_lossy().to_string();
3720        let runtime = tokio::runtime::Runtime::new().unwrap();
3721        runtime.block_on(async {
3722            let mut store = ContextStore::open(&uri).await.unwrap();
3723
3724            // Raw-first capture: append source chunks without embeddings.
3725            let mut by_ext = text_record("raw-ext", 0.0);
3726            by_ext.embedding = None;
3727            by_ext.external_id = Some("doc-1#chunk-1".to_string());
3728            let mut by_id = text_record("raw-id", 0.0);
3729            by_id.embedding = None;
3730            by_id.external_id = None;
3731            store.add(&[by_ext.clone(), by_id.clone()]).await.unwrap();
3732
3733            // Records without an embedding are invisible to vector search.
3734            let query = make_embedding(1.0);
3735            assert!(store.search(&query, Some(10)).await.unwrap().is_empty());
3736
3737            // Enrich-later: patch the embedding by external_id...
3738            let enriched_ext = store
3739                .update_by_external_id(
3740                    "doc-1#chunk-1",
3741                    RecordPatch {
3742                        embedding: Some(make_embedding(1.0)),
3743                        ..Default::default()
3744                    },
3745                )
3746                .await
3747                .unwrap()
3748                .unwrap();
3749            assert_eq!(enriched_ext.record.embedding, Some(make_embedding(1.0)));
3750            // Raw payload is carried forward onto the superseding record.
3751            assert_eq!(enriched_ext.record.text_payload, by_ext.text_payload);
3752
3753            // ...and by internal id.
3754            let enriched_id = store
3755                .update_by_id(
3756                    &by_id.id,
3757                    RecordPatch {
3758                        embedding: Some(make_embedding(0.0)),
3759                        ..Default::default()
3760                    },
3761                )
3762                .await
3763                .unwrap()
3764                .unwrap();
3765            assert_eq!(enriched_id.record.embedding, Some(make_embedding(0.0)));
3766
3767            // Both records now participate in vector search.
3768            let results = store.search(&query, Some(10)).await.unwrap();
3769            let ids: Vec<&str> = results.iter().map(|r| r.record.id.as_str()).collect();
3770            assert!(ids.contains(&enriched_ext.record.id.as_str()));
3771            assert!(ids.contains(&enriched_id.record.id.as_str()));
3772            // The query matches the external_id record exactly (distance 0).
3773            assert_eq!(results[0].record.id, enriched_ext.record.id);
3774        });
3775    }
3776
3777    #[test]
3778    fn relationships_roundtrip_and_support_related_lookup() {
3779        let dir = TempDir::new().unwrap();
3780        let uri = dir.path().to_string_lossy().to_string();
3781        let runtime = tokio::runtime::Runtime::new().unwrap();
3782        runtime.block_on(async {
3783            let mut store = ContextStore::open(&uri).await.unwrap();
3784            let mut related = text_record("related", 0.0);
3785            related.relationships = vec![
3786                Relationship {
3787                    target_id: "doc-1#chunk-1".to_string(),
3788                    relation: "cites".to_string(),
3789                    weight: Some(0.75),
3790                },
3791                Relationship {
3792                    target_id: "service-a".to_string(),
3793                    relation: "mentions".to_string(),
3794                    weight: None,
3795                },
3796            ];
3797            let unrelated = text_record("unrelated", 1.0);
3798            store.add(&[related.clone(), unrelated]).await.unwrap();
3799
3800            let listed = store.list(None, None).await.unwrap();
3801            let roundtrip = listed
3802                .iter()
3803                .find(|record| record.id == related.id)
3804                .unwrap();
3805            assert_eq!(roundtrip.relationships, related.relationships);
3806
3807            let by_target = store
3808                .list_related("doc-1#chunk-1", None, None)
3809                .await
3810                .unwrap();
3811            assert_eq!(by_target.len(), 1);
3812            assert_eq!(by_target[0].id, related.id);
3813
3814            let by_relation = store
3815                .list_related("doc-1#chunk-1", Some("cites"), None)
3816                .await
3817                .unwrap();
3818            assert_eq!(by_relation.len(), 1);
3819            assert_eq!(by_relation[0].id, related.id);
3820
3821            let wrong_relation = store
3822                .list_related("doc-1#chunk-1", Some("mentions"), None)
3823                .await
3824                .unwrap();
3825            assert!(wrong_relation.is_empty());
3826        });
3827    }
3828
3829    #[test]
3830    fn migrate_relationships_column_adds_missing_column() {
3831        let dir = TempDir::new().unwrap();
3832        let uri = dir.path().to_string_lossy().to_string();
3833        let runtime = tokio::runtime::Runtime::new().unwrap();
3834        runtime.block_on(async {
3835            let schema = Arc::new(ContextStore::schema_with_options(
3836                &HashSet::new(),
3837                true,
3838                true,
3839                false,
3840                true,
3841                DEFAULT_EMBEDDING_DIM,
3842                DistanceMetric::default(),
3843            ));
3844            let empty_batch = RecordBatch::new_empty(schema.clone());
3845            let batches = RecordBatchIterator::new(
3846                vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
3847                schema,
3848            );
3849            Dataset::write(
3850                batches,
3851                &uri,
3852                Some(WriteParams {
3853                    mode: WriteMode::Create,
3854                    ..Default::default()
3855                }),
3856            )
3857            .await
3858            .unwrap();
3859
3860            let mut store = ContextStore::open(&uri).await.unwrap();
3861            assert!(!store.has_relationships_column());
3862
3863            let mut record = text_record("with-relationships", 0.0);
3864            record.relationships.push(Relationship {
3865                target_id: "target".to_string(),
3866                relation: "mentions".to_string(),
3867                weight: None,
3868            });
3869            let err = store.add(std::slice::from_ref(&record)).await.unwrap_err();
3870            assert!(
3871                err.to_string().contains("migrate_relationships_column"),
3872                "unexpected error: {err}"
3873            );
3874
3875            assert!(store.migrate_relationships_column().await.unwrap());
3876            assert!(store.has_relationships_column());
3877            assert!(!store.migrate_relationships_column().await.unwrap());
3878
3879            store.add(std::slice::from_ref(&record)).await.unwrap();
3880            let roundtrip = store.get_by_id(&record.id).await.unwrap().unwrap();
3881            assert_eq!(roundtrip.relationships, record.relationships);
3882        });
3883    }
3884
3885    #[test]
3886    fn add_rejects_duplicate_external_id() {
3887        let dir = TempDir::new().unwrap();
3888        let uri = dir.path().to_string_lossy().to_string();
3889        let runtime = tokio::runtime::Runtime::new().unwrap();
3890        runtime.block_on(async {
3891            let mut store = ContextStore::open(&uri).await.unwrap();
3892            let mut first = text_record("a", 0.0);
3893            first.external_id = Some("doc-123#chunk-1".to_string());
3894            store.add(std::slice::from_ref(&first)).await.unwrap();
3895
3896            let mut duplicate = text_record("b", 0.0);
3897            duplicate.external_id = first.external_id.clone();
3898            let err = store.add(&[duplicate]).await.unwrap_err();
3899            let message = err.to_string();
3900            assert!(
3901                message.contains("external_id") && message.contains("already exists"),
3902                "unexpected error message: {message}"
3903            );
3904        });
3905    }
3906
3907    #[test]
3908    fn add_rejects_reserved_tombstone_content_type() {
3909        let dir = TempDir::new().unwrap();
3910        let uri = dir.path().to_string_lossy().to_string();
3911        let runtime = tokio::runtime::Runtime::new().unwrap();
3912        runtime.block_on(async {
3913            let mut store = ContextStore::open(&uri).await.unwrap();
3914            let mut record = text_record("a", 0.0);
3915            record.content_type = CONTENT_TYPE_TOMBSTONE.to_string();
3916
3917            let err = store.add(&[record]).await.unwrap_err();
3918            let message = err.to_string();
3919            assert!(
3920                message.contains("reserved") && message.contains("tombstone"),
3921                "unexpected error message: {message}"
3922            );
3923        });
3924    }
3925
3926    #[test]
3927    fn add_rejects_duplicate_id_against_existing() {
3928        let dir = TempDir::new().unwrap();
3929        let uri = dir.path().to_string_lossy().to_string();
3930        let runtime = tokio::runtime::Runtime::new().unwrap();
3931        runtime.block_on(async {
3932            let mut store = ContextStore::open(&uri).await.unwrap();
3933            store.add(&[text_record("dup", 0.0)]).await.unwrap();
3934
3935            let err = store.add(&[text_record("dup", 1.0)]).await.unwrap_err();
3936            let message = err.to_string();
3937            assert!(
3938                message.contains("id 'dup'") && message.contains("already exists"),
3939                "unexpected error message: {message}"
3940            );
3941        });
3942    }
3943
3944    #[test]
3945    fn add_rejects_duplicate_id_within_batch() {
3946        let dir = TempDir::new().unwrap();
3947        let uri = dir.path().to_string_lossy().to_string();
3948        let runtime = tokio::runtime::Runtime::new().unwrap();
3949        runtime.block_on(async {
3950            let mut store = ContextStore::open(&uri).await.unwrap();
3951            let err = store
3952                .add(&[text_record("same", 0.0), text_record("same", 1.0)])
3953                .await
3954                .unwrap_err();
3955            let message = err.to_string();
3956            assert!(
3957                message.contains("duplicate id 'same' in batch"),
3958                "unexpected error message: {message}"
3959            );
3960        });
3961    }
3962
3963    #[test]
3964    fn add_rejects_duplicate_external_id_within_batch() {
3965        let dir = TempDir::new().unwrap();
3966        let uri = dir.path().to_string_lossy().to_string();
3967        let runtime = tokio::runtime::Runtime::new().unwrap();
3968        runtime.block_on(async {
3969            let mut store = ContextStore::open(&uri).await.unwrap();
3970            let mut first = text_record("a", 0.0);
3971            first.external_id = Some("ext".to_string());
3972            let mut second = text_record("b", 1.0);
3973            second.external_id = Some("ext".to_string());
3974
3975            let err = store.add(&[first, second]).await.unwrap_err();
3976            let message = err.to_string();
3977            assert!(
3978                message.contains("duplicate external_id 'ext' in batch"),
3979                "unexpected error message: {message}"
3980            );
3981        });
3982    }
3983
3984    /// A record removed via tombstone frees its `external_id` for reuse:
3985    /// validation skips tombstones, so a later insert with the same
3986    /// `external_id` succeeds.
3987    #[test]
3988    fn add_allows_external_id_reuse_after_delete() {
3989        let dir = TempDir::new().unwrap();
3990        let uri = dir.path().to_string_lossy().to_string();
3991        let runtime = tokio::runtime::Runtime::new().unwrap();
3992        runtime.block_on(async {
3993            let mut store = ContextStore::open(&uri).await.unwrap();
3994            let mut first = text_record("a", 0.0);
3995            first.external_id = Some("ext".to_string());
3996            store.add(std::slice::from_ref(&first)).await.unwrap();
3997            assert!(store.delete_by_external_id("ext").await.unwrap());
3998
3999            let mut reused = text_record("b", 1.0);
4000            reused.external_id = Some("ext".to_string());
4001            store
4002                .add(std::slice::from_ref(&reused))
4003                .await
4004                .expect("external_id should be reusable after delete");
4005
4006            let visible = store.get_by_external_id("ext").await.unwrap().unwrap();
4007            assert_eq!(visible.id, reused.id);
4008        });
4009    }
4010
4011    /// A record removed via tombstone frees its `id` for reuse.
4012    #[test]
4013    fn add_allows_id_reuse_after_delete() {
4014        let dir = TempDir::new().unwrap();
4015        let uri = dir.path().to_string_lossy().to_string();
4016        let runtime = tokio::runtime::Runtime::new().unwrap();
4017        runtime.block_on(async {
4018            let mut store = ContextStore::open(&uri).await.unwrap();
4019            let first = text_record("dup", 0.0);
4020            store.add(std::slice::from_ref(&first)).await.unwrap();
4021            assert!(store.delete_by_id("dup").await.unwrap());
4022
4023            store
4024                .add(&[text_record("dup", 1.0)])
4025                .await
4026                .expect("id should be reusable after delete");
4027
4028            let visible = store.get_by_id("dup").await.unwrap().unwrap();
4029            assert_eq!(visible.id, "dup");
4030        });
4031    }
4032
4033    /// A superseded (non-tombstone) record still reserves its `external_id`,
4034    /// so a plain `add` with that `external_id` is rejected.
4035    #[test]
4036    fn add_rejects_external_id_after_supersede() {
4037        let dir = TempDir::new().unwrap();
4038        let uri = dir.path().to_string_lossy().to_string();
4039        let runtime = tokio::runtime::Runtime::new().unwrap();
4040        runtime.block_on(async {
4041            let mut store = ContextStore::open(&uri).await.unwrap();
4042            let mut first = text_record("a", 0.0);
4043            first.external_id = Some("ext".to_string());
4044            store.upsert_by_external_id(first).await.unwrap();
4045
4046            let mut successor = text_record("b", 1.0);
4047            successor.external_id = Some("ext".to_string());
4048            store.upsert_by_external_id(successor).await.unwrap();
4049
4050            // The original is now superseded (non-tombstone) and still present,
4051            // so its external_id is taken.
4052            let mut conflict = text_record("c", 2.0);
4053            conflict.external_id = Some("ext".to_string());
4054            let err = store.add(&[conflict]).await.unwrap_err();
4055            let message = err.to_string();
4056            assert!(
4057                message.contains("external_id 'ext'") && message.contains("already exists"),
4058                "unexpected error message: {message}"
4059            );
4060        });
4061    }
4062
4063    /// Uniqueness validation must behave identically when an `id` scalar index
4064    /// is configured (the indexed-lookup path), covering both a rejected
4065    /// collision and a permitted reuse-after-delete.
4066    #[test]
4067    fn validate_uniqueness_with_btree_index() {
4068        let dir = TempDir::new().unwrap();
4069        let uri = dir.path().to_string_lossy().to_string();
4070        let runtime = tokio::runtime::Runtime::new().unwrap();
4071        runtime.block_on(async {
4072            let options = ContextStoreOptions {
4073                id_index_type: IdIndexType::BTree,
4074                ..Default::default()
4075            };
4076            let mut store = ContextStore::open_with_options(&uri, options)
4077                .await
4078                .unwrap();
4079
4080            let mut first = text_record("idx-a", 0.0);
4081            first.external_id = Some("ext".to_string());
4082            store.add(std::slice::from_ref(&first)).await.unwrap();
4083
4084            // Duplicate id rejected via the indexed lookup.
4085            let dup_id = store.add(&[text_record("idx-a", 1.0)]).await.unwrap_err();
4086            assert!(
4087                dup_id.to_string().contains("id 'idx-a'")
4088                    && dup_id.to_string().contains("already exists")
4089            );
4090
4091            // Duplicate external_id rejected.
4092            let mut dup_ext = text_record("idx-b", 1.0);
4093            dup_ext.external_id = Some("ext".to_string());
4094            let dup_ext_err = store.add(&[dup_ext]).await.unwrap_err();
4095            assert!(
4096                dup_ext_err.to_string().contains("external_id 'ext'")
4097                    && dup_ext_err.to_string().contains("already exists")
4098            );
4099
4100            // After delete, both keys become reusable.
4101            assert!(store.delete_by_id("idx-a").await.unwrap());
4102            let mut reused = text_record("idx-a", 2.0);
4103            reused.external_id = Some("ext".to_string());
4104            store
4105                .add(std::slice::from_ref(&reused))
4106                .await
4107                .expect("keys should be reusable after delete with index configured");
4108        });
4109    }
4110
4111    /// Validation stays correct as the store grows: a duplicate is rejected and
4112    /// a fresh key accepted against a store with many historical records,
4113    /// exercising the projected/filtered scan path over a larger dataset.
4114    #[test]
4115    fn validate_uniqueness_against_large_store() {
4116        let dir = TempDir::new().unwrap();
4117        let uri = dir.path().to_string_lossy().to_string();
4118        let runtime = tokio::runtime::Runtime::new().unwrap();
4119        runtime.block_on(async {
4120            let options = ContextStoreOptions {
4121                id_index_type: IdIndexType::BTree,
4122                ..Default::default()
4123            };
4124            let mut store = ContextStore::open_with_options(&uri, options)
4125                .await
4126                .unwrap();
4127
4128            for i in 0..300 {
4129                let mut record = text_record(&format!("rec-{i}"), i as f32);
4130                record.external_id = Some(format!("ext-{i}"));
4131                store.add(std::slice::from_ref(&record)).await.unwrap();
4132            }
4133
4134            // Existing id and external_id both rejected.
4135            let mut dup = text_record("rec-150", 0.0);
4136            dup.external_id = Some("ext-999".to_string());
4137            assert!(store
4138                .add(&[dup])
4139                .await
4140                .unwrap_err()
4141                .to_string()
4142                .contains("id 'rec-150'"));
4143
4144            let mut dup_ext = text_record("rec-new", 0.0);
4145            dup_ext.external_id = Some("ext-42".to_string());
4146            assert!(store
4147                .add(&[dup_ext])
4148                .await
4149                .unwrap_err()
4150                .to_string()
4151                .contains("external_id 'ext-42'"));
4152
4153            // A fresh record still inserts cleanly.
4154            let mut fresh = text_record("rec-300", 0.0);
4155            fresh.external_id = Some("ext-300".to_string());
4156            store.add(std::slice::from_ref(&fresh)).await.unwrap();
4157            assert!(store.get_by_id("rec-300").await.unwrap().is_some());
4158        });
4159    }
4160
4161    /// Benchmark guarding against the O(N) regression: with an `id` index,
4162    /// per-append validation cost should not grow proportionally to store size.
4163    /// Ignored by default (timing-sensitive); run with
4164    /// `cargo test -p lance-context-core -- --ignored append_cost`.
4165    #[test]
4166    #[ignore = "timing-sensitive benchmark; run explicitly with --ignored"]
4167    fn append_cost_does_not_grow_linearly() {
4168        use std::time::Instant;
4169
4170        let dir = TempDir::new().unwrap();
4171        let uri = dir.path().to_string_lossy().to_string();
4172        let runtime = tokio::runtime::Runtime::new().unwrap();
4173        runtime.block_on(async {
4174            let options = ContextStoreOptions {
4175                id_index_type: IdIndexType::BTree,
4176                ..Default::default()
4177            };
4178            let mut store = ContextStore::open_with_options(&uri, options)
4179                .await
4180                .unwrap();
4181
4182            // Time a window of single-record appends at a given store size,
4183            // compacting first so the cost reflects validation against the base
4184            // table rather than accumulated MemWAL generations.
4185            async fn time_window(store: &mut ContextStore, tag: &str, window: usize) -> f64 {
4186                store.compact(None).await.unwrap();
4187                let start = Instant::now();
4188                for i in 0..window {
4189                    let id = format!("{tag}-probe-{i}");
4190                    store.add(&[text_record(&id, i as f32)]).await.unwrap();
4191                }
4192                start.elapsed().as_secs_f64() / window as f64
4193            }
4194
4195            // Grow the store to `count` rows using batched appends (few commits)
4196            // so the benchmark isolates per-call validation cost, not raw write
4197            // throughput.
4198            async fn seed(store: &mut ContextStore, tag: &str, count: usize) {
4199                let chunk = 100;
4200                let mut i = 0;
4201                while i < count {
4202                    let batch: Vec<ContextRecord> = (i..(i + chunk).min(count))
4203                        .map(|j| text_record(&format!("{tag}-seed-{j}"), j as f32))
4204                        .collect();
4205                    store.add(&batch).await.unwrap();
4206                    i += chunk;
4207                }
4208                store.compact(None).await.unwrap();
4209            }
4210
4211            let window = 30;
4212            seed(&mut store, "small", 100).await;
4213            let small = time_window(&mut store, "small", window).await;
4214
4215            seed(&mut store, "big", 2000).await;
4216            let large = time_window(&mut store, "big", window).await;
4217
4218            let ratio = large / small.max(f64::EPSILON);
4219            eprintln!(
4220                "append per-call: small={small:.6}s large={large:.6}s ratio={ratio:.2} (store grew ~20x)"
4221            );
4222            assert!(
4223                ratio < 8.0,
4224                "append cost appears to scale with store size (ratio {ratio:.2}); \
4225                 expected roughly constant per-call validation"
4226            );
4227        });
4228    }
4229
4230    /// `external_id` values are caller-supplied and flow into a SQL `IN (...)`
4231    /// filter, so embedded single quotes must be escaped rather than break the
4232    /// scan. Both insertion and duplicate detection must handle them.
4233    #[test]
4234    fn validation_handles_external_id_with_single_quote() {
4235        let dir = TempDir::new().unwrap();
4236        let uri = dir.path().to_string_lossy().to_string();
4237        let runtime = tokio::runtime::Runtime::new().unwrap();
4238        runtime.block_on(async {
4239            let mut store = ContextStore::open(&uri).await.unwrap();
4240            let tricky = "o'brien#chunk-1";
4241
4242            let mut first = text_record("a", 0.0);
4243            first.external_id = Some(tricky.to_string());
4244            store.add(std::slice::from_ref(&first)).await.unwrap();
4245
4246            // Re-using the same quoted external_id is still detected as a dup.
4247            let mut dup = text_record("b", 1.0);
4248            dup.external_id = Some(tricky.to_string());
4249            let err = store.add(&[dup]).await.unwrap_err();
4250            assert!(
4251                err.to_string().contains("already exists"),
4252                "unexpected error message: {err}"
4253            );
4254
4255            // A different quoted external_id inserts cleanly.
4256            let mut other = text_record("c", 2.0);
4257            other.external_id = Some("d'angelo#chunk-2".to_string());
4258            store.add(std::slice::from_ref(&other)).await.unwrap();
4259            assert!(store
4260                .get_by_external_id("d'angelo#chunk-2")
4261                .await
4262                .unwrap()
4263                .is_some());
4264        });
4265    }
4266
4267    #[test]
4268    fn delete_by_external_id_hides_record_from_default_reads() {
4269        let dir = TempDir::new().unwrap();
4270        let uri = dir.path().to_string_lossy().to_string();
4271        let runtime = tokio::runtime::Runtime::new().unwrap();
4272        runtime.block_on(async {
4273            let mut store = ContextStore::open(&uri).await.unwrap();
4274            let mut first = text_record("a", 0.0);
4275            first.external_id = Some("doc-123#chunk-1".to_string());
4276            let second = text_record("b", 2.0);
4277            store.add(&[first.clone(), second.clone()]).await.unwrap();
4278
4279            assert!(store
4280                .delete_by_external_id("doc-123#chunk-1")
4281                .await
4282                .unwrap());
4283
4284            assert!(store
4285                .get_by_external_id("doc-123#chunk-1")
4286                .await
4287                .unwrap()
4288                .is_none());
4289            assert!(store.get_by_id(&first.id).await.unwrap().is_none());
4290
4291            let records = store.list(None, None).await.unwrap();
4292            assert_eq!(records.len(), 1);
4293            assert_eq!(records[0].id, second.id);
4294
4295            let query = make_embedding(0.0);
4296            let hits = store.search(&query, Some(10)).await.unwrap();
4297            assert_eq!(hits.len(), 1);
4298            assert_eq!(hits[0].record.id, second.id);
4299        });
4300    }
4301
4302    #[test]
4303    fn delete_by_id_hides_record_from_default_reads() {
4304        let dir = TempDir::new().unwrap();
4305        let uri = dir.path().to_string_lossy().to_string();
4306        let runtime = tokio::runtime::Runtime::new().unwrap();
4307        runtime.block_on(async {
4308            let mut store = ContextStore::open(&uri).await.unwrap();
4309            let mut first = text_record("a", 0.0);
4310            first.external_id = Some("doc-123#chunk-1".to_string());
4311            let second = text_record("b", 2.0);
4312            store.add(&[first.clone(), second.clone()]).await.unwrap();
4313
4314            assert!(store.delete_by_id(&first.id).await.unwrap());
4315
4316            assert!(store.get_by_id(&first.id).await.unwrap().is_none());
4317            assert!(store
4318                .get_by_external_id("doc-123#chunk-1")
4319                .await
4320                .unwrap()
4321                .is_none());
4322
4323            let records = store.list(None, None).await.unwrap();
4324            assert_eq!(records.len(), 1);
4325            assert_eq!(records[0].id, second.id);
4326
4327            let query = make_embedding(0.0);
4328            let hits = store.search(&query, Some(10)).await.unwrap();
4329            assert_eq!(hits.len(), 1);
4330            assert_eq!(hits[0].record.id, second.id);
4331        });
4332    }
4333
4334    #[test]
4335    fn delete_missing_id_is_noop() {
4336        let dir = TempDir::new().unwrap();
4337        let uri = dir.path().to_string_lossy().to_string();
4338        let runtime = tokio::runtime::Runtime::new().unwrap();
4339        runtime.block_on(async {
4340            let mut store = ContextStore::open(&uri).await.unwrap();
4341            assert!(!store.delete_by_id("missing").await.unwrap());
4342            assert!(!store.delete_by_external_id("missing").await.unwrap());
4343        });
4344    }
4345
4346    #[test]
4347    fn external_id_can_be_reused_after_delete() {
4348        let dir = TempDir::new().unwrap();
4349        let uri = dir.path().to_string_lossy().to_string();
4350        let runtime = tokio::runtime::Runtime::new().unwrap();
4351        runtime.block_on(async {
4352            let mut store = ContextStore::open(&uri).await.unwrap();
4353            let mut first = text_record("a", 0.0);
4354            first.external_id = Some("doc-123#chunk-1".to_string());
4355            store.add(std::slice::from_ref(&first)).await.unwrap();
4356            assert!(store
4357                .delete_by_external_id("doc-123#chunk-1")
4358                .await
4359                .unwrap());
4360
4361            let mut replacement = text_record("b", 1.0);
4362            replacement.external_id = first.external_id.clone();
4363            store.add(std::slice::from_ref(&replacement)).await.unwrap();
4364
4365            let by_external_id = store
4366                .get_by_external_id("doc-123#chunk-1")
4367                .await
4368                .unwrap()
4369                .unwrap();
4370            assert_eq!(by_external_id.id, replacement.id);
4371            assert_eq!(store.list(None, None).await.unwrap().len(), 1);
4372        });
4373    }
4374
4375    #[test]
4376    fn test_region_id_derivation_explicit() {
4377        let bot_id = Some("bot-123".to_string());
4378        let session_id = Some("session-456".to_string());
4379
4380        let region_id_1 = ContextStore::derive_region_id(&bot_id, &session_id);
4381        let region_id_2 = ContextStore::derive_region_id(&bot_id, &session_id);
4382
4383        assert_eq!(
4384            region_id_1, region_id_2,
4385            "Region ID should be deterministic for same inputs"
4386        );
4387
4388        let other_session = Some("session-789".to_string());
4389        let region_id_3 = ContextStore::derive_region_id(&bot_id, &other_session);
4390
4391        assert_ne!(
4392            region_id_1, region_id_3,
4393            "Region ID should differ for different inputs"
4394        );
4395
4396        // Test None/None case (now deterministic based on empty strings)
4397        let region_id_none = ContextStore::derive_region_id(&None, &None);
4398        let region_id_none_2 = ContextStore::derive_region_id(&None, &None);
4399        assert_eq!(
4400            region_id_none, region_id_none_2,
4401            "Region ID for None/None should be deterministic"
4402        );
4403    }
4404
4405    #[test]
4406    fn test_add_multiple_regions() {
4407        let dir = TempDir::new().unwrap();
4408        let uri = dir.path().to_string_lossy().to_string();
4409        let runtime = tokio::runtime::Runtime::new().unwrap();
4410
4411        runtime.block_on(async {
4412            let mut store = ContextStore::open(&uri).await.unwrap();
4413
4414            // Create records for different regions
4415            let mut record1 = text_record("r1", 0.0);
4416            record1.bot_id = Some("bot-A".to_string());
4417            record1.session_id = Some("session-1".to_string());
4418
4419            let mut record2 = text_record("r2", 0.0);
4420            record2.bot_id = Some("bot-B".to_string());
4421            record2.session_id = Some("session-2".to_string());
4422
4423            // Add them in a single batch
4424            store
4425                .add(&[record1.clone(), record2.clone()])
4426                .await
4427                .unwrap();
4428
4429            // Reload store to verify persistence
4430            let store = ContextStore::open(&uri).await.unwrap();
4431
4432            // Verify we can list them back
4433            let results = store.list(None, None).await.unwrap();
4434            assert_eq!(results.len(), 2);
4435
4436            let ids: Vec<String> = results.iter().map(|r| r.id.clone()).collect();
4437            assert!(ids.contains(&"r1".to_string()));
4438            assert!(ids.contains(&"r2".to_string()));
4439        });
4440    }
4441
4442    #[test]
4443    fn test_blob_binary_payload() {
4444        let dir = TempDir::new().unwrap();
4445        let uri = dir.path().to_string_lossy().to_string();
4446        let runtime = tokio::runtime::Runtime::new().unwrap();
4447
4448        runtime.block_on(async {
4449            let options = ContextStoreOptions {
4450                blob_columns: HashSet::from(["binary_payload".to_string()]),
4451                ..Default::default()
4452            };
4453            let mut store = ContextStore::open_with_options(&uri, options)
4454                .await
4455                .unwrap();
4456
4457            let mut record = text_record("blob-bin-1", 0.0);
4458            record.binary_payload = Some(vec![0xDE, 0xAD, 0xBE, 0xEF]);
4459            store.add(std::slice::from_ref(&record)).await.unwrap();
4460
4461            // Verify schema has blob metadata on binary_payload
4462            let schema = ContextStore::schema(&store.blob_columns);
4463            let field = schema.field_with_name("binary_payload").unwrap();
4464            assert_eq!(
4465                field.metadata().get("lance-encoding:blob"),
4466                Some(&"true".to_string()),
4467            );
4468            // text_payload should remain LargeUtf8 without blob metadata
4469            let text_field = schema.field_with_name("text_payload").unwrap();
4470            assert_eq!(text_field.data_type(), &DataType::LargeUtf8);
4471            assert!(text_field.metadata().get("lance-encoding:blob").is_none());
4472        });
4473    }
4474
4475    #[test]
4476    fn test_blob_text_payload() {
4477        let dir = TempDir::new().unwrap();
4478        let uri = dir.path().to_string_lossy().to_string();
4479        let runtime = tokio::runtime::Runtime::new().unwrap();
4480
4481        runtime.block_on(async {
4482            let options = ContextStoreOptions {
4483                blob_columns: HashSet::from(["text_payload".to_string()]),
4484                ..Default::default()
4485            };
4486            let mut store = ContextStore::open_with_options(&uri, options)
4487                .await
4488                .unwrap();
4489
4490            let record = text_record("blob-txt-1", 0.0);
4491            store.add(std::slice::from_ref(&record)).await.unwrap();
4492
4493            // Roundtrip: records_to_batch -> batch_to_records
4494            let batch = store
4495                .records_to_batch(std::slice::from_ref(&record))
4496                .unwrap();
4497            let batch_schema = batch.schema();
4498            let text_field = batch_schema.field_with_name("text_payload").unwrap();
4499            assert_eq!(
4500                text_field.data_type(),
4501                &DataType::LargeBinary,
4502                "text_payload should be LargeBinary when blob-encoded"
4503            );
4504
4505            let roundtripped = batch_to_records(&batch).unwrap();
4506            assert_eq!(roundtripped.len(), 1);
4507            assert_eq!(
4508                roundtripped[0].text_payload, record.text_payload,
4509                "text payload should survive blob roundtrip"
4510            );
4511        });
4512    }
4513
4514    #[test]
4515    fn test_blob_both_columns() {
4516        let dir = TempDir::new().unwrap();
4517        let uri = dir.path().to_string_lossy().to_string();
4518        let runtime = tokio::runtime::Runtime::new().unwrap();
4519
4520        runtime.block_on(async {
4521            let options = ContextStoreOptions {
4522                blob_columns: HashSet::from([
4523                    "text_payload".to_string(),
4524                    "binary_payload".to_string(),
4525                ]),
4526                ..Default::default()
4527            };
4528            let mut store = ContextStore::open_with_options(&uri, options)
4529                .await
4530                .unwrap();
4531
4532            let mut record = text_record("blob-both-1", 0.0);
4533            record.binary_payload = Some(b"hello binary".to_vec());
4534            store.add(std::slice::from_ref(&record)).await.unwrap();
4535
4536            // Both columns should have blob metadata
4537            let schema = ContextStore::schema(&store.blob_columns);
4538            let text_field = schema.field_with_name("text_payload").unwrap();
4539            let bin_field = schema.field_with_name("binary_payload").unwrap();
4540            assert_eq!(
4541                text_field.metadata().get("lance-encoding:blob"),
4542                Some(&"true".to_string()),
4543            );
4544            assert_eq!(
4545                bin_field.metadata().get("lance-encoding:blob"),
4546                Some(&"true".to_string()),
4547            );
4548
4549            // Roundtrip via batch
4550            let batch = store
4551                .records_to_batch(std::slice::from_ref(&record))
4552                .unwrap();
4553            let roundtripped = batch_to_records(&batch).unwrap();
4554            assert_eq!(roundtripped.len(), 1);
4555            assert_eq!(roundtripped[0].text_payload, record.text_payload);
4556            assert_eq!(roundtripped[0].binary_payload, record.binary_payload);
4557        });
4558    }
4559
4560    #[test]
4561    fn test_no_blob_default() {
4562        // Default options should produce no blob metadata
4563        let schema = ContextStore::schema(&HashSet::new());
4564        let text_field = schema.field_with_name("text_payload").unwrap();
4565        let bin_field = schema.field_with_name("binary_payload").unwrap();
4566
4567        assert_eq!(text_field.data_type(), &DataType::LargeUtf8);
4568        assert!(text_field.metadata().get("lance-encoding:blob").is_none());
4569        assert_eq!(bin_field.data_type(), &DataType::LargeBinary);
4570        assert!(bin_field.metadata().get("lance-encoding:blob").is_none());
4571    }
4572
4573    #[test]
4574    fn test_blob_schema_metadata() {
4575        let blob_columns =
4576            HashSet::from(["text_payload".to_string(), "binary_payload".to_string()]);
4577        let schema = ContextStore::schema(&blob_columns);
4578
4579        let text_field = schema.field_with_name("text_payload").unwrap();
4580        assert_eq!(text_field.data_type(), &DataType::LargeBinary);
4581        assert_eq!(
4582            text_field.metadata().get("lance-encoding:blob"),
4583            Some(&"true".to_string()),
4584        );
4585
4586        let bin_field = schema.field_with_name("binary_payload").unwrap();
4587        assert_eq!(bin_field.data_type(), &DataType::LargeBinary);
4588        assert_eq!(
4589            bin_field.metadata().get("lance-encoding:blob"),
4590            Some(&"true".to_string()),
4591        );
4592
4593        // Non-blob fields should have no blob metadata
4594        let id_field = schema.field_with_name("id").unwrap();
4595        assert!(id_field.metadata().get("lance-encoding:blob").is_none());
4596    }
4597
4598    #[test]
4599    fn test_blob_invalid_column_name() {
4600        let dir = TempDir::new().unwrap();
4601        let uri = dir.path().to_string_lossy().to_string();
4602        let runtime = tokio::runtime::Runtime::new().unwrap();
4603
4604        runtime.block_on(async {
4605            let options = ContextStoreOptions {
4606                blob_columns: HashSet::from(["nonexistent_column".to_string()]),
4607                ..Default::default()
4608            };
4609            let result = ContextStore::open_with_options(&uri, options).await;
4610            assert!(result.is_err(), "should reject invalid blob column names");
4611            let err_msg = result.err().unwrap().to_string();
4612            assert!(
4613                err_msg.contains("invalid blob column"),
4614                "error should mention invalid blob column: {err_msg}"
4615            );
4616        });
4617    }
4618
4619    #[test]
4620    fn test_batch_to_records_autodetects_text_type() {
4621        // Verify that batch_to_records works on both LargeUtf8 and LargeBinary
4622        // text_payload without needing configuration.
4623        let runtime = tokio::runtime::Runtime::new().unwrap();
4624        runtime.block_on(async {
4625            // Build a batch with text_payload as LargeUtf8 (default)
4626            let dir1 = TempDir::new().unwrap();
4627            let uri1 = dir1.path().to_string_lossy().to_string();
4628            let store_default = ContextStore::open(&uri1).await.unwrap();
4629            let record = text_record("auto-1", 0.0);
4630            let batch_utf8 = store_default
4631                .records_to_batch(std::slice::from_ref(&record))
4632                .unwrap();
4633            let results_utf8 = batch_to_records(&batch_utf8).unwrap();
4634            assert_eq!(results_utf8[0].text_payload, record.text_payload);
4635
4636            // Build a batch with text_payload as LargeBinary (blob)
4637            let dir2 = TempDir::new().unwrap();
4638            let uri2 = dir2.path().to_string_lossy().to_string();
4639            let options = ContextStoreOptions {
4640                blob_columns: HashSet::from(["text_payload".to_string()]),
4641                ..Default::default()
4642            };
4643            let store_blob = ContextStore::open_with_options(&uri2, options)
4644                .await
4645                .unwrap();
4646            let batch_binary = store_blob
4647                .records_to_batch(std::slice::from_ref(&record))
4648                .unwrap();
4649            let results_binary = batch_to_records(&batch_binary).unwrap();
4650            assert_eq!(results_binary[0].text_payload, record.text_payload);
4651        });
4652    }
4653
4654    #[test]
4655    fn test_id_index_btree() {
4656        let dir = TempDir::new().unwrap();
4657        let uri = dir.path().to_string_lossy().to_string();
4658        let runtime = tokio::runtime::Runtime::new().unwrap();
4659
4660        runtime.block_on(async {
4661            let options = ContextStoreOptions {
4662                id_index_type: IdIndexType::BTree,
4663                ..Default::default()
4664            };
4665            let mut store = ContextStore::open_with_options(&uri, options)
4666                .await
4667                .unwrap();
4668
4669            // Index should be created eagerly on open
4670            let indices = store.dataset.load_indices().await.unwrap();
4671            assert!(
4672                indices.iter().any(|i| i.name == ID_INDEX_NAME),
4673                "btree index should be created on open"
4674            );
4675
4676            // Add data and verify it still works with the index
4677            for i in 0..5 {
4678                store
4679                    .add(&[text_record(&format!("btree-{i}"), i as f32)])
4680                    .await
4681                    .unwrap();
4682            }
4683            store.compact(None).await.unwrap();
4684
4685            // Index should still exist after compaction
4686            let indices = store.dataset.load_indices().await.unwrap();
4687            assert!(
4688                indices.iter().any(|i| i.name == ID_INDEX_NAME),
4689                "btree index should persist after compaction"
4690            );
4691        });
4692    }
4693
4694    #[test]
4695    fn test_id_index_zonemap() {
4696        let dir = TempDir::new().unwrap();
4697        let uri = dir.path().to_string_lossy().to_string();
4698        let runtime = tokio::runtime::Runtime::new().unwrap();
4699
4700        runtime.block_on(async {
4701            let options = ContextStoreOptions {
4702                id_index_type: IdIndexType::ZoneMap,
4703                ..Default::default()
4704            };
4705            let mut store = ContextStore::open_with_options(&uri, options)
4706                .await
4707                .unwrap();
4708
4709            // Index should be created eagerly on open
4710            let indices = store.dataset.load_indices().await.unwrap();
4711            assert!(
4712                indices.iter().any(|i| i.name == ID_INDEX_NAME),
4713                "zonemap index should be created on open"
4714            );
4715
4716            for i in 0..5 {
4717                store
4718                    .add(&[text_record(&format!("zm-{i}"), i as f32)])
4719                    .await
4720                    .unwrap();
4721            }
4722            store.compact(None).await.unwrap();
4723
4724            let indices = store.dataset.load_indices().await.unwrap();
4725            assert!(
4726                indices.iter().any(|i| i.name == ID_INDEX_NAME),
4727                "zonemap index should persist after compaction"
4728            );
4729        });
4730    }
4731
4732    #[test]
4733    fn test_id_index_none_by_default() {
4734        let dir = TempDir::new().unwrap();
4735        let uri = dir.path().to_string_lossy().to_string();
4736        let runtime = tokio::runtime::Runtime::new().unwrap();
4737
4738        runtime.block_on(async {
4739            let mut store = ContextStore::open(&uri).await.unwrap();
4740
4741            store.add(&[text_record("no-idx-1", 0.0)]).await.unwrap();
4742            store.compact(None).await.unwrap();
4743
4744            let indices = store.dataset.load_indices().await.unwrap();
4745            assert!(
4746                !indices.iter().any(|i| i.name == ID_INDEX_NAME),
4747                "no id index should be created when IdIndexType::None"
4748            );
4749        });
4750    }
4751
4752    #[test]
4753    fn test_id_index_idempotent() {
4754        let dir = TempDir::new().unwrap();
4755        let uri = dir.path().to_string_lossy().to_string();
4756        let runtime = tokio::runtime::Runtime::new().unwrap();
4757
4758        runtime.block_on(async {
4759            let options = ContextStoreOptions {
4760                id_index_type: IdIndexType::BTree,
4761                ..Default::default()
4762            };
4763            let mut store = ContextStore::open_with_options(&uri, options)
4764                .await
4765                .unwrap();
4766
4767            for i in 0..5 {
4768                store
4769                    .add(&[text_record(&format!("idem-{i}"), i as f32)])
4770                    .await
4771                    .unwrap();
4772            }
4773
4774            // Create index twice -- second call should be a no-op
4775            store.create_id_index().await.unwrap();
4776            let v1 = store.version();
4777            store.ensure_id_index().await.unwrap();
4778            let v2 = store.version();
4779            assert_eq!(v1, v2, "ensure_id_index should not recreate existing index");
4780        });
4781    }
4782}