substreams_entity_change/
tables.rs1use crate::pb::entity::entity_change::Operation;
57use crate::pb::entity::value::Typed;
58use crate::pb::entity::{Array, EntityChange, EntityChanges, Field, Value};
59use std::collections::HashMap;
60use substreams::scalar::{BigDecimal, BigInt};
61
62#[derive(Debug)]
63pub struct Tables {
64 pub tables: HashMap<String, Rows>,
66}
67
68impl Tables {
69 pub fn new() -> Self {
70 Tables {
71 tables: HashMap::new(),
72 }
73 }
74
75 pub fn create_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
76 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
77 let row = rows
78 .pks
79 .entry(key.as_ref().to_string())
80 .or_insert(Row::new());
81 match row.operation {
82 Operation::Unspecified => {
83 row.operation = Operation::Create;
84 }
85 Operation::Create => {}
86 Operation::Update => {
87 panic!("cannot create a row that was marked for update")
88 }
89 Operation::Delete => {
90 panic!(
91 "cannot create a row after a scheduled delete operation - table: {} key: {}",
92 table,
93 key.as_ref().to_string()
94 )
95 }
96 Operation::Final => {}
97 }
98 row
99 }
100
101 pub fn update_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
102 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
103 let row = rows
104 .pks
105 .entry(key.as_ref().to_string())
106 .or_insert(Row::new());
107 match row.operation {
108 Operation::Unspecified => {
109 row.operation = Operation::Update;
110 }
111 Operation::Create => {}
112 Operation::Update => {}
113 Operation::Delete => {
114 panic!(
115 "cannot update a row after a scheduled delete operation - table: {} key: {}",
116 table,
117 key.as_ref().to_string()
118 )
119 }
120 Operation::Final => {
121 panic!("impossible state")
122 }
123 }
124 row
125 }
126
127 pub fn delete_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
128 let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
129 let row = rows
130 .pks
131 .entry(key.as_ref().to_string())
132 .or_insert(Row::new());
133 match row.operation {
134 Operation::Unspecified => {
135 row.operation = Operation::Delete;
136 }
137 Operation::Create => {
138 row.operation = Operation::Unspecified;
140 row.columns = HashMap::new();
141 }
142 Operation::Update => {
143 row.columns = HashMap::new();
144 }
145 Operation::Delete => {}
146 Operation::Final => {
147 panic!("impossible state");
148 }
149 }
150 row.operation = Operation::Delete;
151 row.columns = HashMap::new();
152 row
153 }
154
155 pub fn to_entity_changes(self) -> EntityChanges {
157 let mut entities = EntityChanges::default();
158 for (table, rows) in self.tables.into_iter() {
159 for (pk, row) in rows.pks.into_iter() {
160 if row.operation == Operation::Unspecified {
161 continue;
162 }
163
164 let mut pk_finalized: Option<EntityChange> = None;
168 if row.finalized {
169 pk_finalized = Some(EntityChange::new(
170 table.clone(),
171 pk.clone(),
172 0,
173 Operation::Final,
174 ))
175 }
176
177 let mut change = EntityChange::new(table.clone(), pk, 0, row.operation);
179 for (field, value) in row.columns.into_iter() {
180 #[allow(deprecated)]
181 change.fields.push(Field {
182 name: field,
183 new_value: Some(value),
184 old_value: None,
185 });
186 }
187
188 entities.entity_changes.push(change);
189
190 if let Some(finalized_row) = pk_finalized {
191 entities.entity_changes.push(finalized_row);
192 }
193 }
194 }
195 entities
196 }
197}
198
199#[derive(Debug)]
200pub struct Rows {
201 pub pks: HashMap<String, Row>,
203}
204
205impl Rows {
206 pub fn new() -> Self {
207 Rows {
208 pks: HashMap::new(),
209 }
210 }
211}
212
213#[derive(Debug)]
214pub struct Row {
215 pub operation: Operation,
217 pub columns: HashMap<String, Value>,
219 pub finalized: bool,
221}
222
223impl Row {
224 pub fn new() -> Self {
225 Row {
226 operation: Operation::Unspecified,
227 columns: HashMap::new(),
228 finalized: false,
229 }
230 }
231
232 pub fn set<T: ToValue>(&mut self, name: &str, value: T) -> &mut Self {
233 if self.operation == Operation::Delete {
234 panic!("cannot set fields on a delete operation")
235 }
236 self.columns.insert(name.to_string(), value.to_value());
237 self
238 }
239
240 pub fn set_bigint(&mut self, name: &str, value: &String) -> &mut Self {
241 self.columns.insert(
242 name.to_string(),
243 Value {
244 typed: Some(Typed::Bigint(value.clone())),
245 },
246 );
247 self
248 }
249
250 pub fn set_bigdecimal(&mut self, name: &str, value: &String) -> &mut Self {
251 self.columns.insert(
252 name.to_string(),
253 Value {
254 typed: Some(Typed::Bigdecimal(value.clone())),
255 },
256 );
257 self
258 }
259
260 pub fn set_bigint_or_zero(&mut self, name: &str, value: &String) -> &mut Self {
261 if value.len() == 0 {
262 self.set_bigint(name, &"0".to_string())
263 } else {
264 self.set_bigint(name, value)
265 }
266 }
267
268 pub fn _mark_final(&mut self) -> &mut Self {
269 self.finalized = true;
270 self
271 }
272}
273
274pub trait ToValue {
275 fn to_value(self) -> Value;
276}
277
278impl ToValue for &bool {
279 fn to_value(self) -> Value {
280 Value {
281 typed: Some(Typed::Bool(*self)),
282 }
283 }
284}
285
286impl ToValue for &BigDecimal {
287 fn to_value(self) -> Value {
288 Value {
289 typed: Some(Typed::Bigdecimal(self.to_string())),
290 }
291 }
292}
293
294impl ToValue for &str {
295 fn to_value(self) -> Value {
296 Value {
297 typed: Some(Typed::String(self.to_string())),
298 }
299 }
300}
301
302impl ToValue for &String {
303 fn to_value(self) -> Value {
304 Value {
305 typed: Some(Typed::String(self.clone())),
306 }
307 }
308}
309
310impl ToValue for String {
311 fn to_value(self) -> Value {
312 Value {
313 typed: Some(Typed::String(self)),
314 }
315 }
316}
317
318impl ToValue for &Vec<u8> {
319 fn to_value(self) -> Value {
320 Value {
321 typed: Some(Typed::Bytes(base64::encode(self))),
322 }
323 }
324}
325
326impl ToValue for &Vec<Vec<u8>> {
327 fn to_value(self) -> Value {
328 Value {
329 typed: Some(Typed::Array(Array {
330 value: self.iter().map(ToValue::to_value).collect(),
331 })),
332 }
333 }
334}
335
336impl ToValue for Vec<Vec<u8>> {
337 fn to_value(self) -> Value {
338 Value {
339 typed: Some(Typed::Array(Array {
340 value: self.into_iter().map(ToValue::to_value).collect(),
341 })),
342 }
343 }
344}
345
346impl ToValue for Vec<String> {
347 fn to_value(self) -> Value {
348 Value {
349 typed: Some(Typed::Array(Array {
350 value: self.into_iter().map(ToValue::to_value).collect(),
351 })),
352 }
353 }
354}
355
356impl ToValue for &Vec<String> {
357 fn to_value(self) -> Value {
358 Value {
359 typed: Some(Typed::Array(Array {
360 value: self.iter().map(ToValue::to_value).collect(),
361 })),
362 }
363 }
364}
365
366impl ToValue for &Vec<BigInt> {
367 fn to_value(self) -> Value {
368 Value {
369 typed: Some(Typed::Array(Array {
370 value: self.iter().map(ToValue::to_value).collect(),
371 })),
372 }
373 }
374}
375
376impl ToValue for Vec<BigInt> {
377 fn to_value(self) -> Value {
378 Value {
379 typed: Some(Typed::Array(Array {
380 value: self.into_iter().map(ToValue::to_value).collect(),
381 })),
382 }
383 }
384}
385
386impl ToValue for &Vec<BigDecimal> {
387 fn to_value(self) -> Value {
388 Value {
389 typed: Some(Typed::Array(Array {
390 value: self.iter().map(ToValue::to_value).collect(),
391 })),
392 }
393 }
394}
395
396impl ToValue for Vec<BigDecimal> {
397 fn to_value(self) -> Value {
398 Value {
399 typed: Some(Typed::Array(Array {
400 value: self.into_iter().map(ToValue::to_value).collect(),
401 })),
402 }
403 }
404}
405
406impl ToValue for &::prost_types::Timestamp {
407 fn to_value(self) -> Value {
408 Value {
409 typed: Some(Typed::Timestamp(
410 self.seconds * 1_000_000 + self.nanos as i64 / 1000,
411 )),
412 }
413 }
414}
415
416macro_rules! impl_to_database_value_proxy_to_ref {
417 ($name:ty) => {
418 impl ToValue for $name {
419 fn to_value(self) -> Value {
420 ToValue::to_value(&self)
421 }
422 }
423 };
424}
425
426impl_to_database_value_proxy_to_ref!(bool);
428impl_to_database_value_proxy_to_ref!(BigDecimal);
429impl_to_database_value_proxy_to_ref!(Vec<u8>);
430impl_to_database_value_proxy_to_ref!(::prost_types::Timestamp);
431
432macro_rules! impl_to_bigint_value_proxy_to_string {
433 ($name:ty) => {
434 impl ToValue for $name {
435 fn to_value(self) -> Value {
436 Value {
437 typed: Some(Typed::Bigint(self.to_string())),
438 }
439 }
440 }
441 };
442}
443
444impl_to_bigint_value_proxy_to_string!(i64);
445impl_to_bigint_value_proxy_to_string!(u8);
446impl_to_bigint_value_proxy_to_string!(u16);
447impl_to_bigint_value_proxy_to_string!(u32);
448impl_to_bigint_value_proxy_to_string!(u64);
449impl_to_bigint_value_proxy_to_string!(BigInt);
450impl_to_bigint_value_proxy_to_string!(&BigInt);
451
452macro_rules! impl_to_int32_value {
453 ($name:ty) => {
454 impl ToValue for $name {
455 fn to_value(self) -> Value {
456 Value {
457 typed: Some(Typed::Int32(self as i32)),
458 }
459 }
460 }
461 };
462}
463
464impl_to_int32_value!(i8);
465impl_to_int32_value!(i16);
466impl_to_int32_value!(i32);
467
468#[cfg(test)]
469mod test {
470 use crate::pb::entity::{
471 entity_change::Operation, value::Typed, Array, EntityChange, Field, Value,
472 };
473
474 use super::Tables;
475
476 #[test]
477 fn test_timestamp() {
478 let mut tables = Tables::new();
479 tables.create_row("table", "1").set(
480 "field",
481 &::prost_types::Timestamp {
482 seconds: 123,
483 nanos: 456789000,
484 },
485 );
486
487 let changes = tables.to_entity_changes();
488 assert_eq!(changes.entity_changes.len(), 1);
489 assert_eq!(
490 changes.entity_changes[0],
491 EntityChange {
492 entity: "table".to_string(),
493 id: "1".to_string(),
494 operation: Operation::Create as i32,
495 fields: vec![Field {
496 name: "field".to_string(),
497 new_value: Some(Value {
498 typed: Some(Typed::Timestamp(123456789)),
499 }),
500 ..Default::default()
501 }],
502 ..Default::default()
503 }
504 );
505 }
506
507 #[test]
508 fn test_vec_vec_u8() {
509 let mut tables = Tables::new();
510 tables
511 .create_row("table", "1")
512 .set("field", &vec![vec![1, 2, 3]]);
513
514 let changes = tables.to_entity_changes();
515 assert_eq!(changes.entity_changes.len(), 1);
516 assert_eq!(
517 changes.entity_changes[0],
518 EntityChange {
519 entity: "table".to_string(),
520 id: "1".to_string(),
521 operation: Operation::Create as i32,
522 fields: vec![Field {
523 name: "field".to_string(),
524 new_value: Some(Value {
525 typed: Some(Typed::Array(Array {
526 value: vec![Value {
527 typed: Some(Typed::Bytes("AQID".to_string())),
528 }],
529 })),
530 }),
531 ..Default::default()
532 }],
533 ..Default::default()
534 }
535 );
536 }
537}