substreams_database_change/
change.rs1use crate::pb::sf::substreams::sink::database::v1::field::UpdateOp;
2use crate::pb::sf::substreams::sink::database::v1::Field;
3use std::str;
4use substreams::pb::substreams::store_delta::Operation;
5use substreams::scalar::{BigDecimal, BigInt};
6use substreams::store::{
7 DeltaBigDecimal, DeltaBigInt, DeltaBool, DeltaBytes, DeltaInt32, DeltaInt64, DeltaString,
8};
9use substreams::Hex;
10
11pub trait ToField {
12 fn to_field<N: AsRef<str>>(self, name: N) -> Field;
13}
14
15pub trait AsString {
18 fn as_string(self) -> String;
19}
20
21macro_rules! impl_as_string_via_to_string {
22 ($name:ty) => {
23 impl AsString for $name {
24 fn as_string(self) -> String {
25 self.to_string()
26 }
27 }
28 };
29}
30
31impl_as_string_via_to_string!(i8);
32impl_as_string_via_to_string!(i16);
33impl_as_string_via_to_string!(i32);
34impl_as_string_via_to_string!(i64);
35impl_as_string_via_to_string!(u8);
36impl_as_string_via_to_string!(u16);
37impl_as_string_via_to_string!(u32);
38impl_as_string_via_to_string!(u64);
39impl_as_string_via_to_string!(String);
40impl_as_string_via_to_string!(&String);
41impl_as_string_via_to_string!(&str);
42impl_as_string_via_to_string!(bool);
43impl_as_string_via_to_string!(BigDecimal);
44impl_as_string_via_to_string!(&BigDecimal);
45impl_as_string_via_to_string!(BigInt);
46impl_as_string_via_to_string!(&BigInt);
47impl_as_string_via_to_string!(::prost_types::Timestamp);
48impl_as_string_via_to_string!(&::prost_types::Timestamp);
49
50impl<T: AsRef<[u8]>> AsString for Hex<T> {
51 fn as_string(self) -> String {
52 self.to_string()
53 }
54}
55
56impl<T: AsRef<[u8]>> AsString for &Hex<T> {
57 fn as_string(self) -> String {
58 self.to_string()
59 }
60}
61
62impl AsString for Vec<u8> {
63 fn as_string(self) -> String {
64 Hex::encode(self)
65 }
66}
67
68impl AsString for &Vec<u8> {
69 fn as_string(self) -> String {
70 Hex::encode(self)
71 }
72}
73
74impl<T: AsString> ToField for (T, T) {
75 fn to_field<N: AsRef<str>>(self, name: N) -> Field {
76 Field {
77 name: name.as_ref().to_string(),
78 value: self.1.as_string(),
79 update_op: UpdateOp::Set as i32,
80 }
81 }
82}
83
84impl<T: AsString> ToField for (Option<T>, T) {
85 fn to_field<N: AsRef<str>>(self, name: N) -> Field {
86 match self {
87 (Some(old), new) => ToField::to_field((old, new), name),
88 (None, new) => Field {
89 name: name.as_ref().to_string(),
90 value: new.as_string(),
91 update_op: UpdateOp::Set as i32,
92 },
93 }
94 }
95}
96
97impl<T: AsString> ToField for (T, Option<T>) {
98 fn to_field<N: AsRef<str>>(self, name: N) -> Field {
99 match self {
100 (old, Some(new)) => ToField::to_field((old, new), name),
101 (_old, None) => Field {
102 name: name.as_ref().to_string(),
103 value: "".to_string(),
104 update_op: UpdateOp::Set as i32,
105 },
106 }
107 }
108}
109
110fn delta_to_field<T: AsString>(
111 name: &str,
112 operation: Operation,
113 old_value: T,
114 new_value: T,
115) -> Field {
116 match Operation::from(operation) {
117 Operation::Update => ToField::to_field((old_value, new_value), name),
118 Operation::Create => ToField::to_field((None, new_value), name),
119 Operation::Delete => ToField::to_field((None, new_value), name),
120 Operation::Unset => panic!("unsupported operation {:?}", Operation::from(operation)),
121 }
122}
123
124macro_rules! impl_to_field_from_delta_via_move {
125 ($type:ty) => {
126 impl ToField for $type {
127 fn to_field<N: AsRef<str>>(self, name: N) -> Field {
128 delta_to_field(
129 name.as_ref(),
130 self.operation,
131 self.old_value,
132 self.new_value,
133 )
134 }
135 }
136 };
137}
138
139macro_rules! impl_to_field_from_delta_via_ref {
140 ($type:ty) => {
141 impl ToField for $type {
142 fn to_field<N: AsRef<str>>(self, name: N) -> Field {
143 delta_to_field(
144 name.as_ref(),
145 self.operation,
146 &self.old_value,
147 &self.new_value,
148 )
149 }
150 }
151 };
152}
153
154impl_to_field_from_delta_via_move!(&DeltaInt32);
155impl_to_field_from_delta_via_move!(&DeltaInt64);
156impl_to_field_from_delta_via_ref!(&DeltaBigDecimal);
157impl_to_field_from_delta_via_ref!(&DeltaBigInt);
158impl_to_field_from_delta_via_move!(&DeltaBool);
159impl_to_field_from_delta_via_ref!(&DeltaBytes);
160impl_to_field_from_delta_via_ref!(&DeltaString);
161
162#[cfg(test)]
163mod test {
164 use crate::change::ToField;
165 use crate::pb::sf::substreams::sink::database::v1::{field::UpdateOp, Field};
166 use substreams::pb::substreams::store_delta::Operation;
167 use substreams::scalar::{BigDecimal, BigInt};
168 use substreams::store::{DeltaBigDecimal, DeltaBigInt, DeltaBool, DeltaBytes, DeltaString};
169
170 const FIELD_NAME: &str = "field.name.1";
171
172 #[test]
173 fn i32_change() {
174 let i32_change = (None, 1i32);
175 assert_eq!(
176 create_expected_field(FIELD_NAME, Some("1".to_string())),
177 i32_change.to_field(FIELD_NAME)
178 );
179 }
180
181 #[test]
182 fn big_decimal_change() {
183 let bd_change = (None, BigDecimal::from(1 as i32));
184 assert_eq!(
185 create_expected_field(FIELD_NAME, Some("1".to_string())),
186 bd_change.to_field(FIELD_NAME)
187 );
188 }
189
190 #[test]
191 fn delta_big_decimal_change() {
192 let delta = DeltaBigDecimal {
193 operation: Operation::Update,
194 ordinal: 0,
195 key: "change".to_string(),
196 old_value: BigDecimal::from(10),
197 new_value: BigDecimal::from(20),
198 };
199
200 assert_eq!(
201 create_expected_field(FIELD_NAME, Some("20".to_string())),
202 delta.to_field(FIELD_NAME)
203 );
204 }
205
206 #[test]
207 fn big_int_change() {
208 let bi_change = (None, BigInt::from(1 as i32));
209 assert_eq!(
210 create_expected_field(FIELD_NAME, Some("1".to_string())),
211 bi_change.to_field(FIELD_NAME)
212 );
213 }
214
215 #[test]
216 fn delta_big_int_change() {
217 let delta = DeltaBigInt {
218 operation: Operation::Update,
219 ordinal: 0,
220 key: "change".to_string(),
221 old_value: BigInt::from(10),
222 new_value: BigInt::from(20),
223 };
224
225 assert_eq!(
226 create_expected_field(FIELD_NAME, Some("20".to_string())),
227 delta.to_field(FIELD_NAME)
228 );
229 }
230
231 #[test]
232 fn string_change() {
233 let string_change = (None, String::from("string"));
234 assert_eq!(
235 create_expected_field(FIELD_NAME, Some("string".to_string())),
236 string_change.to_field(FIELD_NAME)
237 );
238 }
239
240 #[test]
241 fn delta_string_change() {
242 let delta = DeltaString {
243 operation: Operation::Update,
244 ordinal: 0,
245 key: "change".to_string(),
246 old_value: String::from("string1"),
247 new_value: String::from("string2"),
248 };
249
250 assert_eq!(
251 create_expected_field(FIELD_NAME, Some("string2".to_string())),
252 delta.to_field(FIELD_NAME)
253 );
254 }
255
256 #[test]
257 fn bytes_change() {
258 let bytes_change: (Option<Vec<u8>>, Vec<u8>) = (None, Vec::from("bytes"));
259 assert_eq!(
260 create_expected_field(FIELD_NAME, Some("6279746573".to_string())),
261 bytes_change.to_field(FIELD_NAME)
262 );
263 }
264
265 #[test]
266 fn delta_bytes_change() {
267 let delta = DeltaBytes {
268 operation: Operation::Update,
269 ordinal: 0,
270 key: "change".to_string(),
271 old_value: Vec::from("bytes1"),
272 new_value: Vec::from("bytes2"),
273 };
274
275 assert_eq!(
276 create_expected_field(FIELD_NAME, Some("627974657332".to_string())),
277 delta.to_field(FIELD_NAME)
278 );
279 }
280
281 #[test]
282 fn bool_change() {
283 let bool_change = (None, true);
284 assert_eq!(
285 create_expected_field(FIELD_NAME, Some("true".to_string())),
286 bool_change.to_field(FIELD_NAME)
287 );
288 }
289
290 #[test]
291 fn delta_bool_change() {
292 let delta = DeltaBool {
293 operation: Operation::Update,
294 ordinal: 0,
295 key: "change".to_string(),
296 old_value: true,
297 new_value: false,
298 };
299
300 assert_eq!(
301 create_expected_field(FIELD_NAME, Some(false.to_string()),),
302 delta.to_field(FIELD_NAME)
303 );
304 }
305
306 fn create_expected_field<N: AsRef<str>>(name: N, value: Option<String>) -> Field {
307 let mut field = Field {
308 name: name.as_ref().to_string(),
309 update_op: UpdateOp::Set as i32,
310 value: "".to_string(),
311 };
312 if value.is_some() {
313 field.value = value.unwrap()
314 }
315 field
316 }
317}