substreams_database_change/
tables.rs

1use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange};
2use std::collections::{BTreeMap, HashMap};
3use substreams::{
4    scalar::{BigDecimal, BigInt},
5    Hex,
6};
7
8#[derive(Debug)]
9pub struct Tables {
10    // Map from table name to the primary keys within that table
11    pub tables: HashMap<String, Rows>,
12}
13
14impl Tables {
15    pub fn new() -> Self {
16        Tables {
17            tables: HashMap::new(),
18        }
19    }
20
21    /// Create a new row in the table with the given primary key.
22    ///
23    /// ```
24    /// // With a Primary Key of type `Single`
25    /// use crate::substreams_database_change::tables::Tables;
26    /// let mut tables = Tables::new();
27    /// tables.create_row("myevent", "my_key",);
28    /// ```
29    ///
30    /// ```
31    /// // With a Primary Key of type `Composite`
32    /// use crate::substreams_database_change::tables::Tables;
33    /// let mut tables = Tables::new();
34    /// tables.create_row("myevent", [("evt_tx_hash", String::from("hello")), ("evt_index", String::from("world"))]);
35    /// ```
36    pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
37        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
38        let k = key.into();
39        let key_debug = format!("{:?}", k);
40        let row = rows.pks.entry(k).or_insert(Row::new());
41        match row.operation {
42            Operation::Unspecified => {
43                row.operation = Operation::Create;
44            }
45            Operation::Create => {}
46            Operation::Update => {
47                panic!("cannot create a row that was marked for update")
48            }
49            Operation::Delete => {
50                panic!(
51                    "cannot create a row after a scheduled delete operation - table: {} key: {}",
52                    table, key_debug,
53                )
54            }
55        }
56        row
57    }
58
59    pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
60        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
61        let k = key.into();
62        let key_debug = format!("{:?}", k);
63        let row = rows.pks.entry(k).or_insert(Row::new());
64        match row.operation {
65            Operation::Unspecified => {
66                row.operation = Operation::Update;
67            }
68            Operation::Create => {}
69            Operation::Update => {}
70            Operation::Delete => {
71                panic!(
72                    "cannot update a row after a scheduled delete operation - table: {} key: {}",
73                    table, key_debug,
74                )
75            }
76        }
77        row
78    }
79
80    pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
81        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
82        let row = rows.pks.entry(key.into()).or_insert(Row::new());
83        match row.operation {
84            Operation::Unspecified => {
85                row.operation = Operation::Delete;
86            }
87            Operation::Create => {
88                // simply clear the thing
89                row.operation = Operation::Unspecified;
90                row.columns = HashMap::new();
91            }
92            Operation::Update => {
93                row.columns = HashMap::new();
94            }
95            Operation::Delete => {}
96        }
97        row.operation = Operation::Delete;
98        row.columns = HashMap::new();
99        row
100    }
101
102    // Convert Tables into an DatabaseChanges protobuf object
103    pub fn to_database_changes(self) -> DatabaseChanges {
104        let mut changes = DatabaseChanges::default();
105
106        for (table, rows) in self.tables.into_iter() {
107            for (pk, row) in rows.pks.into_iter() {
108                if row.operation == Operation::Unspecified {
109                    continue;
110                }
111
112                let mut change = match pk {
113                    PrimaryKey::Single(pk) => TableChange::new(table.clone(), pk, 0, row.operation),
114                    PrimaryKey::Composite(keys) => TableChange::new_composite(
115                        table.clone(),
116                        keys.into_iter().collect(),
117                        0,
118                        row.operation,
119                    ),
120                };
121
122                for (field, value) in row.columns.into_iter() {
123                    change.fields.push(Field {
124                        name: field,
125                        new_value: value,
126                        old_value: "".to_string(),
127                    });
128                }
129
130                changes.table_changes.push(change);
131            }
132        }
133
134        changes
135    }
136}
137
138#[derive(Hash, Debug, Eq, PartialEq)]
139pub enum PrimaryKey {
140    Single(String),
141    Composite(BTreeMap<String, String>),
142}
143
144impl From<&str> for PrimaryKey {
145    fn from(x: &str) -> Self {
146        Self::Single(x.to_string())
147    }
148}
149
150impl From<&String> for PrimaryKey {
151    fn from(x: &String) -> Self {
152        Self::Single(x.clone())
153    }
154}
155
156impl From<String> for PrimaryKey {
157    fn from(x: String) -> Self {
158        Self::Single(x)
159    }
160}
161
162impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
163    fn from(arr: [(K, String); N]) -> Self {
164        if N == 0 {
165            return Self::Composite(BTreeMap::new());
166        }
167
168        let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
169        Self::Composite(BTreeMap::from(string_arr))
170    }
171}
172
173#[derive(Debug)]
174pub struct Rows {
175    // Map of primary keys within this table, to the fields within
176    pks: HashMap<PrimaryKey, Row>,
177}
178
179impl Rows {
180    pub fn new() -> Self {
181        Rows {
182            pks: HashMap::new(),
183        }
184    }
185}
186
187#[derive(Debug)]
188pub struct Row {
189    // Verify that we don't try to delete the same row as we're creating it
190    pub operation: Operation,
191    // Map of field name to its last change
192    pub columns: HashMap<String, String>,
193    // Finalized: Last update or delete
194    pub finalized: bool,
195}
196
197impl Row {
198    pub fn new() -> Self {
199        Row {
200            operation: Operation::Unspecified,
201            columns: HashMap::new(),
202            finalized: false,
203        }
204    }
205
206    pub fn set<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
207        if self.operation == Operation::Delete {
208            panic!("cannot set fields on a delete operation")
209        }
210        self.columns.insert(name.to_string(), value.to_value());
211        self
212    }
213
214    pub fn set_raw(&mut self, name: &str, value: String) -> &mut Self {
215        self.columns.insert(name.to_string(), value);
216        self
217    }
218
219    /// Set a field to an array of values compatible with PostgresSQL database,
220    /// this method is currently experimental and hidden as we plan to support
221    /// array natively in the model.
222    ///
223    /// For now, this method should be used with great care as it ties the model
224    /// to the database implementation.
225    #[doc(hidden)]
226    pub fn set_psql_array<T: ToDatabaseValue>(&mut self, name: &str, value: Vec<T>) -> &mut Row {
227        if self.operation == Operation::Delete {
228            panic!("cannot set fields on a delete operation")
229        }
230
231        let values = value
232            .into_iter()
233            .map(|x| x.to_value())
234            .collect::<Vec<_>>()
235            .join(",");
236
237        self.columns
238            .insert(name.to_string(), format!("'{{{}}}'", values));
239        self
240    }
241
242    /// Set a field to an array of values compatible with Clickhouse database,
243    /// this method is currently experimental and hidden as we plan to support
244    /// array natively in the model.
245    ///
246    /// For now, this method should be used with great care as it ties the model
247    /// to the database implementation.
248    #[doc(hidden)]
249    pub fn set_clickhouse_array<T: ToDatabaseValue>(
250        &mut self,
251        name: &str,
252        value: Vec<T>,
253    ) -> &mut Row {
254        if self.operation == Operation::Delete {
255            panic!("cannot set fields on a delete operation")
256        }
257
258        let values = value
259            .into_iter()
260            .map(|x| x.to_value())
261            .collect::<Vec<_>>()
262            .join(",");
263
264        self.columns
265            .insert(name.to_string(), format!("[{}]", values));
266        self
267    }
268}
269
270macro_rules! impl_to_database_value_proxy_to_ref {
271    ($name:ty) => {
272        impl ToDatabaseValue for $name {
273            fn to_value(self) -> String {
274                ToDatabaseValue::to_value(&self)
275            }
276        }
277    };
278}
279
280macro_rules! impl_to_database_value_proxy_to_string {
281    ($name:ty) => {
282        impl ToDatabaseValue for $name {
283            fn to_value(self) -> String {
284                ToString::to_string(&self)
285            }
286        }
287    };
288}
289
290pub trait ToDatabaseValue {
291    fn to_value(self) -> String;
292}
293
294impl_to_database_value_proxy_to_string!(i8);
295impl_to_database_value_proxy_to_string!(i16);
296impl_to_database_value_proxy_to_string!(i32);
297impl_to_database_value_proxy_to_string!(i64);
298impl_to_database_value_proxy_to_string!(u8);
299impl_to_database_value_proxy_to_string!(u16);
300impl_to_database_value_proxy_to_string!(u32);
301impl_to_database_value_proxy_to_string!(u64);
302impl_to_database_value_proxy_to_string!(bool);
303impl_to_database_value_proxy_to_string!(::prost_types::Timestamp);
304impl_to_database_value_proxy_to_string!(&::prost_types::Timestamp);
305impl_to_database_value_proxy_to_string!(&str);
306impl_to_database_value_proxy_to_string!(BigDecimal);
307impl_to_database_value_proxy_to_string!(&BigDecimal);
308impl_to_database_value_proxy_to_string!(BigInt);
309impl_to_database_value_proxy_to_string!(&BigInt);
310
311impl_to_database_value_proxy_to_ref!(Vec<u8>);
312
313impl ToDatabaseValue for &String {
314    fn to_value(self) -> String {
315        self.clone()
316    }
317}
318
319impl ToDatabaseValue for String {
320    fn to_value(self) -> String {
321        self
322    }
323}
324
325impl ToDatabaseValue for &Vec<u8> {
326    fn to_value(self) -> String {
327        Hex::encode(self)
328    }
329}
330
331impl<T: AsRef<[u8]>> ToDatabaseValue for Hex<T> {
332    fn to_value(self) -> String {
333        ToString::to_string(&self)
334    }
335}
336
337impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {
338    fn to_value(self) -> String {
339        ToString::to_string(self)
340    }
341}
342
343#[cfg(test)]
344mod test {
345    use crate::pb::database::table_change::PrimaryKey;
346    use crate::pb::database::CompositePrimaryKey;
347    use crate::pb::database::{DatabaseChanges, TableChange};
348    use crate::tables::PrimaryKey as TablesPrimaryKey;
349    use crate::tables::Tables;
350    use crate::tables::ToDatabaseValue;
351    use std::collections::HashMap;
352
353    #[test]
354    fn to_database_value_proto_timestamp() {
355        assert_eq!(
356            ToDatabaseValue::to_value(::prost_types::Timestamp {
357                seconds: 60 * 60 + 60 + 1,
358                nanos: 1
359            }),
360            "1970-01-01T01:01:01.000000001Z"
361        );
362    }
363
364    #[test]
365    fn create_row_single_pk_direct() {
366        let mut tables = Tables::new();
367        tables.create_row("myevent", TablesPrimaryKey::Single("myhash".to_string()));
368
369        assert_eq!(
370            tables.to_database_changes(),
371            DatabaseChanges {
372                table_changes: [TableChange {
373                    table: "myevent".to_string(),
374                    ordinal: 0,
375                    operation: 1,
376                    fields: [].into(),
377                    primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
378                }]
379                .to_vec(),
380            }
381        );
382    }
383
384    #[test]
385    fn create_row_single_pk() {
386        let mut tables = Tables::new();
387        tables.create_row("myevent", "myhash");
388
389        assert_eq!(
390            tables.to_database_changes(),
391            DatabaseChanges {
392                table_changes: [TableChange {
393                    table: "myevent".to_string(),
394                    ordinal: 0,
395                    operation: 1,
396                    fields: [].into(),
397                    primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
398                }]
399                .to_vec(),
400            }
401        );
402    }
403
404    #[test]
405    fn create_row_composite_pk() {
406        let mut tables = Tables::new();
407        tables.create_row(
408            "myevent",
409            [
410                ("evt_tx_hash", "hello".to_string()),
411                ("evt_index", "world".to_string()),
412            ],
413        );
414
415        assert_eq!(
416            tables.to_database_changes(),
417            DatabaseChanges {
418                table_changes: [TableChange {
419                    table: "myevent".to_string(),
420                    ordinal: 0,
421                    operation: 1,
422                    fields: [].into(),
423                    primary_key: Some(PrimaryKey::CompositePk(CompositePrimaryKey {
424                        keys: HashMap::from([
425                            ("evt_tx_hash".to_string(), "hello".to_string()),
426                            ("evt_index".to_string(), "world".to_string())
427                        ])
428                    }))
429                }]
430                .to_vec(),
431            }
432        );
433    }
434}