Skip to main content

uni_store/storage/
json_index.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use arrow_array::builder::{ListBuilder, StringBuilder, UInt64Builder};
6use arrow_array::{Array, ListArray, RecordBatch, RecordBatchIterator, UInt64Array};
7use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
8use futures::TryStreamExt;
9use lance::dataset::{Dataset, WriteMode, WriteParams};
10use std::sync::Arc;
11use uni_common::core::id::Vid;
12
13pub struct JsonPathIndex {
14    uri: String,
15}
16
17impl JsonPathIndex {
18    pub fn new(base_uri: &str, label: &str, path: &str) -> Self {
19        // Path might contain special chars like $.title -> idx_label_title
20        let safe_path = path.replace(|c: char| !c.is_alphanumeric(), "_");
21        let uri = format!("{}/indexes/idx_{}_{}", base_uri, label, safe_path);
22        Self { uri }
23    }
24
25    pub async fn open(&self) -> Result<Arc<Dataset>> {
26        let ds = Dataset::open(&self.uri).await?;
27        Ok(Arc::new(ds))
28    }
29
30    pub fn get_arrow_schema() -> Arc<ArrowSchema> {
31        Arc::new(ArrowSchema::new(vec![
32            Field::new("value", ArrowDataType::Utf8, false),
33            Field::new(
34                "vids",
35                ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::UInt64, true))),
36                false,
37            ),
38        ]))
39    }
40
41    pub async fn write_entries(&self, entries: Vec<(String, Vec<Vid>)>) -> Result<()> {
42        let schema = Self::get_arrow_schema();
43
44        let mut value_builder = StringBuilder::new();
45        let mut vids_builder = ListBuilder::new(UInt64Builder::new());
46
47        for (val, vids) in entries {
48            value_builder.append_value(val);
49            for vid in vids {
50                vids_builder.values().append_value(vid.as_u64());
51            }
52            vids_builder.append(true);
53        }
54
55        let batch = RecordBatch::try_new(
56            schema.clone(),
57            vec![
58                Arc::new(value_builder.finish()),
59                Arc::new(vids_builder.finish()),
60            ],
61        )?;
62
63        let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema);
64
65        let params = WriteParams {
66            mode: WriteMode::Append,
67            ..Default::default()
68        };
69
70        Dataset::write(Box::new(reader), &self.uri, Some(params)).await?;
71        Ok(())
72    }
73
74    pub async fn get_vids(&self, value: &str) -> Result<Vec<Vid>> {
75        let ds = match self.open().await {
76            Ok(ds) => ds,
77            Err(_) => return Ok(vec![]),
78        };
79
80        // Scan and filter (MVP)
81        let mut stream = ds
82            .scan()
83            .filter(&format!("value = '{}'", value))?
84            .try_into_stream()
85            .await?;
86
87        let mut result = Vec::new();
88
89        while let Some(batch) = stream.try_next().await? {
90            let vids_col = batch
91                .column_by_name("vids")
92                .ok_or(anyhow!("Missing vids column"))?
93                .as_any()
94                .downcast_ref::<ListArray>()
95                .ok_or(anyhow!("Invalid vids column type"))?;
96
97            for i in 0..batch.num_rows() {
98                let list = vids_col.value(i);
99                let uint64_list = list
100                    .as_any()
101                    .downcast_ref::<UInt64Array>()
102                    .ok_or(anyhow!("Invalid inner type"))?;
103
104                for j in 0..uint64_list.len() {
105                    result.push(Vid::from(uint64_list.value(j)));
106                }
107            }
108        }
109        Ok(result)
110    }
111}