1use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19 Expression, Expressive, expr_any,
20 traits::associated_expressions::AssociatedExpression,
21 traits::datasource::ExprDataSource,
22 traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::{QueryContext, eval_rows};
32use crate::types::{cbor_to_string, json_to_cbor};
33
34#[async_trait]
35impl TableSource for Cmd {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = CmdCondition;
44 type Source = String;
45
46 fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
47 Ok(CmdCondition::eq(field.to_string(), value.to_string()))
48 }
49
50 fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
51 Ok(CmdCondition::eq(field.to_string(), value))
52 }
53
54 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
55 Column::new(name)
56 }
57
58 fn to_any_column<Type: ColumnType>(
59 &self,
60 column: Self::Column<Type>,
61 ) -> Self::Column<Self::AnyType> {
62 Column::from_column(column)
63 }
64
65 fn convert_any_column<Type: ColumnType>(
66 &self,
67 any_column: Self::Column<Self::AnyType>,
68 ) -> Option<Self::Column<Type>> {
69 Some(Column::from_column(any_column))
70 }
71
72 fn expr(
73 &self,
74 template: impl Into<String>,
75 parameters: Vec<ExpressiveEnum<Self::Value>>,
76 ) -> Expression<Self::Value> {
77 Expression::new(template, parameters)
78 }
79
80 fn search_table_condition<E>(
81 &self,
82 _table: &Table<Self, E>,
83 search_value: &str,
84 ) -> Self::Condition
85 where
86 E: Entity<Self::Value>,
87 {
88 CmdCondition::eq("__search__", json!(search_value).to_string())
92 }
93
94 async fn list_table_values<E>(
95 &self,
96 table: &Table<Self, E>,
97 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
98 where
99 E: Entity<Self::Value>,
100 Self: Sized,
101 {
102 let table_name = table.table_name().to_string();
103 let spec = self.spec_for(&table_name)?.clone();
104 let command = self.effective_command(&spec);
105 let env = self.effective_env(&spec);
106 let pass_path = self.pass_path();
107 let script = spec.script.clone();
108
109 let id_field = table.id_field().map(|c| c.name().to_string());
110
111 let mut conditions: Vec<CmdCondition> = Vec::new();
114 for cond in table.conditions().cloned() {
115 match cond {
116 CmdCondition::Deferred { field, source } => {
117 let resolved = ExprDataSource::execute(self, &source).await?;
118 match resolved {
119 CborValue::Array(values) => {
120 conditions.push(CmdCondition::In { field, values })
121 }
122 other => conditions.push(CmdCondition::Eq {
123 field,
124 value: other,
125 }),
126 }
127 }
128 other => conditions.push(other),
129 }
130 }
131
132 let columns: Vec<String> = table.columns().keys().cloned().collect();
133 let (limit, offset) = match table.pagination() {
134 Some(p) => (Some(p.limit()), Some(p.skip())),
135 None => (None, None),
136 };
137
138 let ctx = QueryContext {
139 conditions: conditions.clone(),
140 columns,
141 limit,
142 offset,
143 id_column: id_field.clone(),
144 };
145
146 let rows: Vec<JsonValue> =
147 tokio::task::spawn_blocking(move || eval_rows(command, env, pass_path, &script, ctx))
148 .await
149 .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
150
151 let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
152 for (idx, row) in rows.into_iter().enumerate() {
153 let mut record = Record::new();
154 match row {
155 JsonValue::Object(map) => {
156 for (k, v) in map {
157 record.insert(k, json_to_cbor(&v));
158 }
159 }
160 other => {
161 record.insert("value".to_string(), json_to_cbor(&other));
162 }
163 }
164 let id = id_field
165 .as_ref()
166 .and_then(|f| record.get(f))
167 .map(cbor_to_string)
168 .filter(|s| !s.is_empty())
169 .unwrap_or_else(|| idx.to_string());
170 records.insert(id, record);
171 }
172
173 records.retain(|_id, record| {
177 conditions.iter().all(|c| match c {
178 CmdCondition::Eq { field, value } => match record.get(field) {
179 Some(rec_val) => rec_val == value,
180 None => true,
181 },
182 _ => true,
183 })
184 });
185
186 Ok(records)
187 }
188
189 async fn get_table_value<E>(
190 &self,
191 table: &Table<Self, E>,
192 id: &Self::Id,
193 ) -> Result<Option<Record<Self::Value>>>
194 where
195 E: Entity<Self::Value>,
196 Self: Sized,
197 {
198 let mut all = self.list_table_values(table).await?;
199 Ok(all.shift_remove(id))
200 }
201
202 async fn get_table_some_value<E>(
203 &self,
204 table: &Table<Self, E>,
205 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
206 where
207 E: Entity<Self::Value>,
208 Self: Sized,
209 {
210 let all = self.list_table_values(table).await?;
211 Ok(all.into_iter().next())
212 }
213
214 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
215 where
216 E: Entity<Self::Value>,
217 Self: Sized,
218 {
219 let all = self.list_table_values(table).await?;
220 Ok(all.len() as i64)
221 }
222
223 async fn get_table_sum<E>(
224 &self,
225 _table: &Table<Self, E>,
226 _column: &Self::Column<Self::AnyType>,
227 ) -> Result<Self::Value>
228 where
229 E: Entity<Self::Value>,
230 Self: Sized,
231 {
232 Err(error!("Aggregations not supported by vantage-cmd"))
233 }
234
235 async fn get_table_max<E>(
236 &self,
237 _table: &Table<Self, E>,
238 _column: &Self::Column<Self::AnyType>,
239 ) -> Result<Self::Value>
240 where
241 E: Entity<Self::Value>,
242 Self: Sized,
243 {
244 Err(error!("Aggregations not supported by vantage-cmd"))
245 }
246
247 async fn get_table_min<E>(
248 &self,
249 _table: &Table<Self, E>,
250 _column: &Self::Column<Self::AnyType>,
251 ) -> Result<Self::Value>
252 where
253 E: Entity<Self::Value>,
254 Self: Sized,
255 {
256 Err(error!("Aggregations not supported by vantage-cmd"))
257 }
258
259 async fn insert_table_value<E>(
260 &self,
261 _table: &Table<Self, E>,
262 _id: &Self::Id,
263 _record: &Record<Self::Value>,
264 ) -> Result<Record<Self::Value>>
265 where
266 E: Entity<Self::Value>,
267 Self: Sized,
268 {
269 Err(error!("vantage-cmd is read-only"))
270 }
271
272 async fn replace_table_value<E>(
273 &self,
274 _table: &Table<Self, E>,
275 _id: &Self::Id,
276 _record: &Record<Self::Value>,
277 ) -> Result<Record<Self::Value>>
278 where
279 E: Entity<Self::Value>,
280 Self: Sized,
281 {
282 Err(error!("vantage-cmd is read-only"))
283 }
284
285 async fn patch_table_value<E>(
286 &self,
287 _table: &Table<Self, E>,
288 _id: &Self::Id,
289 _partial: &Record<Self::Value>,
290 ) -> Result<Record<Self::Value>>
291 where
292 E: Entity<Self::Value>,
293 Self: Sized,
294 {
295 Err(error!("vantage-cmd is read-only"))
296 }
297
298 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
299 where
300 E: Entity<Self::Value>,
301 Self: Sized,
302 {
303 Err(error!("vantage-cmd is read-only"))
304 }
305
306 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
307 where
308 E: Entity<Self::Value>,
309 Self: Sized,
310 {
311 Err(error!("vantage-cmd is read-only"))
312 }
313
314 async fn insert_table_return_id_value<E>(
315 &self,
316 _table: &Table<Self, E>,
317 _record: &Record<Self::Value>,
318 ) -> Result<Self::Id>
319 where
320 E: Entity<Self::Value>,
321 Self: Sized,
322 {
323 Err(error!("vantage-cmd is read-only"))
324 }
325
326 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
327 &self,
328 target_field: &str,
329 source_table: &Table<Self, SourceE>,
330 source_column: &str,
331 ) -> Self::Condition
332 where
333 Self: Sized,
334 {
335 let src_col = self.create_column::<Self::AnyType>(source_column);
338 let values_expr = self.column_table_values_expr(source_table, &src_col);
339 CmdCondition::Deferred {
340 field: target_field.to_string(),
341 source: values_expr.expr(),
342 }
343 }
344
345 fn column_table_values_expr<'a, E, Type: ColumnType>(
346 &'a self,
347 table: &Table<Self, E>,
348 column: &Self::Column<Type>,
349 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
350 where
351 E: Entity<Self::Value> + 'static,
352 Self: Sized,
353 {
354 let table_clone = table.clone();
355 let col = column.name().to_string();
356 let cmd = self.clone();
357
358 let inner = expr_any!("{}", {
359 DeferredFn::new(move || {
360 let cmd = cmd.clone();
361 let table = table_clone.clone();
362 let col = col.clone();
363 Box::pin(async move {
364 let records = cmd.list_table_values(&table).await?;
365 let values: Vec<CborValue> = records
366 .values()
367 .filter_map(|r| r.get(&col).cloned())
368 .collect();
369 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
370 })
371 })
372 });
373
374 let expr = expr_any!("{}", { self.defer(inner) });
375 AssociatedExpression::new(expr, self)
376 }
377}