Skip to main content

substreams_database_change/
change.rs

1use crate::pb::sf::substreams::sink::database::v1::field::UpdateOp;
2use crate::pb::sf::substreams::sink::database::v1::Field;
3use std::str;
4use substreams::pb::substreams::store_delta::Operation;
5use substreams::scalar::{BigDecimal, BigInt};
6use substreams::store::{
7    DeltaBigDecimal, DeltaBigInt, DeltaBool, DeltaBytes, DeltaInt32, DeltaInt64, DeltaString,
8};
9use substreams::Hex;
10
11pub trait ToField {
12    fn to_field<N: AsRef<str>>(self, name: N) -> Field;
13}
14
15/// We require to use a custom trait because we need to customize some of the string
16/// transformation and we need to control the String conversion ourself.
17pub trait AsString {
18    fn as_string(self) -> String;
19}
20
21macro_rules! impl_as_string_via_to_string {
22    ($name:ty) => {
23        impl AsString for $name {
24            fn as_string(self) -> String {
25                self.to_string()
26            }
27        }
28    };
29}
30
31impl_as_string_via_to_string!(i8);
32impl_as_string_via_to_string!(i16);
33impl_as_string_via_to_string!(i32);
34impl_as_string_via_to_string!(i64);
35impl_as_string_via_to_string!(u8);
36impl_as_string_via_to_string!(u16);
37impl_as_string_via_to_string!(u32);
38impl_as_string_via_to_string!(u64);
39impl_as_string_via_to_string!(String);
40impl_as_string_via_to_string!(&String);
41impl_as_string_via_to_string!(&str);
42impl_as_string_via_to_string!(bool);
43impl_as_string_via_to_string!(BigDecimal);
44impl_as_string_via_to_string!(&BigDecimal);
45impl_as_string_via_to_string!(BigInt);
46impl_as_string_via_to_string!(&BigInt);
47impl_as_string_via_to_string!(::prost_types::Timestamp);
48impl_as_string_via_to_string!(&::prost_types::Timestamp);
49
50impl<T: AsRef<[u8]>> AsString for Hex<T> {
51    fn as_string(self) -> String {
52        self.to_string()
53    }
54}
55
56impl<T: AsRef<[u8]>> AsString for &Hex<T> {
57    fn as_string(self) -> String {
58        self.to_string()
59    }
60}
61
62impl AsString for Vec<u8> {
63    fn as_string(self) -> String {
64        Hex::encode(self)
65    }
66}
67
68impl AsString for &Vec<u8> {
69    fn as_string(self) -> String {
70        Hex::encode(self)
71    }
72}
73
74impl<T: AsString> ToField for (T, T) {
75    fn to_field<N: AsRef<str>>(self, name: N) -> Field {
76        Field {
77            name: name.as_ref().to_string(),
78            value: self.1.as_string(),
79            update_op: UpdateOp::Set as i32,
80        }
81    }
82}
83
84impl<T: AsString> ToField for (Option<T>, T) {
85    fn to_field<N: AsRef<str>>(self, name: N) -> Field {
86        match self {
87            (Some(old), new) => ToField::to_field((old, new), name),
88            (None, new) => Field {
89                name: name.as_ref().to_string(),
90                value: new.as_string(),
91                update_op: UpdateOp::Set as i32,
92            },
93        }
94    }
95}
96
97impl<T: AsString> ToField for (T, Option<T>) {
98    fn to_field<N: AsRef<str>>(self, name: N) -> Field {
99        match self {
100            (old, Some(new)) => ToField::to_field((old, new), name),
101            (_old, None) => Field {
102                name: name.as_ref().to_string(),
103                value: "".to_string(),
104                update_op: UpdateOp::Set as i32,
105            },
106        }
107    }
108}
109
110fn delta_to_field<T: AsString>(
111    name: &str,
112    operation: Operation,
113    old_value: T,
114    new_value: T,
115) -> Field {
116    match Operation::from(operation) {
117        Operation::Update => ToField::to_field((old_value, new_value), name),
118        Operation::Create => ToField::to_field((None, new_value), name),
119        Operation::Delete => ToField::to_field((None, new_value), name),
120        Operation::Unset => panic!("unsupported operation {:?}", Operation::from(operation)),
121    }
122}
123
124macro_rules! impl_to_field_from_delta_via_move {
125    ($type:ty) => {
126        impl ToField for $type {
127            fn to_field<N: AsRef<str>>(self, name: N) -> Field {
128                delta_to_field(
129                    name.as_ref(),
130                    self.operation,
131                    self.old_value,
132                    self.new_value,
133                )
134            }
135        }
136    };
137}
138
139macro_rules! impl_to_field_from_delta_via_ref {
140    ($type:ty) => {
141        impl ToField for $type {
142            fn to_field<N: AsRef<str>>(self, name: N) -> Field {
143                delta_to_field(
144                    name.as_ref(),
145                    self.operation,
146                    &self.old_value,
147                    &self.new_value,
148                )
149            }
150        }
151    };
152}
153
154impl_to_field_from_delta_via_move!(&DeltaInt32);
155impl_to_field_from_delta_via_move!(&DeltaInt64);
156impl_to_field_from_delta_via_ref!(&DeltaBigDecimal);
157impl_to_field_from_delta_via_ref!(&DeltaBigInt);
158impl_to_field_from_delta_via_move!(&DeltaBool);
159impl_to_field_from_delta_via_ref!(&DeltaBytes);
160impl_to_field_from_delta_via_ref!(&DeltaString);
161
162#[cfg(test)]
163mod test {
164    use crate::change::ToField;
165    use crate::pb::sf::substreams::sink::database::v1::{field::UpdateOp, Field};
166    use substreams::pb::substreams::store_delta::Operation;
167    use substreams::scalar::{BigDecimal, BigInt};
168    use substreams::store::{DeltaBigDecimal, DeltaBigInt, DeltaBool, DeltaBytes, DeltaString};
169
170    const FIELD_NAME: &str = "field.name.1";
171
172    #[test]
173    fn i32_change() {
174        let i32_change = (None, 1i32);
175        assert_eq!(
176            create_expected_field(FIELD_NAME, Some("1".to_string())),
177            i32_change.to_field(FIELD_NAME)
178        );
179    }
180
181    #[test]
182    fn big_decimal_change() {
183        let bd_change = (None, BigDecimal::from(1 as i32));
184        assert_eq!(
185            create_expected_field(FIELD_NAME, Some("1".to_string())),
186            bd_change.to_field(FIELD_NAME)
187        );
188    }
189
190    #[test]
191    fn delta_big_decimal_change() {
192        let delta = DeltaBigDecimal {
193            operation: Operation::Update,
194            ordinal: 0,
195            key: "change".to_string(),
196            old_value: BigDecimal::from(10),
197            new_value: BigDecimal::from(20),
198        };
199
200        assert_eq!(
201            create_expected_field(FIELD_NAME, Some("20".to_string())),
202            delta.to_field(FIELD_NAME)
203        );
204    }
205
206    #[test]
207    fn big_int_change() {
208        let bi_change = (None, BigInt::from(1 as i32));
209        assert_eq!(
210            create_expected_field(FIELD_NAME, Some("1".to_string())),
211            bi_change.to_field(FIELD_NAME)
212        );
213    }
214
215    #[test]
216    fn delta_big_int_change() {
217        let delta = DeltaBigInt {
218            operation: Operation::Update,
219            ordinal: 0,
220            key: "change".to_string(),
221            old_value: BigInt::from(10),
222            new_value: BigInt::from(20),
223        };
224
225        assert_eq!(
226            create_expected_field(FIELD_NAME, Some("20".to_string())),
227            delta.to_field(FIELD_NAME)
228        );
229    }
230
231    #[test]
232    fn string_change() {
233        let string_change = (None, String::from("string"));
234        assert_eq!(
235            create_expected_field(FIELD_NAME, Some("string".to_string())),
236            string_change.to_field(FIELD_NAME)
237        );
238    }
239
240    #[test]
241    fn delta_string_change() {
242        let delta = DeltaString {
243            operation: Operation::Update,
244            ordinal: 0,
245            key: "change".to_string(),
246            old_value: String::from("string1"),
247            new_value: String::from("string2"),
248        };
249
250        assert_eq!(
251            create_expected_field(FIELD_NAME, Some("string2".to_string())),
252            delta.to_field(FIELD_NAME)
253        );
254    }
255
256    #[test]
257    fn bytes_change() {
258        let bytes_change: (Option<Vec<u8>>, Vec<u8>) = (None, Vec::from("bytes"));
259        assert_eq!(
260            create_expected_field(FIELD_NAME, Some("6279746573".to_string())),
261            bytes_change.to_field(FIELD_NAME)
262        );
263    }
264
265    #[test]
266    fn delta_bytes_change() {
267        let delta = DeltaBytes {
268            operation: Operation::Update,
269            ordinal: 0,
270            key: "change".to_string(),
271            old_value: Vec::from("bytes1"),
272            new_value: Vec::from("bytes2"),
273        };
274
275        assert_eq!(
276            create_expected_field(FIELD_NAME, Some("627974657332".to_string())),
277            delta.to_field(FIELD_NAME)
278        );
279    }
280
281    #[test]
282    fn bool_change() {
283        let bool_change = (None, true);
284        assert_eq!(
285            create_expected_field(FIELD_NAME, Some("true".to_string())),
286            bool_change.to_field(FIELD_NAME)
287        );
288    }
289
290    #[test]
291    fn delta_bool_change() {
292        let delta = DeltaBool {
293            operation: Operation::Update,
294            ordinal: 0,
295            key: "change".to_string(),
296            old_value: true,
297            new_value: false,
298        };
299
300        assert_eq!(
301            create_expected_field(FIELD_NAME, Some(false.to_string()),),
302            delta.to_field(FIELD_NAME)
303        );
304    }
305
306    fn create_expected_field<N: AsRef<str>>(name: N, value: Option<String>) -> Field {
307        let mut field = Field {
308            name: name.as_ref().to_string(),
309            update_op: UpdateOp::Set as i32,
310            value: "".to_string(),
311        };
312        if value.is_some() {
313            field.value = value.unwrap()
314        }
315        field
316    }
317}