gluesql_json_storage/
lib.rs

1mod alter_table;
2pub mod error;
3mod function;
4mod index;
5mod store;
6mod store_mut;
7mod transaction;
8
9use {
10    error::{JsonStorageError, OptionExt, ResultExt},
11    gluesql_core::{
12        ast::ColumnUniqueOption,
13        data::{Key, Schema, value::BTreeMapJsonExt},
14        error::{Error, Result},
15        store::{DataRow, Metadata},
16    },
17    iter_enum::Iterator,
18    serde_json::Value as JsonValue,
19    std::{
20        collections::BTreeMap,
21        fs::{self, File},
22        io::{self, BufRead, Read},
23        path::{Path, PathBuf},
24    },
25};
26
27type RowIter = Box<dyn Iterator<Item = Result<(Key, DataRow)>> + Send>;
28
29#[derive(Clone, Debug)]
30pub struct JsonStorage {
31    pub path: PathBuf,
32}
33
34impl JsonStorage {
35    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self> {
36        let path = path.as_ref();
37        fs::create_dir_all(path).map_storage_err()?;
38
39        Ok(Self { path: path.into() })
40    }
41
42    fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
43        match (
44            self.jsonl_path(table_name).exists(),
45            self.json_path(table_name).exists(),
46        ) {
47            (true, true) => {
48                return Err(Error::StorageMsg(
49                    JsonStorageError::BothJsonlAndJsonExist(table_name.to_owned()).to_string(),
50                ));
51            }
52            (false, false) => return Ok(None),
53            _ => {}
54        }
55
56        let schema_path = self.schema_path(table_name);
57        let (column_defs, foreign_keys, comment) = match schema_path.exists() {
58            true => {
59                let mut file = File::open(&schema_path).map_storage_err()?;
60                let mut ddl = String::new();
61                file.read_to_string(&mut ddl).map_storage_err()?;
62
63                let schema = Schema::from_ddl(&ddl)?;
64                if schema.table_name != table_name {
65                    return Err(Error::StorageMsg(
66                        JsonStorageError::TableNameDoesNotMatchWithFile.to_string(),
67                    ));
68                }
69
70                (schema.column_defs, schema.foreign_keys, schema.comment)
71            }
72            false => (None, Vec::new(), None),
73        };
74
75        Ok(Some(Schema {
76            table_name: table_name.to_owned(),
77            column_defs,
78            indexes: vec![],
79            engine: None,
80            foreign_keys,
81            comment,
82        }))
83    }
84
85    fn jsonl_path(&self, table_name: &str) -> PathBuf {
86        self.path_by(table_name, "jsonl")
87    }
88
89    fn json_path(&self, table_name: &str) -> PathBuf {
90        self.path_by(table_name, "json")
91    }
92
93    fn schema_path(&self, table_name: &str) -> PathBuf {
94        self.path_by(table_name, "sql")
95    }
96
97    fn path_by(&self, table_name: &str, extension: &str) -> PathBuf {
98        let path = self.path.as_path();
99        let mut path = path.join(table_name);
100        path.set_extension(extension);
101
102        path
103    }
104
105    fn scan_data(&self, table_name: &str) -> Result<(RowIter, Schema)> {
106        let schema = self
107            .fetch_schema(table_name)?
108            .map_storage_err(JsonStorageError::TableDoesNotExist)?;
109
110        #[derive(Iterator)]
111        enum Extension<I1, I2> {
112            Json(I1),
113            Jsonl(I2),
114        }
115        let json_path = self.json_path(table_name);
116        let jsons = match fs::read_to_string(json_path) {
117            Ok(json_file_str) => {
118                let value = serde_json::from_str(&json_file_str).map_err(|_| {
119                    Error::StorageMsg(
120                        JsonStorageError::InvalidJsonContent(format!("{table_name}.json"))
121                            .to_string(),
122                    )
123                })?;
124
125                let jsons = match value {
126                    JsonValue::Array(values) => values
127                        .into_iter()
128                        .map(|value| match value {
129                            JsonValue::Object(json_map) => BTreeMap::try_from_json_map(json_map),
130                            _ => Err(Error::StorageMsg(
131                                JsonStorageError::JsonObjectTypeRequired.to_string(),
132                            )),
133                        })
134                        .collect::<Result<Vec<_>>>(),
135                    JsonValue::Object(json_map) => Ok(vec![BTreeMap::try_from_json_map(json_map)?]),
136                    _ => Err(Error::StorageMsg(
137                        JsonStorageError::JsonArrayTypeRequired.to_string(),
138                    )),
139                }?;
140
141                Extension::Json(jsons.into_iter().map(Ok))
142            }
143            Err(_) => {
144                let jsonl_path = self.jsonl_path(table_name);
145                let lines = read_lines(jsonl_path).map_storage_err()?;
146                let jsons = lines.map(|line| BTreeMap::parse_json_object(&line.map_storage_err()?));
147
148                Extension::Jsonl(jsons)
149            }
150        };
151
152        let schema2 = schema.clone();
153        let rows = jsons.enumerate().map(move |(index, json)| -> Result<_> {
154            let json = json?;
155            let get_index_key = || index.try_into().map(Key::I64).map_storage_err();
156
157            let column_defs = match &schema2.column_defs {
158                Some(column_defs) => column_defs,
159                None => {
160                    let key = get_index_key()?;
161                    let row = DataRow::Map(json);
162
163                    return Ok((key, row));
164                }
165            };
166
167            let mut key: Option<Key> = None;
168            let mut values = Vec::with_capacity(column_defs.len());
169            for column_def in column_defs {
170                let value = json.get(&column_def.name).map_storage_err(
171                    JsonStorageError::ColumnDoesNotExist(column_def.name.clone()),
172                )?;
173
174                if column_def.unique == Some(ColumnUniqueOption { is_primary: true }) {
175                    let value = value.cast(&column_def.data_type)?;
176                    key = Some(value.try_into().map_storage_err()?);
177                }
178
179                let value = match value.get_type() {
180                    Some(data_type) if data_type != column_def.data_type => {
181                        value.cast(&column_def.data_type)?
182                    }
183                    Some(_) | None => value.clone(),
184                };
185
186                values.push(value);
187            }
188
189            let key = match key {
190                Some(key) => key,
191                None => get_index_key()?,
192            };
193            let row = DataRow::Vec(values);
194
195            Ok((key, row))
196        });
197
198        Ok((Box::new(rows), schema))
199    }
200}
201
202fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
203where
204    P: AsRef<Path>,
205{
206    let file = File::open(filename)?;
207    Ok(io::BufReader::new(file).lines())
208}
209
210impl Metadata for JsonStorage {}