1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3
4use arrow_array::{
5 Array, Int32Array, LargeStringArray, RecordBatch, RecordBatchIterator, StringArray,
6};
7use arrow_schema::{ArrowError, DataType, Field, Schema};
8use futures::TryStreamExt;
9use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams};
10use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
11use lance::{Error as LanceError, Result as LanceResult};
12use uuid::Uuid;
13
14use crate::store::{ContextStore, ContextStoreOptions};
15
16const MANIFEST_TABLE_NAME: &str = "__manifest";
17const PARTITION_TABLE_NAME: &str = "dataset";
18
19pub type PartitionSelector = BTreeMap<String, String>;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct PartitionSpec {
29 version: i32,
30 fields: Vec<String>,
31}
32
33impl PartitionSpec {
34 pub fn new<I, S>(fields: I) -> LanceResult<Self>
36 where
37 I: IntoIterator<Item = S>,
38 S: Into<String>,
39 {
40 Self::with_version(1, fields)
41 }
42
43 pub fn with_version<I, S>(version: i32, fields: I) -> LanceResult<Self>
45 where
46 I: IntoIterator<Item = S>,
47 S: Into<String>,
48 {
49 if version <= 0 {
50 return Err(invalid_input("partition spec version must be positive"));
51 }
52
53 let mut seen = HashSet::new();
54 let mut normalized = Vec::new();
55 for field in fields {
56 let field = field.into();
57 if field.trim().is_empty() {
58 return Err(invalid_input("partition field names must be non-empty"));
59 }
60 if !seen.insert(field.clone()) {
61 return Err(invalid_input(format!(
62 "duplicate partition field '{field}'"
63 )));
64 }
65 normalized.push(field);
66 }
67 if normalized.is_empty() {
68 return Err(invalid_input("partition spec requires at least one field"));
69 }
70
71 Ok(Self {
72 version,
73 fields: normalized,
74 })
75 }
76
77 #[must_use]
78 pub fn tenant() -> Self {
79 Self {
80 version: 1,
81 fields: vec!["tenant".to_string()],
82 }
83 }
84
85 #[must_use]
86 pub fn tenant_source() -> Self {
87 Self {
88 version: 1,
89 fields: vec!["tenant".to_string(), "source".to_string()],
90 }
91 }
92
93 #[must_use]
94 pub fn version(&self) -> i32 {
95 self.version
96 }
97
98 #[must_use]
99 pub fn fields(&self) -> &[String] {
100 &self.fields
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct PartitionInfo {
107 pub partition_id: String,
108 pub spec_version: i32,
109 pub selector: PartitionSelector,
110 pub dataset_uri: String,
111}
112
113#[derive(Debug, Clone)]
119pub struct ContextNamespace {
120 root_uri: String,
121 partition_spec: PartitionSpec,
122 context_options: ContextStoreOptions,
123}
124
125impl ContextNamespace {
126 pub async fn create(
128 root_uri: impl Into<String>,
129 partition_spec: PartitionSpec,
130 ) -> LanceResult<Self> {
131 Self::create_with_options(root_uri, partition_spec, ContextStoreOptions::default()).await
132 }
133
134 pub async fn create_with_options(
136 root_uri: impl Into<String>,
137 partition_spec: PartitionSpec,
138 context_options: ContextStoreOptions,
139 ) -> LanceResult<Self> {
140 let namespace = Self::new(root_uri, partition_spec, context_options)?;
141 namespace.ensure_manifest().await?;
142 Ok(namespace)
143 }
144
145 pub fn new(
147 root_uri: impl Into<String>,
148 partition_spec: PartitionSpec,
149 context_options: ContextStoreOptions,
150 ) -> LanceResult<Self> {
151 let root_uri = normalize_root_uri(root_uri.into())?;
152 Ok(Self {
153 root_uri,
154 partition_spec,
155 context_options,
156 })
157 }
158
159 #[must_use]
160 pub fn root_uri(&self) -> &str {
161 &self.root_uri
162 }
163
164 #[must_use]
165 pub fn partition_spec(&self) -> &PartitionSpec {
166 &self.partition_spec
167 }
168
169 #[must_use]
170 pub fn manifest_uri(&self) -> String {
171 join_uri(&self.root_uri, MANIFEST_TABLE_NAME)
172 }
173
174 pub fn resolve_partition(&self, selector: &PartitionSelector) -> LanceResult<PartitionInfo> {
176 self.validate_selector(selector)?;
177 let selector_json = selector_json(selector)?;
178 let partition_id =
179 partition_id(&self.root_uri, self.partition_spec.version, &selector_json);
180 let dataset_uri = join_uri(
181 &self.root_uri,
182 &format!(
183 "v{}/{}/{}",
184 self.partition_spec.version, partition_id, PARTITION_TABLE_NAME
185 ),
186 );
187
188 Ok(PartitionInfo {
189 partition_id,
190 spec_version: self.partition_spec.version,
191 selector: selector.clone(),
192 dataset_uri,
193 })
194 }
195
196 pub async fn context(&self, selector: &PartitionSelector) -> LanceResult<ContextStore> {
198 let partition = self.resolve_partition(selector)?;
199 self.ensure_manifest_entry(&partition).await?;
200 ContextStore::open_with_options(&partition.dataset_uri, self.context_options.clone()).await
201 }
202
203 pub async fn partitions(&self) -> LanceResult<Vec<PartitionInfo>> {
205 let manifest = self.ensure_manifest().await?;
206 read_manifest(&manifest).await
207 }
208
209 async fn ensure_manifest(&self) -> LanceResult<Dataset> {
210 match load_dataset(&self.manifest_uri(), self.context_options.storage_options()).await {
211 Ok(dataset) => Ok(dataset),
212 Err(LanceError::DatasetNotFound { .. }) => {
213 create_manifest(&self.manifest_uri(), self.context_options.storage_options()).await
214 }
215 Err(err) => Err(err),
216 }
217 }
218
219 async fn ensure_manifest_entry(&self, partition: &PartitionInfo) -> LanceResult<()> {
220 let manifest = self.ensure_manifest().await?;
221 for existing in read_manifest(&manifest).await? {
222 if existing.partition_id == partition.partition_id {
223 if existing == *partition {
224 return Ok(());
225 }
226 return Err(invalid_input(format!(
227 "partition id '{}' already maps to a different selector",
228 partition.partition_id
229 )));
230 }
231 }
232
233 append_manifest_entry(
234 &self.manifest_uri(),
235 self.context_options.storage_options(),
236 partition,
237 )
238 .await
239 }
240
241 fn validate_selector(&self, selector: &PartitionSelector) -> LanceResult<()> {
242 for field in &self.partition_spec.fields {
243 match selector.get(field) {
244 Some(value) if !value.is_empty() => {}
245 Some(_) => {
246 return Err(invalid_input(format!(
247 "partition selector value for '{field}' must be non-empty"
248 )));
249 }
250 None => {
251 return Err(invalid_input(format!(
252 "partition selector is missing required field '{field}'"
253 )));
254 }
255 }
256 }
257
258 for field in selector.keys() {
259 if !self
260 .partition_spec
261 .fields
262 .iter()
263 .any(|expected| expected == field)
264 {
265 return Err(invalid_input(format!(
266 "partition selector contains unknown field '{field}'"
267 )));
268 }
269 }
270
271 Ok(())
272 }
273}
274
275fn manifest_schema() -> Arc<Schema> {
276 Arc::new(Schema::new(vec![
277 Field::new("partition_id", DataType::Utf8, false),
278 Field::new("spec_version", DataType::Int32, false),
279 Field::new("selector_json", DataType::LargeUtf8, false),
280 Field::new("dataset_uri", DataType::Utf8, false),
281 ]))
282}
283
284async fn load_dataset(
285 uri: &str,
286 storage_options: Option<HashMap<String, String>>,
287) -> LanceResult<Dataset> {
288 if let Some(options) = storage_options {
289 DatasetBuilder::from_uri(uri)
290 .with_storage_options(options)
291 .load()
292 .await
293 } else {
294 Dataset::open(uri).await
295 }
296}
297
298async fn create_manifest(
299 uri: &str,
300 storage_options: Option<HashMap<String, String>>,
301) -> LanceResult<Dataset> {
302 let schema = manifest_schema();
303 let empty_batch = RecordBatch::new_empty(schema.clone());
304 let batches = RecordBatchIterator::new(
305 vec![Ok::<RecordBatch, ArrowError>(empty_batch)].into_iter(),
306 schema,
307 );
308 let params = write_params(WriteMode::Create, storage_options);
309 Dataset::write(batches, uri, Some(params)).await
310}
311
312async fn append_manifest_entry(
313 uri: &str,
314 storage_options: Option<HashMap<String, String>>,
315 partition: &PartitionInfo,
316) -> LanceResult<()> {
317 let selector_json = selector_json(&partition.selector)?;
318 let schema = manifest_schema();
319 let batch = RecordBatch::try_new(
320 schema.clone(),
321 vec![
322 Arc::new(StringArray::from(vec![partition.partition_id.as_str()])),
323 Arc::new(Int32Array::from(vec![partition.spec_version])),
324 Arc::new(LargeStringArray::from(vec![selector_json.as_str()])),
325 Arc::new(StringArray::from(vec![partition.dataset_uri.as_str()])),
326 ],
327 )?;
328 let batches = RecordBatchIterator::new(
329 vec![Ok::<RecordBatch, ArrowError>(batch)].into_iter(),
330 schema,
331 );
332 let params = write_params(WriteMode::Append, storage_options);
333 Dataset::write(batches, uri, Some(params)).await?;
334 Ok(())
335}
336
337fn write_params(mode: WriteMode, storage_options: Option<HashMap<String, String>>) -> WriteParams {
338 let mut params = WriteParams {
339 mode,
340 ..Default::default()
341 };
342
343 if let Some(options) = storage_options {
344 let store_params = ObjectStoreParams {
345 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
346 options,
347 ))),
348 ..Default::default()
349 };
350 params.store_params = Some(store_params);
351 }
352
353 params
354}
355
356async fn read_manifest(dataset: &Dataset) -> LanceResult<Vec<PartitionInfo>> {
357 let mut scanner = dataset.scan();
358 scanner.project(&[
359 "partition_id",
360 "spec_version",
361 "selector_json",
362 "dataset_uri",
363 ])?;
364 let mut stream = scanner.try_into_stream().await?;
365 let mut partitions = Vec::new();
366 while let Some(batch) = stream.try_next().await? {
367 partitions.extend(manifest_batch_to_partitions(&batch)?);
368 }
369 Ok(partitions)
370}
371
372fn manifest_batch_to_partitions(batch: &RecordBatch) -> LanceResult<Vec<PartitionInfo>> {
373 let partition_id = column_as::<StringArray>(batch, "partition_id")?;
374 let spec_version = column_as::<Int32Array>(batch, "spec_version")?;
375 let selector_json = column_as::<LargeStringArray>(batch, "selector_json")?;
376 let dataset_uri = column_as::<StringArray>(batch, "dataset_uri")?;
377
378 let mut partitions = Vec::with_capacity(batch.num_rows());
379 for row in 0..batch.num_rows() {
380 partitions.push(PartitionInfo {
381 partition_id: partition_id.value(row).to_string(),
382 spec_version: spec_version.value(row),
383 selector: serde_json::from_str(selector_json.value(row)).map_err(|err| {
384 LanceError::from(ArrowError::InvalidArgumentError(format!(
385 "invalid partition selector JSON in manifest: {err}"
386 )))
387 })?,
388 dataset_uri: dataset_uri.value(row).to_string(),
389 });
390 }
391 Ok(partitions)
392}
393
394fn column_as<'a, A>(batch: &'a RecordBatch, name: &str) -> LanceResult<&'a A>
395where
396 A: Array + 'static,
397{
398 batch
399 .column_by_name(name)
400 .and_then(|col| col.as_ref().as_any().downcast_ref::<A>())
401 .ok_or_else(|| {
402 LanceError::from(ArrowError::InvalidArgumentError(format!(
403 "column '{name}' has unexpected data type"
404 )))
405 })
406}
407
408fn selector_json(selector: &PartitionSelector) -> LanceResult<String> {
409 serde_json::to_string(selector).map_err(|err| {
410 LanceError::from(ArrowError::InvalidArgumentError(format!(
411 "partition selector is not JSON serializable: {err}"
412 )))
413 })
414}
415
416fn partition_id(root_uri: &str, spec_version: i32, selector_json: &str) -> String {
417 let key = format!("{root_uri}\n{spec_version}\n{selector_json}");
418 let uuid = Uuid::new_v5(&Uuid::NAMESPACE_URL, key.as_bytes())
419 .simple()
420 .to_string();
421 uuid[..16].to_string()
422}
423
424fn normalize_root_uri(root_uri: String) -> LanceResult<String> {
425 let root_uri = root_uri.trim();
426 if root_uri.is_empty() {
427 return Err(invalid_input("namespace root URI must be non-empty"));
428 }
429 if root_uri == "/" {
430 return Ok(root_uri.to_string());
431 }
432 Ok(root_uri.trim_end_matches('/').to_string())
433}
434
435fn join_uri(root_uri: &str, child: &str) -> String {
436 if root_uri == "/" {
437 format!("/{child}")
438 } else {
439 format!("{root_uri}/{child}")
440 }
441}
442
443fn invalid_input(message: impl Into<String>) -> LanceError {
444 LanceError::from(ArrowError::InvalidArgumentError(message.into()))
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::record::{ContextRecord, LIFECYCLE_ACTIVE};
451 use crate::serde::CONTENT_TYPE_TEXT;
452 use chrono::Utc;
453 use tempfile::TempDir;
454
455 fn selector(pairs: &[(&str, &str)]) -> PartitionSelector {
456 pairs
457 .iter()
458 .map(|(key, value)| ((*key).to_string(), (*value).to_string()))
459 .collect()
460 }
461
462 fn record(id: &str, tenant: &str, source: &str) -> ContextRecord {
463 ContextRecord {
464 id: id.to_string(),
465 external_id: None,
466 run_id: format!("run-{id}"),
467 bot_id: Some("support-bot".to_string()),
468 session_id: None,
469 tenant: Some(tenant.to_string()),
470 source: Some(source.to_string()),
471 created_at: Utc::now(),
472 role: "user".to_string(),
473 state_metadata: None,
474 metadata: None,
475 relationships: Vec::new(),
476 expires_at: None,
477 retention_policy: None,
478 lifecycle_status: LIFECYCLE_ACTIVE.to_string(),
479 retired_at: None,
480 retired_reason: None,
481 supersedes_id: None,
482 superseded_by_id: None,
483 content_type: CONTENT_TYPE_TEXT.to_string(),
484 text_payload: Some("hello".to_string()),
485 binary_payload: None,
486 embedding: None,
487 }
488 }
489
490 #[tokio::test]
491 async fn namespace_context_records_manifest_and_opens_partition() {
492 let dir = TempDir::new().unwrap();
493 let root_uri = dir.path().to_string_lossy().to_string();
494 let namespace = ContextNamespace::create(root_uri, PartitionSpec::tenant_source())
495 .await
496 .unwrap();
497 let selector = selector(&[("tenant", "acme"), ("source", "memory")]);
498 let partition = namespace.resolve_partition(&selector).unwrap();
499
500 let mut store = namespace.context(&selector).await.unwrap();
501 store
502 .add(&[record("rec-1", "acme", "memory")])
503 .await
504 .unwrap();
505
506 let records = store
507 .list_filtered(
508 None,
509 None,
510 Some(&crate::record::RecordFilters {
511 tenant: Some("acme".to_string()),
512 source: Some("memory".to_string()),
513 ..Default::default()
514 }),
515 )
516 .await
517 .unwrap();
518 assert_eq!(records.len(), 1);
519
520 let partitions = namespace.partitions().await.unwrap();
521 assert_eq!(partitions, vec![partition]);
522
523 namespace.context(&selector).await.unwrap();
524 assert_eq!(namespace.partitions().await.unwrap().len(), 1);
525 }
526
527 #[test]
528 fn namespace_rejects_partial_or_unknown_selectors() {
529 let namespace = ContextNamespace::new(
530 "/tmp/context-ns",
531 PartitionSpec::tenant_source(),
532 ContextStoreOptions::default(),
533 )
534 .unwrap();
535
536 let err = namespace
537 .resolve_partition(&selector(&[("tenant", "acme")]))
538 .unwrap_err();
539 assert!(err.to_string().contains("source"));
540
541 let err = namespace
542 .resolve_partition(&selector(&[
543 ("tenant", "acme"),
544 ("source", "memory"),
545 ("session_id", "s1"),
546 ]))
547 .unwrap_err();
548 assert!(err.to_string().contains("session_id"));
549 }
550}