Skip to main content

vantage_cmd/
table_source.rs

1//! `TableSource` impl for `Cmd`.
2//!
3//! The one interesting method is `list_table_values`: it resolves any
4//! deferred (relation) conditions, hands the read context to the Rhai
5//! script on a blocking thread (which builds the argv, runs the locked
6//! command, and parses the output), then keys the resulting rows by their
7//! id field and applies a client-side `Eq` filter as a safety net.
8//!
9//! Read-only: writes and aggregations error, exactly like `vantage-aws`.
10
11use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19    Expression, Expressive, expr_any,
20    traits::associated_expressions::AssociatedExpression,
21    traits::datasource::ExprDataSource,
22    traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::QueryContext;
32use crate::types::{cbor_to_string, json_to_cbor};
33
34/// Convert script-produced JSON rows into id-keyed records. The id comes
35/// from the `id_field` value on each row, falling back to the row index.
36fn rows_to_records(
37    rows: Vec<JsonValue>,
38    id_field: Option<&str>,
39) -> IndexMap<String, Record<CborValue>> {
40    let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
41    for (idx, row) in rows.into_iter().enumerate() {
42        let mut record = Record::new();
43        match row {
44            JsonValue::Object(map) => {
45                for (k, v) in map {
46                    record.insert(k, json_to_cbor(&v));
47                }
48            }
49            other => {
50                record.insert("value".to_string(), json_to_cbor(&other));
51            }
52        }
53        let id = id_field
54            .and_then(|f| record.get(f))
55            .map(cbor_to_string)
56            .filter(|s| !s.is_empty())
57            .unwrap_or_else(|| idx.to_string());
58        records.insert(id, record);
59    }
60    records
61}
62
63impl Cmd {
64    /// Detail-script hydration for one id, with the existing list-pass `row`
65    /// injected into the script scope as `row`. Falls back to the normal
66    /// list-and-pick path when the table has no detail script.
67    pub async fn get_table_value_with_row<E>(
68        &self,
69        table: &Table<Self, E>,
70        id: &String,
71        row: &Record<CborValue>,
72    ) -> Result<Option<Record<CborValue>>>
73    where
74        E: Entity<CborValue>,
75        Self: Sized,
76    {
77        let table_name = table.table_name().to_string();
78        if !self.has_detail_script(&table_name) {
79            let mut all = self.list_table_values(table).await?;
80            return Ok(all.shift_remove(id));
81        }
82
83        let id_field = table.id_field().map(|c| c.name().to_string());
84        let columns: Vec<String> = table.columns().keys().cloned().collect();
85        let row_map = ciborium::value::Value::Map(
86            row.iter()
87                .map(|(k, v)| (ciborium::value::Value::Text(k.clone()), v.clone()))
88                .collect(),
89        );
90        let ctx = QueryContext {
91            conditions: Vec::new(),
92            columns,
93            limit: None,
94            offset: None,
95            id_column: id_field.clone(),
96            id: Some(id.clone()),
97            row: row_map,
98        };
99        let cmd = self.clone();
100        let name = table_name.clone();
101        let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
102            let compiled = cmd
103                .compiled_detail_script(&name)?
104                .ok_or_else(|| error!("detail script vanished"))?;
105            compiled.eval(ctx)
106        })
107        .await
108        .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
109
110        let mut records = rows_to_records(rows, id_field.as_deref());
111        // Prefer the row matching the requested id; else the first row.
112        Ok(records
113            .shift_remove(id)
114            .or_else(|| records.into_iter().next().map(|(_, r)| r)))
115    }
116}
117
118#[async_trait]
119impl TableSource for Cmd {
120    type Column<Type>
121        = Column<Type>
122    where
123        Type: ColumnType;
124    type AnyType = CborValue;
125    type Value = CborValue;
126    type Id = String;
127    type Condition = CmdCondition;
128    type Source = String;
129
130    fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
131        Ok(CmdCondition::eq(field.to_string(), value.to_string()))
132    }
133
134    fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
135        Ok(CmdCondition::eq(field.to_string(), value))
136    }
137
138    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
139        Column::new(name)
140    }
141
142    fn to_any_column<Type: ColumnType>(
143        &self,
144        column: Self::Column<Type>,
145    ) -> Self::Column<Self::AnyType> {
146        Column::from_column(column)
147    }
148
149    fn convert_any_column<Type: ColumnType>(
150        &self,
151        any_column: Self::Column<Self::AnyType>,
152    ) -> Option<Self::Column<Type>> {
153        Some(Column::from_column(any_column))
154    }
155
156    fn expr(
157        &self,
158        template: impl Into<String>,
159        parameters: Vec<ExpressiveEnum<Self::Value>>,
160    ) -> Expression<Self::Value> {
161        Expression::new(template, parameters)
162    }
163
164    fn search_table_condition<E>(
165        &self,
166        _table: &Table<Self, E>,
167        search_value: &str,
168    ) -> Self::Condition
169    where
170        E: Entity<Self::Value>,
171    {
172        // The script decides how to use a `__search__` condition (e.g.
173        // CloudWatch's `--filter-pattern`); there's no generic field set
174        // at this layer.
175        CmdCondition::eq("__search__", json!(search_value).to_string())
176    }
177
178    async fn list_table_values<E>(
179        &self,
180        table: &Table<Self, E>,
181    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
182    where
183        E: Entity<Self::Value>,
184        Self: Sized,
185    {
186        let table_name = table.table_name().to_string();
187
188        let id_field = table.id_field().map(|c| c.name().to_string());
189
190        // Resolve Deferred (relation) conditions to concrete Eq/In before
191        // anything reaches the script — the command never sees a subquery.
192        let mut conditions: Vec<CmdCondition> = Vec::new();
193        for cond in table.conditions().cloned() {
194            match cond {
195                CmdCondition::Deferred { field, source } => {
196                    let resolved = ExprDataSource::execute(self, &source).await?;
197                    match resolved {
198                        CborValue::Array(values) => {
199                            conditions.push(CmdCondition::In { field, values })
200                        }
201                        other => conditions.push(CmdCondition::Eq {
202                            field,
203                            value: other,
204                        }),
205                    }
206                }
207                other => conditions.push(other),
208            }
209        }
210
211        let columns: Vec<String> = table.columns().keys().cloned().collect();
212        let (limit, offset) = match table.pagination() {
213            Some(p) => (Some(p.limit()), Some(p.skip())),
214            None => (None, None),
215        };
216
217        let ctx = QueryContext {
218            conditions: conditions.clone(),
219            columns,
220            limit,
221            offset,
222            id_column: id_field.clone(),
223            id: None,
224            row: ciborium::value::Value::Map(vec![]),
225        };
226
227        let cmd = self.clone();
228        let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
229            let compiled = cmd.compiled_list_script(&table_name)?;
230            compiled.eval(ctx)
231        })
232        .await
233        .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
234
235        let mut records = rows_to_records(rows, id_field.as_deref());
236
237        // Client-side safety net: re-apply `Eq` conditions naming real
238        // record fields. Fields the script consumed as request flags won't
239        // appear on the rows and are left alone.
240        records.retain(|_id, record| {
241            conditions.iter().all(|c| match c {
242                CmdCondition::Eq { field, value } => match record.get(field) {
243                    Some(rec_val) => rec_val == value,
244                    None => true,
245                },
246                _ => true,
247            })
248        });
249
250        Ok(records)
251    }
252
253    async fn get_table_value<E>(
254        &self,
255        table: &Table<Self, E>,
256        id: &Self::Id,
257    ) -> Result<Option<Record<Self::Value>>>
258    where
259        E: Entity<Self::Value>,
260        Self: Sized,
261    {
262        // Detail hydration with no caller-supplied row (id-only path).
263        let empty: Record<CborValue> = Record::new();
264        self.get_table_value_with_row(table, id, &empty).await
265    }
266
267    async fn get_table_some_value<E>(
268        &self,
269        table: &Table<Self, E>,
270    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
271    where
272        E: Entity<Self::Value>,
273        Self: Sized,
274    {
275        let all = self.list_table_values(table).await?;
276        Ok(all.into_iter().next())
277    }
278
279    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
280    where
281        E: Entity<Self::Value>,
282        Self: Sized,
283    {
284        let all = self.list_table_values(table).await?;
285        Ok(all.len() as i64)
286    }
287
288    async fn get_table_sum<E>(
289        &self,
290        _table: &Table<Self, E>,
291        _column: &Self::Column<Self::AnyType>,
292    ) -> Result<Self::Value>
293    where
294        E: Entity<Self::Value>,
295        Self: Sized,
296    {
297        Err(error!("Aggregations not supported by vantage-cmd"))
298    }
299
300    async fn get_table_max<E>(
301        &self,
302        _table: &Table<Self, E>,
303        _column: &Self::Column<Self::AnyType>,
304    ) -> Result<Self::Value>
305    where
306        E: Entity<Self::Value>,
307        Self: Sized,
308    {
309        Err(error!("Aggregations not supported by vantage-cmd"))
310    }
311
312    async fn get_table_min<E>(
313        &self,
314        _table: &Table<Self, E>,
315        _column: &Self::Column<Self::AnyType>,
316    ) -> Result<Self::Value>
317    where
318        E: Entity<Self::Value>,
319        Self: Sized,
320    {
321        Err(error!("Aggregations not supported by vantage-cmd"))
322    }
323
324    async fn insert_table_value<E>(
325        &self,
326        _table: &Table<Self, E>,
327        _id: &Self::Id,
328        _record: &Record<Self::Value>,
329    ) -> Result<Record<Self::Value>>
330    where
331        E: Entity<Self::Value>,
332        Self: Sized,
333    {
334        Err(error!("vantage-cmd is read-only"))
335    }
336
337    async fn replace_table_value<E>(
338        &self,
339        _table: &Table<Self, E>,
340        _id: &Self::Id,
341        _record: &Record<Self::Value>,
342    ) -> Result<Record<Self::Value>>
343    where
344        E: Entity<Self::Value>,
345        Self: Sized,
346    {
347        Err(error!("vantage-cmd is read-only"))
348    }
349
350    async fn patch_table_value<E>(
351        &self,
352        _table: &Table<Self, E>,
353        _id: &Self::Id,
354        _partial: &Record<Self::Value>,
355    ) -> Result<Record<Self::Value>>
356    where
357        E: Entity<Self::Value>,
358        Self: Sized,
359    {
360        Err(error!("vantage-cmd is read-only"))
361    }
362
363    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
364    where
365        E: Entity<Self::Value>,
366        Self: Sized,
367    {
368        Err(error!("vantage-cmd is read-only"))
369    }
370
371    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
372    where
373        E: Entity<Self::Value>,
374        Self: Sized,
375    {
376        Err(error!("vantage-cmd is read-only"))
377    }
378
379    async fn insert_table_return_id_value<E>(
380        &self,
381        _table: &Table<Self, E>,
382        _record: &Record<Self::Value>,
383    ) -> Result<Self::Id>
384    where
385        E: Entity<Self::Value>,
386        Self: Sized,
387    {
388        Err(error!("vantage-cmd is read-only"))
389    }
390
391    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
392        &self,
393        target_field: &str,
394        source_table: &Table<Self, SourceE>,
395        source_column: &str,
396    ) -> Self::Condition
397    where
398        Self: Sized,
399    {
400        // "target_field IN (subquery)" as a Deferred condition, resolved
401        // at read time in `list_table_values`. Same shape as vantage-aws.
402        let src_col = self.create_column::<Self::AnyType>(source_column);
403        let values_expr = self.column_table_values_expr(source_table, &src_col);
404        CmdCondition::Deferred {
405            field: target_field.to_string(),
406            source: values_expr.expr(),
407        }
408    }
409
410    fn column_table_values_expr<'a, E, Type: ColumnType>(
411        &'a self,
412        table: &Table<Self, E>,
413        column: &Self::Column<Type>,
414    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
415    where
416        E: Entity<Self::Value> + 'static,
417        Self: Sized,
418    {
419        let table_clone = table.clone();
420        let col = column.name().to_string();
421        let cmd = self.clone();
422
423        let inner = expr_any!("{}", {
424            DeferredFn::new(move || {
425                let cmd = cmd.clone();
426                let table = table_clone.clone();
427                let col = col.clone();
428                Box::pin(async move {
429                    let records = cmd.list_table_values(&table).await?;
430                    let values: Vec<CborValue> = records
431                        .values()
432                        .filter_map(|r| r.get(&col).cloned())
433                        .collect();
434                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
435                })
436            })
437        });
438
439        let expr = expr_any!("{}", { self.defer(inner) });
440        AssociatedExpression::new(expr, self)
441    }
442}