substreams_database_change/
tables.rs1use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange};
2use std::collections::{BTreeMap, HashMap};
3use substreams::{
4 scalar::{BigDecimal, BigInt},
5 Hex,
6};
7
8#[derive(Debug)]
9pub struct Tables {
10 pub tables: HashMap<String, Rows>,
12}
13
14impl Tables {
15 pub fn new() -> Self {
16 Tables {
17 tables: HashMap::new(),
18 }
19 }
20
21 pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
37 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
38 let k = key.into();
39 let key_debug = format!("{:?}", k);
40 let row = rows.pks.entry(k).or_insert(Row::new());
41 match row.operation {
42 Operation::Unspecified => {
43 row.operation = Operation::Create;
44 }
45 Operation::Create => {}
46 Operation::Update => {
47 panic!("cannot create a row that was marked for update")
48 }
49 Operation::Delete => {
50 panic!(
51 "cannot create a row after a scheduled delete operation - table: {} key: {}",
52 table, key_debug,
53 )
54 }
55 }
56 row
57 }
58
59 pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
60 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
61 let k = key.into();
62 let key_debug = format!("{:?}", k);
63 let row = rows.pks.entry(k).or_insert(Row::new());
64 match row.operation {
65 Operation::Unspecified => {
66 row.operation = Operation::Update;
67 }
68 Operation::Create => {}
69 Operation::Update => {}
70 Operation::Delete => {
71 panic!(
72 "cannot update a row after a scheduled delete operation - table: {} key: {}",
73 table, key_debug,
74 )
75 }
76 }
77 row
78 }
79
80 pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
81 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
82 let row = rows.pks.entry(key.into()).or_insert(Row::new());
83 match row.operation {
84 Operation::Unspecified => {
85 row.operation = Operation::Delete;
86 }
87 Operation::Create => {
88 row.operation = Operation::Unspecified;
90 row.columns = HashMap::new();
91 }
92 Operation::Update => {
93 row.columns = HashMap::new();
94 }
95 Operation::Delete => {}
96 }
97 row.operation = Operation::Delete;
98 row.columns = HashMap::new();
99 row
100 }
101
102 pub fn to_database_changes(self) -> DatabaseChanges {
104 let mut changes = DatabaseChanges::default();
105
106 for (table, rows) in self.tables.into_iter() {
107 for (pk, row) in rows.pks.into_iter() {
108 if row.operation == Operation::Unspecified {
109 continue;
110 }
111
112 let mut change = match pk {
113 PrimaryKey::Single(pk) => TableChange::new(table.clone(), pk, 0, row.operation),
114 PrimaryKey::Composite(keys) => TableChange::new_composite(
115 table.clone(),
116 keys.into_iter().collect(),
117 0,
118 row.operation,
119 ),
120 };
121
122 for (field, value) in row.columns.into_iter() {
123 change.fields.push(Field {
124 name: field,
125 new_value: value,
126 old_value: "".to_string(),
127 });
128 }
129
130 changes.table_changes.push(change);
131 }
132 }
133
134 changes
135 }
136}
137
138#[derive(Hash, Debug, Eq, PartialEq)]
139pub enum PrimaryKey {
140 Single(String),
141 Composite(BTreeMap<String, String>),
142}
143
144impl From<&str> for PrimaryKey {
145 fn from(x: &str) -> Self {
146 Self::Single(x.to_string())
147 }
148}
149
150impl From<&String> for PrimaryKey {
151 fn from(x: &String) -> Self {
152 Self::Single(x.clone())
153 }
154}
155
156impl From<String> for PrimaryKey {
157 fn from(x: String) -> Self {
158 Self::Single(x)
159 }
160}
161
162impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
163 fn from(arr: [(K, String); N]) -> Self {
164 if N == 0 {
165 return Self::Composite(BTreeMap::new());
166 }
167
168 let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
169 Self::Composite(BTreeMap::from(string_arr))
170 }
171}
172
173#[derive(Debug)]
174pub struct Rows {
175 pks: HashMap<PrimaryKey, Row>,
177}
178
179impl Rows {
180 pub fn new() -> Self {
181 Rows {
182 pks: HashMap::new(),
183 }
184 }
185}
186
187#[derive(Debug)]
188pub struct Row {
189 pub operation: Operation,
191 pub columns: HashMap<String, String>,
193 pub finalized: bool,
195}
196
197impl Row {
198 pub fn new() -> Self {
199 Row {
200 operation: Operation::Unspecified,
201 columns: HashMap::new(),
202 finalized: false,
203 }
204 }
205
206 pub fn set<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
207 if self.operation == Operation::Delete {
208 panic!("cannot set fields on a delete operation")
209 }
210 self.columns.insert(name.to_string(), value.to_value());
211 self
212 }
213
214 pub fn set_raw(&mut self, name: &str, value: String) -> &mut Self {
215 self.columns.insert(name.to_string(), value);
216 self
217 }
218
219 #[doc(hidden)]
226 pub fn set_psql_array<T: ToDatabaseValue>(&mut self, name: &str, value: Vec<T>) -> &mut Row {
227 if self.operation == Operation::Delete {
228 panic!("cannot set fields on a delete operation")
229 }
230
231 let values = value
232 .into_iter()
233 .map(|x| x.to_value())
234 .collect::<Vec<_>>()
235 .join(",");
236
237 self.columns
238 .insert(name.to_string(), format!("'{{{}}}'", values));
239 self
240 }
241
242 #[doc(hidden)]
249 pub fn set_clickhouse_array<T: ToDatabaseValue>(
250 &mut self,
251 name: &str,
252 value: Vec<T>,
253 ) -> &mut Row {
254 if self.operation == Operation::Delete {
255 panic!("cannot set fields on a delete operation")
256 }
257
258 let values = value
259 .into_iter()
260 .map(|x| x.to_value())
261 .collect::<Vec<_>>()
262 .join(",");
263
264 self.columns
265 .insert(name.to_string(), format!("[{}]", values));
266 self
267 }
268}
269
270macro_rules! impl_to_database_value_proxy_to_ref {
271 ($name:ty) => {
272 impl ToDatabaseValue for $name {
273 fn to_value(self) -> String {
274 ToDatabaseValue::to_value(&self)
275 }
276 }
277 };
278}
279
280macro_rules! impl_to_database_value_proxy_to_string {
281 ($name:ty) => {
282 impl ToDatabaseValue for $name {
283 fn to_value(self) -> String {
284 ToString::to_string(&self)
285 }
286 }
287 };
288}
289
290pub trait ToDatabaseValue {
291 fn to_value(self) -> String;
292}
293
294impl_to_database_value_proxy_to_string!(i8);
295impl_to_database_value_proxy_to_string!(i16);
296impl_to_database_value_proxy_to_string!(i32);
297impl_to_database_value_proxy_to_string!(i64);
298impl_to_database_value_proxy_to_string!(u8);
299impl_to_database_value_proxy_to_string!(u16);
300impl_to_database_value_proxy_to_string!(u32);
301impl_to_database_value_proxy_to_string!(u64);
302impl_to_database_value_proxy_to_string!(bool);
303impl_to_database_value_proxy_to_string!(::prost_types::Timestamp);
304impl_to_database_value_proxy_to_string!(&::prost_types::Timestamp);
305impl_to_database_value_proxy_to_string!(&str);
306impl_to_database_value_proxy_to_string!(BigDecimal);
307impl_to_database_value_proxy_to_string!(&BigDecimal);
308impl_to_database_value_proxy_to_string!(BigInt);
309impl_to_database_value_proxy_to_string!(&BigInt);
310
311impl_to_database_value_proxy_to_ref!(Vec<u8>);
312
313impl ToDatabaseValue for &String {
314 fn to_value(self) -> String {
315 self.clone()
316 }
317}
318
319impl ToDatabaseValue for String {
320 fn to_value(self) -> String {
321 self
322 }
323}
324
325impl ToDatabaseValue for &Vec<u8> {
326 fn to_value(self) -> String {
327 Hex::encode(self)
328 }
329}
330
331impl<T: AsRef<[u8]>> ToDatabaseValue for Hex<T> {
332 fn to_value(self) -> String {
333 ToString::to_string(&self)
334 }
335}
336
337impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {
338 fn to_value(self) -> String {
339 ToString::to_string(self)
340 }
341}
342
343#[cfg(test)]
344mod test {
345 use crate::pb::database::table_change::PrimaryKey;
346 use crate::pb::database::CompositePrimaryKey;
347 use crate::pb::database::{DatabaseChanges, TableChange};
348 use crate::tables::PrimaryKey as TablesPrimaryKey;
349 use crate::tables::Tables;
350 use crate::tables::ToDatabaseValue;
351 use std::collections::HashMap;
352
353 #[test]
354 fn to_database_value_proto_timestamp() {
355 assert_eq!(
356 ToDatabaseValue::to_value(::prost_types::Timestamp {
357 seconds: 60 * 60 + 60 + 1,
358 nanos: 1
359 }),
360 "1970-01-01T01:01:01.000000001Z"
361 );
362 }
363
364 #[test]
365 fn create_row_single_pk_direct() {
366 let mut tables = Tables::new();
367 tables.create_row("myevent", TablesPrimaryKey::Single("myhash".to_string()));
368
369 assert_eq!(
370 tables.to_database_changes(),
371 DatabaseChanges {
372 table_changes: [TableChange {
373 table: "myevent".to_string(),
374 ordinal: 0,
375 operation: 1,
376 fields: [].into(),
377 primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
378 }]
379 .to_vec(),
380 }
381 );
382 }
383
384 #[test]
385 fn create_row_single_pk() {
386 let mut tables = Tables::new();
387 tables.create_row("myevent", "myhash");
388
389 assert_eq!(
390 tables.to_database_changes(),
391 DatabaseChanges {
392 table_changes: [TableChange {
393 table: "myevent".to_string(),
394 ordinal: 0,
395 operation: 1,
396 fields: [].into(),
397 primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
398 }]
399 .to_vec(),
400 }
401 );
402 }
403
404 #[test]
405 fn create_row_composite_pk() {
406 let mut tables = Tables::new();
407 tables.create_row(
408 "myevent",
409 [
410 ("evt_tx_hash", "hello".to_string()),
411 ("evt_index", "world".to_string()),
412 ],
413 );
414
415 assert_eq!(
416 tables.to_database_changes(),
417 DatabaseChanges {
418 table_changes: [TableChange {
419 table: "myevent".to_string(),
420 ordinal: 0,
421 operation: 1,
422 fields: [].into(),
423 primary_key: Some(PrimaryKey::CompositePk(CompositePrimaryKey {
424 keys: HashMap::from([
425 ("evt_tx_hash".to_string(), "hello".to_string()),
426 ("evt_index".to_string(), "world".to_string())
427 ])
428 }))
429 }]
430 .to_vec(),
431 }
432 );
433 }
434}