Skip to main content

vantage_csv/
table_source.rs

1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24    type Column<Type>
25        = Column<Type>
26    where
27        Type: ColumnType;
28    type AnyType = AnyCsvType;
29    type Value = AnyCsvType;
30    type Id = String;
31    type Condition = vantage_expressions::Expression<Self::Value>;
32
33    fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
34        let column: Column<AnyCsvType> = Column::new(field);
35        Ok(CsvOperation::eq(&column, value))
36    }
37
38    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
39        Column::new(name)
40    }
41
42    fn to_any_column<Type: ColumnType>(
43        &self,
44        column: Self::Column<Type>,
45    ) -> Self::Column<Self::AnyType> {
46        Column::from_column(column)
47    }
48
49    fn convert_any_column<Type: ColumnType>(
50        &self,
51        any_column: Self::Column<Self::AnyType>,
52    ) -> Option<Self::Column<Type>> {
53        Some(Column::from_column(any_column))
54    }
55
56    fn expr(
57        &self,
58        template: impl Into<String>,
59        parameters: Vec<ExpressiveEnum<Self::Value>>,
60    ) -> Expression<Self::Value> {
61        Expression::new(template, parameters)
62    }
63
64    fn search_table_condition<E>(
65        &self,
66        _table: &Table<Self, E>,
67        search_value: &str,
68    ) -> Expression<Self::Value>
69    where
70        E: Entity<Self::Value>,
71    {
72        Expression::new(format!("SEARCH '{}'", search_value), vec![])
73    }
74
75    async fn list_table_values<E>(
76        &self,
77        table: &Table<Self, E>,
78    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
79    where
80        E: Entity<Self::Value>,
81        Self: Sized,
82    {
83        let mut records = self.read_csv(table.table_name(), table.columns())?;
84
85        for condition in table.conditions() {
86            records = apply_condition(records, condition).await?;
87        }
88
89        Ok(records)
90    }
91
92    async fn get_table_value<E>(
93        &self,
94        table: &Table<Self, E>,
95        id: &Self::Id,
96    ) -> Result<Option<Record<Self::Value>>>
97    where
98        E: Entity<Self::Value>,
99        Self: Sized,
100    {
101        let records = self.read_csv(table.table_name(), table.columns())?;
102        Ok(records.get(id).cloned())
103    }
104
105    async fn get_table_some_value<E>(
106        &self,
107        table: &Table<Self, E>,
108    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
109    where
110        E: Entity<Self::Value>,
111        Self: Sized,
112    {
113        let records = self.read_csv(table.table_name(), table.columns())?;
114        Ok(records.into_iter().next())
115    }
116
117    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
118    where
119        E: Entity<Self::Value>,
120        Self: Sized,
121    {
122        let records = self.read_csv(table.table_name(), table.columns())?;
123        Ok(records.len() as i64)
124    }
125
126    async fn get_table_sum<E>(
127        &self,
128        _table: &Table<Self, E>,
129        _column: &Self::Column<Self::AnyType>,
130    ) -> Result<Self::Value>
131    where
132        E: Entity<Self::Value>,
133        Self: Sized,
134    {
135        Err(error!("Sum not implemented for CSV backend"))
136    }
137
138    async fn get_table_max<E>(
139        &self,
140        _table: &Table<Self, E>,
141        _column: &Self::Column<Self::AnyType>,
142    ) -> Result<Self::Value>
143    where
144        E: Entity<Self::Value>,
145        Self: Sized,
146    {
147        Err(error!("Max not implemented for CSV backend"))
148    }
149
150    async fn get_table_min<E>(
151        &self,
152        _table: &Table<Self, E>,
153        _column: &Self::Column<Self::AnyType>,
154    ) -> Result<Self::Value>
155    where
156        E: Entity<Self::Value>,
157        Self: Sized,
158    {
159        Err(error!("Min not implemented for CSV backend"))
160    }
161
162    async fn insert_table_value<E>(
163        &self,
164        _table: &Table<Self, E>,
165        _id: &Self::Id,
166        _record: &Record<Self::Value>,
167    ) -> Result<Record<Self::Value>>
168    where
169        E: Entity<Self::Value>,
170        Self: Sized,
171    {
172        Err(error!("CSV is a read-only data source"))
173    }
174
175    async fn replace_table_value<E>(
176        &self,
177        _table: &Table<Self, E>,
178        _id: &Self::Id,
179        _record: &Record<Self::Value>,
180    ) -> Result<Record<Self::Value>>
181    where
182        E: Entity<Self::Value>,
183        Self: Sized,
184    {
185        Err(error!("CSV is a read-only data source"))
186    }
187
188    async fn patch_table_value<E>(
189        &self,
190        _table: &Table<Self, E>,
191        _id: &Self::Id,
192        _partial: &Record<Self::Value>,
193    ) -> Result<Record<Self::Value>>
194    where
195        E: Entity<Self::Value>,
196        Self: Sized,
197    {
198        Err(error!("CSV is a read-only data source"))
199    }
200
201    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
202    where
203        E: Entity<Self::Value>,
204        Self: Sized,
205    {
206        Err(error!("CSV is a read-only data source"))
207    }
208
209    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
210    where
211        E: Entity<Self::Value>,
212        Self: Sized,
213    {
214        Err(error!("CSV is a read-only data source"))
215    }
216
217    async fn insert_table_return_id_value<E>(
218        &self,
219        _table: &Table<Self, E>,
220        _record: &Record<Self::Value>,
221    ) -> Result<Self::Id>
222    where
223        E: Entity<Self::Value>,
224        Self: Sized,
225    {
226        Err(error!("CSV is a read-only data source"))
227    }
228
229    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
230        &self,
231        target_field: &str,
232        source_table: &Table<Self, SourceE>,
233        source_column: &str,
234    ) -> Self::Condition
235    where
236        Self: Sized,
237    {
238        let src_col = self.create_column::<Self::AnyType>(source_column);
239        let fk_values = self.column_table_values_expr(source_table, &src_col);
240        let tgt_col = self.create_column::<Self::AnyType>(target_field);
241        tgt_col.in_(fk_values.expr())
242    }
243
244    fn column_table_values_expr<'a, E, Type: ColumnType>(
245        &'a self,
246        table: &Table<Self, E>,
247        column: &Self::Column<Type>,
248    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
249    where
250        E: Entity<Self::Value> + 'static,
251        Self: Sized,
252    {
253        use vantage_expressions::{
254            expr_any,
255            traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
256        };
257
258        let table_clone = table.clone();
259        let col = column.name().to_string();
260        let csv = self.clone();
261
262        let inner = expr_any!("{}", {
263            DeferredFn::new(move || {
264                let csv = csv.clone();
265                let table = table_clone.clone();
266                let col = col.clone();
267                Box::pin(async move {
268                    let records = csv.list_table_values(&table).await?;
269                    let values: Vec<AnyCsvType> = records
270                        .values()
271                        .filter_map(|r| r.get(&col).cloned())
272                        .collect();
273                    Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
274                })
275            })
276        });
277
278        let expr = expr_any!("{}", { self.defer(inner) });
279        AssociatedExpression::new(expr, self)
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::type_system::CsvTypeVariants;
287    use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
288    use vantage_types::EmptyEntity;
289
290    fn test_csv() -> Csv {
291        Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
292    }
293
294    #[tokio::test]
295    async fn test_list_bakery() {
296        let csv = test_csv();
297        let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
298            .with_column_of::<String>("name")
299            .with_column_of::<i64>("profit_margin");
300
301        let values = table.list_values().await.unwrap();
302        assert_eq!(values.len(), 1);
303        assert!(values.contains_key("hill_valley"));
304
305        let bakery = &values["hill_valley"];
306        let name = bakery["name"].try_get::<String>().unwrap();
307        assert_eq!(name, "Hill Valley Bakery");
308
309        let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
310        assert_eq!(profit, 15);
311    }
312
313    #[tokio::test]
314    async fn test_list_clients() {
315        let csv = test_csv();
316        let table = Table::<Csv, EmptyEntity>::new("client", csv)
317            .with_column_of::<String>("name")
318            .with_column_of::<String>("email")
319            .with_column_of::<bool>("is_paying_client")
320            .with_column_of::<serde_json::Value>("metadata");
321
322        let values = table.list_values().await.unwrap();
323        assert_eq!(values.len(), 3);
324
325        let marty = &values["marty"];
326        assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
327        assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
328
329        let biff = &values["biff"];
330        assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
331        assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
332    }
333
334    #[tokio::test]
335    async fn test_list_products_typed() {
336        let csv = test_csv();
337        let table = Table::<Csv, EmptyEntity>::new("product", csv)
338            .with_column_of::<String>("name")
339            .with_column_of::<i64>("calories")
340            .with_column_of::<i64>("price")
341            .with_column_of::<bool>("is_deleted")
342            .with_column_of::<serde_json::Value>("inventory");
343
344        let values = table.list_values().await.unwrap();
345        assert_eq!(values.len(), 5);
346
347        let cupcake = &values["flux_cupcake"];
348        assert_eq!(
349            cupcake["name"].try_get::<String>().unwrap(),
350            "Flux Capacitor Cupcake"
351        );
352        assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
353        assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
354        assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
355
356        let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
357        assert_eq!(inv["stock"], serde_json::json!(50));
358    }
359
360    #[tokio::test]
361    async fn test_untyped_columns_stay_string() {
362        let csv = test_csv();
363        let table = Table::<Csv, EmptyEntity>::new("product", csv);
364
365        let values = table.list_values().await.unwrap();
366        let cupcake = &values["flux_cupcake"];
367        assert_eq!(
368            cupcake["calories"].type_variant(),
369            Some(CsvTypeVariants::String)
370        );
371        assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
372    }
373
374    #[tokio::test]
375    async fn test_get_value_by_id() {
376        let csv = test_csv();
377        let table = Table::<Csv, EmptyEntity>::new("client", csv)
378            .with_column_of::<String>("name")
379            .with_column_of::<String>("email");
380
381        let record = table
382            .get_value(&"doc".to_string())
383            .await
384            .unwrap()
385            .expect("doc exists");
386        assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
387        assert_eq!(
388            record["email"].try_get::<String>().unwrap(),
389            "doc@brown.com"
390        );
391    }
392
393    #[tokio::test]
394    async fn test_get_value_not_found() {
395        let csv = test_csv();
396        let table = Table::<Csv, EmptyEntity>::new("client", csv);
397
398        let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
399        assert!(result.is_none());
400    }
401
402    #[tokio::test]
403    async fn test_get_some_value() {
404        let csv = test_csv();
405        let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
406
407        let result = table.get_some_value().await.unwrap();
408        assert!(result.is_some());
409        let (id, record) = result.unwrap();
410        assert_eq!(id, "hill_valley");
411        assert_eq!(
412            record["name"].try_get::<String>().unwrap(),
413            "Hill Valley Bakery"
414        );
415    }
416
417    #[tokio::test]
418    async fn test_get_count() {
419        let csv = test_csv();
420        let table = Table::<Csv, EmptyEntity>::new("product", csv);
421
422        let count = table.data_source().get_table_count(&table).await.unwrap();
423        assert_eq!(count, 5);
424    }
425
426    #[tokio::test]
427    async fn test_write_operations_fail() {
428        let csv = test_csv();
429        let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
430
431        let record = Record::new();
432        assert!(
433            WritableValueSet::insert_value(&table, &"test".to_string(), &record)
434                .await
435                .is_err()
436        );
437        assert!(
438            WritableValueSet::delete(&table, &"test".to_string())
439                .await
440                .is_err()
441        );
442        assert!(WritableValueSet::delete_all(&table).await.is_err());
443    }
444
445    #[tokio::test]
446    async fn test_missing_file() {
447        let csv = test_csv();
448        let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
449
450        let result = table.list_values().await;
451        assert!(result.is_err());
452    }
453}