1use {
2 crate::{
3 error::{CsvStorageError, ResultExt},
4 CsvStorage,
5 },
6 async_trait::async_trait,
7 csv::Writer,
8 gluesql_core::{
9 data::{Key, Schema},
10 error::Result,
11 store::{DataRow, StoreMut},
12 },
13 std::{
14 cmp::Ordering,
15 collections::BTreeSet,
16 fs::{remove_file, rename, File, OpenOptions},
17 io::Write,
18 iter::Peekable,
19 vec::IntoIter,
20 },
21};
22
23#[async_trait(?Send)]
24impl StoreMut for CsvStorage {
25 async fn insert_schema(&mut self, schema: &Schema) -> Result<()> {
26 let schema_path = self.schema_path(schema.table_name.as_str());
27 let ddl = schema.to_ddl();
28 let mut file = File::create(schema_path).map_storage_err()?;
29 file.write_all(ddl.as_bytes()).map_storage_err()?;
30
31 let column_defs = match &schema.column_defs {
32 Some(column_defs) => column_defs,
33 None => {
34 return Ok(());
35 }
36 };
37
38 let columns = column_defs
39 .iter()
40 .map(|column_def| column_def.name.as_str())
41 .collect::<Vec<&str>>();
42 let data_path = self.data_path(schema.table_name.as_str());
43
44 File::create(data_path)
45 .map_storage_err()
46 .map(Writer::from_writer)?
47 .write_record(&columns)
48 .map_storage_err()
49 }
50
51 async fn delete_schema(&mut self, table_name: &str) -> Result<()> {
52 let data_path = self.data_path(table_name);
53 if data_path.exists() {
54 remove_file(data_path).map_storage_err()?;
55 }
56
57 let types_path = self.types_path(table_name);
58 if types_path.exists() {
59 remove_file(types_path).map_storage_err()?;
60 }
61
62 let schema_path = self.schema_path(table_name);
63 if schema_path.exists() {
64 remove_file(schema_path).map_storage_err()?;
65 }
66
67 Ok(())
68 }
69
70 async fn append_data(&mut self, table_name: &str, rows: Vec<DataRow>) -> Result<()> {
71 let (columns, prev_rows) = self.scan_data(table_name)?;
72
73 if columns.is_some() {
74 let data_path = self.data_path(table_name);
75 let mut wtr = OpenOptions::new()
76 .append(true)
77 .open(data_path)
78 .map_storage_err()
79 .map(Writer::from_writer)?;
80
81 for row in rows {
82 let row = convert(row)?;
83
84 wtr.write_record(&row).map_storage_err()?;
85 }
86
87 Ok(())
88 } else {
89 let rows = prev_rows
90 .map(|item| item.map(|(_, row)| row))
91 .chain(rows.into_iter().map(Ok));
92
93 self.write(table_name, columns, rows)
94 }
95 }
96
97 async fn insert_data(&mut self, table_name: &str, mut rows: Vec<(Key, DataRow)>) -> Result<()> {
98 let (columns, prev_rows) = self.scan_data(table_name)?;
99
100 rows.sort_by(|(key_a, _), (key_b, _)| key_a.cmp(key_b));
101
102 let merged = SortMerge::new(prev_rows, rows.into_iter());
103
104 self.write(table_name, columns, merged)
105 }
106
107 async fn delete_data(&mut self, table_name: &str, keys: Vec<Key>) -> Result<()> {
108 let (columns, prev_rows) = self.scan_data(table_name)?;
109 let rows = prev_rows.filter_map(|item| {
110 let (key, data_row) = match item {
111 Ok(item) => item,
112 Err(e) => return Some(Err(e)),
113 };
114
115 keys.iter()
116 .all(|target_key| target_key != &key)
117 .then_some(Ok(data_row))
118 });
119
120 self.write(table_name, columns, rows)
121 }
122}
123
124impl CsvStorage {
125 fn write<T: Iterator<Item = Result<DataRow>>>(
126 &self,
127 table_name: &str,
128 columns: Option<Vec<String>>,
129 rows: T,
130 ) -> Result<()> {
131 let tmp_data_path = self.tmp_data_path(table_name);
132 let mut data_wtr = File::create(&tmp_data_path)
133 .map_storage_err()
134 .map(Writer::from_writer)?;
135
136 if let Some(columns) = columns {
137 data_wtr.write_record(&columns).map_storage_err()?;
138
139 for row in rows {
140 let row = convert(row?)?;
141
142 data_wtr.write_record(&row).map_storage_err()?;
143 }
144 } else {
145 let tmp_types_path = self.tmp_types_path(table_name);
146 let mut types_wtr = File::create(&tmp_types_path)
147 .map(Writer::from_writer)
148 .map_storage_err()?;
149
150 let mut columns = BTreeSet::new();
151 let rows = rows
152 .map(|row| match row? {
153 DataRow::Vec(_) => {
154 Err(CsvStorageError::UnreachableVecTypeDataRowTypeFound.into())
155 }
156 DataRow::Map(values) => Ok(values),
157 })
158 .collect::<Result<Vec<_>>>()?;
159
160 for row in &rows {
161 columns.extend(row.keys());
162 }
163
164 data_wtr.write_record(&columns).map_storage_err()?;
165 types_wtr.write_record(&columns).map_storage_err()?;
166
167 for row in &rows {
168 let (row, data_types): (Vec<_>, Vec<_>) = columns
169 .iter()
170 .map(|key| {
171 row.get(key.as_str())
172 .map(|value| {
173 let data_type = value
174 .get_type()
175 .map(|t| t.to_string())
176 .unwrap_or("NULL".to_owned());
177
178 (String::from(value), data_type)
179 })
180 .unwrap_or(("NULL".to_owned(), "".to_owned()))
181 })
182 .unzip();
183
184 data_wtr.write_record(&row).map_storage_err()?;
185 types_wtr.write_record(&data_types).map_storage_err()?;
186 }
187
188 rename(tmp_types_path, self.types_path(table_name)).map_storage_err()?
189 }
190
191 rename(tmp_data_path, self.data_path(table_name)).map_storage_err()
192 }
193}
194
195fn convert(data_row: DataRow) -> Result<Vec<String>> {
196 match data_row {
197 DataRow::Vec(values) => Ok(values.into_iter().map(String::from).collect()),
198 DataRow::Map(_) => Err(CsvStorageError::UnreachableMapTypeDataRowFound.into()),
199 }
200}
201
202struct SortMerge<T: Iterator<Item = Result<(Key, DataRow)>>> {
203 left_rows: Peekable<T>,
204 right_rows: Peekable<IntoIter<(Key, DataRow)>>,
205}
206
207impl<T> SortMerge<T>
208where
209 T: Iterator<Item = Result<(Key, DataRow)>>,
210{
211 fn new(left_rows: T, right_rows: IntoIter<(Key, DataRow)>) -> Self {
212 let left_rows = left_rows.peekable();
213 let right_rows = right_rows.peekable();
214
215 Self {
216 left_rows,
217 right_rows,
218 }
219 }
220}
221impl<T> Iterator for SortMerge<T>
222where
223 T: Iterator<Item = Result<(Key, DataRow)>>,
224{
225 type Item = Result<DataRow>;
226
227 fn next(&mut self) -> Option<Self::Item> {
228 let left = self.left_rows.peek();
229 let right = self.right_rows.peek();
230
231 match (left, right) {
232 (Some(Ok((left_key, _))), Some((right_key, _))) => match left_key.cmp(right_key) {
233 Ordering::Less => self.left_rows.next(),
234 Ordering::Greater => self.right_rows.next().map(Ok),
235 Ordering::Equal => {
236 self.left_rows.next();
237 self.right_rows.next().map(Ok)
238 }
239 }
240 .map(|item| Ok(item?.1)),
241 (Some(_), _) => self.left_rows.next().map(|item| Ok(item?.1)),
242 (None, Some(_)) => self.right_rows.next().map(|item| Ok(item.1)),
243 (None, None) => None,
244 }
245 }
246}