gluesql_csv_storage/
store.rs

1use {
2    crate::{
3        CsvStorage,
4        error::{CsvStorageError, OptionExt, ResultExt},
5    },
6    async_trait::async_trait,
7    futures::stream::iter,
8    gluesql_core::{
9        data::{Key, Schema},
10        error::Result,
11        store::{DataRow, RowIter, Store},
12    },
13    std::{ffi::OsStr, fs},
14};
15
16#[async_trait]
17impl Store for CsvStorage {
18    async fn fetch_schema(&self, table_name: &str) -> Result<Option<Schema>> {
19        self.fetch_schema(table_name)
20            .map(|schema| schema.map(|(schema, _)| schema))
21    }
22
23    async fn fetch_all_schemas(&self) -> Result<Vec<Schema>> {
24        let paths = fs::read_dir(&self.path).map_storage_err()?;
25        let mut schemas = paths
26            .map(|result| {
27                let path = result.map_storage_err()?.path();
28                let extension = path.extension().and_then(OsStr::to_str);
29                if extension != Some("csv") || path.to_string_lossy().ends_with(".types.csv") {
30                    return Ok(None);
31                }
32
33                let table_name = path
34                    .file_stem()
35                    .and_then(OsStr::to_str)
36                    .map_storage_err(CsvStorageError::FileNotFound)?;
37
38                self.fetch_schema(table_name)?
39                    .map(|(schema, _)| schema)
40                    .map_storage_err(CsvStorageError::TableDoesNotExist)
41                    .map(Some)
42            })
43            .filter_map(Result::transpose)
44            .collect::<Result<Vec<Schema>>>()?;
45
46        schemas.sort_by(|a, b| a.table_name.cmp(&b.table_name));
47
48        Ok(schemas)
49    }
50
51    async fn fetch_data(&self, table_name: &str, target: &Key) -> Result<Option<DataRow>> {
52        let (_, rows) = self.scan_data(table_name)?;
53
54        for item in rows {
55            let (key, row) = item?;
56
57            if &key == target {
58                return Ok(Some(row));
59            }
60        }
61
62        Ok(None)
63    }
64
65    async fn scan_data<'a>(&'a self, table_name: &str) -> Result<RowIter<'a>> {
66        let rows = self.scan_data(table_name).map(|(_, rows)| rows)?;
67
68        Ok(Box::pin(iter(rows)))
69    }
70}