Skip to main content

lance_context_core/
namespace.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3
4use arrow_array::{
5    Array, Int32Array, LargeStringArray, RecordBatch, RecordBatchIterator, StringArray,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams};
10use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
11use lance::{Error as LanceError, Result as LanceResult};
12use uuid::Uuid;
13
14use crate::store::{ContextStore, ContextStoreOptions};
15
16const MANIFEST_TABLE_NAME: &str = "__manifest";
17const PARTITION_TABLE_NAME: &str = "dataset";
18
19/// Complete selector values for one context partition.
20pub type PartitionSelector = BTreeMap<String, String>;
21
22/// Phase-1 partition specification for a context namespace.
23///
24/// This intentionally models only identity partition fields. More advanced
25/// Lance partition transforms (`bucket`, `truncate`, time transforms) can be
26/// layered in once the namespace resolver grows past the single-partition path.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct PartitionSpec {
29    version: i32,
30    fields: Vec<String>,
31}
32
33impl PartitionSpec {
34    /// Create a v1 identity partition spec from ordered field names.
35    pub fn new<I, S>(fields: I) -> LanceResult<Self>
36    where
37        I: IntoIterator<Item = S>,
38        S: Into<String>,
39    {
40        Self::with_version(1, fields)
41    }
42
43    /// Create an identity partition spec with an explicit version.
44    pub fn with_version<I, S>(version: i32, fields: I) -> LanceResult<Self>
45    where
46        I: IntoIterator<Item = S>,
47        S: Into<String>,
48    {
49        if version <= 0 {
50            return Err(invalid_input("partition spec version must be positive"));
51        }
52
53        let mut seen = HashSet::new();
54        let mut normalized = Vec::new();
55        for field in fields {
56            let field = field.into();
57            if field.trim().is_empty() {
58                return Err(invalid_input("partition field names must be non-empty"));
59            }
60            if !seen.insert(field.clone()) {
61                return Err(invalid_input(format!(
62                    "duplicate partition field '{field}'"
63                )));
64            }
65            normalized.push(field);
66        }
67        if normalized.is_empty() {
68            return Err(invalid_input("partition spec requires at least one field"));
69        }
70
71        Ok(Self {
72            version,
73            fields: normalized,
74        })
75    }
76
77    #[must_use]
78    pub fn tenant() -> Self {
79        Self {
80            version: 1,
81            fields: vec!["tenant".to_string()],
82        }
83    }
84
85    #[must_use]
86    pub fn tenant_source() -> Self {
87        Self {
88            version: 1,
89            fields: vec!["tenant".to_string(), "source".to_string()],
90        }
91    }
92
93    #[must_use]
94    pub fn version(&self) -> i32 {
95        self.version
96    }
97
98    #[must_use]
99    pub fn fields(&self) -> &[String] {
100        &self.fields
101    }
102}
103
104/// Manifest row describing one resolved context partition.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct PartitionInfo {
107    pub partition_id: String,
108    pub spec_version: i32,
109    pub selector: PartitionSelector,
110    pub dataset_uri: String,
111}
112
113/// Thin resolver for scoped context partitions under one namespace root.
114///
115/// Phase 1 supports opening exactly one partition from a complete selector. The
116/// returned value is today's [`ContextStore`], preserving existing per-context
117/// behavior while putting each selector in a physically separate Lance dataset.
118#[derive(Debug, Clone)]
119pub struct ContextNamespace {
120    root_uri: String,
121    partition_spec: PartitionSpec,
122    context_options: ContextStoreOptions,
123}
124
125impl ContextNamespace {
126    /// Create or open a namespace root with default context-store options.
127    pub async fn create(
128        root_uri: impl Into<String>,
129        partition_spec: PartitionSpec,
130    ) -> LanceResult<Self> {
131        Self::create_with_options(root_uri, partition_spec, ContextStoreOptions::default()).await
132    }
133
134    /// Create or open a namespace root with explicit context-store options.
135    pub async fn create_with_options(
136        root_uri: impl Into<String>,
137        partition_spec: PartitionSpec,
138        context_options: ContextStoreOptions,
139    ) -> LanceResult<Self> {
140        let namespace = Self::new(root_uri, partition_spec, context_options)?;
141        namespace.ensure_manifest().await?;
142        Ok(namespace)
143    }
144
145    /// Build a namespace resolver without touching storage.
146    pub fn new(
147        root_uri: impl Into<String>,
148        partition_spec: PartitionSpec,
149        context_options: ContextStoreOptions,
150    ) -> LanceResult<Self> {
151        let root_uri = normalize_root_uri(root_uri.into())?;
152        Ok(Self {
153            root_uri,
154            partition_spec,
155            context_options,
156        })
157    }
158
159    #[must_use]
160    pub fn root_uri(&self) -> &str {
161        &self.root_uri
162    }
163
164    #[must_use]
165    pub fn partition_spec(&self) -> &PartitionSpec {
166        &self.partition_spec
167    }
168
169    #[must_use]
170    pub fn manifest_uri(&self) -> String {
171        join_uri(&self.root_uri, MANIFEST_TABLE_NAME)
172    }
173
174    /// Resolve a complete selector to its opaque partition dataset URI.
175    pub fn resolve_partition(&self, selector: &PartitionSelector) -> LanceResult<PartitionInfo> {
176        self.validate_selector(selector)?;
177        let selector_json = selector_json(selector)?;
178        let partition_id =
179            partition_id(&self.root_uri, self.partition_spec.version, &selector_json);
180        let dataset_uri = join_uri(
181            &self.root_uri,
182            &format!(
183                "v{}/{}/{}",
184                self.partition_spec.version, partition_id, PARTITION_TABLE_NAME
185            ),
186        );
187
188        Ok(PartitionInfo {
189            partition_id,
190            spec_version: self.partition_spec.version,
191            selector: selector.clone(),
192            dataset_uri,
193        })
194    }
195
196    /// Open one partition as a normal [`ContextStore`].
197    pub async fn context(&self, selector: &PartitionSelector) -> LanceResult<ContextStore> {
198        let partition = self.resolve_partition(selector)?;
199        self.ensure_manifest_entry(&partition).await?;
200        ContextStore::open_with_options(&partition.dataset_uri, self.context_options.clone()).await
201    }
202
203    /// List partition mappings currently recorded in `__manifest`.
204    pub async fn partitions(&self) -> LanceResult<Vec<PartitionInfo>> {
205        let manifest = self.ensure_manifest().await?;
206        read_manifest(&manifest).await
207    }
208
209    async fn ensure_manifest(&self) -> LanceResult<Dataset> {
210        match load_dataset(&self.manifest_uri(), self.context_options.storage_options()).await {
211            Ok(dataset) => Ok(dataset),
212            Err(LanceError::DatasetNotFound { .. }) => {
213                create_manifest(&self.manifest_uri(), self.context_options.storage_options()).await
214            }
215            Err(err) => Err(err),
216        }
217    }
218
219    async fn ensure_manifest_entry(&self, partition: &PartitionInfo) -> LanceResult<()> {
220        let manifest = self.ensure_manifest().await?;
221        for existing in read_manifest(&manifest).await? {
222            if existing.partition_id == partition.partition_id {
223                if existing == *partition {
224                    return Ok(());
225                }
226                return Err(invalid_input(format!(
227                    "partition id '{}' already maps to a different selector",
228                    partition.partition_id
229                )));
230            }
231        }
232
233        append_manifest_entry(
234            &self.manifest_uri(),
235            self.context_options.storage_options(),
236            partition,
237        )
238        .await
239    }
240
241    fn validate_selector(&self, selector: &PartitionSelector) -> LanceResult<()> {
242        for field in &self.partition_spec.fields {
243            match selector.get(field) {
244                Some(value) if !value.is_empty() => {}
245                Some(_) => {
246                    return Err(invalid_input(format!(
247                        "partition selector value for '{field}' must be non-empty"
248                    )));
249                }
250                None => {
251                    return Err(invalid_input(format!(
252                        "partition selector is missing required field '{field}'"
253                    )));
254                }
255            }
256        }
257
258        for field in selector.keys() {
259            if !self
260                .partition_spec
261                .fields
262                .iter()
263                .any(|expected| expected == field)
264            {
265                return Err(invalid_input(format!(
266                    "partition selector contains unknown field '{field}'"
267                )));
268            }
269        }
270
271        Ok(())
272    }
273}
274
275fn manifest_schema() -> Arc<Schema> {
276    Arc::new(Schema::new(vec![
277        Field::new("partition_id", DataType::Utf8, false),
278        Field::new("spec_version", DataType::Int32, false),
279        Field::new("selector_json", DataType::LargeUtf8, false),
280        Field::new("dataset_uri", DataType::Utf8, false),
281    ]))
282}
283
284async fn load_dataset(
285    uri: &str,
286    storage_options: Option<HashMap<String, String>>,
287) -> LanceResult<Dataset> {
288    if let Some(options) = storage_options {
289        DatasetBuilder::from_uri(uri)
290            .with_storage_options(options)
291            .load()
292            .await
293    } else {
294        Dataset::open(uri).await
295    }
296}
297
298async fn create_manifest(
299    uri: &str,
300    storage_options: Option<HashMap<String, String>>,
301) -> LanceResult<Dataset> {
302    let schema = manifest_schema();
303    let empty_batch = RecordBatch::new_empty(schema.clone());
304    let batches = RecordBatchIterator::new(
305        vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
306        schema,
307    );
308    let params = write_params(WriteMode::Create, storage_options);
309    Dataset::write(batches, uri, Some(params)).await
310}
311
312async fn append_manifest_entry(
313    uri: &str,
314    storage_options: Option<HashMap<String, String>>,
315    partition: &PartitionInfo,
316) -> LanceResult<()> {
317    let selector_json = selector_json(&partition.selector)?;
318    let schema = manifest_schema();
319    let batch = RecordBatch::try_new(
320        schema.clone(),
321        vec![
322            Arc::new(StringArray::from(vec![partition.partition_id.as_str()])),
323            Arc::new(Int32Array::from(vec![partition.spec_version])),
324            Arc::new(LargeStringArray::from(vec![selector_json.as_str()])),
325            Arc::new(StringArray::from(vec![partition.dataset_uri.as_str()])),
326        ],
327    )?;
328    let batches = RecordBatchIterator::new(
329        vec![Ok::<RecordBatch, ArrowError>(batch)].into_iter(),
330        schema,
331    );
332    let params = write_params(WriteMode::Append, storage_options);
333    Dataset::write(batches, uri, Some(params)).await?;
334    Ok(())
335}
336
337fn write_params(mode: WriteMode, storage_options: Option<HashMap<String, String>>) -> WriteParams {
338    let mut params = WriteParams {
339        mode,
340        ..Default::default()
341    };
342
343    if let Some(options) = storage_options {
344        let store_params = ObjectStoreParams {
345            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
346                options,
347            ))),
348            ..Default::default()
349        };
350        params.store_params = Some(store_params);
351    }
352
353    params
354}
355
356async fn read_manifest(dataset: &Dataset) -> LanceResult<Vec<PartitionInfo>> {
357    let mut scanner = dataset.scan();
358    scanner.project(&[
359        "partition_id",
360        "spec_version",
361        "selector_json",
362        "dataset_uri",
363    ])?;
364    let mut stream = scanner.try_into_stream().await?;
365    let mut partitions = Vec::new();
366    while let Some(batch) = stream.try_next().await? {
367        partitions.extend(manifest_batch_to_partitions(&batch)?);
368    }
369    Ok(partitions)
370}
371
372fn manifest_batch_to_partitions(batch: &RecordBatch) -> LanceResult<Vec<PartitionInfo>> {
373    let partition_id = column_as::<StringArray>(batch, "partition_id")?;
374    let spec_version = column_as::<Int32Array>(batch, "spec_version")?;
375    let selector_json = column_as::<LargeStringArray>(batch, "selector_json")?;
376    let dataset_uri = column_as::<StringArray>(batch, "dataset_uri")?;
377
378    let mut partitions = Vec::with_capacity(batch.num_rows());
379    for row in 0..batch.num_rows() {
380        partitions.push(PartitionInfo {
381            partition_id: partition_id.value(row).to_string(),
382            spec_version: spec_version.value(row),
383            selector: serde_json::from_str(selector_json.value(row)).map_err(|err| {
384                LanceError::from(ArrowError::InvalidArgumentError(format!(
385                    "invalid partition selector JSON in manifest: {err}"
386                )))
387            })?,
388            dataset_uri: dataset_uri.value(row).to_string(),
389        });
390    }
391    Ok(partitions)
392}
393
394fn column_as<'a, A>(batch: &'a RecordBatch, name: &str) -> LanceResult<&'a A>
395where
396    A: Array + 'static,
397{
398    batch
399        .column_by_name(name)
400        .and_then(|col| col.as_ref().as_any().downcast_ref::<A>())
401        .ok_or_else(|| {
402            LanceError::from(ArrowError::InvalidArgumentError(format!(
403                "column '{name}' has unexpected data type"
404            )))
405        })
406}
407
408fn selector_json(selector: &PartitionSelector) -> LanceResult<String> {
409    serde_json::to_string(selector).map_err(|err| {
410        LanceError::from(ArrowError::InvalidArgumentError(format!(
411            "partition selector is not JSON serializable: {err}"
412        )))
413    })
414}
415
416fn partition_id(root_uri: &str, spec_version: i32, selector_json: &str) -> String {
417    let key = format!("{root_uri}\n{spec_version}\n{selector_json}");
418    let uuid = Uuid::new_v5(&Uuid::NAMESPACE_URL, key.as_bytes())
419        .simple()
420        .to_string();
421    uuid[..16].to_string()
422}
423
424fn normalize_root_uri(root_uri: String) -> LanceResult<String> {
425    let root_uri = root_uri.trim();
426    if root_uri.is_empty() {
427        return Err(invalid_input("namespace root URI must be non-empty"));
428    }
429    if root_uri == "/" {
430        return Ok(root_uri.to_string());
431    }
432    Ok(root_uri.trim_end_matches('/').to_string())
433}
434
435fn join_uri(root_uri: &str, child: &str) -> String {
436    if root_uri == "/" {
437        format!("/{child}")
438    } else {
439        format!("{root_uri}/{child}")
440    }
441}
442
443fn invalid_input(message: impl Into<String>) -> LanceError {
444    LanceError::from(ArrowError::InvalidArgumentError(message.into()))
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::record::{ContextRecord, LIFECYCLE_ACTIVE};
451    use crate::serde::CONTENT_TYPE_TEXT;
452    use chrono::Utc;
453    use tempfile::TempDir;
454
455    fn selector(pairs: &[(&str, &str)]) -> PartitionSelector {
456        pairs
457            .iter()
458            .map(|(key, value)| ((*key).to_string(), (*value).to_string()))
459            .collect()
460    }
461
462    fn record(id: &str, tenant: &str, source: &str) -> ContextRecord {
463        ContextRecord {
464            id: id.to_string(),
465            external_id: None,
466            run_id: format!("run-{id}"),
467            bot_id: Some("support-bot".to_string()),
468            session_id: None,
469            tenant: Some(tenant.to_string()),
470            source: Some(source.to_string()),
471            created_at: Utc::now(),
472            role: "user".to_string(),
473            state_metadata: None,
474            metadata: None,
475            relationships: Vec::new(),
476            expires_at: None,
477            retention_policy: None,
478            lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
479            retired_at: None,
480            retired_reason: None,
481            supersedes_id: None,
482            superseded_by_id: None,
483            content_type: CONTENT_TYPE_TEXT.to_string(),
484            text_payload: Some("hello".to_string()),
485            binary_payload: None,
486            embedding: None,
487        }
488    }
489
490    #[tokio::test]
491    async fn namespace_context_records_manifest_and_opens_partition() {
492        let dir = TempDir::new().unwrap();
493        let root_uri = dir.path().to_string_lossy().to_string();
494        let namespace = ContextNamespace::create(root_uri, PartitionSpec::tenant_source())
495            .await
496            .unwrap();
497        let selector = selector(&[("tenant", "acme"), ("source", "memory")]);
498        let partition = namespace.resolve_partition(&selector).unwrap();
499
500        let mut store = namespace.context(&selector).await.unwrap();
501        store
502            .add(&[record("rec-1", "acme", "memory")])
503            .await
504            .unwrap();
505
506        let records = store
507            .list_filtered(
508                None,
509                None,
510                Some(&crate::record::RecordFilters {
511                    tenant: Some("acme".to_string()),
512                    source: Some("memory".to_string()),
513                    ..Default::default()
514                }),
515            )
516            .await
517            .unwrap();
518        assert_eq!(records.len(), 1);
519
520        let partitions = namespace.partitions().await.unwrap();
521        assert_eq!(partitions, vec![partition]);
522
523        namespace.context(&selector).await.unwrap();
524        assert_eq!(namespace.partitions().await.unwrap().len(), 1);
525    }
526
527    #[test]
528    fn namespace_rejects_partial_or_unknown_selectors() {
529        let namespace = ContextNamespace::new(
530            "/tmp/context-ns",
531            PartitionSpec::tenant_source(),
532            ContextStoreOptions::default(),
533        )
534        .unwrap();
535
536        let err = namespace
537            .resolve_partition(&selector(&[("tenant", "acme")]))
538            .unwrap_err();
539        assert!(err.to_string().contains("source"));
540
541        let err = namespace
542            .resolve_partition(&selector(&[
543                ("tenant", "acme"),
544                ("source", "memory"),
545                ("session_id", "s1"),
546            ]))
547            .unwrap_err();
548        assert!(err.to_string().contains("session_id"));
549    }
550}