substreams_entity_change/
tables.rs

1/// Tables is a collection of rows, which is a collection of columns and make it easy to build
2/// up EntityChanges protobuf objects.
3///
4/// ```rust
5///  use substreams::{scalar::{BigInt, BigDecimal}, Hex};
6///  use substreams_entity_change::tables::Tables;
7///
8///  let mut tables = Tables::new();
9///  let bigint0 = BigInt::from(0);
10///  let bigdecimal0 = BigDecimal::from(0);
11///
12///  tables
13///    .create_row("Factory", "0x0000000")
14///    .set("poolCount", &bigint0)
15///    .set("txCount", &bigint0)
16///    .set("totalVolumeUSD", &bigdecimal0)
17///    .set("totalVolumeETH", &bigdecimal0)
18///    .set("totalFeesUSD", &bigdecimal0)
19///    .set("totalFeesETH", &bigdecimal0)
20///    .set("untrackedVolumeUSD", &bigdecimal0)
21///    .set("totalValueLockedUSD", &bigdecimal0)
22///    .set("totalValueLockedETH", &bigdecimal0)
23///    .set("totalValueLockedUSDUntracked", &bigdecimal0)
24///    .set("totalValueLockedETHUntracked", &bigdecimal0)
25///    .set("owner", "0x0000000000000000000000000000000000000000");
26/// ```
27///
28/// In the code above, we create a new row in the `Factory` table with the primary key `0x0000000`
29/// and set the fields for the entity.
30///
31/// Later to update the row, we can do:
32///
33/// ```rust
34///  use substreams::{scalar::{BigInt, BigDecimal}, Hex};
35///  use substreams_entity_change::tables::Tables;
36///
37///  let mut tables = Tables::new();
38///  let new_count = BigInt::from(1);
39///
40///  tables
41///    .update_row("Factory", "0x0000000")
42///    .set("txCount", new_count);
43/// ```
44///
45/// When you have populated the table changes, you can convert them into an EntityChanges protobuf
46/// so that the Graph Node Substreams Sink can ingest your data correctly:
47///
48/// ```rust
49///  use substreams_entity_change::tables::Tables;
50///
51/// let mut tables = Tables::new();
52/// // ... populate tables
53/// let changes = tables.to_entity_changes();
54/// ```
55///
56use crate::pb::entity::entity_change::Operation;
57use crate::pb::entity::value::Typed;
58use crate::pb::entity::{Array, EntityChange, EntityChanges, Field, Value};
59use std::collections::HashMap;
60use substreams::scalar::{BigDecimal, BigInt};
61
62#[derive(Debug)]
63pub struct Tables {
64    // Map from table name to the primary keys within that table
65    pub tables: HashMap<String, Rows>,
66}
67
68impl Tables {
69    pub fn new() -> Self {
70        Tables {
71            tables: HashMap::new(),
72        }
73    }
74
75    pub fn create_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
76        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
77        let row = rows
78            .pks
79            .entry(key.as_ref().to_string())
80            .or_insert(Row::new());
81        match row.operation {
82            Operation::Unspecified => {
83                row.operation = Operation::Create;
84            }
85            Operation::Create => {}
86            Operation::Update => {
87                panic!("cannot create a row that was marked for update")
88            }
89            Operation::Delete => {
90                panic!(
91                    "cannot create a row after a scheduled delete operation - table: {} key: {}",
92                    table,
93                    key.as_ref().to_string()
94                )
95            }
96            Operation::Final => {}
97        }
98        row
99    }
100
101    pub fn update_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
102        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
103        let row = rows
104            .pks
105            .entry(key.as_ref().to_string())
106            .or_insert(Row::new());
107        match row.operation {
108            Operation::Unspecified => {
109                row.operation = Operation::Update;
110            }
111            Operation::Create => {}
112            Operation::Update => {}
113            Operation::Delete => {
114                panic!(
115                    "cannot update a row after a scheduled delete operation - table: {} key: {}",
116                    table,
117                    key.as_ref().to_string()
118                )
119            }
120            Operation::Final => {
121                panic!("impossible state")
122            }
123        }
124        row
125    }
126
127    pub fn delete_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
128        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
129        let row = rows
130            .pks
131            .entry(key.as_ref().to_string())
132            .or_insert(Row::new());
133        match row.operation {
134            Operation::Unspecified => {
135                row.operation = Operation::Delete;
136            }
137            Operation::Create => {
138                // simply clear the thing
139                row.operation = Operation::Unspecified;
140                row.columns = HashMap::new();
141            }
142            Operation::Update => {
143                row.columns = HashMap::new();
144            }
145            Operation::Delete => {}
146            Operation::Final => {
147                panic!("impossible state");
148            }
149        }
150        row.operation = Operation::Delete;
151        row.columns = HashMap::new();
152        row
153    }
154
155    // Convert Tables into an EntityChanges protobuf object
156    pub fn to_entity_changes(self) -> EntityChanges {
157        let mut entities = EntityChanges::default();
158        for (table, rows) in self.tables.into_iter() {
159            for (pk, row) in rows.pks.into_iter() {
160                if row.operation == Operation::Unspecified {
161                    continue;
162                }
163
164                // Doing it right now removes the need to perform a `pk.clone()` when creating the real change
165                // below. We assume finalized row happen much less often than standard, so we save a bunch of
166                // clone by eagerly creating the finalized row here.
167                let mut pk_finalized: Option<EntityChange> = None;
168                if row.finalized {
169                    pk_finalized = Some(EntityChange::new(
170                        table.clone(),
171                        pk.clone(),
172                        0,
173                        Operation::Final,
174                    ))
175                }
176
177                // Map the row.operation into an EntityChange.Operation
178                let mut change = EntityChange::new(table.clone(), pk, 0, row.operation);
179                for (field, value) in row.columns.into_iter() {
180                    #[allow(deprecated)]
181                    change.fields.push(Field {
182                        name: field,
183                        new_value: Some(value),
184                        old_value: None,
185                    });
186                }
187
188                entities.entity_changes.push(change);
189
190                if let Some(finalized_row) = pk_finalized {
191                    entities.entity_changes.push(finalized_row);
192                }
193            }
194        }
195        entities
196    }
197}
198
199#[derive(Debug)]
200pub struct Rows {
201    // Map of primary keys within this table, to the fields within
202    pub pks: HashMap<String, Row>,
203}
204
205impl Rows {
206    pub fn new() -> Self {
207        Rows {
208            pks: HashMap::new(),
209        }
210    }
211}
212
213#[derive(Debug)]
214pub struct Row {
215    // Verify that we don't try to delete the same row as we're creating it
216    pub operation: Operation,
217    // Map of field name to its last change
218    pub columns: HashMap<String, Value>,
219    // Finalized: Last update or delete
220    pub finalized: bool,
221}
222
223impl Row {
224    pub fn new() -> Self {
225        Row {
226            operation: Operation::Unspecified,
227            columns: HashMap::new(),
228            finalized: false,
229        }
230    }
231
232    pub fn set<T: ToValue>(&mut self, name: &str, value: T) -> &mut Self {
233        if self.operation == Operation::Delete {
234            panic!("cannot set fields on a delete operation")
235        }
236        self.columns.insert(name.to_string(), value.to_value());
237        self
238    }
239
240    pub fn set_bigint(&mut self, name: &str, value: &String) -> &mut Self {
241        self.columns.insert(
242            name.to_string(),
243            Value {
244                typed: Some(Typed::Bigint(value.clone())),
245            },
246        );
247        self
248    }
249
250    pub fn set_bigdecimal(&mut self, name: &str, value: &String) -> &mut Self {
251        self.columns.insert(
252            name.to_string(),
253            Value {
254                typed: Some(Typed::Bigdecimal(value.clone())),
255            },
256        );
257        self
258    }
259
260    pub fn set_bigint_or_zero(&mut self, name: &str, value: &String) -> &mut Self {
261        if value.len() == 0 {
262            self.set_bigint(name, &"0".to_string())
263        } else {
264            self.set_bigint(name, value)
265        }
266    }
267
268    pub fn _mark_final(&mut self) -> &mut Self {
269        self.finalized = true;
270        self
271    }
272}
273
274pub trait ToValue {
275    fn to_value(self) -> Value;
276}
277
278impl ToValue for &bool {
279    fn to_value(self) -> Value {
280        Value {
281            typed: Some(Typed::Bool(*self)),
282        }
283    }
284}
285
286impl ToValue for &BigDecimal {
287    fn to_value(self) -> Value {
288        Value {
289            typed: Some(Typed::Bigdecimal(self.to_string())),
290        }
291    }
292}
293
294impl ToValue for &str {
295    fn to_value(self) -> Value {
296        Value {
297            typed: Some(Typed::String(self.to_string())),
298        }
299    }
300}
301
302impl ToValue for &String {
303    fn to_value(self) -> Value {
304        Value {
305            typed: Some(Typed::String(self.clone())),
306        }
307    }
308}
309
310impl ToValue for String {
311    fn to_value(self) -> Value {
312        Value {
313            typed: Some(Typed::String(self)),
314        }
315    }
316}
317
318impl ToValue for &Vec<u8> {
319    fn to_value(self) -> Value {
320        Value {
321            typed: Some(Typed::Bytes(base64::encode(self))),
322        }
323    }
324}
325
326impl ToValue for &Vec<Vec<u8>> {
327    fn to_value(self) -> Value {
328        Value {
329            typed: Some(Typed::Array(Array {
330                value: self.iter().map(ToValue::to_value).collect(),
331            })),
332        }
333    }
334}
335
336impl ToValue for Vec<Vec<u8>> {
337    fn to_value(self) -> Value {
338        Value {
339            typed: Some(Typed::Array(Array {
340                value: self.into_iter().map(ToValue::to_value).collect(),
341            })),
342        }
343    }
344}
345
346impl ToValue for Vec<String> {
347    fn to_value(self) -> Value {
348        Value {
349            typed: Some(Typed::Array(Array {
350                value: self.into_iter().map(ToValue::to_value).collect(),
351            })),
352        }
353    }
354}
355
356impl ToValue for &Vec<String> {
357    fn to_value(self) -> Value {
358        Value {
359            typed: Some(Typed::Array(Array {
360                value: self.iter().map(ToValue::to_value).collect(),
361            })),
362        }
363    }
364}
365
366impl ToValue for &Vec<BigInt> {
367    fn to_value(self) -> Value {
368        Value {
369            typed: Some(Typed::Array(Array {
370                value: self.iter().map(ToValue::to_value).collect(),
371            })),
372        }
373    }
374}
375
376impl ToValue for Vec<BigInt> {
377    fn to_value(self) -> Value {
378        Value {
379            typed: Some(Typed::Array(Array {
380                value: self.into_iter().map(ToValue::to_value).collect(),
381            })),
382        }
383    }
384}
385
386impl ToValue for &Vec<BigDecimal> {
387    fn to_value(self) -> Value {
388        Value {
389            typed: Some(Typed::Array(Array {
390                value: self.iter().map(ToValue::to_value).collect(),
391            })),
392        }
393    }
394}
395
396impl ToValue for Vec<BigDecimal> {
397    fn to_value(self) -> Value {
398        Value {
399            typed: Some(Typed::Array(Array {
400                value: self.into_iter().map(ToValue::to_value).collect(),
401            })),
402        }
403    }
404}
405
406impl ToValue for &::prost_types::Timestamp {
407    fn to_value(self) -> Value {
408        Value {
409            typed: Some(Typed::Timestamp(
410                self.seconds * 1_000_000 + self.nanos as i64 / 1000,
411            )),
412        }
413    }
414}
415
416macro_rules! impl_to_database_value_proxy_to_ref {
417    ($name:ty) => {
418        impl ToValue for $name {
419            fn to_value(self) -> Value {
420                ToValue::to_value(&self)
421            }
422        }
423    };
424}
425
426// Those owns the received value (so once called, the value is dropped)
427impl_to_database_value_proxy_to_ref!(bool);
428impl_to_database_value_proxy_to_ref!(BigDecimal);
429impl_to_database_value_proxy_to_ref!(Vec<u8>);
430impl_to_database_value_proxy_to_ref!(::prost_types::Timestamp);
431
432macro_rules! impl_to_bigint_value_proxy_to_string {
433    ($name:ty) => {
434        impl ToValue for $name {
435            fn to_value(self) -> Value {
436                Value {
437                    typed: Some(Typed::Bigint(self.to_string())),
438                }
439            }
440        }
441    };
442}
443
444impl_to_bigint_value_proxy_to_string!(i64);
445impl_to_bigint_value_proxy_to_string!(u8);
446impl_to_bigint_value_proxy_to_string!(u16);
447impl_to_bigint_value_proxy_to_string!(u32);
448impl_to_bigint_value_proxy_to_string!(u64);
449impl_to_bigint_value_proxy_to_string!(BigInt);
450impl_to_bigint_value_proxy_to_string!(&BigInt);
451
452macro_rules! impl_to_int32_value {
453    ($name:ty) => {
454        impl ToValue for $name {
455            fn to_value(self) -> Value {
456                Value {
457                    typed: Some(Typed::Int32(self as i32)),
458                }
459            }
460        }
461    };
462}
463
464impl_to_int32_value!(i8);
465impl_to_int32_value!(i16);
466impl_to_int32_value!(i32);
467
468#[cfg(test)]
469mod test {
470    use crate::pb::entity::{
471        entity_change::Operation, value::Typed, Array, EntityChange, Field, Value,
472    };
473
474    use super::Tables;
475
476    #[test]
477    fn test_timestamp() {
478        let mut tables = Tables::new();
479        tables.create_row("table", "1").set(
480            "field",
481            &::prost_types::Timestamp {
482                seconds: 123,
483                nanos: 456789000,
484            },
485        );
486
487        let changes = tables.to_entity_changes();
488        assert_eq!(changes.entity_changes.len(), 1);
489        assert_eq!(
490            changes.entity_changes[0],
491            EntityChange {
492                entity: "table".to_string(),
493                id: "1".to_string(),
494                operation: Operation::Create as i32,
495                fields: vec![Field {
496                    name: "field".to_string(),
497                    new_value: Some(Value {
498                        typed: Some(Typed::Timestamp(123456789)),
499                    }),
500                    ..Default::default()
501                }],
502                ..Default::default()
503            }
504        );
505    }
506
507    #[test]
508    fn test_vec_vec_u8() {
509        let mut tables = Tables::new();
510        tables
511            .create_row("table", "1")
512            .set("field", &vec![vec![1, 2, 3]]);
513
514        let changes = tables.to_entity_changes();
515        assert_eq!(changes.entity_changes.len(), 1);
516        assert_eq!(
517            changes.entity_changes[0],
518            EntityChange {
519                entity: "table".to_string(),
520                id: "1".to_string(),
521                operation: Operation::Create as i32,
522                fields: vec![Field {
523                    name: "field".to_string(),
524                    new_value: Some(Value {
525                        typed: Some(Typed::Array(Array {
526                            value: vec![Value {
527                                typed: Some(Typed::Bytes("AQID".to_string())),
528                            }],
529                        })),
530                    }),
531                    ..Default::default()
532                }],
533                ..Default::default()
534            }
535        );
536    }
537}