Skip to main content

vantage_csv/
table_source.rs

1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24    type Column<Type>
25        = Column<Type>
26    where
27        Type: ColumnType;
28    type AnyType = AnyCsvType;
29    type Value = AnyCsvType;
30    type Id = String;
31    type Condition = vantage_expressions::Expression<Self::Value>;
32
33    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
34        Column::new(name)
35    }
36
37    fn to_any_column<Type: ColumnType>(
38        &self,
39        column: Self::Column<Type>,
40    ) -> Self::Column<Self::AnyType> {
41        Column::from_column(column)
42    }
43
44    fn convert_any_column<Type: ColumnType>(
45        &self,
46        any_column: Self::Column<Self::AnyType>,
47    ) -> Option<Self::Column<Type>> {
48        Some(Column::from_column(any_column))
49    }
50
51    fn expr(
52        &self,
53        template: impl Into<String>,
54        parameters: Vec<ExpressiveEnum<Self::Value>>,
55    ) -> Expression<Self::Value> {
56        Expression::new(template, parameters)
57    }
58
59    fn search_table_condition<E>(
60        &self,
61        _table: &Table<Self, E>,
62        search_value: &str,
63    ) -> Expression<Self::Value>
64    where
65        E: Entity<Self::Value>,
66    {
67        Expression::new(format!("SEARCH '{}'", search_value), vec![])
68    }
69
70    async fn list_table_values<E>(
71        &self,
72        table: &Table<Self, E>,
73    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
74    where
75        E: Entity<Self::Value>,
76        Self: Sized,
77    {
78        let mut records = self.read_csv(table.table_name(), table.columns())?;
79
80        for condition in table.conditions() {
81            records = apply_condition(records, condition).await?;
82        }
83
84        Ok(records)
85    }
86
87    async fn get_table_value<E>(
88        &self,
89        table: &Table<Self, E>,
90        id: &Self::Id,
91    ) -> Result<Option<Record<Self::Value>>>
92    where
93        E: Entity<Self::Value>,
94        Self: Sized,
95    {
96        let records = self.read_csv(table.table_name(), table.columns())?;
97        Ok(records.get(id).cloned())
98    }
99
100    async fn get_table_some_value<E>(
101        &self,
102        table: &Table<Self, E>,
103    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
104    where
105        E: Entity<Self::Value>,
106        Self: Sized,
107    {
108        let records = self.read_csv(table.table_name(), table.columns())?;
109        Ok(records.into_iter().next())
110    }
111
112    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
113    where
114        E: Entity<Self::Value>,
115        Self: Sized,
116    {
117        let records = self.read_csv(table.table_name(), table.columns())?;
118        Ok(records.len() as i64)
119    }
120
121    async fn get_table_sum<E>(
122        &self,
123        _table: &Table<Self, E>,
124        _column: &Self::Column<Self::AnyType>,
125    ) -> Result<Self::Value>
126    where
127        E: Entity<Self::Value>,
128        Self: Sized,
129    {
130        Err(error!("Sum not implemented for CSV backend"))
131    }
132
133    async fn get_table_max<E>(
134        &self,
135        _table: &Table<Self, E>,
136        _column: &Self::Column<Self::AnyType>,
137    ) -> Result<Self::Value>
138    where
139        E: Entity<Self::Value>,
140        Self: Sized,
141    {
142        Err(error!("Max not implemented for CSV backend"))
143    }
144
145    async fn get_table_min<E>(
146        &self,
147        _table: &Table<Self, E>,
148        _column: &Self::Column<Self::AnyType>,
149    ) -> Result<Self::Value>
150    where
151        E: Entity<Self::Value>,
152        Self: Sized,
153    {
154        Err(error!("Min not implemented for CSV backend"))
155    }
156
157    async fn insert_table_value<E>(
158        &self,
159        _table: &Table<Self, E>,
160        _id: &Self::Id,
161        _record: &Record<Self::Value>,
162    ) -> Result<Record<Self::Value>>
163    where
164        E: Entity<Self::Value>,
165        Self: Sized,
166    {
167        Err(error!("CSV is a read-only data source"))
168    }
169
170    async fn replace_table_value<E>(
171        &self,
172        _table: &Table<Self, E>,
173        _id: &Self::Id,
174        _record: &Record<Self::Value>,
175    ) -> Result<Record<Self::Value>>
176    where
177        E: Entity<Self::Value>,
178        Self: Sized,
179    {
180        Err(error!("CSV is a read-only data source"))
181    }
182
183    async fn patch_table_value<E>(
184        &self,
185        _table: &Table<Self, E>,
186        _id: &Self::Id,
187        _partial: &Record<Self::Value>,
188    ) -> Result<Record<Self::Value>>
189    where
190        E: Entity<Self::Value>,
191        Self: Sized,
192    {
193        Err(error!("CSV is a read-only data source"))
194    }
195
196    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
197    where
198        E: Entity<Self::Value>,
199        Self: Sized,
200    {
201        Err(error!("CSV is a read-only data source"))
202    }
203
204    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
205    where
206        E: Entity<Self::Value>,
207        Self: Sized,
208    {
209        Err(error!("CSV is a read-only data source"))
210    }
211
212    async fn insert_table_return_id_value<E>(
213        &self,
214        _table: &Table<Self, E>,
215        _record: &Record<Self::Value>,
216    ) -> Result<Self::Id>
217    where
218        E: Entity<Self::Value>,
219        Self: Sized,
220    {
221        Err(error!("CSV is a read-only data source"))
222    }
223
224    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
225        &self,
226        target_field: &str,
227        source_table: &Table<Self, SourceE>,
228        source_column: &str,
229    ) -> Self::Condition
230    where
231        Self: Sized,
232    {
233        let src_col = self.create_column::<Self::AnyType>(source_column);
234        let fk_values = self.column_table_values_expr(source_table, &src_col);
235        let tgt_col = self.create_column::<Self::AnyType>(target_field);
236        tgt_col.in_(fk_values.expr())
237    }
238
239    fn column_table_values_expr<'a, E, Type: ColumnType>(
240        &'a self,
241        table: &Table<Self, E>,
242        column: &Self::Column<Type>,
243    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
244    where
245        E: Entity<Self::Value> + 'static,
246        Self: Sized,
247    {
248        use vantage_expressions::{
249            expr_any,
250            traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
251        };
252
253        let table_clone = table.clone();
254        let col = column.name().to_string();
255        let csv = self.clone();
256
257        let inner = expr_any!("{}", {
258            DeferredFn::new(move || {
259                let csv = csv.clone();
260                let table = table_clone.clone();
261                let col = col.clone();
262                Box::pin(async move {
263                    let records = csv.list_table_values(&table).await?;
264                    let values: Vec<AnyCsvType> = records
265                        .values()
266                        .filter_map(|r| r.get(&col).cloned())
267                        .collect();
268                    Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
269                })
270            })
271        });
272
273        let expr = expr_any!("{}", { self.defer(inner) });
274        AssociatedExpression::new(expr, self)
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use crate::type_system::CsvTypeVariants;
282    use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
283    use vantage_types::EmptyEntity;
284
285    fn test_csv() -> Csv {
286        Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
287    }
288
289    #[tokio::test]
290    async fn test_list_bakery() {
291        let csv = test_csv();
292        let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
293            .with_column_of::<String>("name")
294            .with_column_of::<i64>("profit_margin");
295
296        let values = table.list_values().await.unwrap();
297        assert_eq!(values.len(), 1);
298        assert!(values.contains_key("hill_valley"));
299
300        let bakery = &values["hill_valley"];
301        let name = bakery["name"].try_get::<String>().unwrap();
302        assert_eq!(name, "Hill Valley Bakery");
303
304        let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
305        assert_eq!(profit, 15);
306    }
307
308    #[tokio::test]
309    async fn test_list_clients() {
310        let csv = test_csv();
311        let table = Table::<Csv, EmptyEntity>::new("client", csv)
312            .with_column_of::<String>("name")
313            .with_column_of::<String>("email")
314            .with_column_of::<bool>("is_paying_client")
315            .with_column_of::<serde_json::Value>("metadata");
316
317        let values = table.list_values().await.unwrap();
318        assert_eq!(values.len(), 3);
319
320        let marty = &values["marty"];
321        assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
322        assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
323
324        let biff = &values["biff"];
325        assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
326        assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
327    }
328
329    #[tokio::test]
330    async fn test_list_products_typed() {
331        let csv = test_csv();
332        let table = Table::<Csv, EmptyEntity>::new("product", csv)
333            .with_column_of::<String>("name")
334            .with_column_of::<i64>("calories")
335            .with_column_of::<i64>("price")
336            .with_column_of::<bool>("is_deleted")
337            .with_column_of::<serde_json::Value>("inventory");
338
339        let values = table.list_values().await.unwrap();
340        assert_eq!(values.len(), 5);
341
342        let cupcake = &values["flux_cupcake"];
343        assert_eq!(
344            cupcake["name"].try_get::<String>().unwrap(),
345            "Flux Capacitor Cupcake"
346        );
347        assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
348        assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
349        assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
350
351        let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
352        assert_eq!(inv["stock"], serde_json::json!(50));
353    }
354
355    #[tokio::test]
356    async fn test_untyped_columns_stay_string() {
357        let csv = test_csv();
358        let table = Table::<Csv, EmptyEntity>::new("product", csv);
359
360        let values = table.list_values().await.unwrap();
361        let cupcake = &values["flux_cupcake"];
362        assert_eq!(
363            cupcake["calories"].type_variant(),
364            Some(CsvTypeVariants::String)
365        );
366        assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
367    }
368
369    #[tokio::test]
370    async fn test_get_value_by_id() {
371        let csv = test_csv();
372        let table = Table::<Csv, EmptyEntity>::new("client", csv)
373            .with_column_of::<String>("name")
374            .with_column_of::<String>("email");
375
376        let record = table
377            .get_value(&"doc".to_string())
378            .await
379            .unwrap()
380            .expect("doc exists");
381        assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
382        assert_eq!(
383            record["email"].try_get::<String>().unwrap(),
384            "doc@brown.com"
385        );
386    }
387
388    #[tokio::test]
389    async fn test_get_value_not_found() {
390        let csv = test_csv();
391        let table = Table::<Csv, EmptyEntity>::new("client", csv);
392
393        let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
394        assert!(result.is_none());
395    }
396
397    #[tokio::test]
398    async fn test_get_some_value() {
399        let csv = test_csv();
400        let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
401
402        let result = table.get_some_value().await.unwrap();
403        assert!(result.is_some());
404        let (id, record) = result.unwrap();
405        assert_eq!(id, "hill_valley");
406        assert_eq!(
407            record["name"].try_get::<String>().unwrap(),
408            "Hill Valley Bakery"
409        );
410    }
411
412    #[tokio::test]
413    async fn test_get_count() {
414        let csv = test_csv();
415        let table = Table::<Csv, EmptyEntity>::new("product", csv);
416
417        let count = table.data_source().get_table_count(&table).await.unwrap();
418        assert_eq!(count, 5);
419    }
420
421    #[tokio::test]
422    async fn test_write_operations_fail() {
423        let csv = test_csv();
424        let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
425
426        let record = Record::new();
427        assert!(
428            WritableValueSet::insert_value(&table, &"test".to_string(), &record)
429                .await
430                .is_err()
431        );
432        assert!(
433            WritableValueSet::delete(&table, &"test".to_string())
434                .await
435                .is_err()
436        );
437        assert!(WritableValueSet::delete_all(&table).await.is_err());
438    }
439
440    #[tokio::test]
441    async fn test_missing_file() {
442        let csv = test_csv();
443        let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
444
445        let result = table.list_values().await;
446        assert!(result.is_err());
447    }
448}