1use crate::operation::CsvOperation;
2use async_trait::async_trait;
3use indexmap::IndexMap;
4use vantage_core::error;
5use vantage_dataset::traits::Result;
6use vantage_expressions::Expression;
7use vantage_expressions::Expressive;
8use vantage_expressions::traits::associated_expressions::AssociatedExpression;
9use vantage_expressions::traits::datasource::DataSource;
10use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
11use vantage_table::column::core::{Column, ColumnType};
12use vantage_table::table::Table;
13use vantage_table::traits::table_source::TableSource;
14use vantage_types::{Entity, Record};
15
16use crate::Csv;
17use crate::condition::apply_condition;
18use crate::type_system::AnyCsvType;
19
20impl DataSource for Csv {}
21
22#[async_trait]
23impl TableSource for Csv {
24 type Column<Type>
25 = Column<Type>
26 where
27 Type: ColumnType;
28 type AnyType = AnyCsvType;
29 type Value = AnyCsvType;
30 type Id = String;
31 type Condition = vantage_expressions::Expression<Self::Value>;
32
33 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
34 Column::new(name)
35 }
36
37 fn to_any_column<Type: ColumnType>(
38 &self,
39 column: Self::Column<Type>,
40 ) -> Self::Column<Self::AnyType> {
41 Column::from_column(column)
42 }
43
44 fn convert_any_column<Type: ColumnType>(
45 &self,
46 any_column: Self::Column<Self::AnyType>,
47 ) -> Option<Self::Column<Type>> {
48 Some(Column::from_column(any_column))
49 }
50
51 fn expr(
52 &self,
53 template: impl Into<String>,
54 parameters: Vec<ExpressiveEnum<Self::Value>>,
55 ) -> Expression<Self::Value> {
56 Expression::new(template, parameters)
57 }
58
59 fn search_table_condition<E>(
60 &self,
61 _table: &Table<Self, E>,
62 search_value: &str,
63 ) -> Expression<Self::Value>
64 where
65 E: Entity<Self::Value>,
66 {
67 Expression::new(format!("SEARCH '{}'", search_value), vec![])
68 }
69
70 async fn list_table_values<E>(
71 &self,
72 table: &Table<Self, E>,
73 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
74 where
75 E: Entity<Self::Value>,
76 Self: Sized,
77 {
78 let mut records = self.read_csv(table.table_name(), table.columns())?;
79
80 for condition in table.conditions() {
81 records = apply_condition(records, condition).await?;
82 }
83
84 Ok(records)
85 }
86
87 async fn get_table_value<E>(
88 &self,
89 table: &Table<Self, E>,
90 id: &Self::Id,
91 ) -> Result<Record<Self::Value>>
92 where
93 E: Entity<Self::Value>,
94 Self: Sized,
95 {
96 let records = self.read_csv(table.table_name(), table.columns())?;
97 records
98 .get(id)
99 .cloned()
100 .ok_or_else(|| error!("Record not found", id = id))
101 }
102
103 async fn get_table_some_value<E>(
104 &self,
105 table: &Table<Self, E>,
106 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
107 where
108 E: Entity<Self::Value>,
109 Self: Sized,
110 {
111 let records = self.read_csv(table.table_name(), table.columns())?;
112 Ok(records.into_iter().next())
113 }
114
115 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
116 where
117 E: Entity<Self::Value>,
118 Self: Sized,
119 {
120 let records = self.read_csv(table.table_name(), table.columns())?;
121 Ok(records.len() as i64)
122 }
123
124 async fn get_table_sum<E>(
125 &self,
126 _table: &Table<Self, E>,
127 _column: &Self::Column<Self::AnyType>,
128 ) -> Result<Self::Value>
129 where
130 E: Entity<Self::Value>,
131 Self: Sized,
132 {
133 Err(error!("Sum not implemented for CSV backend"))
134 }
135
136 async fn get_table_max<E>(
137 &self,
138 _table: &Table<Self, E>,
139 _column: &Self::Column<Self::AnyType>,
140 ) -> Result<Self::Value>
141 where
142 E: Entity<Self::Value>,
143 Self: Sized,
144 {
145 Err(error!("Max not implemented for CSV backend"))
146 }
147
148 async fn get_table_min<E>(
149 &self,
150 _table: &Table<Self, E>,
151 _column: &Self::Column<Self::AnyType>,
152 ) -> Result<Self::Value>
153 where
154 E: Entity<Self::Value>,
155 Self: Sized,
156 {
157 Err(error!("Min not implemented for CSV backend"))
158 }
159
160 async fn insert_table_value<E>(
161 &self,
162 _table: &Table<Self, E>,
163 _id: &Self::Id,
164 _record: &Record<Self::Value>,
165 ) -> Result<Record<Self::Value>>
166 where
167 E: Entity<Self::Value>,
168 Self: Sized,
169 {
170 Err(error!("CSV is a read-only data source"))
171 }
172
173 async fn replace_table_value<E>(
174 &self,
175 _table: &Table<Self, E>,
176 _id: &Self::Id,
177 _record: &Record<Self::Value>,
178 ) -> Result<Record<Self::Value>>
179 where
180 E: Entity<Self::Value>,
181 Self: Sized,
182 {
183 Err(error!("CSV is a read-only data source"))
184 }
185
186 async fn patch_table_value<E>(
187 &self,
188 _table: &Table<Self, E>,
189 _id: &Self::Id,
190 _partial: &Record<Self::Value>,
191 ) -> Result<Record<Self::Value>>
192 where
193 E: Entity<Self::Value>,
194 Self: Sized,
195 {
196 Err(error!("CSV is a read-only data source"))
197 }
198
199 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
200 where
201 E: Entity<Self::Value>,
202 Self: Sized,
203 {
204 Err(error!("CSV is a read-only data source"))
205 }
206
207 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
208 where
209 E: Entity<Self::Value>,
210 Self: Sized,
211 {
212 Err(error!("CSV is a read-only data source"))
213 }
214
215 async fn insert_table_return_id_value<E>(
216 &self,
217 _table: &Table<Self, E>,
218 _record: &Record<Self::Value>,
219 ) -> Result<Self::Id>
220 where
221 E: Entity<Self::Value>,
222 Self: Sized,
223 {
224 Err(error!("CSV is a read-only data source"))
225 }
226
227 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
228 &self,
229 target_field: &str,
230 source_table: &Table<Self, SourceE>,
231 source_column: &str,
232 ) -> Self::Condition
233 where
234 Self: Sized,
235 {
236 let src_col = self.create_column::<Self::AnyType>(source_column);
237 let fk_values = self.column_table_values_expr(source_table, &src_col);
238 let tgt_col = self.create_column::<Self::AnyType>(target_field);
239 tgt_col.in_(fk_values.expr())
240 }
241
242 fn column_table_values_expr<'a, E, Type: ColumnType>(
243 &'a self,
244 table: &Table<Self, E>,
245 column: &Self::Column<Type>,
246 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
247 where
248 E: Entity<Self::Value> + 'static,
249 Self: Sized,
250 {
251 use vantage_expressions::{
252 expr_any,
253 traits::{associated_expressions::AssociatedExpression, datasource::ExprDataSource},
254 };
255
256 let table_clone = table.clone();
257 let col = column.name().to_string();
258 let csv = self.clone();
259
260 let inner = expr_any!("{}", {
261 DeferredFn::new(move || {
262 let csv = csv.clone();
263 let table = table_clone.clone();
264 let col = col.clone();
265 Box::pin(async move {
266 let records = csv.list_table_values(&table).await?;
267 let values: Vec<AnyCsvType> = records
268 .values()
269 .filter_map(|r| r.get(&col).cloned())
270 .collect();
271 Ok(ExpressiveEnum::Scalar(AnyCsvType::new(values)))
272 })
273 })
274 });
275
276 let expr = expr_any!("{}", { self.defer(inner) });
277 AssociatedExpression::new(expr, self)
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::type_system::CsvTypeVariants;
285 use vantage_dataset::prelude::{ReadableValueSet, WritableValueSet};
286 use vantage_types::EmptyEntity;
287
288 fn test_csv() -> Csv {
289 Csv::new(format!("{}/data", env!("CARGO_MANIFEST_DIR")))
290 }
291
292 #[tokio::test]
293 async fn test_list_bakery() {
294 let csv = test_csv();
295 let table = Table::<Csv, EmptyEntity>::new("bakery", csv)
296 .with_column_of::<String>("name")
297 .with_column_of::<i64>("profit_margin");
298
299 let values = table.list_values().await.unwrap();
300 assert_eq!(values.len(), 1);
301 assert!(values.contains_key("hill_valley"));
302
303 let bakery = &values["hill_valley"];
304 let name = bakery["name"].try_get::<String>().unwrap();
305 assert_eq!(name, "Hill Valley Bakery");
306
307 let profit = bakery["profit_margin"].try_get::<i64>().unwrap();
308 assert_eq!(profit, 15);
309 }
310
311 #[tokio::test]
312 async fn test_list_clients() {
313 let csv = test_csv();
314 let table = Table::<Csv, EmptyEntity>::new("client", csv)
315 .with_column_of::<String>("name")
316 .with_column_of::<String>("email")
317 .with_column_of::<bool>("is_paying_client")
318 .with_column_of::<serde_json::Value>("metadata");
319
320 let values = table.list_values().await.unwrap();
321 assert_eq!(values.len(), 3);
322
323 let marty = &values["marty"];
324 assert_eq!(marty["name"].try_get::<String>().unwrap(), "Marty McFly");
325 assert!(marty["is_paying_client"].try_get::<bool>().unwrap());
326
327 let biff = &values["biff"];
328 assert!(!biff["is_paying_client"].try_get::<bool>().unwrap());
329 assert_eq!(biff["metadata"].type_variant(), Some(CsvTypeVariants::Json));
330 }
331
332 #[tokio::test]
333 async fn test_list_products_typed() {
334 let csv = test_csv();
335 let table = Table::<Csv, EmptyEntity>::new("product", csv)
336 .with_column_of::<String>("name")
337 .with_column_of::<i64>("calories")
338 .with_column_of::<i64>("price")
339 .with_column_of::<bool>("is_deleted")
340 .with_column_of::<serde_json::Value>("inventory");
341
342 let values = table.list_values().await.unwrap();
343 assert_eq!(values.len(), 5);
344
345 let cupcake = &values["flux_cupcake"];
346 assert_eq!(
347 cupcake["name"].try_get::<String>().unwrap(),
348 "Flux Capacitor Cupcake"
349 );
350 assert_eq!(cupcake["calories"].try_get::<i64>().unwrap(), 300);
351 assert_eq!(cupcake["price"].try_get::<i64>().unwrap(), 120);
352 assert!(!cupcake["is_deleted"].try_get::<bool>().unwrap());
353
354 let inv = cupcake["inventory"].try_get::<serde_json::Value>().unwrap();
355 assert_eq!(inv["stock"], serde_json::json!(50));
356 }
357
358 #[tokio::test]
359 async fn test_untyped_columns_stay_string() {
360 let csv = test_csv();
361 let table = Table::<Csv, EmptyEntity>::new("product", csv);
362
363 let values = table.list_values().await.unwrap();
364 let cupcake = &values["flux_cupcake"];
365 assert_eq!(
366 cupcake["calories"].type_variant(),
367 Some(CsvTypeVariants::String)
368 );
369 assert_eq!(cupcake["calories"].try_get::<String>().unwrap(), "300");
370 }
371
372 #[tokio::test]
373 async fn test_get_value_by_id() {
374 let csv = test_csv();
375 let table = Table::<Csv, EmptyEntity>::new("client", csv)
376 .with_column_of::<String>("name")
377 .with_column_of::<String>("email");
378
379 let record = table.get_value(&"doc".to_string()).await.unwrap();
380 assert_eq!(record["name"].try_get::<String>().unwrap(), "Doc Brown");
381 assert_eq!(
382 record["email"].try_get::<String>().unwrap(),
383 "doc@brown.com"
384 );
385 }
386
387 #[tokio::test]
388 async fn test_get_value_not_found() {
389 let csv = test_csv();
390 let table = Table::<Csv, EmptyEntity>::new("client", csv);
391
392 let result = table.get_value(&"nonexistent".to_string()).await;
393 assert!(result.is_err());
394 }
395
396 #[tokio::test]
397 async fn test_get_some_value() {
398 let csv = test_csv();
399 let table = Table::<Csv, EmptyEntity>::new("bakery", csv).with_column_of::<String>("name");
400
401 let result = table.get_some_value().await.unwrap();
402 assert!(result.is_some());
403 let (id, record) = result.unwrap();
404 assert_eq!(id, "hill_valley");
405 assert_eq!(
406 record["name"].try_get::<String>().unwrap(),
407 "Hill Valley Bakery"
408 );
409 }
410
411 #[tokio::test]
412 async fn test_get_count() {
413 let csv = test_csv();
414 let table = Table::<Csv, EmptyEntity>::new("product", csv);
415
416 let count = table.data_source().get_table_count(&table).await.unwrap();
417 assert_eq!(count, 5);
418 }
419
420 #[tokio::test]
421 async fn test_write_operations_fail() {
422 let csv = test_csv();
423 let table = Table::<Csv, EmptyEntity>::new("bakery", csv);
424
425 let record = Record::new();
426 assert!(
427 WritableValueSet::insert_value(&table, &"test".to_string(), &record)
428 .await
429 .is_err()
430 );
431 assert!(
432 WritableValueSet::delete(&table, &"test".to_string())
433 .await
434 .is_err()
435 );
436 assert!(WritableValueSet::delete_all(&table).await.is_err());
437 }
438
439 #[tokio::test]
440 async fn test_missing_file() {
441 let csv = test_csv();
442 let table = Table::<Csv, EmptyEntity>::new("nonexistent", csv);
443
444 let result = table.list_values().await;
445 assert!(result.is_err());
446 }
447}