gluesql_csv_storage/
lib.rs

1pub mod error;
2mod store;
3mod store_mut;
4
5use {
6    error::{CsvStorageError, ResultExt},
7    gluesql_core::{
8        ast::{ColumnDef, ColumnUniqueOption, DataType},
9        data::{Key, Schema, Value},
10        error::Result,
11        parse_sql::parse_data_type,
12        store::{
13            AlterTable, CustomFunction, CustomFunctionMut, DataRow, Index, IndexMut, Metadata,
14            Transaction,
15        },
16        translate::translate_data_type,
17    },
18    std::{
19        collections::HashMap,
20        fs::{self, File},
21        io::Read,
22        path::{Path, PathBuf},
23    },
24};
25
26type RowIter = Box<dyn Iterator<Item = Result<(Key, DataRow)>>>;
27
28pub struct CsvStorage {
29    pub path: PathBuf,
30}
31
32impl CsvStorage {
33    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self> {
34        let path = path.as_ref();
35        fs::create_dir_all(path).map_storage_err()?;
36        let path = PathBuf::from(path);
37
38        Ok(Self { path })
39    }
40
41    fn fetch_schema(&self, table_name: &str) -> Result<Option<(Schema, bool)>> {
42        let schema_path = self.schema_path(table_name);
43        if !schema_path.exists() {
44            let data_path = self.data_path(table_name);
45            let types_path = self.types_path(table_name);
46
47            let column_defs = match (types_path.exists(), data_path.exists()) {
48                (false, false) => return Ok(None),
49                (false, true) => Some(
50                    csv::Reader::from_path(data_path)
51                        .map_storage_err()?
52                        .headers()
53                        .map_storage_err()?
54                        .into_iter()
55                        .map(|header| ColumnDef {
56                            name: header.to_string(),
57                            data_type: DataType::Text,
58                            unique: None,
59                            default: None,
60                            nullable: true,
61                            comment: None,
62                        })
63                        .collect::<Vec<_>>(),
64                ),
65                (true, _) => None,
66            };
67
68            let schema = Schema {
69                table_name: table_name.to_owned(),
70                column_defs,
71                indexes: Vec::new(),
72                engine: None,
73                foreign_keys: Vec::new(),
74                comment: None,
75            };
76
77            return Ok(Some((schema, true)));
78        }
79
80        let mut file = File::open(&schema_path).map_storage_err()?;
81        let mut ddl = String::new();
82        file.read_to_string(&mut ddl).map_storage_err()?;
83
84        let schema = Schema::from_ddl(&ddl)?;
85        if schema.table_name != table_name {
86            return Err(CsvStorageError::TableNameDoesNotMatchWithFile.into());
87        }
88
89        Ok(Some((schema, false)))
90    }
91
92    fn path_by(&self, table_name: &str, extension: &str) -> PathBuf {
93        let path = self.path.as_path();
94        let mut path = path.join(table_name);
95        path.set_extension(extension);
96
97        path
98    }
99
100    fn schema_path(&self, table_name: &str) -> PathBuf {
101        self.path_by(table_name, "sql")
102    }
103
104    fn data_path(&self, table_name: &str) -> PathBuf {
105        self.path_by(table_name, "csv")
106    }
107
108    fn tmp_data_path(&self, table_name: &str) -> PathBuf {
109        self.path_by(table_name, "tmp.csv")
110    }
111
112    fn types_path(&self, table_name: &str) -> PathBuf {
113        self.path_by(table_name, "types.csv")
114    }
115
116    fn tmp_types_path(&self, table_name: &str) -> PathBuf {
117        self.path_by(table_name, "types.tmp.csv")
118    }
119
120    fn scan_data(&self, table_name: &str) -> Result<(Option<Vec<String>>, RowIter)> {
121        let data_path = self.data_path(table_name);
122        let (schema, generated) = match (self.fetch_schema(table_name)?, data_path.exists()) {
123            (None, _) | (_, false) => return Ok((None, Box::new(std::iter::empty()))),
124            (Some(v), true) => v,
125        };
126
127        let mut data_rdr = csv::Reader::from_path(data_path).map_storage_err()?;
128        let mut fetch_data_header_columns = || -> Result<Vec<String>> {
129            Ok(data_rdr
130                .headers()
131                .map_storage_err()?
132                .into_iter()
133                .map(|header| header.to_string())
134                .collect::<Vec<_>>())
135        };
136
137        if let Schema {
138            column_defs: Some(column_defs),
139            ..
140        } = schema
141        {
142            let columns = column_defs
143                .iter()
144                .map(|column_def| column_def.name.to_owned())
145                .collect::<Vec<_>>();
146
147            let rows = data_rdr
148                .into_records()
149                .enumerate()
150                .map(move |(index, record)| {
151                    let mut key: Option<Key> = None;
152
153                    let values = record
154                        .map_storage_err()?
155                        .into_iter()
156                        .zip(column_defs.iter())
157                        .map(|(value, column_def)| {
158                            let value = match value {
159                                "NULL" => Value::Null,
160                                _ => Value::Str(value.to_owned()),
161                            };
162
163                            let value = match &column_def.data_type {
164                                DataType::Text => value,
165                                data_type => value.cast(data_type)?,
166                            };
167
168                            if column_def.unique == Some(ColumnUniqueOption { is_primary: true }) {
169                                key = Key::try_from(&value).map(Some)?;
170                            }
171
172                            Ok(value)
173                        })
174                        .collect::<Result<Vec<Value>>>()?;
175
176                    let key = key.unwrap_or(Key::U64(index as u64));
177                    let row = DataRow::Vec(values);
178
179                    Ok((key, row))
180                });
181
182            Ok((Some(columns), Box::new(rows)))
183        } else if self.types_path(table_name).exists() {
184            let types_path = self.types_path(table_name);
185            let types_rdr = csv::Reader::from_path(types_path)
186                .map_storage_err()?
187                .into_records();
188
189            let columns = fetch_data_header_columns()?;
190            let rows = data_rdr.into_records().zip(types_rdr).enumerate().map(
191                move |(index, (record, types))| {
192                    let key = Key::U64(index as u64);
193                    let record = record.map_storage_err()?;
194                    let types = types.map_storage_err()?;
195
196                    record
197                        .into_iter()
198                        .zip(columns.iter())
199                        .zip(&types)
200                        .filter_map(|((value, column), data_type)| {
201                            if data_type.is_empty() {
202                                return None;
203                            }
204
205                            let value = if data_type == "NULL" {
206                                Ok(Value::Null)
207                            } else {
208                                parse_data_type(data_type).and_then(|data_type| {
209                                    let data_type = translate_data_type(&data_type)?;
210                                    let value = Value::Str(value.to_owned());
211
212                                    match data_type {
213                                        DataType::Text => Ok(value),
214                                        data_type => value.cast(&data_type),
215                                    }
216                                })
217                            };
218
219                            Some(value.map(|value| (column.clone(), value)))
220                        })
221                        .collect::<Result<HashMap<String, Value>>>()
222                        .map(DataRow::Map)
223                        .map(|row| (key, row))
224                },
225            );
226
227            Ok((None, Box::new(rows)))
228        } else {
229            let columns = fetch_data_header_columns()?;
230            let rows = {
231                let columns = columns.clone();
232
233                data_rdr
234                    .into_records()
235                    .enumerate()
236                    .map(move |(index, record)| {
237                        let key = Key::U64(index as u64);
238                        let row = record
239                            .map_storage_err()?
240                            .into_iter()
241                            .zip(columns.iter())
242                            .map(|(value, column)| (column.clone(), Value::Str(value.to_owned())))
243                            .collect::<HashMap<String, Value>>();
244
245                        Ok((key, DataRow::Map(row)))
246                    })
247            };
248
249            Ok((generated.then_some(columns), Box::new(rows)))
250        }
251    }
252}
253
254impl AlterTable for CsvStorage {}
255impl CustomFunction for CsvStorage {}
256impl CustomFunctionMut for CsvStorage {}
257impl Index for CsvStorage {}
258impl IndexMut for CsvStorage {}
259impl Transaction for CsvStorage {}
260impl Metadata for CsvStorage {}