1use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21 Expression, Expressive, expr_any,
22 traits::associated_expressions::AssociatedExpression,
23 traits::datasource::ExprDataSource,
24 traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = AwsCondition;
44 type Source = String;
45
46 fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
47 Ok(AwsCondition::eq(field.to_string(), value.to_string()))
48 }
49
50 fn eq_value_condition(
56 &self,
57 field: &str,
58 value: Self::Value,
59 ) -> DatasetResult<Self::Condition> {
60 Ok(AwsCondition::eq(field.to_string(), value))
61 }
62
63 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
64 Column::new(name)
65 }
66
67 fn to_any_column<Type: ColumnType>(
68 &self,
69 column: Self::Column<Type>,
70 ) -> Self::Column<Self::AnyType> {
71 Column::from_column(column)
72 }
73
74 fn convert_any_column<Type: ColumnType>(
75 &self,
76 any_column: Self::Column<Self::AnyType>,
77 ) -> Option<Self::Column<Type>> {
78 Some(Column::from_column(any_column))
79 }
80
81 fn expr(
82 &self,
83 template: impl Into<String>,
84 parameters: Vec<ExpressiveEnum<Self::Value>>,
85 ) -> Expression<Self::Value> {
86 Expression::new(template, parameters)
87 }
88
89 fn search_table_condition<E>(
90 &self,
91 _table: &Table<Self, E>,
92 search_value: &str,
93 ) -> Self::Condition
94 where
95 E: Entity<Self::Value>,
96 {
97 AwsCondition::eq("__search__", json!(search_value).to_string())
101 }
102
103 async fn list_table_values<E>(
104 &self,
105 table: &Table<Self, E>,
106 ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
107 where
108 E: Entity<Self::Value>,
109 Self: Sized,
110 {
111 let id_field = table.id_field().map(|c| c.name().to_string());
112 let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
113 let resp = self.execute_rpc(table.table_name(), &conditions).await?;
114 let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
115
116 records.retain(|_id, record| {
124 conditions.iter().all(|c| match c {
125 AwsCondition::Eq { field, value } => match record.get(field) {
126 Some(rec_val) => rec_val == value,
127 None => true,
128 },
129 _ => true,
130 })
131 });
132
133 Ok(records)
134 }
135
136 async fn get_table_value<E>(
137 &self,
138 table: &Table<Self, E>,
139 id: &Self::Id,
140 ) -> DatasetResult<Option<Record<Self::Value>>>
141 where
142 E: Entity<Self::Value>,
143 Self: Sized,
144 {
145 let mut all = self.list_table_values(table).await?;
149 Ok(all.shift_remove(id))
150 }
151
152 async fn get_table_some_value<E>(
153 &self,
154 table: &Table<Self, E>,
155 ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
156 where
157 E: Entity<Self::Value>,
158 Self: Sized,
159 {
160 let all = self.list_table_values(table).await?;
161 Ok(all.into_iter().next())
162 }
163
164 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
165 where
166 E: Entity<Self::Value>,
167 Self: Sized,
168 {
169 let all = self.list_table_values(table).await?;
170 Ok(all.len() as i64)
171 }
172
173 async fn get_table_sum<E>(
174 &self,
175 _table: &Table<Self, E>,
176 _column: &Self::Column<Self::AnyType>,
177 ) -> DatasetResult<Self::Value>
178 where
179 E: Entity<Self::Value>,
180 Self: Sized,
181 {
182 Err(error!("Aggregations not supported by vantage-aws"))
183 }
184
185 async fn get_table_max<E>(
186 &self,
187 _table: &Table<Self, E>,
188 _column: &Self::Column<Self::AnyType>,
189 ) -> DatasetResult<Self::Value>
190 where
191 E: Entity<Self::Value>,
192 Self: Sized,
193 {
194 Err(error!("Aggregations not supported by vantage-aws"))
195 }
196
197 async fn get_table_min<E>(
198 &self,
199 _table: &Table<Self, E>,
200 _column: &Self::Column<Self::AnyType>,
201 ) -> DatasetResult<Self::Value>
202 where
203 E: Entity<Self::Value>,
204 Self: Sized,
205 {
206 Err(error!("Aggregations not supported by vantage-aws"))
207 }
208
209 async fn insert_table_value<E>(
210 &self,
211 _table: &Table<Self, E>,
212 _id: &Self::Id,
213 _record: &Record<Self::Value>,
214 ) -> DatasetResult<Record<Self::Value>>
215 where
216 E: Entity<Self::Value>,
217 Self: Sized,
218 {
219 Err(error!("vantage-aws is read-only in v0"))
220 }
221
222 async fn replace_table_value<E>(
223 &self,
224 _table: &Table<Self, E>,
225 _id: &Self::Id,
226 _record: &Record<Self::Value>,
227 ) -> DatasetResult<Record<Self::Value>>
228 where
229 E: Entity<Self::Value>,
230 Self: Sized,
231 {
232 Err(error!("vantage-aws is read-only in v0"))
233 }
234
235 async fn patch_table_value<E>(
236 &self,
237 _table: &Table<Self, E>,
238 _id: &Self::Id,
239 _partial: &Record<Self::Value>,
240 ) -> DatasetResult<Record<Self::Value>>
241 where
242 E: Entity<Self::Value>,
243 Self: Sized,
244 {
245 Err(error!("vantage-aws is read-only in v0"))
246 }
247
248 async fn delete_table_value<E>(
249 &self,
250 _table: &Table<Self, E>,
251 _id: &Self::Id,
252 ) -> DatasetResult<()>
253 where
254 E: Entity<Self::Value>,
255 Self: Sized,
256 {
257 Err(error!("vantage-aws is read-only in v0"))
258 }
259
260 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
261 where
262 E: Entity<Self::Value>,
263 Self: Sized,
264 {
265 Err(error!("vantage-aws is read-only in v0"))
266 }
267
268 async fn insert_table_return_id_value<E>(
269 &self,
270 _table: &Table<Self, E>,
271 _record: &Record<Self::Value>,
272 ) -> DatasetResult<Self::Id>
273 where
274 E: Entity<Self::Value>,
275 Self: Sized,
276 {
277 Err(error!("vantage-aws is read-only in v0"))
278 }
279
280 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
281 &self,
282 target_field: &str,
283 source_table: &Table<Self, SourceE>,
284 source_column: &str,
285 ) -> Self::Condition
286 where
287 Self: Sized,
288 {
289 let src_col = self.create_column::<Self::AnyType>(source_column);
296 let values_expr = self.column_table_values_expr(source_table, &src_col);
297 AwsCondition::Deferred {
298 field: target_field.to_string(),
299 source: values_expr.expr(),
300 }
301 }
302
303 fn column_table_values_expr<'a, E, Type: ColumnType>(
304 &'a self,
305 table: &Table<Self, E>,
306 column: &Self::Column<Type>,
307 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
308 where
309 E: Entity<Self::Value> + 'static,
310 Self: Sized,
311 {
312 let table_clone = table.clone();
318 let col = column.name().to_string();
319 let aws = self.clone();
320
321 let inner = expr_any!("{}", {
322 DeferredFn::new(move || {
323 let aws = aws.clone();
324 let table = table_clone.clone();
325 let col = col.clone();
326 Box::pin(async move {
327 let records = aws.list_table_values(&table).await?;
328 let values: Vec<CborValue> = records
329 .values()
330 .filter_map(|r| r.get(&col).cloned())
331 .collect();
332 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
333 })
334 })
335 });
336
337 let expr = expr_any!("{}", { self.defer(inner) });
338 AssociatedExpression::new(expr, self)
339 }
340}