1use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21 Expression, Expressive, expr_any,
22 traits::associated_expressions::AssociatedExpression,
23 traits::datasource::ExprDataSource,
24 traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = AwsCondition;
44
45 fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
46 Ok(AwsCondition::eq(field.to_string(), value.to_string()))
47 }
48
49 fn eq_value_condition(
55 &self,
56 field: &str,
57 value: Self::Value,
58 ) -> DatasetResult<Self::Condition> {
59 Ok(AwsCondition::eq(field.to_string(), value))
60 }
61
62 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
63 Column::new(name)
64 }
65
66 fn to_any_column<Type: ColumnType>(
67 &self,
68 column: Self::Column<Type>,
69 ) -> Self::Column<Self::AnyType> {
70 Column::from_column(column)
71 }
72
73 fn convert_any_column<Type: ColumnType>(
74 &self,
75 any_column: Self::Column<Self::AnyType>,
76 ) -> Option<Self::Column<Type>> {
77 Some(Column::from_column(any_column))
78 }
79
80 fn expr(
81 &self,
82 template: impl Into<String>,
83 parameters: Vec<ExpressiveEnum<Self::Value>>,
84 ) -> Expression<Self::Value> {
85 Expression::new(template, parameters)
86 }
87
88 fn search_table_condition<E>(
89 &self,
90 _table: &Table<Self, E>,
91 search_value: &str,
92 ) -> Self::Condition
93 where
94 E: Entity<Self::Value>,
95 {
96 AwsCondition::eq("__search__", json!(search_value).to_string())
100 }
101
102 async fn list_table_values<E>(
103 &self,
104 table: &Table<Self, E>,
105 ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
106 where
107 E: Entity<Self::Value>,
108 Self: Sized,
109 {
110 let id_field = table.id_field().map(|c| c.name().to_string());
111 let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
112 let resp = self.execute_rpc(table.table_name(), &conditions).await?;
113 let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
114
115 records.retain(|_id, record| {
123 conditions.iter().all(|c| match c {
124 AwsCondition::Eq { field, value } => match record.get(field) {
125 Some(rec_val) => rec_val == value,
126 None => true,
127 },
128 _ => true,
129 })
130 });
131
132 Ok(records)
133 }
134
135 async fn get_table_value<E>(
136 &self,
137 table: &Table<Self, E>,
138 id: &Self::Id,
139 ) -> DatasetResult<Option<Record<Self::Value>>>
140 where
141 E: Entity<Self::Value>,
142 Self: Sized,
143 {
144 let mut all = self.list_table_values(table).await?;
148 Ok(all.shift_remove(id))
149 }
150
151 async fn get_table_some_value<E>(
152 &self,
153 table: &Table<Self, E>,
154 ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
155 where
156 E: Entity<Self::Value>,
157 Self: Sized,
158 {
159 let all = self.list_table_values(table).await?;
160 Ok(all.into_iter().next())
161 }
162
163 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
164 where
165 E: Entity<Self::Value>,
166 Self: Sized,
167 {
168 let all = self.list_table_values(table).await?;
169 Ok(all.len() as i64)
170 }
171
172 async fn get_table_sum<E>(
173 &self,
174 _table: &Table<Self, E>,
175 _column: &Self::Column<Self::AnyType>,
176 ) -> DatasetResult<Self::Value>
177 where
178 E: Entity<Self::Value>,
179 Self: Sized,
180 {
181 Err(error!("Aggregations not supported by vantage-aws"))
182 }
183
184 async fn get_table_max<E>(
185 &self,
186 _table: &Table<Self, E>,
187 _column: &Self::Column<Self::AnyType>,
188 ) -> DatasetResult<Self::Value>
189 where
190 E: Entity<Self::Value>,
191 Self: Sized,
192 {
193 Err(error!("Aggregations not supported by vantage-aws"))
194 }
195
196 async fn get_table_min<E>(
197 &self,
198 _table: &Table<Self, E>,
199 _column: &Self::Column<Self::AnyType>,
200 ) -> DatasetResult<Self::Value>
201 where
202 E: Entity<Self::Value>,
203 Self: Sized,
204 {
205 Err(error!("Aggregations not supported by vantage-aws"))
206 }
207
208 async fn insert_table_value<E>(
209 &self,
210 _table: &Table<Self, E>,
211 _id: &Self::Id,
212 _record: &Record<Self::Value>,
213 ) -> DatasetResult<Record<Self::Value>>
214 where
215 E: Entity<Self::Value>,
216 Self: Sized,
217 {
218 Err(error!("vantage-aws is read-only in v0"))
219 }
220
221 async fn replace_table_value<E>(
222 &self,
223 _table: &Table<Self, E>,
224 _id: &Self::Id,
225 _record: &Record<Self::Value>,
226 ) -> DatasetResult<Record<Self::Value>>
227 where
228 E: Entity<Self::Value>,
229 Self: Sized,
230 {
231 Err(error!("vantage-aws is read-only in v0"))
232 }
233
234 async fn patch_table_value<E>(
235 &self,
236 _table: &Table<Self, E>,
237 _id: &Self::Id,
238 _partial: &Record<Self::Value>,
239 ) -> DatasetResult<Record<Self::Value>>
240 where
241 E: Entity<Self::Value>,
242 Self: Sized,
243 {
244 Err(error!("vantage-aws is read-only in v0"))
245 }
246
247 async fn delete_table_value<E>(
248 &self,
249 _table: &Table<Self, E>,
250 _id: &Self::Id,
251 ) -> DatasetResult<()>
252 where
253 E: Entity<Self::Value>,
254 Self: Sized,
255 {
256 Err(error!("vantage-aws is read-only in v0"))
257 }
258
259 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
260 where
261 E: Entity<Self::Value>,
262 Self: Sized,
263 {
264 Err(error!("vantage-aws is read-only in v0"))
265 }
266
267 async fn insert_table_return_id_value<E>(
268 &self,
269 _table: &Table<Self, E>,
270 _record: &Record<Self::Value>,
271 ) -> DatasetResult<Self::Id>
272 where
273 E: Entity<Self::Value>,
274 Self: Sized,
275 {
276 Err(error!("vantage-aws is read-only in v0"))
277 }
278
279 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
280 &self,
281 target_field: &str,
282 source_table: &Table<Self, SourceE>,
283 source_column: &str,
284 ) -> Self::Condition
285 where
286 Self: Sized,
287 {
288 let src_col = self.create_column::<Self::AnyType>(source_column);
295 let values_expr = self.column_table_values_expr(source_table, &src_col);
296 AwsCondition::Deferred {
297 field: target_field.to_string(),
298 source: values_expr.expr(),
299 }
300 }
301
302 fn column_table_values_expr<'a, E, Type: ColumnType>(
303 &'a self,
304 table: &Table<Self, E>,
305 column: &Self::Column<Type>,
306 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
307 where
308 E: Entity<Self::Value> + 'static,
309 Self: Sized,
310 {
311 let table_clone = table.clone();
317 let col = column.name().to_string();
318 let aws = self.clone();
319
320 let inner = expr_any!("{}", {
321 DeferredFn::new(move || {
322 let aws = aws.clone();
323 let table = table_clone.clone();
324 let col = col.clone();
325 Box::pin(async move {
326 let records = aws.list_table_values(&table).await?;
327 let values: Vec<CborValue> = records
328 .values()
329 .filter_map(|r| r.get(&col).cloned())
330 .collect();
331 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
332 })
333 })
334 });
335
336 let expr = expr_any!("{}", { self.defer(inner) });
337 AssociatedExpression::new(expr, self)
338 }
339}