1use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange};
2use std::collections::{BTreeMap, HashMap};
3use substreams::{
4 scalar::{BigDecimal, BigInt},
5 Hex,
6};
7
8#[derive(Debug)]
9pub struct Tables {
10 pub tables: HashMap<String, Rows>,
12
13 ordinal: Ordinal,
16}
17
18impl Tables {
19 pub fn new() -> Self {
20 Tables {
21 tables: HashMap::new(),
22 ordinal: Ordinal::new(),
23 }
24 }
25
26 pub fn all_row_count(&self) -> usize {
28 self.tables.values().map(|rows| rows.pks.len()).sum()
29 }
30
31 pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
47 let rows: &mut Rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
48 let k = key.into();
49 let key_debug = format!("{:?}", k);
50 let row = rows
51 .pks
52 .entry(k)
53 .or_insert(Row::new_ordered(self.ordinal.next()));
54 match row.operation {
55 Operation::Unspecified => {
56 row.operation = Operation::Create;
57 }
58 Operation::Create => { }
59 Operation::Upsert => {
60 panic!(
61 "cannot create a row after a scheduled upsert operation, create and upsert are exclusive - table: {} key: {}",
62 table, key_debug,
63 )
64 }
65 Operation::Update => {
66 panic!("cannot create a row that was marked for update")
67 }
68 Operation::Delete => {
69 panic!(
70 "cannot create a row after a scheduled delete operation - table: {} key: {}",
71 table, key_debug,
72 )
73 }
74 }
75 row
76 }
77
78 pub fn upsert_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
96 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
97 let k = key.into();
98 let key_debug = format!("{:?}", k);
99 let row = rows
100 .pks
101 .entry(k)
102 .or_insert(Row::new_ordered(self.ordinal.next()));
103 match row.operation {
104 Operation::Unspecified => {
105 row.operation = Operation::Upsert;
106 }
107 Operation::Create => {
108 panic!(
109 "cannot upsert a row after a scheduled create operation, create and upsert are exclusive - table: {} key: {}",
110 table, key_debug,
111 )
112 }
113 Operation::Upsert => { }
114 Operation::Update => {
115 panic!(
116 "cannot upsert a row after a scheduled update operation, update and upsert are exclusive - table: {} key: {}",
117 table, key_debug,
118 )
119 }
120 Operation::Delete => {
121 panic!(
122 "cannot upsert a row after a scheduled delete operation - table: {} key: {}",
123 table, key_debug,
124 )
125 }
126 }
127 row
128 }
129
130 pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
131 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
132 let k = key.into();
133 let key_debug = format!("{:?}", k);
134 let row = rows
135 .pks
136 .entry(k)
137 .or_insert(Row::new_ordered(self.ordinal.next()));
138 match row.operation {
139 Operation::Unspecified => {
140 row.operation = Operation::Update;
141 }
142 Operation::Create => { }
143 Operation::Upsert => { }
144 Operation::Update => { }
145 Operation::Delete => {
146 panic!(
147 "cannot update a row after a scheduled delete operation - table: {} key: {}",
148 table, key_debug,
149 )
150 }
151 }
152 row
153 }
154
155 pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
156 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
157 let row = rows
158 .pks
159 .entry(key.into())
160 .or_insert(Row::new_ordered(self.ordinal.next()));
161
162 row.columns = HashMap::new();
163 row.operation = match row.operation {
164 Operation::Unspecified => Operation::Delete,
165 Operation::Create => {
166 Operation::Unspecified
170 }
171 Operation::Upsert => {
172 Operation::Delete
176 }
177 Operation::Update => {
178 Operation::Delete
180 }
181 Operation::Delete => {
182 Operation::Delete
184 }
185 };
186
187 row
188 }
189
190 pub fn to_database_changes(self) -> DatabaseChanges {
192 let mut changes = DatabaseChanges::default();
193
194 for (table, rows) in self.tables.into_iter() {
195 for (pk, row) in rows.pks.into_iter() {
196 if row.operation == Operation::Unspecified {
197 continue;
198 }
199
200 let mut change = match pk {
201 PrimaryKey::Single(pk) => {
202 TableChange::new(table.clone(), pk, row.ordinal, row.operation)
203 }
204 PrimaryKey::Composite(keys) => TableChange::new_composite(
205 table.clone(),
206 keys.into_iter().collect(),
207 row.ordinal,
208 row.operation,
209 ),
210 };
211
212 for (field, value) in row.columns.into_iter() {
213 change.fields.push(Field {
214 name: field,
215 new_value: value,
216 old_value: "".to_string(),
217 });
218 }
219
220 changes.table_changes.push(change);
221 }
222 }
223
224 changes.table_changes.sort_by_key(|change| change.ordinal);
225 changes
226 }
227}
228
229#[derive(Debug, Default, Clone, Copy)]
230pub struct Ordinal(u64);
231
232impl Ordinal {
233 pub fn new() -> Self {
234 Ordinal(0)
235 }
236
237 pub fn next(&mut self) -> u64 {
238 let current = self.0;
239 self.0 += 1;
240 current
241 }
242}
243
244#[derive(Hash, Debug, Eq, PartialEq)]
245pub enum PrimaryKey {
246 Single(String),
247 Composite(BTreeMap<String, String>),
248}
249
250impl From<&str> for PrimaryKey {
251 fn from(x: &str) -> Self {
252 Self::Single(x.to_string())
253 }
254}
255
256impl From<&String> for PrimaryKey {
257 fn from(x: &String) -> Self {
258 Self::Single(x.clone())
259 }
260}
261
262impl From<String> for PrimaryKey {
263 fn from(x: String) -> Self {
264 Self::Single(x)
265 }
266}
267
268impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
269 fn from(arr: [(K, String); N]) -> Self {
270 if N == 0 {
271 return Self::Composite(BTreeMap::new());
272 }
273
274 let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
275 Self::Composite(BTreeMap::from(string_arr))
276 }
277}
278
279impl<K: AsRef<str>, const N: usize> From<[(K, &str); N]> for PrimaryKey {
280 fn from(arr: [(K, &str); N]) -> Self {
281 if N == 0 {
282 return Self::Composite(BTreeMap::new());
283 }
284
285 let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v.to_string()));
286 Self::Composite(BTreeMap::from(string_arr))
287 }
288}
289
290#[derive(Debug)]
291pub struct Rows {
292 pks: HashMap<PrimaryKey, Row>,
294}
295
296impl Rows {
297 pub fn new() -> Self {
298 Rows {
299 pks: HashMap::new(),
300 }
301 }
302}
303
304#[derive(Debug, Default)]
305pub struct Row {
306 pub operation: Operation,
308 pub columns: HashMap<String, String>,
310 #[deprecated(
312 note = "The finalization state is now implicitly handled by the `operation` field."
313 )]
314 pub finalized: bool,
315
316 ordinal: u64,
317}
318
319impl Row {
320 #[deprecated(
323 note = "Do now create a new row manually, use the `Tables` API instead like `create_row`, `upsert_row`, `update_row`, or `delete_row`"
324 )]
325 pub fn new() -> Self {
326 Row {
327 operation: Operation::Unspecified,
328 columns: HashMap::new(),
329 ..Default::default()
330 }
331 }
332
333 pub(crate) fn new_ordered(ordinal: u64) -> Self {
334 Row {
335 operation: Operation::Unspecified,
336 columns: HashMap::new(),
337 ordinal,
338 ..Default::default()
339 }
340 }
341
342 pub fn set<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
372 if self.operation == Operation::Delete {
373 panic!("cannot set fields on a delete operation")
374 }
375 self.columns.insert(name.to_string(), value.to_value());
376 self
377 }
378
379 pub fn set_raw(&mut self, name: &str, value: String) -> &mut Self {
385 self.columns.insert(name.to_string(), value);
386 self
387 }
388
389 #[doc(hidden)]
396 pub fn set_psql_array<T: ToDatabaseValue>(&mut self, name: &str, value: Vec<T>) -> &mut Row {
397 if self.operation == Operation::Delete {
398 panic!("cannot set fields on a delete operation")
399 }
400
401 let values = value
402 .into_iter()
403 .map(|x| x.to_value())
404 .collect::<Vec<_>>()
405 .join(",");
406
407 self.columns
408 .insert(name.to_string(), format!("'{{{}}}'", values));
409 self
410 }
411
412 #[doc(hidden)]
419 pub fn set_clickhouse_array<T: ToDatabaseValue>(
420 &mut self,
421 name: &str,
422 value: Vec<T>,
423 ) -> &mut Row {
424 if self.operation == Operation::Delete {
425 panic!("cannot set fields on a delete operation")
426 }
427
428 let values = value
429 .into_iter()
430 .map(|x| x.to_value())
431 .collect::<Vec<_>>()
432 .join(",");
433
434 self.columns
435 .insert(name.to_string(), format!("[{}]", values));
436 self
437 }
438}
439
440macro_rules! impl_to_database_value_proxy_to_ref {
441 ($name:ty) => {
442 impl ToDatabaseValue for $name {
443 fn to_value(self) -> String {
444 ToDatabaseValue::to_value(&self)
445 }
446 }
447 };
448}
449
450macro_rules! impl_to_database_value_proxy_to_string {
451 ($name:ty) => {
452 impl ToDatabaseValue for $name {
453 fn to_value(self) -> String {
454 ToString::to_string(&self)
455 }
456 }
457 };
458}
459
460pub trait ToDatabaseValue {
461 fn to_value(self) -> String;
462}
463
464impl_to_database_value_proxy_to_string!(i8);
465impl_to_database_value_proxy_to_string!(i16);
466impl_to_database_value_proxy_to_string!(i32);
467impl_to_database_value_proxy_to_string!(i64);
468impl_to_database_value_proxy_to_string!(u8);
469impl_to_database_value_proxy_to_string!(u16);
470impl_to_database_value_proxy_to_string!(u32);
471impl_to_database_value_proxy_to_string!(u64);
472impl_to_database_value_proxy_to_string!(bool);
473impl_to_database_value_proxy_to_string!(::prost_types::Timestamp);
474impl_to_database_value_proxy_to_string!(&::prost_types::Timestamp);
475impl_to_database_value_proxy_to_string!(&str);
476impl_to_database_value_proxy_to_string!(BigDecimal);
477impl_to_database_value_proxy_to_string!(&BigDecimal);
478impl_to_database_value_proxy_to_string!(BigInt);
479impl_to_database_value_proxy_to_string!(&BigInt);
480
481impl_to_database_value_proxy_to_ref!(Vec<u8>);
482
483impl ToDatabaseValue for &String {
484 fn to_value(self) -> String {
485 self.clone()
486 }
487}
488
489impl ToDatabaseValue for String {
490 fn to_value(self) -> String {
491 self
492 }
493}
494
495impl ToDatabaseValue for &Vec<u8> {
496 fn to_value(self) -> String {
497 Hex::encode(self)
498 }
499}
500
501impl<T: AsRef<[u8]>> ToDatabaseValue for Hex<T> {
502 fn to_value(self) -> String {
503 ToString::to_string(&self)
504 }
505}
506
507impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {
508 fn to_value(self) -> String {
509 ToString::to_string(self)
510 }
511}
512
513#[cfg(test)]
514mod test {
515 use crate::pb::database::table_change::PrimaryKey as PrimaryKeyProto;
516 use crate::pb::database::CompositePrimaryKey as CompositePrimaryKeyProto;
517 use crate::pb::database::{DatabaseChanges, TableChange};
518 use crate::tables::PrimaryKey;
519 use crate::tables::Tables;
520 use crate::tables::ToDatabaseValue;
521 use pretty_assertions::assert_eq;
522
523 #[test]
524 fn to_database_value_proto_timestamp() {
525 assert_eq!(
526 ToDatabaseValue::to_value(::prost_types::Timestamp {
527 seconds: 60 * 60 + 60 + 1,
528 nanos: 1
529 }),
530 "1970-01-01T01:01:01.000000001Z"
531 );
532 }
533
534 #[test]
535 fn create_row_single_pk_direct() {
536 let mut tables = Tables::new();
537 tables.create_row("myevent", PrimaryKey::Single("myhash".to_string()));
538
539 assert_eq!(
540 tables.to_database_changes(),
541 DatabaseChanges {
542 table_changes: [change("myevent", "myhash", 0)].to_vec(),
543 }
544 );
545 }
546
547 #[test]
548 fn create_row_single_pk() {
549 let mut tables = Tables::new();
550 tables.create_row("myevent", "myhash");
551
552 assert_eq!(
553 tables.to_database_changes(),
554 DatabaseChanges {
555 table_changes: [change("myevent", "myhash", 0)].to_vec(),
556 }
557 );
558 }
559
560 #[test]
561 fn create_row_composite_pk() {
562 let mut tables = Tables::new();
563 tables.create_row(
564 "myevent",
565 [("evt_tx_hash", "hello"), ("evt_index", "world")],
566 );
567
568 assert_eq!(
569 tables.to_database_changes(),
570 DatabaseChanges {
571 table_changes: [change(
572 "myevent",
573 [("evt_tx_hash", "hello"), ("evt_index", "world")],
574 0
575 )]
576 .to_vec()
577 }
578 );
579 }
580
581 #[test]
582 fn row_ordering() {
583 let mut tables = Tables::new();
584 tables.create_row("tableA", "one");
585 tables.create_row("tableC", "two");
586 tables.create_row("tableA", "three");
587 tables.create_row("tableD", "four");
588 tables.create_row("tableE", "five");
589 tables.create_row("tableC", "six");
590
591 assert_eq!(
592 tables.to_database_changes(),
593 DatabaseChanges {
594 table_changes: [
595 change("tableA", "one", 0),
596 change("tableC", "two", 1),
597 change("tableA", "three", 2),
598 change("tableD", "four", 3),
599 change("tableE", "five", 4),
600 change("tableC", "six", 5)
601 ]
602 .to_vec(),
603 }
604 );
605 }
606
607 fn change<K: Into<PrimaryKey>>(name: &str, key: K, ordinal: u64) -> TableChange {
608 TableChange {
609 table: name.to_string(),
610 ordinal,
611 operation: 1,
612 fields: [].into(),
613 primary_key: Some(match key.into() {
614 PrimaryKey::Single(pk) => PrimaryKeyProto::Pk(pk),
615 PrimaryKey::Composite(keys) => {
616 PrimaryKeyProto::CompositePk(CompositePrimaryKeyProto {
617 keys: keys.into_iter().collect(),
618 })
619 }
620 }),
621 }
622 }
623}