gluesql_json_storage/
lib.rs1mod alter_table;
2pub mod error;
3mod function;
4mod index;
5mod store;
6mod store_mut;
7mod transaction;
8
9use {
10 error::{JsonStorageError, OptionExt, ResultExt},
11 gluesql_core::{
12 ast::ColumnUniqueOption,
13 data::{Key, Schema, value::BTreeMapJsonExt},
14 error::{Error, Result},
15 store::{DataRow, Metadata},
16 },
17 iter_enum::Iterator,
18 serde_json::Value as JsonValue,
19 std::{
20 collections::BTreeMap,
21 fs::{self, File},
22 io::{self, BufRead, Read},
23 path::{Path, PathBuf},
24 },
25};
26
27type RowIter = Box<dyn Iterator<Item = Result<(Key, DataRow)>> + Send>;
28
29#[derive(Clone, Debug)]
30pub struct JsonStorage {
31 pub path: PathBuf,
32}
33
34impl JsonStorage {
35 pub fn new<T: AsRef<Path>>(path: T) -> Result<Self> {
36 let path = path.as_ref();
37 fs::create_dir_all(path).map_storage_err()?;
38
39 Ok(Self { path: path.into() })
40 }
41
42 fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
43 match (
44 self.jsonl_path(table_name).exists(),
45 self.json_path(table_name).exists(),
46 ) {
47 (true, true) => {
48 return Err(Error::StorageMsg(
49 JsonStorageError::BothJsonlAndJsonExist(table_name.to_owned()).to_string(),
50 ));
51 }
52 (false, false) => return Ok(None),
53 _ => {}
54 }
55
56 let schema_path = self.schema_path(table_name);
57 let (column_defs, foreign_keys, comment) = match schema_path.exists() {
58 true => {
59 let mut file = File::open(&schema_path).map_storage_err()?;
60 let mut ddl = String::new();
61 file.read_to_string(&mut ddl).map_storage_err()?;
62
63 let schema = Schema::from_ddl(&ddl)?;
64 if schema.table_name != table_name {
65 return Err(Error::StorageMsg(
66 JsonStorageError::TableNameDoesNotMatchWithFile.to_string(),
67 ));
68 }
69
70 (schema.column_defs, schema.foreign_keys, schema.comment)
71 }
72 false => (None, Vec::new(), None),
73 };
74
75 Ok(Some(Schema {
76 table_name: table_name.to_owned(),
77 column_defs,
78 indexes: vec![],
79 engine: None,
80 foreign_keys,
81 comment,
82 }))
83 }
84
85 fn jsonl_path(&self, table_name: &str) -> PathBuf {
86 self.path_by(table_name, "jsonl")
87 }
88
89 fn json_path(&self, table_name: &str) -> PathBuf {
90 self.path_by(table_name, "json")
91 }
92
93 fn schema_path(&self, table_name: &str) -> PathBuf {
94 self.path_by(table_name, "sql")
95 }
96
97 fn path_by(&self, table_name: &str, extension: &str) -> PathBuf {
98 let path = self.path.as_path();
99 let mut path = path.join(table_name);
100 path.set_extension(extension);
101
102 path
103 }
104
105 fn scan_data(&self, table_name: &str) -> Result<(RowIter, Schema)> {
106 let schema = self
107 .fetch_schema(table_name)?
108 .map_storage_err(JsonStorageError::TableDoesNotExist)?;
109
110 #[derive(Iterator)]
111 enum Extension<I1, I2> {
112 Json(I1),
113 Jsonl(I2),
114 }
115 let json_path = self.json_path(table_name);
116 let jsons = match fs::read_to_string(json_path) {
117 Ok(json_file_str) => {
118 let value = serde_json::from_str(&json_file_str).map_err(|_| {
119 Error::StorageMsg(
120 JsonStorageError::InvalidJsonContent(format!("{table_name}.json"))
121 .to_string(),
122 )
123 })?;
124
125 let jsons = match value {
126 JsonValue::Array(values) => values
127 .into_iter()
128 .map(|value| match value {
129 JsonValue::Object(json_map) => BTreeMap::try_from_json_map(json_map),
130 _ => Err(Error::StorageMsg(
131 JsonStorageError::JsonObjectTypeRequired.to_string(),
132 )),
133 })
134 .collect::<Result<Vec<_>>>(),
135 JsonValue::Object(json_map) => Ok(vec![BTreeMap::try_from_json_map(json_map)?]),
136 _ => Err(Error::StorageMsg(
137 JsonStorageError::JsonArrayTypeRequired.to_string(),
138 )),
139 }?;
140
141 Extension::Json(jsons.into_iter().map(Ok))
142 }
143 Err(_) => {
144 let jsonl_path = self.jsonl_path(table_name);
145 let lines = read_lines(jsonl_path).map_storage_err()?;
146 let jsons = lines.map(|line| BTreeMap::parse_json_object(&line.map_storage_err()?));
147
148 Extension::Jsonl(jsons)
149 }
150 };
151
152 let schema2 = schema.clone();
153 let rows = jsons.enumerate().map(move |(index, json)| -> Result<_> {
154 let json = json?;
155 let get_index_key = || index.try_into().map(Key::I64).map_storage_err();
156
157 let column_defs = match &schema2.column_defs {
158 Some(column_defs) => column_defs,
159 None => {
160 let key = get_index_key()?;
161 let row = DataRow::Map(json);
162
163 return Ok((key, row));
164 }
165 };
166
167 let mut key: Option<Key> = None;
168 let mut values = Vec::with_capacity(column_defs.len());
169 for column_def in column_defs {
170 let value = json.get(&column_def.name).map_storage_err(
171 JsonStorageError::ColumnDoesNotExist(column_def.name.clone()),
172 )?;
173
174 if column_def.unique == Some(ColumnUniqueOption { is_primary: true }) {
175 let value = value.cast(&column_def.data_type)?;
176 key = Some(value.try_into().map_storage_err()?);
177 }
178
179 let value = match value.get_type() {
180 Some(data_type) if data_type != column_def.data_type => {
181 value.cast(&column_def.data_type)?
182 }
183 Some(_) | None => value.clone(),
184 };
185
186 values.push(value);
187 }
188
189 let key = match key {
190 Some(key) => key,
191 None => get_index_key()?,
192 };
193 let row = DataRow::Vec(values);
194
195 Ok((key, row))
196 });
197
198 Ok((Box::new(rows), schema))
199 }
200}
201
202fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
203where
204 P: AsRef<Path>,
205{
206 let file = File::open(filename)?;
207 Ok(io::BufReader::new(file).lines())
208}
209
210impl Metadata for JsonStorage {}