Skip to main content

vantage_aws/impls/
table_source.rs

1//! `TableSource` impl for `AwsAccount`.
2//!
3//! Protocol-agnostic — `execute_rpc` and `parse_records` (in
4//! `crate::dispatch`) pick the wire protocol from the table-name
5//! prefix. Read-only in v0. Writes are stubbed to error. Aggregations
6//! (sum/min/max) likewise. The two interesting methods are
7//! `list_table_values` (folds conditions into a request body and
8//! parses the response) and `column_table_values_expr` (returns a
9//! deferred expression over the column's values — same shape as
10//! `vantage-csv`). `related_in_condition` builds on top of that to
11//! make `with_one` / `with_many` work for cross-resource navigation.
12
13use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21    Expression, Expressive, expr_any,
22    traits::associated_expressions::AssociatedExpression,
23    traits::datasource::ExprDataSource,
24    traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = AwsCondition;
44
45    fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
46        Ok(AwsCondition::eq(field.to_string(), value.to_string()))
47    }
48
49    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
50        Column::new(name)
51    }
52
53    fn to_any_column<Type: ColumnType>(
54        &self,
55        column: Self::Column<Type>,
56    ) -> Self::Column<Self::AnyType> {
57        Column::from_column(column)
58    }
59
60    fn convert_any_column<Type: ColumnType>(
61        &self,
62        any_column: Self::Column<Self::AnyType>,
63    ) -> Option<Self::Column<Type>> {
64        Some(Column::from_column(any_column))
65    }
66
67    fn expr(
68        &self,
69        template: impl Into<String>,
70        parameters: Vec<ExpressiveEnum<Self::Value>>,
71    ) -> Expression<Self::Value> {
72        Expression::new(template, parameters)
73    }
74
75    fn search_table_condition<E>(
76        &self,
77        _table: &Table<Self, E>,
78        search_value: &str,
79    ) -> Self::Condition
80    where
81        E: Entity<Self::Value>,
82    {
83        // No notion of which field is "searchable" at this layer.
84        // Models that want full-text search add their own Eq on the
85        // service-specific field (e.g. CloudWatch's filterPattern).
86        AwsCondition::eq("__search__", json!(search_value).to_string())
87    }
88
89    async fn list_table_values<E>(
90        &self,
91        table: &Table<Self, E>,
92    ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
93    where
94        E: Entity<Self::Value>,
95        Self: Sized,
96    {
97        let id_field = table.id_field().map(|c| c.name().to_string());
98        let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
99        let resp = self.execute_rpc(table.table_name(), &conditions).await?;
100        let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
101
102        // AWS APIs only push down their own request-param filters
103        // (e.g. `PathPrefix`, `logGroupNamePrefix`, `cluster`). Eq
104        // conditions naming actual record fields (`UserName`, `Path`,
105        // `clusterArn`) are still useful — apply them post-hoc so
106        // narrowing works regardless of what the API filters
107        // server-side. Field names that don't appear on any record are
108        // assumed to be request params and skipped.
109        records.retain(|_id, record| {
110            conditions.iter().all(|c| match c {
111                AwsCondition::Eq { field, value } => match record.get(field) {
112                    Some(rec_val) => rec_val == value,
113                    None => true,
114                },
115                _ => true,
116            })
117        });
118
119        Ok(records)
120    }
121
122    async fn get_table_value<E>(
123        &self,
124        table: &Table<Self, E>,
125        id: &Self::Id,
126    ) -> DatasetResult<Option<Record<Self::Value>>>
127    where
128        E: Entity<Self::Value>,
129        Self: Sized,
130    {
131        // No native point-get for most JSON-1.1 APIs — list with the
132        // table's conditions and pluck. Same honest cost as Mongo's
133        // get-by-id without an index.
134        let mut all = self.list_table_values(table).await?;
135        Ok(all.shift_remove(id))
136    }
137
138    async fn get_table_some_value<E>(
139        &self,
140        table: &Table<Self, E>,
141    ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
142    where
143        E: Entity<Self::Value>,
144        Self: Sized,
145    {
146        let all = self.list_table_values(table).await?;
147        Ok(all.into_iter().next())
148    }
149
150    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
151    where
152        E: Entity<Self::Value>,
153        Self: Sized,
154    {
155        let all = self.list_table_values(table).await?;
156        Ok(all.len() as i64)
157    }
158
159    async fn get_table_sum<E>(
160        &self,
161        _table: &Table<Self, E>,
162        _column: &Self::Column<Self::AnyType>,
163    ) -> DatasetResult<Self::Value>
164    where
165        E: Entity<Self::Value>,
166        Self: Sized,
167    {
168        Err(error!("Aggregations not supported by vantage-aws"))
169    }
170
171    async fn get_table_max<E>(
172        &self,
173        _table: &Table<Self, E>,
174        _column: &Self::Column<Self::AnyType>,
175    ) -> DatasetResult<Self::Value>
176    where
177        E: Entity<Self::Value>,
178        Self: Sized,
179    {
180        Err(error!("Aggregations not supported by vantage-aws"))
181    }
182
183    async fn get_table_min<E>(
184        &self,
185        _table: &Table<Self, E>,
186        _column: &Self::Column<Self::AnyType>,
187    ) -> DatasetResult<Self::Value>
188    where
189        E: Entity<Self::Value>,
190        Self: Sized,
191    {
192        Err(error!("Aggregations not supported by vantage-aws"))
193    }
194
195    async fn insert_table_value<E>(
196        &self,
197        _table: &Table<Self, E>,
198        _id: &Self::Id,
199        _record: &Record<Self::Value>,
200    ) -> DatasetResult<Record<Self::Value>>
201    where
202        E: Entity<Self::Value>,
203        Self: Sized,
204    {
205        Err(error!("vantage-aws is read-only in v0"))
206    }
207
208    async fn replace_table_value<E>(
209        &self,
210        _table: &Table<Self, E>,
211        _id: &Self::Id,
212        _record: &Record<Self::Value>,
213    ) -> DatasetResult<Record<Self::Value>>
214    where
215        E: Entity<Self::Value>,
216        Self: Sized,
217    {
218        Err(error!("vantage-aws is read-only in v0"))
219    }
220
221    async fn patch_table_value<E>(
222        &self,
223        _table: &Table<Self, E>,
224        _id: &Self::Id,
225        _partial: &Record<Self::Value>,
226    ) -> DatasetResult<Record<Self::Value>>
227    where
228        E: Entity<Self::Value>,
229        Self: Sized,
230    {
231        Err(error!("vantage-aws is read-only in v0"))
232    }
233
234    async fn delete_table_value<E>(
235        &self,
236        _table: &Table<Self, E>,
237        _id: &Self::Id,
238    ) -> DatasetResult<()>
239    where
240        E: Entity<Self::Value>,
241        Self: Sized,
242    {
243        Err(error!("vantage-aws is read-only in v0"))
244    }
245
246    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
247    where
248        E: Entity<Self::Value>,
249        Self: Sized,
250    {
251        Err(error!("vantage-aws is read-only in v0"))
252    }
253
254    async fn insert_table_return_id_value<E>(
255        &self,
256        _table: &Table<Self, E>,
257        _record: &Record<Self::Value>,
258    ) -> DatasetResult<Self::Id>
259    where
260        E: Entity<Self::Value>,
261        Self: Sized,
262    {
263        Err(error!("vantage-aws is read-only in v0"))
264    }
265
266    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
267        &self,
268        target_field: &str,
269        source_table: &Table<Self, SourceE>,
270        source_column: &str,
271    ) -> Self::Condition
272    where
273        Self: Sized,
274    {
275        // Build "target_field IN (subquery)" as a Deferred condition:
276        // at execute time, the embedded expression runs the source
277        // query, projects `source_column`, and we apply the same
278        // take-1-or-error rule as any other Deferred. AWS doesn't
279        // accept multi-value filters, so traversal is implicitly
280        // single-parent — multi-row sources error loudly.
281        let src_col = self.create_column::<Self::AnyType>(source_column);
282        let values_expr = self.column_table_values_expr(source_table, &src_col);
283        AwsCondition::Deferred {
284            field: target_field.to_string(),
285            source: values_expr.expr(),
286        }
287    }
288
289    fn column_table_values_expr<'a, E, Type: ColumnType>(
290        &'a self,
291        table: &Table<Self, E>,
292        column: &Self::Column<Type>,
293    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
294    where
295        E: Entity<Self::Value> + 'static,
296        Self: Sized,
297    {
298        // Same shape as `vantage-csv`'s impl: wrap a `DeferredFn` that
299        // runs `list_table_values` and projects the named column at
300        // execute time. Caller goes through `AssociatedExpression::get`
301        // for direct execution, or `.expr()` to embed in a bigger
302        // expression (e.g. a deferred condition on another table).
303        let table_clone = table.clone();
304        let col = column.name().to_string();
305        let aws = self.clone();
306
307        let inner = expr_any!("{}", {
308            DeferredFn::new(move || {
309                let aws = aws.clone();
310                let table = table_clone.clone();
311                let col = col.clone();
312                Box::pin(async move {
313                    let records = aws.list_table_values(&table).await?;
314                    let values: Vec<CborValue> = records
315                        .values()
316                        .filter_map(|r| r.get(&col).cloned())
317                        .collect();
318                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
319                })
320            })
321        });
322
323        let expr = expr_any!("{}", { self.defer(inner) });
324        AssociatedExpression::new(expr, self)
325    }
326}