1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24 type Column<Type>
25 = Column<Type>
26 where
27 Type: ColumnType;
28 type AnyType = AnyCsvType;
29 type Value = AnyCsvType;
30 type Id = String;
31 type Condition = vantage_expressions::Expression<Self::Value>;
32 type Source = String;
33
34 fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
35 let column: Column<AnyCsvType> = Column::new(field);
36 Ok(CsvOperation::eq(&column, value))
37 }
38
39 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
40 Column::new(name)
41 }
42
43 fn to_any_column<Type: ColumnType>(
44 &self,
45 column: Self::Column<Type>,
46 ) -> Self::Column<Self::AnyType> {
47 Column::from_column(column)
48 }
49
50 fn convert_any_column<Type: ColumnType>(
51 &self,
52 any_column: Self::Column<Self::AnyType>,
53 ) -> Option<Self::Column<Type>> {
54 Some(Column::from_column(any_column))
55 }
56
57 fn expr(
58 &self,
59 template: impl Into<String>,
60 parameters: Vec<ExpressiveEnum<Self::Value>>,
61 ) -> Expression<Self::Value> {
62 Expression::new(template, parameters)
63 }
64
65 fn search_table_condition<E>(
66 &self,
67 _table: &Table<Self, E>,
68 search_value: &str,
69 ) -> Expression<Self::Value>
70 where
71 E: Entity<Self::Value>,
72 {
73 Expression::new(format!("SEARCH '{}'", search_value), vec![])
74 }
75
76 async fn list_table_values<E>(
77 &self,
78 table: &Table<Self, E>,
79 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
80 where
81 E: Entity<Self::Value>,
82 Self: Sized,
83 {
84 let mut records = self.read_csv(table.table_name(), table.columns())?;
85
86 for condition in table.conditions() {
87 records = apply_condition(records, condition).await?;
88 }
89
90 Ok(records)
91 }
92
93 async fn get_table_value<E>(
94 &self,
95 table: &Table<Self, E>,
96 id: &Self::Id,
97 ) -> Result<Option<Record<Self::Value>>>
98 where
99 E: Entity<Self::Value>,
100 Self: Sized,
101 {
102 let records = self.read_csv(table.table_name(), table.columns())?;
103 Ok(records.get(id).cloned())
104 }
105
106 async fn get_table_some_value<E>(
107 &self,
108 table: &Table<Self, E>,
109 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
110 where
111 E: Entity<Self::Value>,
112 Self: Sized,
113 {
114 let records = self.read_csv(table.table_name(), table.columns())?;
115 Ok(records.into_iter().next())
116 }
117
118 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
119 where
120 E: Entity<Self::Value>,
121 Self: Sized,
122 {
123 let records = self.read_csv(table.table_name(), table.columns())?;
124 Ok(records.len() as i64)
125 }
126
127 async fn get_table_sum<E>(
128 &self,
129 _table: &Table<Self, E>,
130 _column: &Self::Column<Self::AnyType>,
131 ) -> Result<Self::Value>
132 where
133 E: Entity<Self::Value>,
134 Self: Sized,
135 {
136 Err(error!("Sum not implemented for CSV backend"))
137 }
138
139 async fn get_table_max<E>(
140 &self,
141 _table: &Table<Self, E>,
142 _column: &Self::Column<Self::AnyType>,
143 ) -> Result<Self::Value>
144 where
145 E: Entity<Self::Value>,
146 Self: Sized,
147 {
148 Err(error!("Max not implemented for CSV backend"))
149 }
150
151 async fn get_table_min<E>(
152 &self,
153 _table: &Table<Self, E>,
154 _column: &Self::Column<Self::AnyType>,
155 ) -> Result<Self::Value>
156 where
157 E: Entity<Self::Value>,
158 Self: Sized,
159 {
160 Err(error!("Min not implemented for CSV backend"))
161 }
162
163 async fn insert_table_value<E>(
164 &self,
165 _table: &Table<Self, E>,
166 _id: &Self::Id,
167 _record: &Record<Self::Value>,
168 ) -> Result<Record<Self::Value>>
169 where
170 E: Entity<Self::Value>,
171 Self: Sized,
172 {
173 Err(error!("CSV is a read-only data source"))
174 }
175
176 async fn replace_table_value<E>(
177 &self,
178 _table: &Table<Self, E>,
179 _id: &Self::Id,
180 _record: &Record<Self::Value>,
181 ) -> Result<Record<Self::Value>>
182 where
183 E: Entity<Self::Value>,
184 Self: Sized,
185 {
186 Err(error!("CSV is a read-only data source"))
187 }
188
189 async fn patch_table_value<E>(
190 &self,
191 _table: &Table<Self, E>,
192 _id: &Self::Id,
193 _partial: &Record<Self::Value>,
194 ) -> Result<Record<Self::Value>>
195 where
196 E: Entity<Self::Value>,
197 Self: Sized,
198 {
199 Err(error!("CSV is a read-only data source"))
200 }
201
202 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
203 where
204 E: Entity<Self::Value>,
205 Self: Sized,
206 {
207 Err(error!("CSV is a read-only data source"))
208 }
209
210 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
211 where
212 E: Entity<Self::Value>,
213 Self: Sized,
214 {
215 Err(error!("CSV is a read-only data source"))
216 }
217
218 async fn insert_table_return_id_value<E>(
219 &self,
220 _table: &Table<Self, E>,
221 _record: &Record<Self::Value>,
222 ) -> Result<Self::Id>
223 where
224 E: Entity<Self::Value>,
225 Self: Sized,
226 {
227 Err(error!("CSV is a read-only data source"))
228 }
229
230 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
231 &self,
232 target_field: &str,
233 source_table: &Table<Self, SourceE>,
234 source_column: &str,
235 ) -> Self::Condition
236 where
237 Self: Sized,
238 {
239 let src_col = self.create_column::<Self::AnyType>(source_column);
240 let fk_values = self.column_table_values_expr(source_table, &src_col);
241 let tgt_col = self.create_column::<Self::AnyType>(target_field);
242 tgt_col.in_(fk_values.expr())
243 }
244
245 fn column_table_values_expr<'a, E, Type: ColumnType>(
246 &'a self,
247 table: &Table<Self, E>,
248 column: &Self::Column<Type>,
249 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
250 where
251 E: Entity<Self::Value> + 'static,
252 Self: Sized,
253 {
254 use vantage_expressions::{
255 expr_any,
256 traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
257 };
258
259 let table_clone = table.clone();
260 let col = column.name().to_string();
261 let csv = self.clone();
262
263 let inner = expr_any!("{}", {
264 DeferredFn::new(move || {
265 let csv = csv.clone();
266 let table = table_clone.clone();
267 let col = col.clone();
268 Box::pin(async move {
269 let records = csv.list_table_values(&table).await?;
270 let values: Vec<AnyCsvType> = records
271 .values()
272 .filter_map(|r| r.get(&col).cloned())
273 .collect();
274 Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
275 })
276 })
277 });
278
279 let expr = expr_any!("{}", { self.defer(inner) });
280 AssociatedExpression::new(expr, self)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::type_system::CsvTypeVariants;
288 use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
289 use vantage_types::EmptyEntity;
290
291 fn test_csv() -> Csv {
292 Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
293 }
294
295 #[tokio::test]
296 async fn test_list_bakery() {
297 let csv = test_csv();
298 let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
299 .with_column_of::<String>("name")
300 .with_column_of::<i64>("profit_margin");
301
302 let values = table.list_values().await.unwrap();
303 assert_eq!(values.len(), 1);
304 assert!(values.contains_key("hill_valley"));
305
306 let bakery = &values["hill_valley"];
307 let name = bakery["name"].try_get::<String>().unwrap();
308 assert_eq!(name, "Hill Valley Bakery");
309
310 let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
311 assert_eq!(profit, 15);
312 }
313
314 #[tokio::test]
315 async fn test_list_clients() {
316 let csv = test_csv();
317 let table = Table::<Csv, EmptyEntity>::new("client", csv)
318 .with_column_of::<String>("name")
319 .with_column_of::<String>("email")
320 .with_column_of::<bool>("is_paying_client")
321 .with_column_of::<serde_json::Value>("metadata");
322
323 let values = table.list_values().await.unwrap();
324 assert_eq!(values.len(), 3);
325
326 let marty = &values["marty"];
327 assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
328 assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
329
330 let biff = &values["biff"];
331 assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
332 assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
333 }
334
335 #[tokio::test]
336 async fn test_list_products_typed() {
337 let csv = test_csv();
338 let table = Table::<Csv, EmptyEntity>::new("product", csv)
339 .with_column_of::<String>("name")
340 .with_column_of::<i64>("calories")
341 .with_column_of::<i64>("price")
342 .with_column_of::<bool>("is_deleted")
343 .with_column_of::<serde_json::Value>("inventory");
344
345 let values = table.list_values().await.unwrap();
346 assert_eq!(values.len(), 5);
347
348 let cupcake = &values["flux_cupcake"];
349 assert_eq!(
350 cupcake["name"].try_get::<String>().unwrap(),
351 "Flux Capacitor Cupcake"
352 );
353 assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
354 assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
355 assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
356
357 let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
358 assert_eq!(inv["stock"], serde_json::json!(50));
359 }
360
361 #[tokio::test]
362 async fn test_untyped_columns_stay_string() {
363 let csv = test_csv();
364 let table = Table::<Csv, EmptyEntity>::new("product", csv);
365
366 let values = table.list_values().await.unwrap();
367 let cupcake = &values["flux_cupcake"];
368 assert_eq!(
369 cupcake["calories"].type_variant(),
370 Some(CsvTypeVariants::String)
371 );
372 assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
373 }
374
375 #[tokio::test]
376 async fn test_get_value_by_id() {
377 let csv = test_csv();
378 let table = Table::<Csv, EmptyEntity>::new("client", csv)
379 .with_column_of::<String>("name")
380 .with_column_of::<String>("email");
381
382 let record = table
383 .get_value(&"doc".to_string())
384 .await
385 .unwrap()
386 .expect("doc exists");
387 assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
388 assert_eq!(
389 record["email"].try_get::<String>().unwrap(),
390 "doc@brown.com"
391 );
392 }
393
394 #[tokio::test]
395 async fn test_get_value_not_found() {
396 let csv = test_csv();
397 let table = Table::<Csv, EmptyEntity>::new("client", csv);
398
399 let result = table.get_value(&"nonexistent".to_string()).await.unwrap();
400 assert!(result.is_none());
401 }
402
403 #[tokio::test]
404 async fn test_get_some_value() {
405 let csv = test_csv();
406 let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
407
408 let result = table.get_some_value().await.unwrap();
409 assert!(result.is_some());
410 let (id, record) = result.unwrap();
411 assert_eq!(id, "hill_valley");
412 assert_eq!(
413 record["name"].try_get::<String>().unwrap(),
414 "Hill Valley Bakery"
415 );
416 }
417
418 #[tokio::test]
419 async fn test_get_count() {
420 let csv = test_csv();
421 let table = Table::<Csv, EmptyEntity>::new("product", csv);
422
423 let count = table.data_source().get_table_count(&table).await.unwrap();
424 assert_eq!(count, 5);
425 }
426
427 #[tokio::test]
428 async fn test_write_operations_fail() {
429 let csv = test_csv();
430 let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
431
432 let record = Record::new();
433 assert!(
434 WritableValueSet::insert_value(&table, &"test".to_string(), &record)
435 .await
436 .is_err()
437 );
438 assert!(
439 WritableValueSet::delete(&table, &"test".to_string())
440 .await
441 .is_err()
442 );
443 assert!(WritableValueSet::delete_all(&table).await.is_err());
444 }
445
446 #[tokio::test]
447 async fn test_missing_file() {
448 let csv = test_csv();
449 let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
450
451 let result = table.list_values().await;
452 assert!(result.is_err());
453 }
454}