1use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19 Expression, Expressive, expr_any,
20 traits::associated_expressions::AssociatedExpression,
21 traits::datasource::ExprDataSource,
22 traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::QueryContext;
32use crate::types::{cbor_to_string, json_to_cbor};
33
34fn rows_to_records(
37 rows: Vec<JsonValue>,
38 id_field: Option<&str>,
39) -> IndexMap<String, Record<CborValue>> {
40 let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
41 for (idx, row) in rows.into_iter().enumerate() {
42 let mut record = Record::new();
43 match row {
44 JsonValue::Object(map) => {
45 for (k, v) in map {
46 record.insert(k, json_to_cbor(&v));
47 }
48 }
49 other => {
50 record.insert("value".to_string(), json_to_cbor(&other));
51 }
52 }
53 let id = id_field
54 .and_then(|f| record.get(f))
55 .map(cbor_to_string)
56 .filter(|s| !s.is_empty())
57 .unwrap_or_else(|| idx.to_string());
58 records.insert(id, record);
59 }
60 records
61}
62
63impl Cmd {
64 pub async fn get_table_value_with_row<E>(
68 &self,
69 table: &Table<Self, E>,
70 id: &String,
71 row: &Record<CborValue>,
72 ) -> Result<Option<Record<CborValue>>>
73 where
74 E: Entity<CborValue>,
75 Self: Sized,
76 {
77 let table_name = table.table_name().to_string();
78 if !self.has_detail_script(&table_name) {
79 let mut all = self.list_table_values(table).await?;
80 return Ok(all.shift_remove(id));
81 }
82
83 let id_field = table.id_field().map(|c| c.name().to_string());
84 let columns: Vec<String> = table.columns().keys().cloned().collect();
85 let row_map = ciborium::value::Value::Map(
86 row.iter()
87 .map(|(k, v)| (ciborium::value::Value::Text(k.clone()), v.clone()))
88 .collect(),
89 );
90 let ctx = QueryContext {
91 conditions: Vec::new(),
92 columns,
93 limit: None,
94 offset: None,
95 id_column: id_field.clone(),
96 id: Some(id.clone()),
97 row: row_map,
98 };
99 let cmd = self.clone();
100 let name = table_name.clone();
101 let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
102 let compiled = cmd
103 .compiled_detail_script(&name)?
104 .ok_or_else(|| error!("detail script vanished"))?;
105 compiled.eval(ctx)
106 })
107 .await
108 .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
109
110 let mut records = rows_to_records(rows, id_field.as_deref());
111 Ok(records
113 .shift_remove(id)
114 .or_else(|| records.into_iter().next().map(|(_, r)| r)))
115 }
116}
117
118#[async_trait]
119impl TableSource for Cmd {
120 type Column<Type>
121 = Column<Type>
122 where
123 Type: ColumnType;
124 type AnyType = CborValue;
125 type Value = CborValue;
126 type Id = String;
127 type Condition = CmdCondition;
128 type Source = String;
129
130 fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
131 Ok(CmdCondition::eq(field.to_string(), value.to_string()))
132 }
133
134 fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
135 Ok(CmdCondition::eq(field.to_string(), value))
136 }
137
138 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
139 Column::new(name)
140 }
141
142 fn to_any_column<Type: ColumnType>(
143 &self,
144 column: Self::Column<Type>,
145 ) -> Self::Column<Self::AnyType> {
146 Column::from_column(column)
147 }
148
149 fn convert_any_column<Type: ColumnType>(
150 &self,
151 any_column: Self::Column<Self::AnyType>,
152 ) -> Option<Self::Column<Type>> {
153 Some(Column::from_column(any_column))
154 }
155
156 fn expr(
157 &self,
158 template: impl Into<String>,
159 parameters: Vec<ExpressiveEnum<Self::Value>>,
160 ) -> Expression<Self::Value> {
161 Expression::new(template, parameters)
162 }
163
164 fn search_table_condition<E>(
165 &self,
166 _table: &Table<Self, E>,
167 search_value: &str,
168 ) -> Self::Condition
169 where
170 E: Entity<Self::Value>,
171 {
172 CmdCondition::eq("__search__", json!(search_value).to_string())
176 }
177
178 async fn list_table_values<E>(
179 &self,
180 table: &Table<Self, E>,
181 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
182 where
183 E: Entity<Self::Value>,
184 Self: Sized,
185 {
186 let table_name = table.table_name().to_string();
187
188 let id_field = table.id_field().map(|c| c.name().to_string());
189
190 let mut conditions: Vec<CmdCondition> = Vec::new();
193 for cond in table.conditions().cloned() {
194 match cond {
195 CmdCondition::Deferred { field, source } => {
196 let resolved = ExprDataSource::execute(self, &source).await?;
197 match resolved {
198 CborValue::Array(values) => {
199 conditions.push(CmdCondition::In { field, values })
200 }
201 other => conditions.push(CmdCondition::Eq {
202 field,
203 value: other,
204 }),
205 }
206 }
207 other => conditions.push(other),
208 }
209 }
210
211 let columns: Vec<String> = table.columns().keys().cloned().collect();
212 let (limit, offset) = match table.pagination() {
213 Some(p) => (Some(p.limit()), Some(p.skip())),
214 None => (None, None),
215 };
216
217 let ctx = QueryContext {
218 conditions: conditions.clone(),
219 columns,
220 limit,
221 offset,
222 id_column: id_field.clone(),
223 id: None,
224 row: ciborium::value::Value::Map(vec![]),
225 };
226
227 let cmd = self.clone();
228 let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
229 let compiled = cmd.compiled_list_script(&table_name)?;
230 compiled.eval(ctx)
231 })
232 .await
233 .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
234
235 let mut records = rows_to_records(rows, id_field.as_deref());
236
237 records.retain(|_id, record| {
241 conditions.iter().all(|c| match c {
242 CmdCondition::Eq { field, value } => match record.get(field) {
243 Some(rec_val) => rec_val == value,
244 None => true,
245 },
246 _ => true,
247 })
248 });
249
250 Ok(records)
251 }
252
253 async fn get_table_value<E>(
254 &self,
255 table: &Table<Self, E>,
256 id: &Self::Id,
257 ) -> Result<Option<Record<Self::Value>>>
258 where
259 E: Entity<Self::Value>,
260 Self: Sized,
261 {
262 let empty: Record<CborValue> = Record::new();
264 self.get_table_value_with_row(table, id, &empty).await
265 }
266
267 async fn get_table_some_value<E>(
268 &self,
269 table: &Table<Self, E>,
270 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
271 where
272 E: Entity<Self::Value>,
273 Self: Sized,
274 {
275 let all = self.list_table_values(table).await?;
276 Ok(all.into_iter().next())
277 }
278
279 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
280 where
281 E: Entity<Self::Value>,
282 Self: Sized,
283 {
284 let all = self.list_table_values(table).await?;
285 Ok(all.len() as i64)
286 }
287
288 async fn get_table_sum<E>(
289 &self,
290 _table: &Table<Self, E>,
291 _column: &Self::Column<Self::AnyType>,
292 ) -> Result<Self::Value>
293 where
294 E: Entity<Self::Value>,
295 Self: Sized,
296 {
297 Err(error!("Aggregations not supported by vantage-cmd"))
298 }
299
300 async fn get_table_max<E>(
301 &self,
302 _table: &Table<Self, E>,
303 _column: &Self::Column<Self::AnyType>,
304 ) -> Result<Self::Value>
305 where
306 E: Entity<Self::Value>,
307 Self: Sized,
308 {
309 Err(error!("Aggregations not supported by vantage-cmd"))
310 }
311
312 async fn get_table_min<E>(
313 &self,
314 _table: &Table<Self, E>,
315 _column: &Self::Column<Self::AnyType>,
316 ) -> Result<Self::Value>
317 where
318 E: Entity<Self::Value>,
319 Self: Sized,
320 {
321 Err(error!("Aggregations not supported by vantage-cmd"))
322 }
323
324 async fn insert_table_value<E>(
325 &self,
326 _table: &Table<Self, E>,
327 _id: &Self::Id,
328 _record: &Record<Self::Value>,
329 ) -> Result<Record<Self::Value>>
330 where
331 E: Entity<Self::Value>,
332 Self: Sized,
333 {
334 Err(error!("vantage-cmd is read-only"))
335 }
336
337 async fn replace_table_value<E>(
338 &self,
339 _table: &Table<Self, E>,
340 _id: &Self::Id,
341 _record: &Record<Self::Value>,
342 ) -> Result<Record<Self::Value>>
343 where
344 E: Entity<Self::Value>,
345 Self: Sized,
346 {
347 Err(error!("vantage-cmd is read-only"))
348 }
349
350 async fn patch_table_value<E>(
351 &self,
352 _table: &Table<Self, E>,
353 _id: &Self::Id,
354 _partial: &Record<Self::Value>,
355 ) -> Result<Record<Self::Value>>
356 where
357 E: Entity<Self::Value>,
358 Self: Sized,
359 {
360 Err(error!("vantage-cmd is read-only"))
361 }
362
363 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
364 where
365 E: Entity<Self::Value>,
366 Self: Sized,
367 {
368 Err(error!("vantage-cmd is read-only"))
369 }
370
371 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
372 where
373 E: Entity<Self::Value>,
374 Self: Sized,
375 {
376 Err(error!("vantage-cmd is read-only"))
377 }
378
379 async fn insert_table_return_id_value<E>(
380 &self,
381 _table: &Table<Self, E>,
382 _record: &Record<Self::Value>,
383 ) -> Result<Self::Id>
384 where
385 E: Entity<Self::Value>,
386 Self: Sized,
387 {
388 Err(error!("vantage-cmd is read-only"))
389 }
390
391 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
392 &self,
393 target_field: &str,
394 source_table: &Table<Self, SourceE>,
395 source_column: &str,
396 ) -> Self::Condition
397 where
398 Self: Sized,
399 {
400 let src_col = self.create_column::<Self::AnyType>(source_column);
403 let values_expr = self.column_table_values_expr(source_table, &src_col);
404 CmdCondition::Deferred {
405 field: target_field.to_string(),
406 source: values_expr.expr(),
407 }
408 }
409
410 fn column_table_values_expr<'a, E, Type: ColumnType>(
411 &'a self,
412 table: &Table<Self, E>,
413 column: &Self::Column<Type>,
414 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
415 where
416 E: Entity<Self::Value> + 'static,
417 Self: Sized,
418 {
419 let table_clone = table.clone();
420 let col = column.name().to_string();
421 let cmd = self.clone();
422
423 let inner = expr_any!("{}", {
424 DeferredFn::new(move || {
425 let cmd = cmd.clone();
426 let table = table_clone.clone();
427 let col = col.clone();
428 Box::pin(async move {
429 let records = cmd.list_table_values(&table).await?;
430 let values: Vec<CborValue> = records
431 .values()
432 .filter_map(|r| r.get(&col).cloned())
433 .collect();
434 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
435 })
436 })
437 });
438
439 let expr = expr_any!("{}", { self.defer(inner) });
440 AssociatedExpression::new(expr, self)
441 }
442}