Skip to main content

vantage_cmd/
table_source.rs

1//! `TableSource` impl for `Cmd`.
2//!
3//! The one interesting method is `list_table_values`: it resolves any
4//! deferred (relation) conditions, hands the read context to the Rhai
5//! script on a blocking thread (which builds the argv, runs the locked
6//! command, and parses the output), then keys the resulting rows by their
7//! id field and applies a client-side `Eq` filter as a safety net.
8//!
9//! Read-only: writes and aggregations error, exactly like `vantage-aws`.
10
11use async_trait::async_trait;
12use ciborium::Value as CborValue;
13use indexmap::IndexMap;
14use serde_json::{Value as JsonValue, json};
15
16use vantage_core::error;
17use vantage_dataset::traits::Result;
18use vantage_expressions::{
19    Expression, Expressive, expr_any,
20    traits::associated_expressions::AssociatedExpression,
21    traits::datasource::ExprDataSource,
22    traits::expressive::{DeferredFn, ExpressiveEnum},
23};
24use vantage_table::column::core::{Column, ColumnType};
25use vantage_table::table::Table;
26use vantage_table::traits::table_source::TableSource;
27use vantage_types::{Entity, Record};
28
29use crate::cmd::Cmd;
30use crate::condition::CmdCondition;
31use crate::rhai_engine::{QueryContext, eval_rows};
32use crate::types::{cbor_to_string, json_to_cbor};
33
34#[async_trait]
35impl TableSource for Cmd {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = CmdCondition;
44    type Source = String;
45
46    fn eq_condition(field: &str, value: &str) -> Result<Self::Condition> {
47        Ok(CmdCondition::eq(field.to_string(), value.to_string()))
48    }
49
50    fn eq_value_condition(&self, field: &str, value: Self::Value) -> Result<Self::Condition> {
51        Ok(CmdCondition::eq(field.to_string(), value))
52    }
53
54    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
55        Column::new(name)
56    }
57
58    fn to_any_column<Type: ColumnType>(
59        &self,
60        column: Self::Column<Type>,
61    ) -> Self::Column<Self::AnyType> {
62        Column::from_column(column)
63    }
64
65    fn convert_any_column<Type: ColumnType>(
66        &self,
67        any_column: Self::Column<Self::AnyType>,
68    ) -> Option<Self::Column<Type>> {
69        Some(Column::from_column(any_column))
70    }
71
72    fn expr(
73        &self,
74        template: impl Into<String>,
75        parameters: Vec<ExpressiveEnum<Self::Value>>,
76    ) -> Expression<Self::Value> {
77        Expression::new(template, parameters)
78    }
79
80    fn search_table_condition<E>(
81        &self,
82        _table: &Table<Self, E>,
83        search_value: &str,
84    ) -> Self::Condition
85    where
86        E: Entity<Self::Value>,
87    {
88        // The script decides how to use a `__search__` condition (e.g.
89        // CloudWatch's `--filter-pattern`); there's no generic field set
90        // at this layer.
91        CmdCondition::eq("__search__", json!(search_value).to_string())
92    }
93
94    async fn list_table_values<E>(
95        &self,
96        table: &Table<Self, E>,
97    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
98    where
99        E: Entity<Self::Value>,
100        Self: Sized,
101    {
102        let table_name = table.table_name().to_string();
103        let spec = self.spec_for(&table_name)?.clone();
104        let command = self.effective_command(&spec);
105        let env = self.effective_env(&spec);
106        let pass_path = self.pass_path();
107        let base_dir = self.base_dir();
108        let script = spec.script.clone();
109
110        let id_field = table.id_field().map(|c| c.name().to_string());
111
112        // Resolve Deferred (relation) conditions to concrete Eq/In before
113        // anything reaches the script — the command never sees a subquery.
114        let mut conditions: Vec<CmdCondition> = Vec::new();
115        for cond in table.conditions().cloned() {
116            match cond {
117                CmdCondition::Deferred { field, source } => {
118                    let resolved = ExprDataSource::execute(self, &source).await?;
119                    match resolved {
120                        CborValue::Array(values) => {
121                            conditions.push(CmdCondition::In { field, values })
122                        }
123                        other => conditions.push(CmdCondition::Eq {
124                            field,
125                            value: other,
126                        }),
127                    }
128                }
129                other => conditions.push(other),
130            }
131        }
132
133        let columns: Vec<String> = table.columns().keys().cloned().collect();
134        let (limit, offset) = match table.pagination() {
135            Some(p) => (Some(p.limit()), Some(p.skip())),
136            None => (None, None),
137        };
138
139        let ctx = QueryContext {
140            conditions: conditions.clone(),
141            columns,
142            limit,
143            offset,
144            id_column: id_field.clone(),
145        };
146
147        let rows: Vec<JsonValue> = tokio::task::spawn_blocking(move || {
148            eval_rows(command, env, pass_path, base_dir, &script, ctx)
149        })
150        .await
151        .map_err(|e| error!("command task failed to join", detail = e.to_string()))??;
152
153        let mut records: IndexMap<String, Record<CborValue>> = IndexMap::new();
154        for (idx, row) in rows.into_iter().enumerate() {
155            let mut record = Record::new();
156            match row {
157                JsonValue::Object(map) => {
158                    for (k, v) in map {
159                        record.insert(k, json_to_cbor(&v));
160                    }
161                }
162                other => {
163                    record.insert("value".to_string(), json_to_cbor(&other));
164                }
165            }
166            let id = id_field
167                .as_ref()
168                .and_then(|f| record.get(f))
169                .map(cbor_to_string)
170                .filter(|s| !s.is_empty())
171                .unwrap_or_else(|| idx.to_string());
172            records.insert(id, record);
173        }
174
175        // Client-side safety net: re-apply `Eq` conditions naming real
176        // record fields. Fields the script consumed as request flags won't
177        // appear on the rows and are left alone.
178        records.retain(|_id, record| {
179            conditions.iter().all(|c| match c {
180                CmdCondition::Eq { field, value } => match record.get(field) {
181                    Some(rec_val) => rec_val == value,
182                    None => true,
183                },
184                _ => true,
185            })
186        });
187
188        Ok(records)
189    }
190
191    async fn get_table_value<E>(
192        &self,
193        table: &Table<Self, E>,
194        id: &Self::Id,
195    ) -> Result<Option<Record<Self::Value>>>
196    where
197        E: Entity<Self::Value>,
198        Self: Sized,
199    {
200        let mut all = self.list_table_values(table).await?;
201        Ok(all.shift_remove(id))
202    }
203
204    async fn get_table_some_value<E>(
205        &self,
206        table: &Table<Self, E>,
207    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
208    where
209        E: Entity<Self::Value>,
210        Self: Sized,
211    {
212        let all = self.list_table_values(table).await?;
213        Ok(all.into_iter().next())
214    }
215
216    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
217    where
218        E: Entity<Self::Value>,
219        Self: Sized,
220    {
221        let all = self.list_table_values(table).await?;
222        Ok(all.len() as i64)
223    }
224
225    async fn get_table_sum<E>(
226        &self,
227        _table: &Table<Self, E>,
228        _column: &Self::Column<Self::AnyType>,
229    ) -> Result<Self::Value>
230    where
231        E: Entity<Self::Value>,
232        Self: Sized,
233    {
234        Err(error!("Aggregations not supported by vantage-cmd"))
235    }
236
237    async fn get_table_max<E>(
238        &self,
239        _table: &Table<Self, E>,
240        _column: &Self::Column<Self::AnyType>,
241    ) -> Result<Self::Value>
242    where
243        E: Entity<Self::Value>,
244        Self: Sized,
245    {
246        Err(error!("Aggregations not supported by vantage-cmd"))
247    }
248
249    async fn get_table_min<E>(
250        &self,
251        _table: &Table<Self, E>,
252        _column: &Self::Column<Self::AnyType>,
253    ) -> Result<Self::Value>
254    where
255        E: Entity<Self::Value>,
256        Self: Sized,
257    {
258        Err(error!("Aggregations not supported by vantage-cmd"))
259    }
260
261    async fn insert_table_value<E>(
262        &self,
263        _table: &Table<Self, E>,
264        _id: &Self::Id,
265        _record: &Record<Self::Value>,
266    ) -> Result<Record<Self::Value>>
267    where
268        E: Entity<Self::Value>,
269        Self: Sized,
270    {
271        Err(error!("vantage-cmd is read-only"))
272    }
273
274    async fn replace_table_value<E>(
275        &self,
276        _table: &Table<Self, E>,
277        _id: &Self::Id,
278        _record: &Record<Self::Value>,
279    ) -> Result<Record<Self::Value>>
280    where
281        E: Entity<Self::Value>,
282        Self: Sized,
283    {
284        Err(error!("vantage-cmd is read-only"))
285    }
286
287    async fn patch_table_value<E>(
288        &self,
289        _table: &Table<Self, E>,
290        _id: &Self::Id,
291        _partial: &Record<Self::Value>,
292    ) -> Result<Record<Self::Value>>
293    where
294        E: Entity<Self::Value>,
295        Self: Sized,
296    {
297        Err(error!("vantage-cmd is read-only"))
298    }
299
300    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
301    where
302        E: Entity<Self::Value>,
303        Self: Sized,
304    {
305        Err(error!("vantage-cmd is read-only"))
306    }
307
308    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
309    where
310        E: Entity<Self::Value>,
311        Self: Sized,
312    {
313        Err(error!("vantage-cmd is read-only"))
314    }
315
316    async fn insert_table_return_id_value<E>(
317        &self,
318        _table: &Table<Self, E>,
319        _record: &Record<Self::Value>,
320    ) -> Result<Self::Id>
321    where
322        E: Entity<Self::Value>,
323        Self: Sized,
324    {
325        Err(error!("vantage-cmd is read-only"))
326    }
327
328    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
329        &self,
330        target_field: &str,
331        source_table: &Table<Self, SourceE>,
332        source_column: &str,
333    ) -> Self::Condition
334    where
335        Self: Sized,
336    {
337        // "target_field IN (subquery)" as a Deferred condition, resolved
338        // at read time in `list_table_values`. Same shape as vantage-aws.
339        let src_col = self.create_column::<Self::AnyType>(source_column);
340        let values_expr = self.column_table_values_expr(source_table, &src_col);
341        CmdCondition::Deferred {
342            field: target_field.to_string(),
343            source: values_expr.expr(),
344        }
345    }
346
347    fn column_table_values_expr<'a, E, Type: ColumnType>(
348        &'a self,
349        table: &Table<Self, E>,
350        column: &Self::Column<Type>,
351    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
352    where
353        E: Entity<Self::Value> + 'static,
354        Self: Sized,
355    {
356        let table_clone = table.clone();
357        let col = column.name().to_string();
358        let cmd = self.clone();
359
360        let inner = expr_any!("{}", {
361            DeferredFn::new(move || {
362                let cmd = cmd.clone();
363                let table = table_clone.clone();
364                let col = col.clone();
365                Box::pin(async move {
366                    let records = cmd.list_table_values(&table).await?;
367                    let values: Vec<CborValue> = records
368                        .values()
369                        .filter_map(|r| r.get(&col).cloned())
370                        .collect();
371                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
372                })
373            })
374        });
375
376        let expr = expr_any!("{}", { self.defer(inner) });
377        AssociatedExpression::new(expr, self)
378    }
379}