gluesql_csv_storage/
store.rs1use {
2 crate::{
3 CsvStorage,
4 error::{CsvStorageError, OptionExt, ResultExt},
5 },
6 async_trait::async_trait,
7 futures::stream::iter,
8 gluesql_core::{
9 data::{Key, Schema},
10 error::Result,
11 store::{DataRow, RowIter, Store},
12 },
13 std::{ffi::OsStr, fs},
14};
15
16#[async_trait]
17impl Store for CsvStorage {
18 async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
19 self.fetch_schema(table_name)
20 .map(|schema| schema.map(|(schema, _)| schema))
21 }
22
23 async fn fetch_all_schemas(&self) -> Result<Vec<Schema>> {
24 let paths = fs::read_dir(&self.path).map_storage_err()?;
25 let mut schemas = paths
26 .map(|result| {
27 let path = result.map_storage_err()?.path();
28 let extension = path.extension().and_then(OsStr::to_str);
29 if extension != Some("csv") || path.to_string_lossy().ends_with(".types.csv") {
30 return Ok(None);
31 }
32
33 let table_name = path
34 .file_stem()
35 .and_then(OsStr::to_str)
36 .map_storage_err(CsvStorageError::FileNotFound)?;
37
38 self.fetch_schema(table_name)?
39 .map(|(schema, _)| schema)
40 .map_storage_err(CsvStorageError::TableDoesNotExist)
41 .map(Some)
42 })
43 .filter_map(Result::transpose)
44 .collect::<Result<Vec<Schema>>>()?;
45
46 schemas.sort_by(|a, b| a.table_name.cmp(&b.table_name));
47
48 Ok(schemas)
49 }
50
51 async fn fetch_data(&self, table_name: &str, target: &Key) -> Result<Option<DataRow>> {
52 let (_, rows) = self.scan_data(table_name)?;
53
54 for item in rows {
55 let (key, row) = item?;
56
57 if &key == target {
58 return Ok(Some(row));
59 }
60 }
61
62 Ok(None)
63 }
64
65 async fn scan_data<'a>(&'a self, table_name: &str) -> Result<RowIter<'a>> {
66 let rows = self.scan_data(table_name).map(|(_, rows)| rows)?;
67
68 Ok(Box::pin(iter(rows)))
69 }
70}