use async_trait::async_trait;
use ciborium::Value as CborValue;
use indexmap::IndexMap;
use serde_json::json;
use vantage_core::error;
use vantage_dataset::traits::Result as DatasetResult;
use vantage_expressions::{
Expression, Expressive, expr_any,
traits::associated_expressions::AssociatedExpression,
traits::datasource::ExprDataSource,
traits::expressive::{DeferredFn, ExpressiveEnum},
};
use vantage_table::column::core::{Column, ColumnType};
use vantage_table::table::Table;
use vantage_table::traits::table_source::TableSource;
use vantage_types::{Entity, Record};
use crate::account::AwsAccount;
use crate::condition::AwsCondition;
#[async_trait]
impl TableSource for AwsAccount {
type Column<Type>
= Column<Type>
where
Type: ColumnType;
type AnyType = CborValue;
type Value = CborValue;
type Id = String;
type Condition = AwsCondition;
fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
Ok(AwsCondition::eq(field.to_string(), value.to_string()))
}
fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
Column::new(name)
}
fn to_any_column<Type: ColumnType>(
&self,
column: Self::Column<Type>,
) -> Self::Column<Self::AnyType> {
Column::from_column(column)
}
fn convert_any_column<Type: ColumnType>(
&self,
any_column: Self::Column<Self::AnyType>,
) -> Option<Self::Column<Type>> {
Some(Column::from_column(any_column))
}
fn expr(
&self,
template: impl Into<String>,
parameters: Vec<ExpressiveEnum<Self::Value>>,
) -> Expression<Self::Value> {
Expression::new(template, parameters)
}
fn search_table_condition<E>(
&self,
_table: &Table<Self, E>,
search_value: &str,
) -> Self::Condition
where
E: Entity<Self::Value>,
{
AwsCondition::eq("__search__", json!(search_value).to_string())
}
async fn list_table_values<E>(
&self,
table: &Table<Self, E>,
) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
where
E: Entity<Self::Value>,
Self: Sized,
{
let id_field = table.id_field().map(|c| c.name().to_string());
let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
let resp = self.execute_rpc(table.table_name(), &conditions).await?;
let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
records.retain(|_id, record| {
conditions.iter().all(|c| match c {
AwsCondition::Eq { field, value } => match record.get(field) {
Some(rec_val) => rec_val == value,
None => true,
},
_ => true,
})
});
Ok(records)
}
async fn get_table_value<E>(
&self,
table: &Table<Self, E>,
id: &Self::Id,
) -> DatasetResult<Option<Record<Self::Value>>>
where
E: Entity<Self::Value>,
Self: Sized,
{
let mut all = self.list_table_values(table).await?;
Ok(all.shift_remove(id))
}
async fn get_table_some_value<E>(
&self,
table: &Table<Self, E>,
) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
where
E: Entity<Self::Value>,
Self: Sized,
{
let all = self.list_table_values(table).await?;
Ok(all.into_iter().next())
}
async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
where
E: Entity<Self::Value>,
Self: Sized,
{
let all = self.list_table_values(table).await?;
Ok(all.len() as i64)
}
async fn get_table_sum<E>(
&self,
_table: &Table<Self, E>,
_column: &Self::Column<Self::AnyType>,
) -> DatasetResult<Self::Value>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("Aggregations not supported by vantage-aws"))
}
async fn get_table_max<E>(
&self,
_table: &Table<Self, E>,
_column: &Self::Column<Self::AnyType>,
) -> DatasetResult<Self::Value>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("Aggregations not supported by vantage-aws"))
}
async fn get_table_min<E>(
&self,
_table: &Table<Self, E>,
_column: &Self::Column<Self::AnyType>,
) -> DatasetResult<Self::Value>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("Aggregations not supported by vantage-aws"))
}
async fn insert_table_value<E>(
&self,
_table: &Table<Self, E>,
_id: &Self::Id,
_record: &Record<Self::Value>,
) -> DatasetResult<Record<Self::Value>>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
async fn replace_table_value<E>(
&self,
_table: &Table<Self, E>,
_id: &Self::Id,
_record: &Record<Self::Value>,
) -> DatasetResult<Record<Self::Value>>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
async fn patch_table_value<E>(
&self,
_table: &Table<Self, E>,
_id: &Self::Id,
_partial: &Record<Self::Value>,
) -> DatasetResult<Record<Self::Value>>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
async fn delete_table_value<E>(
&self,
_table: &Table<Self, E>,
_id: &Self::Id,
) -> DatasetResult<()>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
async fn insert_table_return_id_value<E>(
&self,
_table: &Table<Self, E>,
_record: &Record<Self::Value>,
) -> DatasetResult<Self::Id>
where
E: Entity<Self::Value>,
Self: Sized,
{
Err(error!("vantage-aws is read-only in v0"))
}
fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
&self,
target_field: &str,
source_table: &Table<Self, SourceE>,
source_column: &str,
) -> Self::Condition
where
Self: Sized,
{
let src_col = self.create_column::<Self::AnyType>(source_column);
let values_expr = self.column_table_values_expr(source_table, &src_col);
AwsCondition::Deferred {
field: target_field.to_string(),
source: values_expr.expr(),
}
}
fn column_table_values_expr<'a, E, Type: ColumnType>(
&'a self,
table: &Table<Self, E>,
column: &Self::Column<Type>,
) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
where
E: Entity<Self::Value> + 'static,
Self: Sized,
{
let table_clone = table.clone();
let col = column.name().to_string();
let aws = self.clone();
let inner = expr_any!("{}", {
DeferredFn::new(move || {
let aws = aws.clone();
let table = table_clone.clone();
let col = col.clone();
Box::pin(async move {
let records = aws.list_table_values(&table).await?;
let values: Vec<CborValue> = records
.values()
.filter_map(|r| r.get(&col).cloned())
.collect();
Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
})
})
});
let expr = expr_any!("{}", { self.defer(inner) });
AssociatedExpression::new(expr, self)
}
}