use std::collections::{HashMap, HashSet};
use schema_core::{
ColumnName, DatabaseSchema, GenericValue, IndexSchema, Relation, RelationKey, TableName,
};
use sources_core::{Result, RowKey, SourceError};
use super::fields::relation_target;
use super::{PgDocumentBuilder, query, query_err, value};
impl PgDocumentBuilder {
pub(super) async fn resolve_path(
&self,
schema: &IndexSchema,
changed_table: &TableName,
change_key: &RowKey,
path: &[&Relation],
) -> Result<Vec<GenericValue>> {
let mut current_keys = vec![change_key.clone()];
let mut current_table = changed_table.clone();
for depth in (0..path.len()).rev() {
let relation = *path
.get(depth)
.ok_or_else(|| SourceError::Query("internal: path index".into()))?;
let parent_table = if depth == 0 {
schema.table.clone()
} else {
let prev = *path
.get(depth - 1)
.ok_or_else(|| SourceError::Query("internal: path index".into()))?;
relation_target(prev).0.clone()
};
let parent_pk = if depth == 0 {
schema.primary_key.clone().ok_or_else(|| {
SourceError::Unsupported(
"index without primary_key cannot resolve relations".into(),
)
})?
} else {
self.table_primary_key(&schema.db_schema, &parent_table)
.await?
};
let mut next = Vec::new();
let mut seen = HashSet::new();
for key in ¤t_keys {
for value in self
.reverse_hop(
&schema.db_schema,
relation,
¤t_table,
&parent_table,
&parent_pk,
key,
)
.await?
{
if seen.insert(value.clone()) {
next.push(RowKey(vec![(parent_pk.clone(), value)]));
}
}
}
current_keys = next;
current_table = parent_table;
}
Ok(current_keys
.into_iter()
.filter_map(|key| key.0.into_iter().next().map(|(_, value)| value))
.collect())
}
async fn reverse_hop(
&self,
schema: &DatabaseSchema,
relation: &Relation,
current_table: &TableName,
parent_table: &TableName,
parent_pk: &ColumnName,
key: &RowKey,
) -> Result<Vec<GenericValue>> {
let (target, relation_key) = relation_target(relation);
match relation_key {
RelationKey::Direct(foreign_key) => {
self.reverse_direct(schema, target, foreign_key, key).await
}
RelationKey::Local(column) => {
self.reverse_local(schema, relation, parent_table, parent_pk, column, key)
.await
}
RelationKey::Through(through) if *current_table == through.table => {
self.reverse_through_junction(schema, &through.table, &through.left_key, key)
.await
}
RelationKey::Through(through) => {
self.reverse_through_far(
schema,
&through.table,
&through.left_key,
&through.right_key,
key,
)
.await
}
}
}
async fn reverse_local(
&self,
schema: &DatabaseSchema,
relation: &Relation,
parent_table: &TableName,
parent_pk: &ColumnName,
column: &ColumnName,
target_key: &RowKey,
) -> Result<Vec<GenericValue>> {
let value = local_target_value(relation, target_key)?.clone();
let key = [(column.clone(), value)];
let col_types = self.key_types(schema, parent_table, &key).await?;
let (sql, params) =
query::reverse_query(schema, parent_table, parent_pk, &key, &col_types)?;
self.run_reverse(sql, params, parent_pk.as_ref()).await
}
async fn reverse_direct(
&self,
schema: &DatabaseSchema,
child: &TableName,
foreign_key: &ColumnName,
child_key: &RowKey,
) -> Result<Vec<GenericValue>> {
let col_types = self.key_types(schema, child, &child_key.0).await?;
let (sql, params) =
query::reverse_query(schema, child, foreign_key, &child_key.0, &col_types)?;
self.run_reverse(sql, params, foreign_key.as_ref()).await
}
async fn reverse_through_far(
&self,
schema: &DatabaseSchema,
junction: &TableName,
left_key: &ColumnName,
right_key: &ColumnName,
far_key: &RowKey,
) -> Result<Vec<GenericValue>> {
let far_pk = single_far_key(far_key)?.clone();
let key = [(right_key.clone(), far_pk)];
let col_types = self.key_types(schema, junction, &key).await?;
let (sql, params) = query::reverse_query(schema, junction, left_key, &key, &col_types)?;
self.run_reverse(sql, params, left_key.as_ref()).await
}
async fn reverse_through_junction(
&self,
schema: &DatabaseSchema,
junction: &TableName,
left_key: &ColumnName,
junction_key: &RowKey,
) -> Result<Vec<GenericValue>> {
if let Some((_, value)) = junction_key.0.iter().find(|(column, _)| column == left_key) {
return Ok(vec![value.clone()]);
}
let col_types = self.key_types(schema, junction, &junction_key.0).await?;
let (sql, params) =
query::reverse_query(schema, junction, left_key, &junction_key.0, &col_types)?;
self.run_reverse(sql, params, left_key.as_ref()).await
}
async fn key_types(
&self,
db: &DatabaseSchema,
table: &TableName,
key: &[(ColumnName, GenericValue)],
) -> Result<HashMap<(String, String), String>> {
let mut types = HashMap::new();
for (column, _) in key {
let entry = (table.to_string(), column.to_string());
if types.contains_key(&entry) {
continue;
}
types.insert(entry, self.column_type(db, table, column).await?);
}
Ok(types)
}
async fn run_reverse(
&self,
sql: query::SqlString,
params: Vec<GenericValue>,
result_column: &str,
) -> Result<Vec<GenericValue>> {
let mut query = sqlx::query(sql);
for param in ¶ms {
query = query::bind_param(query, param)?;
}
let rows = query.fetch_all(&self.pool).await.map_err(query_err)?;
let mut seen = HashSet::new();
let mut roots = Vec::new();
for row in &rows {
let value = value::decode_named_column(row, result_column);
if !matches!(value, GenericValue::Null) && seen.insert(value.clone()) {
roots.push(value);
}
}
Ok(roots)
}
}
fn local_target_value<'a>(relation: &Relation, key: &'a RowKey) -> Result<&'a GenericValue> {
if let Relation::Join(join) = relation
&& let Some((_, value)) = key.0.iter().find(|(column, _)| *column == join.primary_key)
{
return Ok(value);
}
match key.0.as_slice() {
[(_, value)] => Ok(value),
_ => Err(SourceError::Unsupported(
"belongs_to relations require the changed row's key to carry its primary key".into(),
)),
}
}
fn single_far_key(key: &RowKey) -> Result<&GenericValue> {
match key.0.as_slice() {
[(_, value)] => Ok(value),
_ => Err(SourceError::Unsupported(
"many-to-many relations require a single-column key on the far/junction table".into(),
)),
}
}