1use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19 Expression, Expressive, expr_any,
20 traits::associated_expressions::AssociatedExpression,
21 traits::datasource::ExprDataSource,
22 traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::{QueryContext, eval_rows};
32use crate::types::{cbor_to_string, json_to_cbor};
33
34#[async_trait]
35impl TableSource for Cmd {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = CmdCondition;
44 type Source = String;
45
46 fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
47 Ok(CmdCondition::eq(field.to_string(), value.to_string()))
48 }
49
50 fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
51 Ok(CmdCondition::eq(field.to_string(), value))
52 }
53
54 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
55 Column::new(name)
56 }
57
58 fn to_any_column<Type: ColumnType>(
59 &self,
60 column: Self::Column<Type>,
61 ) -> Self::Column<Self::AnyType> {
62 Column::from_column(column)
63 }
64
65 fn convert_any_column<Type: ColumnType>(
66 &self,
67 any_column: Self::Column<Self::AnyType>,
68 ) -> Option<Self::Column<Type>> {
69 Some(Column::from_column(any_column))
70 }
71
72 fn expr(
73 &self,
74 template: impl Into<String>,
75 parameters: Vec<ExpressiveEnum<Self::Value>>,
76 ) -> Expression<Self::Value> {
77 Expression::new(template, parameters)
78 }
79
80 fn search_table_condition<E>(
81 &self,
82 _table: &Table<Self, E>,
83 search_value: &str,
84 ) -> Self::Condition
85 where
86 E: Entity<Self::Value>,
87 {
88 CmdCondition::eq("__search__", json!(search_value).to_string())
92 }
93
94 async fn list_table_values<E>(
95 &self,
96 table: &Table<Self, E>,
97 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
98 where
99 E: Entity<Self::Value>,
100 Self: Sized,
101 {
102 let table_name = table.table_name().to_string();
103 let spec = self.spec_for(&table_name)?.clone();
104 let command = self.effective_command(&spec);
105 let env = self.effective_env(&spec);
106 let pass_path = self.pass_path();
107 let base_dir = self.base_dir();
108 let script = spec.script.clone();
109
110 let id_field = table.id_field().map(|c| c.name().to_string());
111
112 let mut conditions: Vec<CmdCondition> = Vec::new();
115 for cond in table.conditions().cloned() {
116 match cond {
117 CmdCondition::Deferred { field, source } => {
118 let resolved = ExprDataSource::execute(self, &source).await?;
119 match resolved {
120 CborValue::Array(values) => {
121 conditions.push(CmdCondition::In { field, values })
122 }
123 other => conditions.push(CmdCondition::Eq {
124 field,
125 value: other,
126 }),
127 }
128 }
129 other => conditions.push(other),
130 }
131 }
132
133 let columns: Vec<String> = table.columns().keys().cloned().collect();
134 let (limit, offset) = match table.pagination() {
135 Some(p) => (Some(p.limit()), Some(p.skip())),
136 None => (None, None),
137 };
138
139 let ctx = QueryContext {
140 conditions: conditions.clone(),
141 columns,
142 limit,
143 offset,
144 id_column: id_field.clone(),
145 };
146
147 let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
148 eval_rows(command, env, pass_path, base_dir, &script, ctx)
149 })
150 .await
151 .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
152
153 let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
154 for (idx, row) in rows.into_iter().enumerate() {
155 let mut record = Record::new();
156 match row {
157 JsonValue::Object(map) => {
158 for (k, v) in map {
159 record.insert(k, json_to_cbor(&v));
160 }
161 }
162 other => {
163 record.insert("value".to_string(), json_to_cbor(&other));
164 }
165 }
166 let id = id_field
167 .as_ref()
168 .and_then(|f| record.get(f))
169 .map(cbor_to_string)
170 .filter(|s| !s.is_empty())
171 .unwrap_or_else(|| idx.to_string());
172 records.insert(id, record);
173 }
174
175 records.retain(|_id, record| {
179 conditions.iter().all(|c| match c {
180 CmdCondition::Eq { field, value } => match record.get(field) {
181 Some(rec_val) => rec_val == value,
182 None => true,
183 },
184 _ => true,
185 })
186 });
187
188 Ok(records)
189 }
190
191 async fn get_table_value<E>(
192 &self,
193 table: &Table<Self, E>,
194 id: &Self::Id,
195 ) -> Result<Option<Record<Self::Value>>>
196 where
197 E: Entity<Self::Value>,
198 Self: Sized,
199 {
200 let mut all = self.list_table_values(table).await?;
201 Ok(all.shift_remove(id))
202 }
203
204 async fn get_table_some_value<E>(
205 &self,
206 table: &Table<Self, E>,
207 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
208 where
209 E: Entity<Self::Value>,
210 Self: Sized,
211 {
212 let all = self.list_table_values(table).await?;
213 Ok(all.into_iter().next())
214 }
215
216 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
217 where
218 E: Entity<Self::Value>,
219 Self: Sized,
220 {
221 let all = self.list_table_values(table).await?;
222 Ok(all.len() as i64)
223 }
224
225 async fn get_table_sum<E>(
226 &self,
227 _table: &Table<Self, E>,
228 _column: &Self::Column<Self::AnyType>,
229 ) -> Result<Self::Value>
230 where
231 E: Entity<Self::Value>,
232 Self: Sized,
233 {
234 Err(error!("Aggregations not supported by vantage-cmd"))
235 }
236
237 async fn get_table_max<E>(
238 &self,
239 _table: &Table<Self, E>,
240 _column: &Self::Column<Self::AnyType>,
241 ) -> Result<Self::Value>
242 where
243 E: Entity<Self::Value>,
244 Self: Sized,
245 {
246 Err(error!("Aggregations not supported by vantage-cmd"))
247 }
248
249 async fn get_table_min<E>(
250 &self,
251 _table: &Table<Self, E>,
252 _column: &Self::Column<Self::AnyType>,
253 ) -> Result<Self::Value>
254 where
255 E: Entity<Self::Value>,
256 Self: Sized,
257 {
258 Err(error!("Aggregations not supported by vantage-cmd"))
259 }
260
261 async fn insert_table_value<E>(
262 &self,
263 _table: &Table<Self, E>,
264 _id: &Self::Id,
265 _record: &Record<Self::Value>,
266 ) -> Result<Record<Self::Value>>
267 where
268 E: Entity<Self::Value>,
269 Self: Sized,
270 {
271 Err(error!("vantage-cmd is read-only"))
272 }
273
274 async fn replace_table_value<E>(
275 &self,
276 _table: &Table<Self, E>,
277 _id: &Self::Id,
278 _record: &Record<Self::Value>,
279 ) -> Result<Record<Self::Value>>
280 where
281 E: Entity<Self::Value>,
282 Self: Sized,
283 {
284 Err(error!("vantage-cmd is read-only"))
285 }
286
287 async fn patch_table_value<E>(
288 &self,
289 _table: &Table<Self, E>,
290 _id: &Self::Id,
291 _partial: &Record<Self::Value>,
292 ) -> Result<Record<Self::Value>>
293 where
294 E: Entity<Self::Value>,
295 Self: Sized,
296 {
297 Err(error!("vantage-cmd is read-only"))
298 }
299
300 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
301 where
302 E: Entity<Self::Value>,
303 Self: Sized,
304 {
305 Err(error!("vantage-cmd is read-only"))
306 }
307
308 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
309 where
310 E: Entity<Self::Value>,
311 Self: Sized,
312 {
313 Err(error!("vantage-cmd is read-only"))
314 }
315
316 async fn insert_table_return_id_value<E>(
317 &self,
318 _table: &Table<Self, E>,
319 _record: &Record<Self::Value>,
320 ) -> Result<Self::Id>
321 where
322 E: Entity<Self::Value>,
323 Self: Sized,
324 {
325 Err(error!("vantage-cmd is read-only"))
326 }
327
328 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
329 &self,
330 target_field: &str,
331 source_table: &Table<Self, SourceE>,
332 source_column: &str,
333 ) -> Self::Condition
334 where
335 Self: Sized,
336 {
337 let src_col = self.create_column::<Self::AnyType>(source_column);
340 let values_expr = self.column_table_values_expr(source_table, &src_col);
341 CmdCondition::Deferred {
342 field: target_field.to_string(),
343 source: values_expr.expr(),
344 }
345 }
346
347 fn column_table_values_expr<'a, E, Type: ColumnType>(
348 &'a self,
349 table: &Table<Self, E>,
350 column: &Self::Column<Type>,
351 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
352 where
353 E: Entity<Self::Value> + 'static,
354 Self: Sized,
355 {
356 let table_clone = table.clone();
357 let col = column.name().to_string();
358 let cmd = self.clone();
359
360 let inner = expr_any!("{}", {
361 DeferredFn::new(move || {
362 let cmd = cmd.clone();
363 let table = table_clone.clone();
364 let col = col.clone();
365 Box::pin(async move {
366 let records = cmd.list_table_values(&table).await?;
367 let values: Vec<CborValue> = records
368 .values()
369 .filter_map(|r| r.get(&col).cloned())
370 .collect();
371 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
372 })
373 })
374 });
375
376 let expr = expr_any!("{}", { self.defer(inner) });
377 AssociatedExpression::new(expr, self)
378 }
379}