Skip to main content

vantage_csv/
table_source.rs

1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24    type Column<Type>
25        = Column<Type>
26    where
27        Type: ColumnType;
28    type AnyType = AnyCsvType;
29    type Value = AnyCsvType;
30    type Id = String;
31    type Condition = vantage_expressions::Expression<Self::Value>;
32    type Source = String;
33
34    fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
35        let column: Column<AnyCsvType> = Column::new(field);
36        Ok(CsvOperation::eq(&column, value))
37    }
38
39    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
40        Column::new(name)
41    }
42
43    fn to_any_column<Type: ColumnType>(
44        &self,
45        column: Self::Column<Type>,
46    ) -> Self::Column<Self::AnyType> {
47        Column::from_column(column)
48    }
49
50    fn convert_any_column<Type: ColumnType>(
51        &self,
52        any_column: Self::Column<Self::AnyType>,
53    ) -> Option<Self::Column<Type>> {
54        Some(Column::from_column(any_column))
55    }
56
57    fn expr(
58        &self,
59        template: impl Into<String>,
60        parameters: Vec<ExpressiveEnum<Self::Value>>,
61    ) -> Expression<Self::Value> {
62        Expression::new(template, parameters)
63    }
64
65    fn search_table_condition<E>(
66        &self,
67        _table: &Table<Self, E>,
68        search_value: &str,
69    ) -> Expression<Self::Value>
70    where
71        E: Entity<Self::Value>,
72    {
73        Expression::new(format!("SEARCH '{}'", search_value), vec![])
74    }
75
76    async fn list_table_values<E>(
77        &self,
78        table: &Table<Self, E>,
79    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
80    where
81        E: Entity<Self::Value>,
82        Self: Sized,
83    {
84        let mut records = self.read_csv(table.table_name(), table.columns())?;
85
86        for condition in table.conditions() {
87            records = apply_condition(records, condition).await?;
88        }
89
90        Ok(records)
91    }
92
93    async fn get_table_value<E>(
94        &self,
95        table: &Table<Self, E>,
96        id: &Self::Id,
97    ) -> Result<Option<Record<Self::Value>>>
98    where
99        E: Entity<Self::Value>,
100        Self: Sized,
101    {
102        let records = self.read_csv(table.table_name(), table.columns())?;
103        Ok(records.get(id).cloned())
104    }
105
106    async fn get_table_some_value<E>(
107        &self,
108        table: &Table<Self, E>,
109    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
110    where
111        E: Entity<Self::Value>,
112        Self: Sized,
113    {
114        let records = self.read_csv(table.table_name(), table.columns())?;
115        Ok(records.into_iter().next())
116    }
117
118    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
119    where
120        E: Entity<Self::Value>,
121        Self: Sized,
122    {
123        let records = self.read_csv(table.table_name(), table.columns())?;
124        Ok(records.len() as i64)
125    }
126
127    async fn get_table_sum<E>(
128        &self,
129        _table: &Table<Self, E>,
130        _column: &Self::Column<Self::AnyType>,
131    ) -> Result<Self::Value>
132    where
133        E: Entity<Self::Value>,
134        Self: Sized,
135    {
136        Err(error!("Sum not implemented for CSV backend"))
137    }
138
139    async fn get_table_max<E>(
140        &self,
141        _table: &Table<Self, E>,
142        _column: &Self::Column<Self::AnyType>,
143    ) -> Result<Self::Value>
144    where
145        E: Entity<Self::Value>,
146        Self: Sized,
147    {
148        Err(error!("Max not implemented for CSV backend"))
149    }
150
151    async fn get_table_min<E>(
152        &self,
153        _table: &Table<Self, E>,
154        _column: &Self::Column<Self::AnyType>,
155    ) -> Result<Self::Value>
156    where
157        E: Entity<Self::Value>,
158        Self: Sized,
159    {
160        Err(error!("Min not implemented for CSV backend"))
161    }
162
163    async fn insert_table_value<E>(
164        &self,
165        _table: &Table<Self, E>,
166        _id: &Self::Id,
167        _record: &Record<Self::Value>,
168    ) -> Result<Record<Self::Value>>
169    where
170        E: Entity<Self::Value>,
171        Self: Sized,
172    {
173        Err(error!("CSV is a read-only data source"))
174    }
175
176    async fn replace_table_value<E>(
177        &self,
178        _table: &Table<Self, E>,
179        _id: &Self::Id,
180        _record: &Record<Self::Value>,
181    ) -> Result<Record<Self::Value>>
182    where
183        E: Entity<Self::Value>,
184        Self: Sized,
185    {
186        Err(error!("CSV is a read-only data source"))
187    }
188
189    async fn patch_table_value<E>(
190        &self,
191        _table: &Table<Self, E>,
192        _id: &Self::Id,
193        _partial: &Record<Self::Value>,
194    ) -> Result<Record<Self::Value>>
195    where
196        E: Entity<Self::Value>,
197        Self: Sized,
198    {
199        Err(error!("CSV is a read-only data source"))
200    }
201
202    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
203    where
204        E: Entity<Self::Value>,
205        Self: Sized,
206    {
207        Err(error!("CSV is a read-only data source"))
208    }
209
210    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
211    where
212        E: Entity<Self::Value>,
213        Self: Sized,
214    {
215        Err(error!("CSV is a read-only data source"))
216    }
217
218    async fn insert_table_return_id_value<E>(
219        &self,
220        _table: &Table<Self, E>,
221        _record: &Record<Self::Value>,
222    ) -> Result<Self::Id>
223    where
224        E: Entity<Self::Value>,
225        Self: Sized,
226    {
227        Err(error!("CSV is a read-only data source"))
228    }
229
230    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
231        &self,
232        target_field: &str,
233        source_table: &Table<Self, SourceE>,
234        source_column: &str,
235    ) -> Self::Condition
236    where
237        Self: Sized,
238    {
239        let src_col = self.create_column::<Self::AnyType>(source_column);
240        let fk_values = self.column_table_values_expr(source_table, &src_col);
241        let tgt_col = self.create_column::<Self::AnyType>(target_field);
242        tgt_col.in_(fk_values.expr())
243    }
244
245    fn column_table_values_expr<'a, E, Type: ColumnType>(
246        &'a self,
247        table: &Table<Self, E>,
248        column: &Self::Column<Type>,
249    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
250    where
251        E: Entity<Self::Value> + 'static,
252        Self: Sized,
253    {
254        use vantage_expressions::{
255            expr_any,
256            traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
257        };
258
259        let table_clone = table.clone();
260        let col = column.name().to_string();
261        let csv = self.clone();
262
263        let inner = expr_any!("{}", {
264            DeferredFn::new(move || {
265                let csv = csv.clone();
266                let table = table_clone.clone();
267                let col = col.clone();
268                Box::pin(async move {
269                    let records = csv.list_table_values(&table).await?;
270                    let values: Vec<AnyCsvType> = records
271                        .values()
272                        .filter_map(|r| r.get(&col).cloned())
273                        .collect();
274                    Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
275                })
276            })
277        });
278
279        let expr = expr_any!("{}", { self.defer(inner) });
280        AssociatedExpression::new(expr, self)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::type_system::CsvTypeVariants;
288    use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
289    use vantage_types::EmptyEntity;
290
291    fn test_csv() -> Csv {
292        Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
293    }
294
295    #[tokio::test]
296    async fn test_list_bakery() {
297        let csv = test_csv();
298        let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
299            .with_column_of::<String>("name")
300            .with_column_of::<i64>("profit_margin");
301
302        let values = table.list_values().await.unwrap();
303        assert_eq!(values.len(), 1);
304        assert!(values.contains_key("hill_valley"));
305
306        let bakery = &values["hill_valley"];
307        let name = bakery["name"].try_get::<String>().unwrap();
308        assert_eq!(name, "Hill Valley Bakery");
309
310        let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
311        assert_eq!(profit, 15);
312    }
313
314    #[tokio::test]
315    async fn test_list_clients() {
316        let csv = test_csv();
317        let table = Table::<Csv, EmptyEntity>::new("client", csv)
318            .with_column_of::<String>("name")
319            .with_column_of::<String>("email")
320            .with_column_of::<bool>("is_paying_client")
321            .with_column_of::<serde_json::Value>("metadata");
322
323        let values = table.list_values().await.unwrap();
324        assert_eq!(values.len(), 3);
325
326        let marty = &values["marty"];
327        assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
328        assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
329
330        let biff = &values["biff"];
331        assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
332        assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
333    }
334
335    #[tokio::test]
336    async fn test_list_products_typed() {
337        let csv = test_csv();
338        let table = Table::<Csv, EmptyEntity>::new("product", csv)
339            .with_column_of::<String>("name")
340            .with_column_of::<i64>("calories")
341            .with_column_of::<i64>("price")
342            .with_column_of::<bool>("is_deleted")
343            .with_column_of::<serde_json::Value>("inventory");
344
345        let values = table.list_values().await.unwrap();
346        assert_eq!(values.len(), 5);
347
348        let cupcake = &values["flux_cupcake"];
349        assert_eq!(
350            cupcake["name"].try_get::<String>().unwrap(),
351            "Flux Capacitor Cupcake"
352        );
353        assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
354        assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
355        assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
356
357        let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
358        assert_eq!(inv["stock"], serde_json::json!(50));
359    }
360
361    #[tokio::test]
362    async fn test_untyped_columns_stay_string() {
363        let csv = test_csv();
364        let table = Table::<Csv, EmptyEntity>::new("product", csv);
365
366        let values = table.list_values().await.unwrap();
367        let cupcake = &values["flux_cupcake"];
368        assert_eq!(
369            cupcake["calories"].type_variant(),
370            Some(CsvTypeVariants::String)
371        );
372        assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
373    }
374
375    #[tokio::test]
376    async fn test_get_value_by_id() {
377        let csv = test_csv();
378        let table = Table::<Csv, EmptyEntity>::new("client", csv)
379            .with_column_of::<String>("name")
380            .with_column_of::<String>("email");
381
382        let record = table
383            .get_value(&"doc".to_string())
384            .await
385            .unwrap()
386            .expect("doc exists");
387        assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
388        assert_eq!(
389            record["email"].try_get::<String>().unwrap(),
390            "doc@brown.com"
391        );
392    }
393
394    #[tokio::test]
395    async fn test_get_value_not_found() {
396        let csv = test_csv();
397        let table = Table::<Csv, EmptyEntity>::new("client", csv);
398
399        let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
400        assert!(result.is_none());
401    }
402
403    #[tokio::test]
404    async fn test_get_some_value() {
405        let csv = test_csv();
406        let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
407
408        let result = table.get_some_value().await.unwrap();
409        assert!(result.is_some());
410        let (id, record) = result.unwrap();
411        assert_eq!(id, "hill_valley");
412        assert_eq!(
413            record["name"].try_get::<String>().unwrap(),
414            "Hill Valley Bakery"
415        );
416    }
417
418    #[tokio::test]
419    async fn test_get_count() {
420        let csv = test_csv();
421        let table = Table::<Csv, EmptyEntity>::new("product", csv);
422
423        let count = table.data_source().get_table_count(&table).await.unwrap();
424        assert_eq!(count, 5);
425    }
426
427    #[tokio::test]
428    async fn test_write_operations_fail() {
429        let csv = test_csv();
430        let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
431
432        let record = Record::new();
433        assert!(
434            WritableValueSet::insert_value(&table, &"test".to_string(), &record)
435                .await
436                .is_err()
437        );
438        assert!(
439            WritableValueSet::delete(&table, &"test".to_string())
440                .await
441                .is_err()
442        );
443        assert!(WritableValueSet::delete_all(&table).await.is_err());
444    }
445
446    #[tokio::test]
447    async fn test_missing_file() {
448        let csv = test_csv();
449        let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
450
451        let result = table.list_values().await;
452        assert!(result.is_err());
453    }
454}