Skip to main content

vantage_csv/
table_source.rs

1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24    type Column<Type>
25        = Column<Type>
26    where
27        Type: ColumnType;
28    type AnyType = AnyCsvType;
29    type Value = AnyCsvType;
30    type Id = String;
31    type Condition = vantage_expressions::Expression<Self::Value>;
32
33    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
34        Column::new(name)
35    }
36
37    fn to_any_column<Type: ColumnType>(
38        &self,
39        column: Self::Column<Type>,
40    ) -> Self::Column<Self::AnyType> {
41        Column::from_column(column)
42    }
43
44    fn convert_any_column<Type: ColumnType>(
45        &self,
46        any_column: Self::Column<Self::AnyType>,
47    ) -> Option<Self::Column<Type>> {
48        Some(Column::from_column(any_column))
49    }
50
51    fn expr(
52        &self,
53        template: impl Into<String>,
54        parameters: Vec<ExpressiveEnum<Self::Value>>,
55    ) -> Expression<Self::Value> {
56        Expression::new(template, parameters)
57    }
58
59    fn search_table_condition<E>(
60        &self,
61        _table: &Table<Self, E>,
62        search_value: &str,
63    ) -> Expression<Self::Value>
64    where
65        E: Entity<Self::Value>,
66    {
67        Expression::new(format!("SEARCH '{}'", search_value), vec![])
68    }
69
70    async fn list_table_values<E>(
71        &self,
72        table: &Table<Self, E>,
73    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
74    where
75        E: Entity<Self::Value>,
76        Self: Sized,
77    {
78        let mut records = self.read_csv(table.table_name(), table.columns())?;
79
80        for condition in table.conditions() {
81            records = apply_condition(records, condition).await?;
82        }
83
84        Ok(records)
85    }
86
87    async fn get_table_value<E>(
88        &self,
89        table: &Table<Self, E>,
90        id: &Self::Id,
91    ) -> Result<Record<Self::Value>>
92    where
93        E: Entity<Self::Value>,
94        Self: Sized,
95    {
96        let records = self.read_csv(table.table_name(), table.columns())?;
97        records
98            .get(id)
99            .cloned()
100            .ok_or_else(|| error!("Record not found", id = id))
101    }
102
103    async fn get_table_some_value<E>(
104        &self,
105        table: &Table<Self, E>,
106    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
107    where
108        E: Entity<Self::Value>,
109        Self: Sized,
110    {
111        let records = self.read_csv(table.table_name(), table.columns())?;
112        Ok(records.into_iter().next())
113    }
114
115    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
116    where
117        E: Entity<Self::Value>,
118        Self: Sized,
119    {
120        let records = self.read_csv(table.table_name(), table.columns())?;
121        Ok(records.len() as i64)
122    }
123
124    async fn get_table_sum<E>(
125        &self,
126        _table: &Table<Self, E>,
127        _column: &Self::Column<Self::AnyType>,
128    ) -> Result<Self::Value>
129    where
130        E: Entity<Self::Value>,
131        Self: Sized,
132    {
133        Err(error!("Sum not implemented for CSV backend"))
134    }
135
136    async fn get_table_max<E>(
137        &self,
138        _table: &Table<Self, E>,
139        _column: &Self::Column<Self::AnyType>,
140    ) -> Result<Self::Value>
141    where
142        E: Entity<Self::Value>,
143        Self: Sized,
144    {
145        Err(error!("Max not implemented for CSV backend"))
146    }
147
148    async fn get_table_min<E>(
149        &self,
150        _table: &Table<Self, E>,
151        _column: &Self::Column<Self::AnyType>,
152    ) -> Result<Self::Value>
153    where
154        E: Entity<Self::Value>,
155        Self: Sized,
156    {
157        Err(error!("Min not implemented for CSV backend"))
158    }
159
160    async fn insert_table_value<E>(
161        &self,
162        _table: &Table<Self, E>,
163        _id: &Self::Id,
164        _record: &Record<Self::Value>,
165    ) -> Result<Record<Self::Value>>
166    where
167        E: Entity<Self::Value>,
168        Self: Sized,
169    {
170        Err(error!("CSV is a read-only data source"))
171    }
172
173    async fn replace_table_value<E>(
174        &self,
175        _table: &Table<Self, E>,
176        _id: &Self::Id,
177        _record: &Record<Self::Value>,
178    ) -> Result<Record<Self::Value>>
179    where
180        E: Entity<Self::Value>,
181        Self: Sized,
182    {
183        Err(error!("CSV is a read-only data source"))
184    }
185
186    async fn patch_table_value<E>(
187        &self,
188        _table: &Table<Self, E>,
189        _id: &Self::Id,
190        _partial: &Record<Self::Value>,
191    ) -> Result<Record<Self::Value>>
192    where
193        E: Entity<Self::Value>,
194        Self: Sized,
195    {
196        Err(error!("CSV is a read-only data source"))
197    }
198
199    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
200    where
201        E: Entity<Self::Value>,
202        Self: Sized,
203    {
204        Err(error!("CSV is a read-only data source"))
205    }
206
207    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
208    where
209        E: Entity<Self::Value>,
210        Self: Sized,
211    {
212        Err(error!("CSV is a read-only data source"))
213    }
214
215    async fn insert_table_return_id_value<E>(
216        &self,
217        _table: &Table<Self, E>,
218        _record: &Record<Self::Value>,
219    ) -> Result<Self::Id>
220    where
221        E: Entity<Self::Value>,
222        Self: Sized,
223    {
224        Err(error!("CSV is a read-only data source"))
225    }
226
227    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
228        &self,
229        target_field: &str,
230        source_table: &Table<Self, SourceE>,
231        source_column: &str,
232    ) -> Self::Condition
233    where
234        Self: Sized,
235    {
236        let src_col = self.create_column::<Self::AnyType>(source_column);
237        let fk_values = self.column_table_values_expr(source_table, &src_col);
238        let tgt_col = self.create_column::<Self::AnyType>(target_field);
239        tgt_col.in_(fk_values.expr())
240    }
241
242    fn column_table_values_expr<'a, E, Type: ColumnType>(
243        &'a self,
244        table: &Table<Self, E>,
245        column: &Self::Column<Type>,
246    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
247    where
248        E: Entity<Self::Value> + 'static,
249        Self: Sized,
250    {
251        use vantage_expressions::{
252            expr_any,
253            traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
254        };
255
256        let table_clone = table.clone();
257        let col = column.name().to_string();
258        let csv = self.clone();
259
260        let inner = expr_any!("{}", {
261            DeferredFn::new(move || {
262                let csv = csv.clone();
263                let table = table_clone.clone();
264                let col = col.clone();
265                Box::pin(async move {
266                    let records = csv.list_table_values(&table).await?;
267                    let values: Vec<AnyCsvType> = records
268                        .values()
269                        .filter_map(|r| r.get(&col).cloned())
270                        .collect();
271                    Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
272                })
273            })
274        });
275
276        let expr = expr_any!("{}", { self.defer(inner) });
277        AssociatedExpression::new(expr, self)
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::type_system::CsvTypeVariants;
285    use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
286    use vantage_types::EmptyEntity;
287
288    fn test_csv() -> Csv {
289        Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
290    }
291
292    #[tokio::test]
293    async fn test_list_bakery() {
294        let csv = test_csv();
295        let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
296            .with_column_of::<String>("name")
297            .with_column_of::<i64>("profit_margin");
298
299        let values = table.list_values().await.unwrap();
300        assert_eq!(values.len(), 1);
301        assert!(values.contains_key("hill_valley"));
302
303        let bakery = &values["hill_valley"];
304        let name = bakery["name"].try_get::<String>().unwrap();
305        assert_eq!(name, "Hill Valley Bakery");
306
307        let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
308        assert_eq!(profit, 15);
309    }
310
311    #[tokio::test]
312    async fn test_list_clients() {
313        let csv = test_csv();
314        let table = Table::<Csv, EmptyEntity>::new("client", csv)
315            .with_column_of::<String>("name")
316            .with_column_of::<String>("email")
317            .with_column_of::<bool>("is_paying_client")
318            .with_column_of::<serde_json::Value>("metadata");
319
320        let values = table.list_values().await.unwrap();
321        assert_eq!(values.len(), 3);
322
323        let marty = &values["marty"];
324        assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
325        assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
326
327        let biff = &values["biff"];
328        assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
329        assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
330    }
331
332    #[tokio::test]
333    async fn test_list_products_typed() {
334        let csv = test_csv();
335        let table = Table::<Csv, EmptyEntity>::new("product", csv)
336            .with_column_of::<String>("name")
337            .with_column_of::<i64>("calories")
338            .with_column_of::<i64>("price")
339            .with_column_of::<bool>("is_deleted")
340            .with_column_of::<serde_json::Value>("inventory");
341
342        let values = table.list_values().await.unwrap();
343        assert_eq!(values.len(), 5);
344
345        let cupcake = &values["flux_cupcake"];
346        assert_eq!(
347            cupcake["name"].try_get::<String>().unwrap(),
348            "Flux Capacitor Cupcake"
349        );
350        assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
351        assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
352        assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
353
354        let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
355        assert_eq!(inv["stock"], serde_json::json!(50));
356    }
357
358    #[tokio::test]
359    async fn test_untyped_columns_stay_string() {
360        let csv = test_csv();
361        let table = Table::<Csv, EmptyEntity>::new("product", csv);
362
363        let values = table.list_values().await.unwrap();
364        let cupcake = &values["flux_cupcake"];
365        assert_eq!(
366            cupcake["calories"].type_variant(),
367            Some(CsvTypeVariants::String)
368        );
369        assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
370    }
371
372    #[tokio::test]
373    async fn test_get_value_by_id() {
374        let csv = test_csv();
375        let table = Table::<Csv, EmptyEntity>::new("client", csv)
376            .with_column_of::<String>("name")
377            .with_column_of::<String>("email");
378
379        let record = table.get_value(&"doc".to_string()).await.unwrap();
380        assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
381        assert_eq!(
382            record["email"].try_get::<String>().unwrap(),
383            "doc@brown.com"
384        );
385    }
386
387    #[tokio::test]
388    async fn test_get_value_not_found() {
389        let csv = test_csv();
390        let table = Table::<Csv, EmptyEntity>::new("client", csv);
391
392        let result = table.get_value(&"nonexistent".to_string()).await;
393        assert!(result.is_err());
394    }
395
396    #[tokio::test]
397    async fn test_get_some_value() {
398        let csv = test_csv();
399        let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
400
401        let result = table.get_some_value().await.unwrap();
402        assert!(result.is_some());
403        let (id, record) = result.unwrap();
404        assert_eq!(id, "hill_valley");
405        assert_eq!(
406            record["name"].try_get::<String>().unwrap(),
407            "Hill Valley Bakery"
408        );
409    }
410
411    #[tokio::test]
412    async fn test_get_count() {
413        let csv = test_csv();
414        let table = Table::<Csv, EmptyEntity>::new("product", csv);
415
416        let count = table.data_source().get_table_count(&table).await.unwrap();
417        assert_eq!(count, 5);
418    }
419
420    #[tokio::test]
421    async fn test_write_operations_fail() {
422        let csv = test_csv();
423        let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
424
425        let record = Record::new();
426        assert!(
427            WritableValueSet::insert_value(&table, &"test".to_string(), &record)
428                .await
429                .is_err()
430        );
431        assert!(
432            WritableValueSet::delete(&table, &"test".to_string())
433                .await
434                .is_err()
435        );
436        assert!(WritableValueSet::delete_all(&table).await.is_err());
437    }
438
439    #[tokio::test]
440    async fn test_missing_file() {
441        let csv = test_csv();
442        let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
443
444        let result = table.list_values().await;
445        assert!(result.is_err());
446    }
447}