gluesql_csv_storage/
store_mut.rs

1use {
2    crate::{
3        error::{CsvStorageError, ResultExt},
4        CsvStorage,
5    },
6    async_trait::async_trait,
7    csv::Writer,
8    gluesql_core::{
9        data::{Key, Schema},
10        error::Result,
11        store::{DataRow, StoreMut},
12    },
13    std::{
14        cmp::Ordering,
15        collections::BTreeSet,
16        fs::{remove_file, rename, File, OpenOptions},
17        io::Write,
18        iter::Peekable,
19        vec::IntoIter,
20    },
21};
22
23#[async_trait(?Send)]
24impl StoreMut for CsvStorage {
25    async fn insert_schema(&mut self, schema: &Schema) -> Result<()> {
26        let schema_path = self.schema_path(schema.table_name.as_str());
27        let ddl = schema.to_ddl();
28        let mut file = File::create(schema_path).map_storage_err()?;
29        file.write_all(ddl.as_bytes()).map_storage_err()?;
30
31        let column_defs = match &schema.column_defs {
32            Some(column_defs) => column_defs,
33            None => {
34                return Ok(());
35            }
36        };
37
38        let columns = column_defs
39            .iter()
40            .map(|column_def| column_def.name.as_str())
41            .collect::<Vec<&str>>();
42        let data_path = self.data_path(schema.table_name.as_str());
43
44        File::create(data_path)
45            .map_storage_err()
46            .map(Writer::from_writer)?
47            .write_record(&columns)
48            .map_storage_err()
49    }
50
51    async fn delete_schema(&mut self, table_name: &str) -> Result<()> {
52        let data_path = self.data_path(table_name);
53        if data_path.exists() {
54            remove_file(data_path).map_storage_err()?;
55        }
56
57        let types_path = self.types_path(table_name);
58        if types_path.exists() {
59            remove_file(types_path).map_storage_err()?;
60        }
61
62        let schema_path = self.schema_path(table_name);
63        if schema_path.exists() {
64            remove_file(schema_path).map_storage_err()?;
65        }
66
67        Ok(())
68    }
69
70    async fn append_data(&mut self, table_name: &str, rows: Vec<DataRow>) -> Result<()> {
71        let (columns, prev_rows) = self.scan_data(table_name)?;
72
73        if columns.is_some() {
74            let data_path = self.data_path(table_name);
75            let mut wtr = OpenOptions::new()
76                .append(true)
77                .open(data_path)
78                .map_storage_err()
79                .map(Writer::from_writer)?;
80
81            for row in rows {
82                let row = convert(row)?;
83
84                wtr.write_record(&row).map_storage_err()?;
85            }
86
87            Ok(())
88        } else {
89            let rows = prev_rows
90                .map(|item| item.map(|(_, row)| row))
91                .chain(rows.into_iter().map(Ok));
92
93            self.write(table_name, columns, rows)
94        }
95    }
96
97    async fn insert_data(&mut self, table_name: &str, mut rows: Vec<(Key, DataRow)>) -> Result<()> {
98        let (columns, prev_rows) = self.scan_data(table_name)?;
99
100        rows.sort_by(|(key_a, _), (key_b, _)| key_a.cmp(key_b));
101
102        let merged = SortMerge::new(prev_rows, rows.into_iter());
103
104        self.write(table_name, columns, merged)
105    }
106
107    async fn delete_data(&mut self, table_name: &str, keys: Vec<Key>) -> Result<()> {
108        let (columns, prev_rows) = self.scan_data(table_name)?;
109        let rows = prev_rows.filter_map(|item| {
110            let (key, data_row) = match item {
111                Ok(item) => item,
112                Err(e) => return Some(Err(e)),
113            };
114
115            keys.iter()
116                .all(|target_key| target_key != &key)
117                .then_some(Ok(data_row))
118        });
119
120        self.write(table_name, columns, rows)
121    }
122}
123
124impl CsvStorage {
125    fn write<T: Iterator<Item = Result<DataRow>>>(
126        &self,
127        table_name: &str,
128        columns: Option<Vec<String>>,
129        rows: T,
130    ) -> Result<()> {
131        let tmp_data_path = self.tmp_data_path(table_name);
132        let mut data_wtr = File::create(&tmp_data_path)
133            .map_storage_err()
134            .map(Writer::from_writer)?;
135
136        if let Some(columns) = columns {
137            data_wtr.write_record(&columns).map_storage_err()?;
138
139            for row in rows {
140                let row = convert(row?)?;
141
142                data_wtr.write_record(&row).map_storage_err()?;
143            }
144        } else {
145            let tmp_types_path = self.tmp_types_path(table_name);
146            let mut types_wtr = File::create(&tmp_types_path)
147                .map(Writer::from_writer)
148                .map_storage_err()?;
149
150            let mut columns = BTreeSet::new();
151            let rows = rows
152                .map(|row| match row? {
153                    DataRow::Vec(_) => {
154                        Err(CsvStorageError::UnreachableVecTypeDataRowTypeFound.into())
155                    }
156                    DataRow::Map(values) => Ok(values),
157                })
158                .collect::<Result<Vec<_>>>()?;
159
160            for row in &rows {
161                columns.extend(row.keys());
162            }
163
164            data_wtr.write_record(&columns).map_storage_err()?;
165            types_wtr.write_record(&columns).map_storage_err()?;
166
167            for row in &rows {
168                let (row, data_types): (Vec<_>, Vec<_>) = columns
169                    .iter()
170                    .map(|key| {
171                        row.get(key.as_str())
172                            .map(|value| {
173                                let data_type = value
174                                    .get_type()
175                                    .map(|t| t.to_string())
176                                    .unwrap_or("NULL".to_owned());
177
178                                (String::from(value), data_type)
179                            })
180                            .unwrap_or(("NULL".to_owned(), "".to_owned()))
181                    })
182                    .unzip();
183
184                data_wtr.write_record(&row).map_storage_err()?;
185                types_wtr.write_record(&data_types).map_storage_err()?;
186            }
187
188            rename(tmp_types_path, self.types_path(table_name)).map_storage_err()?
189        }
190
191        rename(tmp_data_path, self.data_path(table_name)).map_storage_err()
192    }
193}
194
195fn convert(data_row: DataRow) -> Result<Vec<String>> {
196    match data_row {
197        DataRow::Vec(values) => Ok(values.into_iter().map(String::from).collect()),
198        DataRow::Map(_) => Err(CsvStorageError::UnreachableMapTypeDataRowFound.into()),
199    }
200}
201
202struct SortMerge<T: Iterator<Item = Result<(Key, DataRow)>>> {
203    left_rows: Peekable<T>,
204    right_rows: Peekable<IntoIter<(Key, DataRow)>>,
205}
206
207impl<T> SortMerge<T>
208where
209    T: Iterator<Item = Result<(Key, DataRow)>>,
210{
211    fn new(left_rows: T, right_rows: IntoIter<(Key, DataRow)>) -> Self {
212        let left_rows = left_rows.peekable();
213        let right_rows = right_rows.peekable();
214
215        Self {
216            left_rows,
217            right_rows,
218        }
219    }
220}
221impl<T> Iterator for SortMerge<T>
222where
223    T: Iterator<Item = Result<(Key, DataRow)>>,
224{
225    type Item = Result<DataRow>;
226
227    fn next(&mut self) -> Option<Self::Item> {
228        let left = self.left_rows.peek();
229        let right = self.right_rows.peek();
230
231        match (left, right) {
232            (Some(Ok((left_key, _))), Some((right_key, _))) => match left_key.cmp(right_key) {
233                Ordering::Less => self.left_rows.next(),
234                Ordering::Greater => self.right_rows.next().map(Ok),
235                Ordering::Equal => {
236                    self.left_rows.next();
237                    self.right_rows.next().map(Ok)
238                }
239            }
240            .map(|item| Ok(item?.1)),
241            (Some(_), _) => self.left_rows.next().map(|item| Ok(item?.1)),
242            (None, Some(_)) => self.right_rows.next().map(|item| Ok(item.1)),
243            (None, None) => None,
244        }
245    }
246}