1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24 type Column<Type>
25 = Column<Type>
26 where
27 Type: ColumnType;
28 type AnyType = AnyCsvType;
29 type Value = AnyCsvType;
30 type Id = String;
31 type Condition = vantage_expressions::Expression<Self::Value>;
32
33 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
34 Column::new(name)
35 }
36
37 fn to_any_column<Type: ColumnType>(
38 &self,
39 column: Self::Column<Type>,
40 ) -> Self::Column<Self::AnyType> {
41 Column::from_column(column)
42 }
43
44 fn convert_any_column<Type: ColumnType>(
45 &self,
46 any_column: Self::Column<Self::AnyType>,
47 ) -> Option<Self::Column<Type>> {
48 Some(Column::from_column(any_column))
49 }
50
51 fn expr(
52 &self,
53 template: impl Into<String>,
54 parameters: Vec<ExpressiveEnum<Self::Value>>,
55 ) -> Expression<Self::Value> {
56 Expression::new(template, parameters)
57 }
58
59 fn search_table_condition<E>(
60 &self,
61 _table: &Table<Self, E>,
62 search_value: &str,
63 ) -> Expression<Self::Value>
64 where
65 E: Entity<Self::Value>,
66 {
67 Expression::new(format!("SEARCH '{}'", search_value), vec![])
68 }
69
70 async fn list_table_values<E>(
71 &self,
72 table: &Table<Self, E>,
73 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
74 where
75 E: Entity<Self::Value>,
76 Self: Sized,
77 {
78 let mut records = self.read_csv(table.table_name(), table.columns())?;
79
80 for condition in table.conditions() {
81 records = apply_condition(records, condition).await?;
82 }
83
84 Ok(records)
85 }
86
87 async fn get_table_value<E>(
88 &self,
89 table: &Table<Self, E>,
90 id: &Self::Id,
91 ) -> Result<Option<Record<Self::Value>>>
92 where
93 E: Entity<Self::Value>,
94 Self: Sized,
95 {
96 let records = self.read_csv(table.table_name(), table.columns())?;
97 Ok(records.get(id).cloned())
98 }
99
100 async fn get_table_some_value<E>(
101 &self,
102 table: &Table<Self, E>,
103 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
104 where
105 E: Entity<Self::Value>,
106 Self: Sized,
107 {
108 let records = self.read_csv(table.table_name(), table.columns())?;
109 Ok(records.into_iter().next())
110 }
111
112 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
113 where
114 E: Entity<Self::Value>,
115 Self: Sized,
116 {
117 let records = self.read_csv(table.table_name(), table.columns())?;
118 Ok(records.len() as i64)
119 }
120
121 async fn get_table_sum<E>(
122 &self,
123 _table: &Table<Self, E>,
124 _column: &Self::Column<Self::AnyType>,
125 ) -> Result<Self::Value>
126 where
127 E: Entity<Self::Value>,
128 Self: Sized,
129 {
130 Err(error!("Sum not implemented for CSV backend"))
131 }
132
133 async fn get_table_max<E>(
134 &self,
135 _table: &Table<Self, E>,
136 _column: &Self::Column<Self::AnyType>,
137 ) -> Result<Self::Value>
138 where
139 E: Entity<Self::Value>,
140 Self: Sized,
141 {
142 Err(error!("Max not implemented for CSV backend"))
143 }
144
145 async fn get_table_min<E>(
146 &self,
147 _table: &Table<Self, E>,
148 _column: &Self::Column<Self::AnyType>,
149 ) -> Result<Self::Value>
150 where
151 E: Entity<Self::Value>,
152 Self: Sized,
153 {
154 Err(error!("Min not implemented for CSV backend"))
155 }
156
157 async fn insert_table_value<E>(
158 &self,
159 _table: &Table<Self, E>,
160 _id: &Self::Id,
161 _record: &Record<Self::Value>,
162 ) -> Result<Record<Self::Value>>
163 where
164 E: Entity<Self::Value>,
165 Self: Sized,
166 {
167 Err(error!("CSV is a read-only data source"))
168 }
169
170 async fn replace_table_value<E>(
171 &self,
172 _table: &Table<Self, E>,
173 _id: &Self::Id,
174 _record: &Record<Self::Value>,
175 ) -> Result<Record<Self::Value>>
176 where
177 E: Entity<Self::Value>,
178 Self: Sized,
179 {
180 Err(error!("CSV is a read-only data source"))
181 }
182
183 async fn patch_table_value<E>(
184 &self,
185 _table: &Table<Self, E>,
186 _id: &Self::Id,
187 _partial: &Record<Self::Value>,
188 ) -> Result<Record<Self::Value>>
189 where
190 E: Entity<Self::Value>,
191 Self: Sized,
192 {
193 Err(error!("CSV is a read-only data source"))
194 }
195
196 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
197 where
198 E: Entity<Self::Value>,
199 Self: Sized,
200 {
201 Err(error!("CSV is a read-only data source"))
202 }
203
204 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
205 where
206 E: Entity<Self::Value>,
207 Self: Sized,
208 {
209 Err(error!("CSV is a read-only data source"))
210 }
211
212 async fn insert_table_return_id_value<E>(
213 &self,
214 _table: &Table<Self, E>,
215 _record: &Record<Self::Value>,
216 ) -> Result<Self::Id>
217 where
218 E: Entity<Self::Value>,
219 Self: Sized,
220 {
221 Err(error!("CSV is a read-only data source"))
222 }
223
224 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
225 &self,
226 target_field: &str,
227 source_table: &Table<Self, SourceE>,
228 source_column: &str,
229 ) -> Self::Condition
230 where
231 Self: Sized,
232 {
233 let src_col = self.create_column::<Self::AnyType>(source_column);
234 let fk_values = self.column_table_values_expr(source_table, &src_col);
235 let tgt_col = self.create_column::<Self::AnyType>(target_field);
236 tgt_col.in_(fk_values.expr())
237 }
238
239 fn column_table_values_expr<'a, E, Type: ColumnType>(
240 &'a self,
241 table: &Table<Self, E>,
242 column: &Self::Column<Type>,
243 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
244 where
245 E: Entity<Self::Value> + 'static,
246 Self: Sized,
247 {
248 use vantage_expressions::{
249 expr_any,
250 traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
251 };
252
253 let table_clone = table.clone();
254 let col = column.name().to_string();
255 let csv = self.clone();
256
257 let inner = expr_any!("{}", {
258 DeferredFn::new(move || {
259 let csv = csv.clone();
260 let table = table_clone.clone();
261 let col = col.clone();
262 Box::pin(async move {
263 let records = csv.list_table_values(&table).await?;
264 let values: Vec<AnyCsvType> = records
265 .values()
266 .filter_map(|r| r.get(&col).cloned())
267 .collect();
268 Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
269 })
270 })
271 });
272
273 let expr = expr_any!("{}", { self.defer(inner) });
274 AssociatedExpression::new(expr, self)
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use crate::type_system::CsvTypeVariants;
282 use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
283 use vantage_types::EmptyEntity;
284
285 fn test_csv() -> Csv {
286 Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
287 }
288
289 #[tokio::test]
290 async fn test_list_bakery() {
291 let csv = test_csv();
292 let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
293 .with_column_of::<String>("name")
294 .with_column_of::<i64>("profit_margin");
295
296 let values = table.list_values().await.unwrap();
297 assert_eq!(values.len(), 1);
298 assert!(values.contains_key("hill_valley"));
299
300 let bakery = &values["hill_valley"];
301 let name = bakery["name"].try_get::<String>().unwrap();
302 assert_eq!(name, "Hill Valley Bakery");
303
304 let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
305 assert_eq!(profit, 15);
306 }
307
308 #[tokio::test]
309 async fn test_list_clients() {
310 let csv = test_csv();
311 let table = Table::<Csv, EmptyEntity>::new("client", csv)
312 .with_column_of::<String>("name")
313 .with_column_of::<String>("email")
314 .with_column_of::<bool>("is_paying_client")
315 .with_column_of::<serde_json::Value>("metadata");
316
317 let values = table.list_values().await.unwrap();
318 assert_eq!(values.len(), 3);
319
320 let marty = &values["marty"];
321 assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
322 assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
323
324 let biff = &values["biff"];
325 assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
326 assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
327 }
328
329 #[tokio::test]
330 async fn test_list_products_typed() {
331 let csv = test_csv();
332 let table = Table::<Csv, EmptyEntity>::new("product", csv)
333 .with_column_of::<String>("name")
334 .with_column_of::<i64>("calories")
335 .with_column_of::<i64>("price")
336 .with_column_of::<bool>("is_deleted")
337 .with_column_of::<serde_json::Value>("inventory");
338
339 let values = table.list_values().await.unwrap();
340 assert_eq!(values.len(), 5);
341
342 let cupcake = &values["flux_cupcake"];
343 assert_eq!(
344 cupcake["name"].try_get::<String>().unwrap(),
345 "Flux Capacitor Cupcake"
346 );
347 assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
348 assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
349 assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
350
351 let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
352 assert_eq!(inv["stock"], serde_json::json!(50));
353 }
354
355 #[tokio::test]
356 async fn test_untyped_columns_stay_string() {
357 let csv = test_csv();
358 let table = Table::<Csv, EmptyEntity>::new("product", csv);
359
360 let values = table.list_values().await.unwrap();
361 let cupcake = &values["flux_cupcake"];
362 assert_eq!(
363 cupcake["calories"].type_variant(),
364 Some(CsvTypeVariants::String)
365 );
366 assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
367 }
368
369 #[tokio::test]
370 async fn test_get_value_by_id() {
371 let csv = test_csv();
372 let table = Table::<Csv, EmptyEntity>::new("client", csv)
373 .with_column_of::<String>("name")
374 .with_column_of::<String>("email");
375
376 let record = table
377 .get_value(&"doc".to_string())
378 .await
379 .unwrap()
380 .expect("doc exists");
381 assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
382 assert_eq!(
383 record["email"].try_get::<String>().unwrap(),
384 "doc@brown.com"
385 );
386 }
387
388 #[tokio::test]
389 async fn test_get_value_not_found() {
390 let csv = test_csv();
391 let table = Table::<Csv, EmptyEntity>::new("client", csv);
392
393 let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
394 assert!(result.is_none());
395 }
396
397 #[tokio::test]
398 async fn test_get_some_value() {
399 let csv = test_csv();
400 let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
401
402 let result = table.get_some_value().await.unwrap();
403 assert!(result.is_some());
404 let (id, record) = result.unwrap();
405 assert_eq!(id, "hill_valley");
406 assert_eq!(
407 record["name"].try_get::<String>().unwrap(),
408 "Hill Valley Bakery"
409 );
410 }
411
412 #[tokio::test]
413 async fn test_get_count() {
414 let csv = test_csv();
415 let table = Table::<Csv, EmptyEntity>::new("product", csv);
416
417 let count = table.data_source().get_table_count(&table).await.unwrap();
418 assert_eq!(count, 5);
419 }
420
421 #[tokio::test]
422 async fn test_write_operations_fail() {
423 let csv = test_csv();
424 let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
425
426 let record = Record::new();
427 assert!(
428 WritableValueSet::insert_value(&table, &"test".to_string(), &record)
429 .await
430 .is_err()
431 );
432 assert!(
433 WritableValueSet::delete(&table, &"test".to_string())
434 .await
435 .is_err()
436 );
437 assert!(WritableValueSet::delete_all(&table).await.is_err());
438 }
439
440 #[tokio::test]
441 async fn test_missing_file() {
442 let csv = test_csv();
443 let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
444
445 let result = table.list_values().await;
446 assert!(result.is_err());
447 }
448}