substreams_database_change/
tables.rs

1use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange};
2use std::collections::{BTreeMap, HashMap};
3use substreams::{
4    scalar::{BigDecimal, BigInt},
5    Hex,
6};
7
8#[derive(Debug)]
9pub struct Tables {
10    // Map from table name to the primary keys within that table
11    pub tables: HashMap<String, Rows>,
12
13    // Ordinal is used to track the order of changes, it is incremented for each row
14    // in such way that at the end, we can correctly order the changes back correctly.
15    ordinal: Ordinal,
16}
17
18impl Tables {
19    pub fn new() -> Self {
20        Tables {
21            tables: HashMap::new(),
22            ordinal: Ordinal::new(),
23        }
24    }
25
26    /// Returns the number of rows in all tables.
27    pub fn all_row_count(&self) -> usize {
28        self.tables.values().map(|rows| rows.pks.len()).sum()
29    }
30
31    /// Create a new row in the table with the given primary key.
32    ///
33    /// ```
34    /// // With a Primary Key of type `Single`
35    /// use crate::substreams_database_change::tables::Tables;
36    /// let mut tables = Tables::new();
37    /// tables.create_row("myevent", "my_key",);
38    /// ```
39    ///
40    /// ```
41    /// // With a Primary Key of type `Composite`
42    /// use crate::substreams_database_change::tables::Tables;
43    /// let mut tables = Tables::new();
44    /// tables.create_row("myevent", [("evt_tx_hash", String::from("hello")), ("evt_index", String::from("world"))]);
45    /// ```
46    pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
47        let rows: &mut Rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
48        let k = key.into();
49        let key_debug = format!("{:?}", k);
50        let row = rows
51            .pks
52            .entry(k)
53            .or_insert(Row::new_ordered(self.ordinal.next()));
54        match row.operation {
55            Operation::Unspecified => {
56                row.operation = Operation::Create;
57            }
58            Operation::Create => { /* Already the right operation */ }
59            Operation::Upsert => {
60                panic!(
61                    "cannot create a row after a scheduled upsert operation, create and upsert are exclusive - table: {} key: {}",
62                    table, key_debug,
63                )
64            }
65            Operation::Update => {
66                panic!("cannot create a row that was marked for update")
67            }
68            Operation::Delete => {
69                panic!(
70                    "cannot create a row after a scheduled delete operation - table: {} key: {}",
71                    table, key_debug,
72                )
73            }
74        }
75        row
76    }
77
78    /// Upsert (insert or update) a new row in the table with the given primary key.
79    ///
80    /// *Note* Ensure that the SQL sink driver you use supports upsert operations.
81    ///
82    /// ```
83    /// // With a Primary Key of type `Single`
84    /// use crate::substreams_database_change::tables::Tables;
85    /// let mut tables = Tables::new();
86    /// tables.upsert_row("myevent", "my_key",);
87    /// ```
88    ///
89    /// ```
90    /// // With a Primary Key of type `Composite`
91    /// use crate::substreams_database_change::tables::Tables;
92    /// let mut tables = Tables::new();
93    /// tables.upsert_row("myevent", [("evt_tx_hash", String::from("hello")), ("evt_index", String::from("world"))]);
94    /// ```
95    pub fn upsert_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
96        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
97        let k = key.into();
98        let key_debug = format!("{:?}", k);
99        let row = rows
100            .pks
101            .entry(k)
102            .or_insert(Row::new_ordered(self.ordinal.next()));
103        match row.operation {
104            Operation::Unspecified => {
105                row.operation = Operation::Upsert;
106            }
107            Operation::Create => {
108                panic!(
109                    "cannot upsert a row after a scheduled create operation, create and upsert are exclusive - table: {} key: {}",
110                    table, key_debug,
111                )
112            }
113            Operation::Upsert => { /* Already the right operation */ }
114            Operation::Update => {
115                panic!(
116                    "cannot upsert a row after a scheduled update operation, update and upsert are exclusive - table: {} key: {}",
117                    table, key_debug,
118                )
119            }
120            Operation::Delete => {
121                panic!(
122                    "cannot upsert a row after a scheduled delete operation - table: {} key: {}",
123                    table, key_debug,
124                )
125            }
126        }
127        row
128    }
129
130    pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
131        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
132        let k = key.into();
133        let key_debug = format!("{:?}", k);
134        let row = rows
135            .pks
136            .entry(k)
137            .or_insert(Row::new_ordered(self.ordinal.next()));
138        match row.operation {
139            Operation::Unspecified => {
140                row.operation = Operation::Update;
141            }
142            Operation::Create => { /* Fine, updated columns will be part of Insert operation */ }
143            Operation::Upsert => { /* Fine, updated columns will be part of Upsert operation */ }
144            Operation::Update => { /* Already the right operation */ }
145            Operation::Delete => {
146                panic!(
147                    "cannot update a row after a scheduled delete operation - table: {} key: {}",
148                    table, key_debug,
149                )
150            }
151        }
152        row
153    }
154
155    pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
156        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
157        let row = rows
158            .pks
159            .entry(key.into())
160            .or_insert(Row::new_ordered(self.ordinal.next()));
161
162        row.columns = HashMap::new();
163        row.operation = match row.operation {
164            Operation::Unspecified => Operation::Delete,
165            Operation::Create => {
166                // We are creating the row in this block, there is no need to emit a DELETE statement,
167                // we specify Unspecified and the row will be skipped when comes the time to emit the
168                // changes.
169                Operation::Unspecified
170            }
171            Operation::Upsert => {
172                // We cannot know if the row was created within that block or already present
173                // in the database. As such, we must emit a DELETE statement in the sink
174                // for this. Worst case, the DELETE will hit no row and be a no-op.
175                Operation::Delete
176            }
177            Operation::Update => {
178                // The row must be deleted, emit the operation
179                Operation::Delete
180            }
181            Operation::Delete => {
182                // Already delete type, continue using that as the operation
183                Operation::Delete
184            }
185        };
186
187        row
188    }
189
190    // Convert Tables into an DatabaseChanges protobuf object
191    pub fn to_database_changes(self) -> DatabaseChanges {
192        let mut changes = DatabaseChanges::default();
193
194        for (table, rows) in self.tables.into_iter() {
195            for (pk, row) in rows.pks.into_iter() {
196                if row.operation == Operation::Unspecified {
197                    continue;
198                }
199
200                let mut change = match pk {
201                    PrimaryKey::Single(pk) => {
202                        TableChange::new(table.clone(), pk, row.ordinal, row.operation)
203                    }
204                    PrimaryKey::Composite(keys) => TableChange::new_composite(
205                        table.clone(),
206                        keys.into_iter().collect(),
207                        row.ordinal,
208                        row.operation,
209                    ),
210                };
211
212                for (field, value) in row.columns.into_iter() {
213                    change.fields.push(Field {
214                        name: field,
215                        new_value: value,
216                        old_value: "".to_string(),
217                    });
218                }
219
220                changes.table_changes.push(change);
221            }
222        }
223
224        changes.table_changes.sort_by_key(|change| change.ordinal);
225        changes
226    }
227}
228
229#[derive(Debug, Default, Clone, Copy)]
230pub struct Ordinal(u64);
231
232impl Ordinal {
233    pub fn new() -> Self {
234        Ordinal(0)
235    }
236
237    pub fn next(&mut self) -> u64 {
238        let current = self.0;
239        self.0 += 1;
240        current
241    }
242}
243
244#[derive(Hash, Debug, Eq, PartialEq)]
245pub enum PrimaryKey {
246    Single(String),
247    Composite(BTreeMap<String, String>),
248}
249
250impl From<&str> for PrimaryKey {
251    fn from(x: &str) -> Self {
252        Self::Single(x.to_string())
253    }
254}
255
256impl From<&String> for PrimaryKey {
257    fn from(x: &String) -> Self {
258        Self::Single(x.clone())
259    }
260}
261
262impl From<String> for PrimaryKey {
263    fn from(x: String) -> Self {
264        Self::Single(x)
265    }
266}
267
268impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
269    fn from(arr: [(K, String); N]) -> Self {
270        if N == 0 {
271            return Self::Composite(BTreeMap::new());
272        }
273
274        let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
275        Self::Composite(BTreeMap::from(string_arr))
276    }
277}
278
279impl<K: AsRef<str>, const N: usize> From<[(K, &str); N]> for PrimaryKey {
280    fn from(arr: [(K, &str); N]) -> Self {
281        if N == 0 {
282            return Self::Composite(BTreeMap::new());
283        }
284
285        let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v.to_string()));
286        Self::Composite(BTreeMap::from(string_arr))
287    }
288}
289
290#[derive(Debug)]
291pub struct Rows {
292    // Map of primary keys within this table, to the fields within
293    pks: HashMap<PrimaryKey, Row>,
294}
295
296impl Rows {
297    pub fn new() -> Self {
298        Rows {
299            pks: HashMap::new(),
300        }
301    }
302}
303
304#[derive(Debug, Default)]
305pub struct Row {
306    /// Verify that we don't try to delete the same row as we're creating it
307    pub operation: Operation,
308    /// Map of field name to its last change
309    pub columns: HashMap<String, String>,
310    /// Finalized: Last update or delete
311    #[deprecated(
312        note = "The finalization state is now implicitly handled by the `operation` field."
313    )]
314    pub finalized: bool,
315
316    ordinal: u64,
317}
318
319impl Row {
320    /// **Do not use** Now broken, use the `Tables` API instead like `create_row`, `upsert_row`, `update_row`, or `delete_row`.
321    /// Kept for code compilation but it's expected that this was never used in practice.
322    #[deprecated(
323        note = "Do now create a new row manually, use the `Tables` API instead like `create_row`, `upsert_row`, `update_row`, or `delete_row`"
324    )]
325    pub fn new() -> Self {
326        Row {
327            operation: Operation::Unspecified,
328            columns: HashMap::new(),
329            ..Default::default()
330        }
331    }
332
333    pub(crate) fn new_ordered(ordinal: u64) -> Self {
334        Row {
335            operation: Operation::Unspecified,
336            columns: HashMap::new(),
337            ordinal,
338            ..Default::default()
339        }
340    }
341
342    /// Set a field to a value, this is the standard method for setting fields in a row.
343    ///
344    /// This method ensures that the value is converted to a database-compatible format
345    /// using the `ToDatabaseValue` trait. It is the primary way to set fields in a row
346    /// for most use cases.
347    ///
348    /// The `ToDatabaseValue` trait is implemented for various types, including primitive
349    /// types, strings, and custom types. This allows you to set fields with different
350    /// types of values without worrying about the underlying conversion. Check example
351    /// for more details.
352    ///
353    /// Check [ToDatabaseValue] for implemented automatic conversions.
354    ///
355    /// # Panics
356    ///
357    /// This method will panic if called on a row marked for deletion.
358    ///
359    /// # Example
360    ///
361    /// ```
362    /// use substreams::scalar::{BigInt, BigDecimal};
363    /// use crate::substreams_database_change::tables::Tables;
364    /// let mut tables = Tables::new();
365    /// let row = tables.create_row("myevent", "my_key");
366    /// row.set("name", "asset name");
367    /// row.set("decimals", 42);
368    /// row.set("count", BigDecimal::from(42));
369    /// row.set("value", BigInt::from(42));
370    /// ```
371    pub fn set<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
372        if self.operation == Operation::Delete {
373            panic!("cannot set fields on a delete operation")
374        }
375        self.columns.insert(name.to_string(), value.to_value());
376        self
377    }
378
379    /// Set a field to a raw value, this is useful for setting values that are not
380    /// normalized across all databases. In there, you can put the raw value as you
381    /// would in a SQL statement of the database you are targeting.
382    ///
383    /// This will be pass as a string to the database which will interpret it itself.
384    pub fn set_raw(&mut self, name: &str, value: String) -> &mut Self {
385        self.columns.insert(name.to_string(), value);
386        self
387    }
388
389    /// Set a field to an array of values compatible with PostgresSQL database,
390    /// this method is currently experimental and hidden as we plan to support
391    /// array natively in the model.
392    ///
393    /// For now, this method should be used with great care as it ties the model
394    /// to the database implementation.
395    #[doc(hidden)]
396    pub fn set_psql_array<T: ToDatabaseValue>(&mut self, name: &str, value: Vec<T>) -> &mut Row {
397        if self.operation == Operation::Delete {
398            panic!("cannot set fields on a delete operation")
399        }
400
401        let values = value
402            .into_iter()
403            .map(|x| x.to_value())
404            .collect::<Vec<_>>()
405            .join(",");
406
407        self.columns
408            .insert(name.to_string(), format!("'{{{}}}'", values));
409        self
410    }
411
412    /// Set a field to an array of values compatible with Clickhouse database,
413    /// this method is currently experimental and hidden as we plan to support
414    /// array natively in the model.
415    ///
416    /// For now, this method should be used with great care as it ties the model
417    /// to the database implementation.
418    #[doc(hidden)]
419    pub fn set_clickhouse_array<T: ToDatabaseValue>(
420        &mut self,
421        name: &str,
422        value: Vec<T>,
423    ) -> &mut Row {
424        if self.operation == Operation::Delete {
425            panic!("cannot set fields on a delete operation")
426        }
427
428        let values = value
429            .into_iter()
430            .map(|x| x.to_value())
431            .collect::<Vec<_>>()
432            .join(",");
433
434        self.columns
435            .insert(name.to_string(), format!("[{}]", values));
436        self
437    }
438}
439
440macro_rules! impl_to_database_value_proxy_to_ref {
441    ($name:ty) => {
442        impl ToDatabaseValue for $name {
443            fn to_value(self) -> String {
444                ToDatabaseValue::to_value(&self)
445            }
446        }
447    };
448}
449
450macro_rules! impl_to_database_value_proxy_to_string {
451    ($name:ty) => {
452        impl ToDatabaseValue for $name {
453            fn to_value(self) -> String {
454                ToString::to_string(&self)
455            }
456        }
457    };
458}
459
460pub trait ToDatabaseValue {
461    fn to_value(self) -> String;
462}
463
464impl_to_database_value_proxy_to_string!(i8);
465impl_to_database_value_proxy_to_string!(i16);
466impl_to_database_value_proxy_to_string!(i32);
467impl_to_database_value_proxy_to_string!(i64);
468impl_to_database_value_proxy_to_string!(u8);
469impl_to_database_value_proxy_to_string!(u16);
470impl_to_database_value_proxy_to_string!(u32);
471impl_to_database_value_proxy_to_string!(u64);
472impl_to_database_value_proxy_to_string!(bool);
473impl_to_database_value_proxy_to_string!(::prost_types::Timestamp);
474impl_to_database_value_proxy_to_string!(&::prost_types::Timestamp);
475impl_to_database_value_proxy_to_string!(&str);
476impl_to_database_value_proxy_to_string!(BigDecimal);
477impl_to_database_value_proxy_to_string!(&BigDecimal);
478impl_to_database_value_proxy_to_string!(BigInt);
479impl_to_database_value_proxy_to_string!(&BigInt);
480
481impl_to_database_value_proxy_to_ref!(Vec<u8>);
482
483impl ToDatabaseValue for &String {
484    fn to_value(self) -> String {
485        self.clone()
486    }
487}
488
489impl ToDatabaseValue for String {
490    fn to_value(self) -> String {
491        self
492    }
493}
494
495impl ToDatabaseValue for &Vec<u8> {
496    fn to_value(self) -> String {
497        Hex::encode(self)
498    }
499}
500
501impl<T: AsRef<[u8]>> ToDatabaseValue for Hex<T> {
502    fn to_value(self) -> String {
503        ToString::to_string(&self)
504    }
505}
506
507impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {
508    fn to_value(self) -> String {
509        ToString::to_string(self)
510    }
511}
512
513#[cfg(test)]
514mod test {
515    use crate::pb::database::table_change::PrimaryKey as PrimaryKeyProto;
516    use crate::pb::database::CompositePrimaryKey as CompositePrimaryKeyProto;
517    use crate::pb::database::{DatabaseChanges, TableChange};
518    use crate::tables::PrimaryKey;
519    use crate::tables::Tables;
520    use crate::tables::ToDatabaseValue;
521    use pretty_assertions::assert_eq;
522
523    #[test]
524    fn to_database_value_proto_timestamp() {
525        assert_eq!(
526            ToDatabaseValue::to_value(::prost_types::Timestamp {
527                seconds: 60 * 60 + 60 + 1,
528                nanos: 1
529            }),
530            "1970-01-01T01:01:01.000000001Z"
531        );
532    }
533
534    #[test]
535    fn create_row_single_pk_direct() {
536        let mut tables = Tables::new();
537        tables.create_row("myevent", PrimaryKey::Single("myhash".to_string()));
538
539        assert_eq!(
540            tables.to_database_changes(),
541            DatabaseChanges {
542                table_changes: [change("myevent", "myhash", 0)].to_vec(),
543            }
544        );
545    }
546
547    #[test]
548    fn create_row_single_pk() {
549        let mut tables = Tables::new();
550        tables.create_row("myevent", "myhash");
551
552        assert_eq!(
553            tables.to_database_changes(),
554            DatabaseChanges {
555                table_changes: [change("myevent", "myhash", 0)].to_vec(),
556            }
557        );
558    }
559
560    #[test]
561    fn create_row_composite_pk() {
562        let mut tables = Tables::new();
563        tables.create_row(
564            "myevent",
565            [("evt_tx_hash", "hello"), ("evt_index", "world")],
566        );
567
568        assert_eq!(
569            tables.to_database_changes(),
570            DatabaseChanges {
571                table_changes: [change(
572                    "myevent",
573                    [("evt_tx_hash", "hello"), ("evt_index", "world")],
574                    0
575                )]
576                .to_vec()
577            }
578        );
579    }
580
581    #[test]
582    fn row_ordering() {
583        let mut tables = Tables::new();
584        tables.create_row("tableA", "one");
585        tables.create_row("tableC", "two");
586        tables.create_row("tableA", "three");
587        tables.create_row("tableD", "four");
588        tables.create_row("tableE", "five");
589        tables.create_row("tableC", "six");
590
591        assert_eq!(
592            tables.to_database_changes(),
593            DatabaseChanges {
594                table_changes: [
595                    change("tableA", "one", 0),
596                    change("tableC", "two", 1),
597                    change("tableA", "three", 2),
598                    change("tableD", "four", 3),
599                    change("tableE", "five", 4),
600                    change("tableC", "six", 5)
601                ]
602                .to_vec(),
603            }
604        );
605    }
606
607    fn change<K: Into<PrimaryKey>>(name: &str, key: K, ordinal: u64) -> TableChange {
608        TableChange {
609            table: name.to_string(),
610            ordinal,
611            operation: 1,
612            fields: [].into(),
613            primary_key: Some(match key.into() {
614                PrimaryKey::Single(pk) => PrimaryKeyProto::Pk(pk),
615                PrimaryKey::Composite(keys) => {
616                    PrimaryKeyProto::CompositePk(CompositePrimaryKeyProto {
617                        keys: keys.into_iter().collect(),
618                    })
619                }
620            }),
621        }
622    }
623}