uni_store/storage/
json_index.rs1use anyhow::{Result, anyhow};
5use arrow_array::builder::{ListBuilder, StringBuilder, UInt64Builder};
6use arrow_array::{Array, ListArray, RecordBatch, RecordBatchIterator, UInt64Array};
7use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
8use futures::TryStreamExt;
9use lance::dataset::{Dataset, WriteMode, WriteParams};
10use std::sync::Arc;
11use uni_common::core::id::Vid;
12
13pub struct JsonPathIndex {
14 uri: String,
15}
16
17impl JsonPathIndex {
18 pub fn new(base_uri: &str, label: &str, path: &str) -> Self {
19 let safe_path = path.replace(|c: char| !c.is_alphanumeric(), "_");
21 let uri = format!("{}/indexes/idx_{}_{}", base_uri, label, safe_path);
22 Self { uri }
23 }
24
25 pub async fn open(&self) -> Result<Arc<Dataset>> {
26 let ds = Dataset::open(&self.uri).await?;
27 Ok(Arc::new(ds))
28 }
29
30 pub fn get_arrow_schema() -> Arc<ArrowSchema> {
31 Arc::new(ArrowSchema::new(vec![
32 Field::new("value", ArrowDataType::Utf8, false),
33 Field::new(
34 "vids",
35 ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt64, true))),
36 false,
37 ),
38 ]))
39 }
40
41 pub async fn write_entries(&self, entries: Vec<(String, Vec<Vid>)>) -> Result<()> {
42 let schema = Self::get_arrow_schema();
43
44 let mut value_builder = StringBuilder::new();
45 let mut vids_builder = ListBuilder::new(UInt64Builder::new());
46
47 for (val, vids) in entries {
48 value_builder.append_value(val);
49 for vid in vids {
50 vids_builder.values().append_value(vid.as_u64());
51 }
52 vids_builder.append(true);
53 }
54
55 let batch = RecordBatch::try_new(
56 schema.clone(),
57 vec![
58 Arc::new(value_builder.finish()),
59 Arc::new(vids_builder.finish()),
60 ],
61 )?;
62
63 let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema);
64
65 let params = WriteParams {
66 mode: WriteMode::Append,
67 ..Default::default()
68 };
69
70 Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
71 Ok(())
72 }
73
74 pub async fn get_vids(&self, value: &str) -> Result<Vec<Vid>> {
75 let ds = match self.open().await {
76 Ok(ds) => ds,
77 Err(_) => return Ok(vec![]),
78 };
79
80 let mut stream = ds
82 .scan()
83 .filter(&format!("value = '{}'", value))?
84 .try_into_stream()
85 .await?;
86
87 let mut result = Vec::new();
88
89 while let Some(batch) = stream.try_next().await? {
90 let vids_col = batch
91 .column_by_name("vids")
92 .ok_or(anyhow!("Missing vids column"))?
93 .as_any()
94 .downcast_ref::<ListArray>()
95 .ok_or(anyhow!("Invalid vids column type"))?;
96
97 for i in 0..batch.num_rows() {
98 let list = vids_col.value(i);
99 let uint64_list = list
100 .as_any()
101 .downcast_ref::<UInt64Array>()
102 .ok_or(anyhow!("Invalid inner type"))?;
103
104 for j in 0..uint64_list.len() {
105 result.push(Vid::from(uint64_list.value(j)));
106 }
107 }
108 }
109 Ok(result)
110 }
111}