Skip to main content

vantage_cmd/
table_source.rs

1//! `TableSource` impl for `Cmd`.
2//!
3//! The one interesting method is `list_table_values`: it resolves any
4//! deferred (relation) conditions, hands the read context to the Rhai
5//! script on a blocking thread (which builds the argv, runs the locked
6//! command, and parses the output), then keys the resulting rows by their
7//! id field and applies a client-side `Eq` filter as a safety net.
8//!
9//! Read-only: writes and aggregations error, exactly like `vantage-aws`.
10
11use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19    Expression, Expressive, expr_any,
20    traits::associated_expressions::AssociatedExpression,
21    traits::datasource::ExprDataSource,
22    traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::{QueryContext, eval_rows};
32use crate::types::{cbor_to_string, json_to_cbor};
33
34#[async_trait]
35impl TableSource for Cmd {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = CmdCondition;
44    type Source = String;
45
46    fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
47        Ok(CmdCondition::eq(field.to_string(), value.to_string()))
48    }
49
50    fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
51        Ok(CmdCondition::eq(field.to_string(), value))
52    }
53
54    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
55        Column::new(name)
56    }
57
58    fn to_any_column<Type: ColumnType>(
59        &self,
60        column: Self::Column<Type>,
61    ) -> Self::Column<Self::AnyType> {
62        Column::from_column(column)
63    }
64
65    fn convert_any_column<Type: ColumnType>(
66        &self,
67        any_column: Self::Column<Self::AnyType>,
68    ) -> Option<Self::Column<Type>> {
69        Some(Column::from_column(any_column))
70    }
71
72    fn expr(
73        &self,
74        template: impl Into<String>,
75        parameters: Vec<ExpressiveEnum<Self::Value>>,
76    ) -> Expression<Self::Value> {
77        Expression::new(template, parameters)
78    }
79
80    fn search_table_condition<E>(
81        &self,
82        _table: &Table<Self, E>,
83        search_value: &str,
84    ) -> Self::Condition
85    where
86        E: Entity<Self::Value>,
87    {
88        // The script decides how to use a `__search__` condition (e.g.
89        // CloudWatch's `--filter-pattern`); there's no generic field set
90        // at this layer.
91        CmdCondition::eq("__search__", json!(search_value).to_string())
92    }
93
94    async fn list_table_values<E>(
95        &self,
96        table: &Table<Self, E>,
97    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
98    where
99        E: Entity<Self::Value>,
100        Self: Sized,
101    {
102        let table_name = table.table_name().to_string();
103        let spec = self.spec_for(&table_name)?.clone();
104        let command = self.effective_command(&spec);
105        let env = self.effective_env(&spec);
106        let pass_path = self.pass_path();
107        let script = spec.script.clone();
108
109        let id_field = table.id_field().map(|c| c.name().to_string());
110
111        // Resolve Deferred (relation) conditions to concrete Eq/In before
112        // anything reaches the script — the command never sees a subquery.
113        let mut conditions: Vec<CmdCondition> = Vec::new();
114        for cond in table.conditions().cloned() {
115            match cond {
116                CmdCondition::Deferred { field, source } => {
117                    let resolved = ExprDataSource::execute(self, &source).await?;
118                    match resolved {
119                        CborValue::Array(values) => {
120                            conditions.push(CmdCondition::In { field, values })
121                        }
122                        other => conditions.push(CmdCondition::Eq {
123                            field,
124                            value: other,
125                        }),
126                    }
127                }
128                other => conditions.push(other),
129            }
130        }
131
132        let columns: Vec<String> = table.columns().keys().cloned().collect();
133        let (limit, offset) = match table.pagination() {
134            Some(p) => (Some(p.limit()), Some(p.skip())),
135            None => (None, None),
136        };
137
138        let ctx = QueryContext {
139            conditions: conditions.clone(),
140            columns,
141            limit,
142            offset,
143            id_column: id_field.clone(),
144        };
145
146        let rows: Vec<JsonValue> =
147            tokio::task::spawn_blocking(move || eval_rows(command, env, pass_path, &script, ctx))
148                .await
149                .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
150
151        let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
152        for (idx, row) in rows.into_iter().enumerate() {
153            let mut record = Record::new();
154            match row {
155                JsonValue::Object(map) => {
156                    for (k, v) in map {
157                        record.insert(k, json_to_cbor(&v));
158                    }
159                }
160                other => {
161                    record.insert("value".to_string(), json_to_cbor(&other));
162                }
163            }
164            let id = id_field
165                .as_ref()
166                .and_then(|f| record.get(f))
167                .map(cbor_to_string)
168                .filter(|s| !s.is_empty())
169                .unwrap_or_else(|| idx.to_string());
170            records.insert(id, record);
171        }
172
173        // Client-side safety net: re-apply `Eq` conditions naming real
174        // record fields. Fields the script consumed as request flags won't
175        // appear on the rows and are left alone.
176        records.retain(|_id, record| {
177            conditions.iter().all(|c| match c {
178                CmdCondition::Eq { field, value } => match record.get(field) {
179                    Some(rec_val) => rec_val == value,
180                    None => true,
181                },
182                _ => true,
183            })
184        });
185
186        Ok(records)
187    }
188
189    async fn get_table_value<E>(
190        &self,
191        table: &Table<Self, E>,
192        id: &Self::Id,
193    ) -> Result<Option<Record<Self::Value>>>
194    where
195        E: Entity<Self::Value>,
196        Self: Sized,
197    {
198        let mut all = self.list_table_values(table).await?;
199        Ok(all.shift_remove(id))
200    }
201
202    async fn get_table_some_value<E>(
203        &self,
204        table: &Table<Self, E>,
205    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
206    where
207        E: Entity<Self::Value>,
208        Self: Sized,
209    {
210        let all = self.list_table_values(table).await?;
211        Ok(all.into_iter().next())
212    }
213
214    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
215    where
216        E: Entity<Self::Value>,
217        Self: Sized,
218    {
219        let all = self.list_table_values(table).await?;
220        Ok(all.len() as i64)
221    }
222
223    async fn get_table_sum<E>(
224        &self,
225        _table: &Table<Self, E>,
226        _column: &Self::Column<Self::AnyType>,
227    ) -> Result<Self::Value>
228    where
229        E: Entity<Self::Value>,
230        Self: Sized,
231    {
232        Err(error!("Aggregations not supported by vantage-cmd"))
233    }
234
235    async fn get_table_max<E>(
236        &self,
237        _table: &Table<Self, E>,
238        _column: &Self::Column<Self::AnyType>,
239    ) -> Result<Self::Value>
240    where
241        E: Entity<Self::Value>,
242        Self: Sized,
243    {
244        Err(error!("Aggregations not supported by vantage-cmd"))
245    }
246
247    async fn get_table_min<E>(
248        &self,
249        _table: &Table<Self, E>,
250        _column: &Self::Column<Self::AnyType>,
251    ) -> Result<Self::Value>
252    where
253        E: Entity<Self::Value>,
254        Self: Sized,
255    {
256        Err(error!("Aggregations not supported by vantage-cmd"))
257    }
258
259    async fn insert_table_value<E>(
260        &self,
261        _table: &Table<Self, E>,
262        _id: &Self::Id,
263        _record: &Record<Self::Value>,
264    ) -> Result<Record<Self::Value>>
265    where
266        E: Entity<Self::Value>,
267        Self: Sized,
268    {
269        Err(error!("vantage-cmd is read-only"))
270    }
271
272    async fn replace_table_value<E>(
273        &self,
274        _table: &Table<Self, E>,
275        _id: &Self::Id,
276        _record: &Record<Self::Value>,
277    ) -> Result<Record<Self::Value>>
278    where
279        E: Entity<Self::Value>,
280        Self: Sized,
281    {
282        Err(error!("vantage-cmd is read-only"))
283    }
284
285    async fn patch_table_value<E>(
286        &self,
287        _table: &Table<Self, E>,
288        _id: &Self::Id,
289        _partial: &Record<Self::Value>,
290    ) -> Result<Record<Self::Value>>
291    where
292        E: Entity<Self::Value>,
293        Self: Sized,
294    {
295        Err(error!("vantage-cmd is read-only"))
296    }
297
298    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
299    where
300        E: Entity<Self::Value>,
301        Self: Sized,
302    {
303        Err(error!("vantage-cmd is read-only"))
304    }
305
306    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
307    where
308        E: Entity<Self::Value>,
309        Self: Sized,
310    {
311        Err(error!("vantage-cmd is read-only"))
312    }
313
314    async fn insert_table_return_id_value<E>(
315        &self,
316        _table: &Table<Self, E>,
317        _record: &Record<Self::Value>,
318    ) -> Result<Self::Id>
319    where
320        E: Entity<Self::Value>,
321        Self: Sized,
322    {
323        Err(error!("vantage-cmd is read-only"))
324    }
325
326    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
327        &self,
328        target_field: &str,
329        source_table: &Table<Self, SourceE>,
330        source_column: &str,
331    ) -> Self::Condition
332    where
333        Self: Sized,
334    {
335        // "target_field IN (subquery)" as a Deferred condition, resolved
336        // at read time in `list_table_values`. Same shape as vantage-aws.
337        let src_col = self.create_column::<Self::AnyType>(source_column);
338        let values_expr = self.column_table_values_expr(source_table, &src_col);
339        CmdCondition::Deferred {
340            field: target_field.to_string(),
341            source: values_expr.expr(),
342        }
343    }
344
345    fn column_table_values_expr<'a, E, Type: ColumnType>(
346        &'a self,
347        table: &Table<Self, E>,
348        column: &Self::Column<Type>,
349    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
350    where
351        E: Entity<Self::Value> + 'static,
352        Self: Sized,
353    {
354        let table_clone = table.clone();
355        let col = column.name().to_string();
356        let cmd = self.clone();
357
358        let inner = expr_any!("{}", {
359            DeferredFn::new(move || {
360                let cmd = cmd.clone();
361                let table = table_clone.clone();
362                let col = col.clone();
363                Box::pin(async move {
364                    let records = cmd.list_table_values(&table).await?;
365                    let values: Vec<CborValue> = records
366                        .values()
367                        .filter_map(|r| r.get(&col).cloned())
368                        .collect();
369                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
370                })
371            })
372        });
373
374        let expr = expr_any!("{}", { self.defer(inner) });
375        AssociatedExpression::new(expr, self)
376    }
377}