Skip to main content

dinoco/execution/
relation.rs

1use std::collections::HashMap;
2use std::future::Future;
3
4use dinoco_engine::{
5    DeleteStatement, DinocoAdapter, DinocoClient, DinocoResult, Expression, InsertStatement, QueryBuilder,
6    UpdateStatement,
7};
8
9use crate::{ConnectionUpdatePlan, RelationLinkPlan, RelationWriteAction, RelationWritePlan};
10
11use super::{lookup::query_ids, lookup::query_pairs};
12
13pub fn execute_insert_relation_links<'a, A>(
14    relation_links: Vec<RelationLinkPlan>,
15    client: &'a DinocoClient<A>,
16) -> impl Future<Output = DinocoResult<()>> + 'a
17where
18    A: DinocoAdapter,
19{
20    async move {
21        if relation_links.is_empty() {
22            return Ok(());
23        }
24
25        let adapter = client.primary();
26
27        for link_group in group_relation_links(relation_links) {
28            let statement =
29                InsertStatement::new().into(link_group.table_name).columns(link_group.columns).values(link_group.rows);
30            let (sql, params) = adapter.dialect().build_insert(&statement);
31            adapter.execute(&sql, &params).await?;
32        }
33
34        Ok(())
35    }
36}
37
38pub fn execute_connection_updates<'a, A>(
39    connection_updates: Vec<ConnectionUpdatePlan>,
40    client: &'a DinocoClient<A>,
41) -> impl Future<Output = DinocoResult<()>> + 'a
42where
43    A: DinocoAdapter,
44{
45    async move {
46        if connection_updates.is_empty() {
47            return Ok(());
48        }
49
50        let adapter = client.primary();
51
52        for update in connection_updates {
53            let mut statement = UpdateStatement::new().table(update.table_name);
54
55            for (column, value) in update.columns.iter().copied().zip(update.row.into_iter()) {
56                statement = statement.set(column, value);
57            }
58
59            for condition in update.conditions {
60                statement = statement.condition(condition);
61            }
62
63            let (sql, params) = adapter.dialect().build_update(&statement);
64            adapter.execute(&sql, &params).await?;
65        }
66
67        Ok(())
68    }
69}
70
71pub fn execute_relation_writes<'a, A>(
72    table_name: &'static str,
73    conditions: Vec<Expression>,
74    writes: Vec<(RelationWriteAction, RelationWritePlan)>,
75    client: &'a DinocoClient<A>,
76) -> impl Future<Output = DinocoResult<()>> + 'a
77where
78    A: DinocoAdapter,
79{
80    async move {
81        if writes.is_empty() {
82            return Ok(());
83        }
84
85        let adapter = client.primary();
86        let source_key_column = writes[0].1.source_key_column;
87        let source_ids = query_ids(adapter, table_name, Some(source_key_column), conditions).await?;
88
89        if source_ids.is_empty() {
90            return Ok(());
91        }
92
93        for (action, plan) in writes {
94            let target_ids =
95                query_ids(adapter, plan.target_table_name, Some(plan.target_key_column), vec![plan.target_expression])
96                    .await?;
97
98            if target_ids.is_empty() {
99                continue;
100            }
101
102            match action {
103                RelationWriteAction::Connect => {
104                    let existing_rows = query_pairs(
105                        adapter,
106                        plan.join_table_name,
107                        plan.source_join_column,
108                        plan.target_join_column,
109                        source_ids.clone(),
110                        target_ids.clone(),
111                    )
112                    .await?;
113                    let rows = build_missing_relation_rows(source_ids.clone(), target_ids, existing_rows);
114
115                    if rows.is_empty() {
116                        continue;
117                    }
118
119                    let statement = InsertStatement::new()
120                        .into(plan.join_table_name)
121                        .columns(&[plan.source_join_column, plan.target_join_column])
122                        .values(rows);
123                    let (sql, params) = adapter.dialect().build_insert(&statement);
124
125                    adapter.execute(&sql, &params).await?;
126                }
127                RelationWriteAction::Disconnect => {
128                    let statement = DeleteStatement::new()
129                        .from(plan.join_table_name)
130                        .condition(
131                            Expression::Column(plan.source_join_column.to_string()).in_values(source_ids.clone()),
132                        )
133                        .condition(Expression::Column(plan.target_join_column.to_string()).in_values(target_ids));
134                    let (sql, params) = adapter.dialect().build_delete(&statement);
135
136                    adapter.execute(&sql, &params).await?;
137                }
138            }
139        }
140
141        Ok(())
142    }
143}
144
145pub fn execute_delete<'a, A>(
146    statement: DeleteStatement,
147    client: &'a DinocoClient<A>,
148) -> impl Future<Output = DinocoResult<()>> + 'a
149where
150    A: DinocoAdapter,
151{
152    async move {
153        let adapter = client.primary();
154        let (sql, params) = adapter.dialect().build_delete(&statement);
155
156        adapter.execute(&sql, &params).await
157    }
158}
159
160fn group_relation_links(relation_links: Vec<RelationLinkPlan>) -> Vec<GroupedRelationLinks> {
161    let mut grouped = HashMap::<(&'static str, &'static [&'static str]), Vec<Vec<dinoco_engine::DinocoValue>>>::new();
162
163    for link in relation_links {
164        grouped.entry((link.table_name, link.columns)).or_default().push(link.row);
165    }
166
167    grouped
168        .into_iter()
169        .map(|((table_name, columns), rows)| GroupedRelationLinks { table_name, columns, rows })
170        .collect()
171}
172
173fn build_missing_relation_rows(
174    left_values: Vec<dinoco_engine::DinocoValue>,
175    right_values: Vec<dinoco_engine::DinocoValue>,
176    existing_rows: Vec<(dinoco_engine::DinocoValue, dinoco_engine::DinocoValue)>,
177) -> Vec<Vec<dinoco_engine::DinocoValue>> {
178    let mut rows = Vec::new();
179
180    for left in left_values {
181        for right in &right_values {
182            if existing_rows
183                .iter()
184                .any(|(existing_left, existing_right)| existing_left == &left && existing_right == right)
185            {
186                continue;
187            }
188
189            rows.push(vec![left.clone(), right.clone()]);
190        }
191    }
192
193    rows
194}
195
196struct GroupedRelationLinks {
197    table_name: &'static str,
198    columns: &'static [&'static str],
199    rows: Vec<Vec<dinoco_engine::DinocoValue>>,
200}