1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24 type Column<Type>
25 = Column<Type>
26 where
27 Type: ColumnType;
28 type AnyType = AnyCsvType;
29 type Value = AnyCsvType;
30 type Id = String;
31 type Condition = vantage_expressions::Expression<Self::Value>;
32
33 fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
34 let column: Column<AnyCsvType> = Column::new(field);
35 Ok(CsvOperation::eq(&column, value))
36 }
37
38 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
39 Column::new(name)
40 }
41
42 fn to_any_column<Type: ColumnType>(
43 &self,
44 column: Self::Column<Type>,
45 ) -> Self::Column<Self::AnyType> {
46 Column::from_column(column)
47 }
48
49 fn convert_any_column<Type: ColumnType>(
50 &self,
51 any_column: Self::Column<Self::AnyType>,
52 ) -> Option<Self::Column<Type>> {
53 Some(Column::from_column(any_column))
54 }
55
56 fn expr(
57 &self,
58 template: impl Into<String>,
59 parameters: Vec<ExpressiveEnum<Self::Value>>,
60 ) -> Expression<Self::Value> {
61 Expression::new(template, parameters)
62 }
63
64 fn search_table_condition<E>(
65 &self,
66 _table: &Table<Self, E>,
67 search_value: &str,
68 ) -> Expression<Self::Value>
69 where
70 E: Entity<Self::Value>,
71 {
72 Expression::new(format!("SEARCH '{}'", search_value), vec![])
73 }
74
75 async fn list_table_values<E>(
76 &self,
77 table: &Table<Self, E>,
78 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
79 where
80 E: Entity<Self::Value>,
81 Self: Sized,
82 {
83 let mut records = self.read_csv(table.table_name(), table.columns())?;
84
85 for condition in table.conditions() {
86 records = apply_condition(records, condition).await?;
87 }
88
89 Ok(records)
90 }
91
92 async fn get_table_value<E>(
93 &self,
94 table: &Table<Self, E>,
95 id: &Self::Id,
96 ) -> Result<Option<Record<Self::Value>>>
97 where
98 E: Entity<Self::Value>,
99 Self: Sized,
100 {
101 let records = self.read_csv(table.table_name(), table.columns())?;
102 Ok(records.get(id).cloned())
103 }
104
105 async fn get_table_some_value<E>(
106 &self,
107 table: &Table<Self, E>,
108 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
109 where
110 E: Entity<Self::Value>,
111 Self: Sized,
112 {
113 let records = self.read_csv(table.table_name(), table.columns())?;
114 Ok(records.into_iter().next())
115 }
116
117 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
118 where
119 E: Entity<Self::Value>,
120 Self: Sized,
121 {
122 let records = self.read_csv(table.table_name(), table.columns())?;
123 Ok(records.len() as i64)
124 }
125
126 async fn get_table_sum<E>(
127 &self,
128 _table: &Table<Self, E>,
129 _column: &Self::Column<Self::AnyType>,
130 ) -> Result<Self::Value>
131 where
132 E: Entity<Self::Value>,
133 Self: Sized,
134 {
135 Err(error!("Sum not implemented for CSV backend"))
136 }
137
138 async fn get_table_max<E>(
139 &self,
140 _table: &Table<Self, E>,
141 _column: &Self::Column<Self::AnyType>,
142 ) -> Result<Self::Value>
143 where
144 E: Entity<Self::Value>,
145 Self: Sized,
146 {
147 Err(error!("Max not implemented for CSV backend"))
148 }
149
150 async fn get_table_min<E>(
151 &self,
152 _table: &Table<Self, E>,
153 _column: &Self::Column<Self::AnyType>,
154 ) -> Result<Self::Value>
155 where
156 E: Entity<Self::Value>,
157 Self: Sized,
158 {
159 Err(error!("Min not implemented for CSV backend"))
160 }
161
162 async fn insert_table_value<E>(
163 &self,
164 _table: &Table<Self, E>,
165 _id: &Self::Id,
166 _record: &Record<Self::Value>,
167 ) -> Result<Record<Self::Value>>
168 where
169 E: Entity<Self::Value>,
170 Self: Sized,
171 {
172 Err(error!("CSV is a read-only data source"))
173 }
174
175 async fn replace_table_value<E>(
176 &self,
177 _table: &Table<Self, E>,
178 _id: &Self::Id,
179 _record: &Record<Self::Value>,
180 ) -> Result<Record<Self::Value>>
181 where
182 E: Entity<Self::Value>,
183 Self: Sized,
184 {
185 Err(error!("CSV is a read-only data source"))
186 }
187
188 async fn patch_table_value<E>(
189 &self,
190 _table: &Table<Self, E>,
191 _id: &Self::Id,
192 _partial: &Record<Self::Value>,
193 ) -> Result<Record<Self::Value>>
194 where
195 E: Entity<Self::Value>,
196 Self: Sized,
197 {
198 Err(error!("CSV is a read-only data source"))
199 }
200
201 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
202 where
203 E: Entity<Self::Value>,
204 Self: Sized,
205 {
206 Err(error!("CSV is a read-only data source"))
207 }
208
209 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
210 where
211 E: Entity<Self::Value>,
212 Self: Sized,
213 {
214 Err(error!("CSV is a read-only data source"))
215 }
216
217 async fn insert_table_return_id_value<E>(
218 &self,
219 _table: &Table<Self, E>,
220 _record: &Record<Self::Value>,
221 ) -> Result<Self::Id>
222 where
223 E: Entity<Self::Value>,
224 Self: Sized,
225 {
226 Err(error!("CSV is a read-only data source"))
227 }
228
229 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
230 &self,
231 target_field: &str,
232 source_table: &Table<Self, SourceE>,
233 source_column: &str,
234 ) -> Self::Condition
235 where
236 Self: Sized,
237 {
238 let src_col = self.create_column::<Self::AnyType>(source_column);
239 let fk_values = self.column_table_values_expr(source_table, &src_col);
240 let tgt_col = self.create_column::<Self::AnyType>(target_field);
241 tgt_col.in_(fk_values.expr())
242 }
243
244 fn column_table_values_expr<'a, E, Type: ColumnType>(
245 &'a self,
246 table: &Table<Self, E>,
247 column: &Self::Column<Type>,
248 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
249 where
250 E: Entity<Self::Value> + 'static,
251 Self: Sized,
252 {
253 use vantage_expressions::{
254 expr_any,
255 traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
256 };
257
258 let table_clone = table.clone();
259 let col = column.name().to_string();
260 let csv = self.clone();
261
262 let inner = expr_any!("{}", {
263 DeferredFn::new(move || {
264 let csv = csv.clone();
265 let table = table_clone.clone();
266 let col = col.clone();
267 Box::pin(async move {
268 let records = csv.list_table_values(&table).await?;
269 let values: Vec<AnyCsvType> = records
270 .values()
271 .filter_map(|r| r.get(&col).cloned())
272 .collect();
273 Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
274 })
275 })
276 });
277
278 let expr = expr_any!("{}", { self.defer(inner) });
279 AssociatedExpression::new(expr, self)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::type_system::CsvTypeVariants;
287 use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
288 use vantage_types::EmptyEntity;
289
290 fn test_csv() -> Csv {
291 Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
292 }
293
294 #[tokio::test]
295 async fn test_list_bakery() {
296 let csv = test_csv();
297 let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
298 .with_column_of::<String>("name")
299 .with_column_of::<i64>("profit_margin");
300
301 let values = table.list_values().await.unwrap();
302 assert_eq!(values.len(), 1);
303 assert!(values.contains_key("hill_valley"));
304
305 let bakery = &values["hill_valley"];
306 let name = bakery["name"].try_get::<String>().unwrap();
307 assert_eq!(name, "Hill Valley Bakery");
308
309 let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
310 assert_eq!(profit, 15);
311 }
312
313 #[tokio::test]
314 async fn test_list_clients() {
315 let csv = test_csv();
316 let table = Table::<Csv, EmptyEntity>::new("client", csv)
317 .with_column_of::<String>("name")
318 .with_column_of::<String>("email")
319 .with_column_of::<bool>("is_paying_client")
320 .with_column_of::<serde_json::Value>("metadata");
321
322 let values = table.list_values().await.unwrap();
323 assert_eq!(values.len(), 3);
324
325 let marty = &values["marty"];
326 assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
327 assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
328
329 let biff = &values["biff"];
330 assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
331 assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
332 }
333
334 #[tokio::test]
335 async fn test_list_products_typed() {
336 let csv = test_csv();
337 let table = Table::<Csv, EmptyEntity>::new("product", csv)
338 .with_column_of::<String>("name")
339 .with_column_of::<i64>("calories")
340 .with_column_of::<i64>("price")
341 .with_column_of::<bool>("is_deleted")
342 .with_column_of::<serde_json::Value>("inventory");
343
344 let values = table.list_values().await.unwrap();
345 assert_eq!(values.len(), 5);
346
347 let cupcake = &values["flux_cupcake"];
348 assert_eq!(
349 cupcake["name"].try_get::<String>().unwrap(),
350 "Flux Capacitor Cupcake"
351 );
352 assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
353 assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
354 assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
355
356 let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
357 assert_eq!(inv["stock"], serde_json::json!(50));
358 }
359
360 #[tokio::test]
361 async fn test_untyped_columns_stay_string() {
362 let csv = test_csv();
363 let table = Table::<Csv, EmptyEntity>::new("product", csv);
364
365 let values = table.list_values().await.unwrap();
366 let cupcake = &values["flux_cupcake"];
367 assert_eq!(
368 cupcake["calories"].type_variant(),
369 Some(CsvTypeVariants::String)
370 );
371 assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
372 }
373
374 #[tokio::test]
375 async fn test_get_value_by_id() {
376 let csv = test_csv();
377 let table = Table::<Csv, EmptyEntity>::new("client", csv)
378 .with_column_of::<String>("name")
379 .with_column_of::<String>("email");
380
381 let record = table
382 .get_value(&"doc".to_string())
383 .await
384 .unwrap()
385 .expect("doc exists");
386 assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
387 assert_eq!(
388 record["email"].try_get::<String>().unwrap(),
389 "doc@brown.com"
390 );
391 }
392
393 #[tokio::test]
394 async fn test_get_value_not_found() {
395 let csv = test_csv();
396 let table = Table::<Csv, EmptyEntity>::new("client", csv);
397
398 let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
399 assert!(result.is_none());
400 }
401
402 #[tokio::test]
403 async fn test_get_some_value() {
404 let csv = test_csv();
405 let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
406
407 let result = table.get_some_value().await.unwrap();
408 assert!(result.is_some());
409 let (id, record) = result.unwrap();
410 assert_eq!(id, "hill_valley");
411 assert_eq!(
412 record["name"].try_get::<String>().unwrap(),
413 "Hill Valley Bakery"
414 );
415 }
416
417 #[tokio::test]
418 async fn test_get_count() {
419 let csv = test_csv();
420 let table = Table::<Csv, EmptyEntity>::new("product", csv);
421
422 let count = table.data_source().get_table_count(&table).await.unwrap();
423 assert_eq!(count, 5);
424 }
425
426 #[tokio::test]
427 async fn test_write_operations_fail() {
428 let csv = test_csv();
429 let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
430
431 let record = Record::new();
432 assert!(
433 WritableValueSet::insert_value(&table, &"test".to_string(), &record)
434 .await
435 .is_err()
436 );
437 assert!(
438 WritableValueSet::delete(&table, &"test".to_string())
439 .await
440 .is_err()
441 );
442 assert!(WritableValueSet::delete_all(&table).await.is_err());
443 }
444
445 #[tokio::test]
446 async fn test_missing_file() {
447 let csv = test_csv();
448 let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
449
450 let result = table.list_values().await;
451 assert!(result.is_err());
452 }
453}