1pub mod error;
2mod store;
3mod store_mut;
4
5use {
6 error::{CsvStorageError, ResultExt},
7 gluesql_core::{
8 ast::{ColumnDef, ColumnUniqueOption, DataType},
9 data::{Key, Schema, Value},
10 error::Result,
11 parse_sql::parse_data_type,
12 store::{
13 AlterTable, CustomFunction, CustomFunctionMut, DataRow, Index, IndexMut, Metadata,
14 Transaction,
15 },
16 translate::translate_data_type,
17 },
18 std::{
19 collections::HashMap,
20 fs::{self, File},
21 io::Read,
22 path::{Path, PathBuf},
23 },
24};
25
26type RowIter = Box<dyn Iterator<Item = Result<(Key, DataRow)>>>;
27
28pub struct CsvStorage {
29 pub path: PathBuf,
30}
31
32impl CsvStorage {
33 pub fn new<T: AsRef<Path>>(path: T) -> Result<Self> {
34 let path = path.as_ref();
35 fs::create_dir_all(path).map_storage_err()?;
36 let path = PathBuf::from(path);
37
38 Ok(Self { path })
39 }
40
41 fn fetch_schema(&self, table_name: &str) -> Result<Option<(Schema, bool)>> {
42 let schema_path = self.schema_path(table_name);
43 if !schema_path.exists() {
44 let data_path = self.data_path(table_name);
45 let types_path = self.types_path(table_name);
46
47 let column_defs = match (types_path.exists(), data_path.exists()) {
48 (false, false) => return Ok(None),
49 (false, true) => Some(
50 csv::Reader::from_path(data_path)
51 .map_storage_err()?
52 .headers()
53 .map_storage_err()?
54 .into_iter()
55 .map(|header| ColumnDef {
56 name: header.to_string(),
57 data_type: DataType::Text,
58 unique: None,
59 default: None,
60 nullable: true,
61 comment: None,
62 })
63 .collect::<Vec<_>>(),
64 ),
65 (true, _) => None,
66 };
67
68 let schema = Schema {
69 table_name: table_name.to_owned(),
70 column_defs,
71 indexes: Vec::new(),
72 engine: None,
73 foreign_keys: Vec::new(),
74 comment: None,
75 };
76
77 return Ok(Some((schema, true)));
78 }
79
80 let mut file = File::open(&schema_path).map_storage_err()?;
81 let mut ddl = String::new();
82 file.read_to_string(&mut ddl).map_storage_err()?;
83
84 let schema = Schema::from_ddl(&ddl)?;
85 if schema.table_name != table_name {
86 return Err(CsvStorageError::TableNameDoesNotMatchWithFile.into());
87 }
88
89 Ok(Some((schema, false)))
90 }
91
92 fn path_by(&self, table_name: &str, extension: &str) -> PathBuf {
93 let path = self.path.as_path();
94 let mut path = path.join(table_name);
95 path.set_extension(extension);
96
97 path
98 }
99
100 fn schema_path(&self, table_name: &str) -> PathBuf {
101 self.path_by(table_name, "sql")
102 }
103
104 fn data_path(&self, table_name: &str) -> PathBuf {
105 self.path_by(table_name, "csv")
106 }
107
108 fn tmp_data_path(&self, table_name: &str) -> PathBuf {
109 self.path_by(table_name, "tmp.csv")
110 }
111
112 fn types_path(&self, table_name: &str) -> PathBuf {
113 self.path_by(table_name, "types.csv")
114 }
115
116 fn tmp_types_path(&self, table_name: &str) -> PathBuf {
117 self.path_by(table_name, "types.tmp.csv")
118 }
119
120 fn scan_data(&self, table_name: &str) -> Result<(Option<Vec<String>>, RowIter)> {
121 let data_path = self.data_path(table_name);
122 let (schema, generated) = match (self.fetch_schema(table_name)?, data_path.exists()) {
123 (None, _) | (_, false) => return Ok((None, Box::new(std::iter::empty()))),
124 (Some(v), true) => v,
125 };
126
127 let mut data_rdr = csv::Reader::from_path(data_path).map_storage_err()?;
128 let mut fetch_data_header_columns = || -> Result<Vec<String>> {
129 Ok(data_rdr
130 .headers()
131 .map_storage_err()?
132 .into_iter()
133 .map(|header| header.to_string())
134 .collect::<Vec<_>>())
135 };
136
137 if let Schema {
138 column_defs: Some(column_defs),
139 ..
140 } = schema
141 {
142 let columns = column_defs
143 .iter()
144 .map(|column_def| column_def.name.to_owned())
145 .collect::<Vec<_>>();
146
147 let rows = data_rdr
148 .into_records()
149 .enumerate()
150 .map(move |(index, record)| {
151 let mut key: Option<Key> = None;
152
153 let values = record
154 .map_storage_err()?
155 .into_iter()
156 .zip(column_defs.iter())
157 .map(|(value, column_def)| {
158 let value = match value {
159 "NULL" => Value::Null,
160 _ => Value::Str(value.to_owned()),
161 };
162
163 let value = match &column_def.data_type {
164 DataType::Text => value,
165 data_type => value.cast(data_type)?,
166 };
167
168 if column_def.unique == Some(ColumnUniqueOption { is_primary: true }) {
169 key = Key::try_from(&value).map(Some)?;
170 }
171
172 Ok(value)
173 })
174 .collect::<Result<Vec<Value>>>()?;
175
176 let key = key.unwrap_or(Key::U64(index as u64));
177 let row = DataRow::Vec(values);
178
179 Ok((key, row))
180 });
181
182 Ok((Some(columns), Box::new(rows)))
183 } else if self.types_path(table_name).exists() {
184 let types_path = self.types_path(table_name);
185 let types_rdr = csv::Reader::from_path(types_path)
186 .map_storage_err()?
187 .into_records();
188
189 let columns = fetch_data_header_columns()?;
190 let rows = data_rdr.into_records().zip(types_rdr).enumerate().map(
191 move |(index, (record, types))| {
192 let key = Key::U64(index as u64);
193 let record = record.map_storage_err()?;
194 let types = types.map_storage_err()?;
195
196 record
197 .into_iter()
198 .zip(columns.iter())
199 .zip(&types)
200 .filter_map(|((value, column), data_type)| {
201 if data_type.is_empty() {
202 return None;
203 }
204
205 let value = if data_type == "NULL" {
206 Ok(Value::Null)
207 } else {
208 parse_data_type(data_type).and_then(|data_type| {
209 let data_type = translate_data_type(&data_type)?;
210 let value = Value::Str(value.to_owned());
211
212 match data_type {
213 DataType::Text => Ok(value),
214 data_type => value.cast(&data_type),
215 }
216 })
217 };
218
219 Some(value.map(|value| (column.clone(), value)))
220 })
221 .collect::<Result<HashMap<String, Value>>>()
222 .map(DataRow::Map)
223 .map(|row| (key, row))
224 },
225 );
226
227 Ok((None, Box::new(rows)))
228 } else {
229 let columns = fetch_data_header_columns()?;
230 let rows = {
231 let columns = columns.clone();
232
233 data_rdr
234 .into_records()
235 .enumerate()
236 .map(move |(index, record)| {
237 let key = Key::U64(index as u64);
238 let row = record
239 .map_storage_err()?
240 .into_iter()
241 .zip(columns.iter())
242 .map(|(value, column)| (column.clone(), Value::Str(value.to_owned())))
243 .collect::<HashMap<String, Value>>();
244
245 Ok((key, DataRow::Map(row)))
246 })
247 };
248
249 Ok((generated.then_some(columns), Box::new(rows)))
250 }
251 }
252}
253
254impl AlterTable for CsvStorage {}
255impl CustomFunction for CsvStorage {}
256impl CustomFunctionMut for CsvStorage {}
257impl Index for CsvStorage {}
258impl IndexMut for CsvStorage {}
259impl Transaction for CsvStorage {}
260impl Metadata for CsvStorage {}