dinoco/execution/
relation.rs1use std::collections::HashMap;
2use std::future::Future;
3
4use dinoco_engine::{
5 DeleteStatement, DinocoAdapter, DinocoClient, DinocoResult, Expression, InsertStatement, QueryBuilder,
6 UpdateStatement,
7};
8
9use crate::{ConnectionUpdatePlan, RelationLinkPlan, RelationWriteAction, RelationWritePlan};
10
11use super::{lookup::query_ids, lookup::query_pairs};
12
13pub fn execute_insert_relation_links<'a, A>(
14 relation_links: Vec<RelationLinkPlan>,
15 client: &'a DinocoClient<A>,
16) -> impl Future<Output = DinocoResult<()>> + 'a
17where
18 A: DinocoAdapter,
19{
20 async move {
21 if relation_links.is_empty() {
22 return Ok(());
23 }
24
25 let adapter = client.primary();
26
27 for link_group in group_relation_links(relation_links) {
28 let statement =
29 InsertStatement::new().into(link_group.table_name).columns(link_group.columns).values(link_group.rows);
30 let (sql, params) = adapter.dialect().build_insert(&statement);
31 adapter.execute(&sql, ¶ms).await?;
32 }
33
34 Ok(())
35 }
36}
37
38pub fn execute_connection_updates<'a, A>(
39 connection_updates: Vec<ConnectionUpdatePlan>,
40 client: &'a DinocoClient<A>,
41) -> impl Future<Output = DinocoResult<()>> + 'a
42where
43 A: DinocoAdapter,
44{
45 async move {
46 if connection_updates.is_empty() {
47 return Ok(());
48 }
49
50 let adapter = client.primary();
51
52 for update in connection_updates {
53 let mut statement = UpdateStatement::new().table(update.table_name);
54
55 for (column, value) in update.columns.iter().copied().zip(update.row.into_iter()) {
56 statement = statement.set(column, value);
57 }
58
59 for condition in update.conditions {
60 statement = statement.condition(condition);
61 }
62
63 let (sql, params) = adapter.dialect().build_update(&statement);
64 adapter.execute(&sql, ¶ms).await?;
65 }
66
67 Ok(())
68 }
69}
70
71pub fn execute_relation_writes<'a, A>(
72 table_name: &'static str,
73 conditions: Vec<Expression>,
74 writes: Vec<(RelationWriteAction, RelationWritePlan)>,
75 client: &'a DinocoClient<A>,
76) -> impl Future<Output = DinocoResult<()>> + 'a
77where
78 A: DinocoAdapter,
79{
80 async move {
81 if writes.is_empty() {
82 return Ok(());
83 }
84
85 let adapter = client.primary();
86 let source_key_column = writes[0].1.source_key_column;
87 let source_ids = query_ids(adapter, table_name, Some(source_key_column), conditions).await?;
88
89 if source_ids.is_empty() {
90 return Ok(());
91 }
92
93 for (action, plan) in writes {
94 let target_ids =
95 query_ids(adapter, plan.target_table_name, Some(plan.target_key_column), vec![plan.target_expression])
96 .await?;
97
98 if target_ids.is_empty() {
99 continue;
100 }
101
102 match action {
103 RelationWriteAction::Connect => {
104 let existing_rows = query_pairs(
105 adapter,
106 plan.join_table_name,
107 plan.source_join_column,
108 plan.target_join_column,
109 source_ids.clone(),
110 target_ids.clone(),
111 )
112 .await?;
113 let rows = build_missing_relation_rows(source_ids.clone(), target_ids, existing_rows);
114
115 if rows.is_empty() {
116 continue;
117 }
118
119 let statement = InsertStatement::new()
120 .into(plan.join_table_name)
121 .columns(&[plan.source_join_column, plan.target_join_column])
122 .values(rows);
123 let (sql, params) = adapter.dialect().build_insert(&statement);
124
125 adapter.execute(&sql, ¶ms).await?;
126 }
127 RelationWriteAction::Disconnect => {
128 let statement = DeleteStatement::new()
129 .from(plan.join_table_name)
130 .condition(
131 Expression::Column(plan.source_join_column.to_string()).in_values(source_ids.clone()),
132 )
133 .condition(Expression::Column(plan.target_join_column.to_string()).in_values(target_ids));
134 let (sql, params) = adapter.dialect().build_delete(&statement);
135
136 adapter.execute(&sql, ¶ms).await?;
137 }
138 }
139 }
140
141 Ok(())
142 }
143}
144
145pub fn execute_delete<'a, A>(
146 statement: DeleteStatement,
147 client: &'a DinocoClient<A>,
148) -> impl Future<Output = DinocoResult<()>> + 'a
149where
150 A: DinocoAdapter,
151{
152 async move {
153 let adapter = client.primary();
154 let (sql, params) = adapter.dialect().build_delete(&statement);
155
156 adapter.execute(&sql, ¶ms).await
157 }
158}
159
160fn group_relation_links(relation_links: Vec<RelationLinkPlan>) -> Vec<GroupedRelationLinks> {
161 let mut grouped = HashMap::<(&'static str, &'static [&'static str]), Vec<Vec<dinoco_engine::DinocoValue>>>::new();
162
163 for link in relation_links {
164 grouped.entry((link.table_name, link.columns)).or_default().push(link.row);
165 }
166
167 grouped
168 .into_iter()
169 .map(|((table_name, columns), rows)| GroupedRelationLinks { table_name, columns, rows })
170 .collect()
171}
172
173fn build_missing_relation_rows(
174 left_values: Vec<dinoco_engine::DinocoValue>,
175 right_values: Vec<dinoco_engine::DinocoValue>,
176 existing_rows: Vec<(dinoco_engine::DinocoValue, dinoco_engine::DinocoValue)>,
177) -> Vec<Vec<dinoco_engine::DinocoValue>> {
178 let mut rows = Vec::new();
179
180 for left in left_values {
181 for right in &right_values {
182 if existing_rows
183 .iter()
184 .any(|(existing_left, existing_right)| existing_left == &left && existing_right == right)
185 {
186 continue;
187 }
188
189 rows.push(vec![left.clone(), right.clone()]);
190 }
191 }
192
193 rows
194}
195
196struct GroupedRelationLinks {
197 table_name: &'static str,
198 columns: &'static [&'static str],
199 rows: Vec<Vec<dinoco_engine::DinocoValue>>,
200}