Skip to main content

substreams_database_change/
tables.rs

1use crate::numeric::{NumericAddable, NumericComparable};
2use crate::pb::sf::substreams::sink::database::v1::{
3    field::UpdateOp, table_change::Operation, DatabaseChanges, Field, TableChange,
4};
5use std::cmp::Ordering;
6use std::collections::{BTreeMap, HashMap};
7use substreams::{
8    scalar::{BigDecimal, BigInt},
9    Hex,
10};
11
12#[derive(Debug)]
13pub struct Tables {
14    /// Map from table name to the primary keys within that table
15    tables: HashMap<String, Rows>,
16
17    /// Ordinal is used to track the order of changes, it is incremented for each row
18    /// in such way that at the end, we can correctly order the changes back correctly.
19    ordinal: Ordinal,
20}
21
22impl Tables {
23    pub fn new() -> Self {
24        Tables {
25            tables: HashMap::new(),
26            ordinal: Ordinal::new(),
27        }
28    }
29
30    /// Returns the number of rows in all tables.
31    pub fn all_row_count(&self) -> usize {
32        self.tables.values().map(|rows| rows.pks.len()).sum()
33    }
34
35    /// Create a new row in the table with the given primary key.
36    ///
37    /// ```
38    /// // With a Primary Key of type `Single`
39    /// use crate::substreams_database_change::tables::Tables;
40    /// let mut tables = Tables::new();
41    /// tables.create_row("myevent", "my_key",);
42    /// ```
43    ///
44    /// ```
45    /// // With a Primary Key of type `Composite`
46    /// use crate::substreams_database_change::tables::Tables;
47    /// let mut tables = Tables::new();
48    /// tables.create_row("myevent", [("evt_tx_hash", String::from("hello")), ("evt_index", String::from("world"))]);
49    /// ```
50    pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
51        let rows: &mut Rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
52        let k = key.into();
53        let key_debug = format!("{:?}", k);
54        let row = rows.pks.entry(k).or_insert(Row::new(self.ordinal.next()));
55        match row.operation {
56            Operation::Unspecified => {
57                row.operation = Operation::Create;
58            }
59            Operation::Create => { /* Already the right operation */ }
60            Operation::Upsert => {
61                panic!(
62                    "cannot create a row after a scheduled upsert operation, create and upsert are exclusive - table: {} key: {}",
63                    table, key_debug,
64                )
65            }
66            Operation::Update => {
67                panic!("cannot create a row that was marked for update")
68            }
69            Operation::Delete => {
70                panic!(
71                    "cannot create a row after a scheduled delete operation - table: {} key: {}",
72                    table, key_debug,
73                )
74            }
75        }
76        row
77    }
78
79    /// Upsert (insert or update) a new row in the table with the given primary key.
80    ///
81    /// *Note* Ensure that the SQL sink driver you use supports upsert operations.
82    ///
83    /// ```
84    /// // With a Primary Key of type `Single`
85    /// use crate::substreams_database_change::tables::Tables;
86    /// let mut tables = Tables::new();
87    /// tables.upsert_row("myevent", "my_key",);
88    /// ```
89    ///
90    /// ```
91    /// // With a Primary Key of type `Composite`
92    /// use crate::substreams_database_change::tables::Tables;
93    /// let mut tables = Tables::new();
94    /// tables.upsert_row("myevent", [("evt_tx_hash", String::from("hello")), ("evt_index", String::from("world"))]);
95    /// ```
96    pub fn upsert_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
97        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
98        let k = key.into();
99        let key_debug = format!("{:?}", k);
100        let row = rows.pks.entry(k).or_insert(Row::new(self.ordinal.next()));
101        match row.operation {
102            Operation::Unspecified => {
103                row.operation = Operation::Upsert;
104            }
105            Operation::Create => {
106                panic!(
107                    "cannot upsert a row after a scheduled create operation, create and upsert are exclusive - table: {} key: {}",
108                    table, key_debug,
109                )
110            }
111            Operation::Upsert => { /* Already the right operation */ }
112            Operation::Update => {
113                panic!(
114                    "cannot upsert a row after a scheduled update operation, update and upsert are exclusive - table: {} key: {}",
115                    table, key_debug,
116                )
117            }
118            Operation::Delete => {
119                panic!(
120                    "cannot upsert a row after a scheduled delete operation - table: {} key: {}",
121                    table, key_debug,
122                )
123            }
124        }
125        row
126    }
127
128    pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
129        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
130        let k = key.into();
131        let key_debug = format!("{:?}", k);
132        let row = rows.pks.entry(k).or_insert(Row::new(self.ordinal.next()));
133        match row.operation {
134            Operation::Unspecified => {
135                row.operation = Operation::Update;
136            }
137            Operation::Create => { /* Fine, updated columns will be part of Insert operation */ }
138            Operation::Upsert => { /* Fine, updated columns will be part of Upsert operation */ }
139            Operation::Update => { /* Already the right operation */ }
140            Operation::Delete => {
141                panic!(
142                    "cannot update a row after a scheduled delete operation - table: {} key: {}",
143                    table, key_debug,
144                )
145            }
146        }
147        row
148    }
149
150    pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
151        let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
152        let row = rows
153            .pks
154            .entry(key.into())
155            .or_insert(Row::new(self.ordinal.next()));
156
157        row.columns = HashMap::new();
158        row.operation = match row.operation {
159            Operation::Unspecified => Operation::Delete,
160            Operation::Create => {
161                // We are creating the row in this block, there is no need to emit a DELETE statement,
162                // we specify Unspecified and the row will be skipped when comes the time to emit the
163                // changes.
164                Operation::Unspecified
165            }
166            Operation::Upsert => {
167                // We cannot know if the row was created within that block or already present
168                // in the database. As such, we must emit a DELETE statement in the sink
169                // for this. Worst case, the DELETE will hit no row and be a no-op.
170                Operation::Delete
171            }
172            Operation::Update => {
173                // The row must be deleted, emit the operation
174                Operation::Delete
175            }
176            Operation::Delete => {
177                // Already delete type, continue using that as the operation
178                Operation::Delete
179            }
180        };
181
182        row
183    }
184
185    // Convert Tables into an DatabaseChanges protobuf object
186    pub fn to_database_changes(self) -> DatabaseChanges {
187        let mut changes = DatabaseChanges::default();
188
189        for (table, rows) in self.tables.into_iter() {
190            for (pk, row) in rows.pks.into_iter() {
191                if row.operation == Operation::Unspecified {
192                    continue;
193                }
194
195                let mut change = match pk {
196                    PrimaryKey::Single(pk) => {
197                        TableChange::new(table.clone(), pk, row.ordinal, row.operation)
198                    }
199                    PrimaryKey::Composite(keys) => TableChange::new_composite(
200                        table.clone(),
201                        keys.into_iter().collect(),
202                        row.ordinal,
203                        row.operation,
204                    ),
205                };
206
207                for (field, field_value) in row.columns.into_iter() {
208                    change.fields.push(Field {
209                        name: field,
210                        value: field_value.value.into_string(),
211                        update_op: field_value.update_op as i32,
212                    });
213                }
214
215                changes.table_changes.push(change);
216            }
217        }
218
219        changes.table_changes.sort_by_key(|change| change.ordinal);
220        changes
221    }
222}
223
224#[derive(Debug, Default, Clone, Copy)]
225struct Ordinal(u64);
226
227impl Ordinal {
228    fn new() -> Self {
229        Ordinal(0)
230    }
231
232    fn next(&mut self) -> u64 {
233        let current = self.0;
234        self.0 += 1;
235        current
236    }
237}
238
239#[derive(Hash, Debug, Eq, PartialEq)]
240pub enum PrimaryKey {
241    Single(String),
242    Composite(BTreeMap<String, String>),
243}
244
245impl From<&str> for PrimaryKey {
246    fn from(x: &str) -> Self {
247        Self::Single(x.to_string())
248    }
249}
250
251impl From<&String> for PrimaryKey {
252    fn from(x: &String) -> Self {
253        Self::Single(x.clone())
254    }
255}
256
257impl From<String> for PrimaryKey {
258    fn from(x: String) -> Self {
259        Self::Single(x)
260    }
261}
262
263impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
264    fn from(arr: [(K, String); N]) -> Self {
265        if N == 0 {
266            return Self::Composite(BTreeMap::new());
267        }
268
269        let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
270        Self::Composite(BTreeMap::from(string_arr))
271    }
272}
273
274impl<K: AsRef<str>, const N: usize> From<[(K, &str); N]> for PrimaryKey {
275    fn from(arr: [(K, &str); N]) -> Self {
276        if N == 0 {
277            return Self::Composite(BTreeMap::new());
278        }
279
280        let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v.to_string()));
281        Self::Composite(BTreeMap::from(string_arr))
282    }
283}
284
285#[derive(Debug)]
286struct Rows {
287    // Map of primary keys within this table, to the fields within
288    pks: HashMap<PrimaryKey, Row>,
289}
290
291impl Rows {
292    fn new() -> Self {
293        Rows {
294            pks: HashMap::new(),
295        }
296    }
297}
298
299/// Holds field value and its update operation for UPSERT handling.
300#[derive(Debug, Clone)]
301struct FieldValue {
302    value: FieldInnerValue,
303    update_op: UpdateOp,
304}
305
306impl FieldValue {
307    fn new(value: String) -> Self {
308        FieldValue {
309            value: FieldInnerValue::from_string(value),
310            update_op: UpdateOp::Set,
311        }
312    }
313
314    fn new_numeric(value: BigDecimal, op: UpdateOp) -> Self {
315        FieldValue {
316            value: FieldInnerValue::Numeric(value),
317            update_op: op,
318        }
319    }
320
321    fn with_op(value: String, update_op: UpdateOp) -> Self {
322        FieldValue {
323            value: FieldInnerValue::from_string(value),
324            update_op,
325        }
326    }
327}
328
329/// Internal representation of field values that can be either text or numeric.
330/// Numeric values are stored as BigDecimal to avoid repeated parsing during
331/// accumulation operations (add/sub/max/min).
332#[derive(Debug, Clone)]
333enum FieldInnerValue {
334    /// Text representation - used for non-numeric values and initial set() calls
335    Text(String),
336    /// Numeric representation - used after first numeric operation to avoid re-parsing
337    Numeric(BigDecimal),
338}
339
340impl FieldInnerValue {
341    /// Create from a string value
342    fn from_string(s: String) -> Self {
343        FieldInnerValue::Text(s)
344    }
345
346    #[cfg(test)]
347    fn as_string(&self) -> String {
348        match self {
349            FieldInnerValue::Text(s) => s.clone(),
350            FieldInnerValue::Numeric(bd) => bd.to_string(),
351        }
352    }
353
354    /// Convert to string for database output
355    fn into_string(self) -> String {
356        match self {
357            FieldInnerValue::Text(s) => s,
358            FieldInnerValue::Numeric(bd) => bd.to_string(),
359        }
360    }
361
362    /// Get or convert to BigDecimal for numeric operations
363    /// Returns a mutable reference to the internal BigDecimal,
364    /// converting from Text if necessary
365    fn as_numeric_mut(&mut self) -> &mut BigDecimal {
366        use std::str::FromStr;
367
368        // If currently Text, parse and convert to Numeric
369        if let FieldInnerValue::Text(s) = self {
370            let bd = BigDecimal::from_str(s).expect("existing value should be valid BigDecimal");
371            *self = FieldInnerValue::Numeric(bd);
372        }
373
374        match self {
375            FieldInnerValue::Numeric(bd) => bd,
376            FieldInnerValue::Text(_) => unreachable!("already converted to Numeric, impossible"),
377        }
378    }
379}
380
381#[derive(Debug, Default)]
382pub struct Row {
383    /// Verify that we don't try to delete the same row as we're creating it
384    operation: Operation,
385    /// Map of field name to its value and update operation
386    columns: HashMap<String, FieldValue>,
387    /// First ordinal this row was created/modified at, for sorting
388    ordinal: u64,
389}
390
391impl Row {
392    fn new(ordinal: u64) -> Self {
393        Row {
394            operation: Operation::Unspecified,
395            columns: HashMap::new(),
396            ordinal,
397        }
398    }
399
400    /// Set a field to a value, this is the standard method for setting fields in a row.
401    ///
402    /// This method ensures that the value is converted to a database-compatible format
403    /// using the `ToDatabaseValue` trait. It is the primary way to set fields in a row
404    /// for most use cases.
405    ///
406    /// The `ToDatabaseValue` trait is implemented for various types, including primitive
407    /// types, strings, and custom types. This allows you to set fields with different
408    /// types of values without worrying about the underlying conversion. Check example
409    /// for more details.
410    ///
411    /// Check [ToDatabaseValue] for implemented automatic conversions.
412    ///
413    /// # Panics
414    ///
415    /// This method will panic if called on a row marked for deletion.
416    ///
417    /// # Example
418    ///
419    /// ```
420    /// use substreams::scalar::{BigInt, BigDecimal};
421    /// use crate::substreams_database_change::tables::Tables;
422    /// let mut tables = Tables::new();
423    /// let row = tables.create_row("myevent", "my_key");
424    /// row.set("name", "asset name");
425    /// row.set("decimals", 42);
426    /// row.set("count", BigDecimal::from(42));
427    /// row.set("value", BigInt::from(42));
428    /// ```
429    pub fn set<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
430        if self.operation == Operation::Delete {
431            panic!("cannot set fields on a delete operation")
432        }
433        // set() can always be called - it resets the field to a new value
434        // This allows sequences like: set -> add -> set (resets the accumulated value)
435        self.columns
436            .insert(name.to_string(), FieldValue::new(value.to_value()));
437        self
438    }
439
440    /// Add to the existing value: column = COALESCE(column, 0) + value
441    /// Used with upsert_row() for accumulating values like counters or balances.
442    /// If called multiple times for the same column within a block, values are accumulated.
443    pub fn add<T: NumericAddable>(&mut self, name: &str, value: T) -> &mut Self {
444        if self.operation == Operation::Delete {
445            panic!("cannot set fields on a delete operation")
446        }
447        self.accumulate_add(name, value, false);
448        self
449    }
450
451    /// Subtract from the existing value: column = COALESCE(column, 0) - value
452    /// Convenience method that negates the value and uses ADD operation.
453    /// If called multiple times for the same column within a block, values are accumulated.
454    pub fn sub<T: NumericAddable>(&mut self, name: &str, value: T) -> &mut Self {
455        if self.operation == Operation::Delete {
456            panic!("cannot set fields on a delete operation")
457        }
458        self.accumulate_add(name, value, true);
459        self
460    }
461
462    /// Internal helper to accumulate ADD values for the same column within a block.
463    /// - Set + Add/Sub: accumulate values, keep Set op (full value for INSERT)
464    /// - Add + Add/Sub: accumulate values, keep Add op (delta for UPDATE)
465    /// - No existing: store as Add op (delta)
466    /// - Other existing ops (Max/Min/SetIfNull): PANIC (invalid transition)
467    fn accumulate_add<T: NumericAddable>(&mut self, name: &str, value: T, subtract: bool) {
468        match self.columns.get_mut(name) {
469            Some(existing) => {
470                match existing.update_op {
471                    UpdateOp::Unspecified => panic!(
472                        "cannot call add/sub() on field '{}' after unspecified - incompatible operations",
473                        name
474                    ),
475                    UpdateOp::Set | UpdateOp::Add => {
476                        // Get or convert to BigDecimal (only parses once!)
477                        let target = existing.value.as_numeric_mut();
478
479                        if subtract {
480                            value.sub_assign_from(target);
481                        } else {
482                            value.add_assign_to(target);
483                        }
484                        // No .to_string() call - stays as BigDecimal!
485                    }
486                    UpdateOp::Max => panic!(
487                        "cannot call add/sub() on field '{}' after max() - incompatible operations",
488                        name
489                    ),
490                    UpdateOp::Min => panic!(
491                        "cannot call add/sub() on field '{}' after min() - incompatible operations",
492                        name
493                    ),
494                    UpdateOp::SetIfNull => panic!(
495                        "cannot call add/sub() on field '{}' after set_if_null() - incompatible operations",
496                        name
497                    ),
498                }
499            }
500            None => {
501                let mut target = value.to_big_decimal();
502
503                if subtract {
504                    target = -target;
505                }
506
507                // Store directly as Numeric
508                self.columns.insert(
509                    name.to_string(),
510                    FieldValue::new_numeric(target, UpdateOp::Add),
511                );
512            }
513        }
514    }
515
516    /// Set to the maximum of existing and new: column = GREATEST(COALESCE(column, value), value)
517    /// Used with upsert_row() for tracking high values.
518    /// Can only follow set() or another max() call on the same field.
519    pub fn max<T: NumericComparable>(&mut self, name: &str, value: T) -> &mut Self {
520        if self.operation == Operation::Delete {
521            panic!("cannot set fields on a delete operation")
522        }
523
524        // Use get_mut to check, validate, and update in one pass
525        match self.columns.get_mut(name) {
526            Some(existing) => {
527                match existing.update_op {
528                    UpdateOp::Unspecified => panic!(
529                        "cannot call max() on field '{}' after unspecified - incompatible operations",
530                        name
531                    ),
532                    UpdateOp::Set | UpdateOp::Max => {
533                        // Get or convert to BigDecimal
534                        let target = existing.value.as_numeric_mut();
535
536                        if value.cmp_to_big_decimal(target) == Ordering::Greater {
537                            *target = value.to_big_decimal();
538                        }
539                        existing.update_op = UpdateOp::Max;
540                    }
541                    UpdateOp::Add => panic!(
542                        "cannot call max() on field '{}' after add/sub() - incompatible operations",
543                        name
544                    ),
545                    UpdateOp::Min => panic!(
546                        "cannot call max() on field '{}' after min() - incompatible operations",
547                        name
548                    ),
549                    UpdateOp::SetIfNull => panic!(
550                        "cannot call max() on field '{}' after set_if_null() - incompatible operations",
551                        name
552                    ),
553                }
554            }
555            None => {
556                // No existing value - insert new entry with Max operation
557                self.columns.insert(
558                    name.to_string(),
559                    FieldValue::new_numeric(value.to_big_decimal(), UpdateOp::Max),
560                );
561            }
562        }
563        self
564    }
565
566    /// Set to the minimum of existing and new: column = LEAST(COALESCE(column, value), value)
567    /// Used with upsert_row() for tracking low values.
568    /// Can only follow set() or another min() call on the same field.
569    pub fn min<T: NumericComparable>(&mut self, name: &str, value: T) -> &mut Self {
570        if self.operation == Operation::Delete {
571            panic!("cannot set fields on a delete operation")
572        }
573
574        // Use get_mut to check, validate, and update in one pass
575        match self.columns.get_mut(name) {
576            Some(existing) => {
577                match existing.update_op {
578                    UpdateOp::Unspecified => panic!(
579                        "cannot call min() on field '{}' after unspecified - incompatible operations",
580                        name
581                    ),
582                    UpdateOp::Set | UpdateOp::Min => {
583                        // Get or convert to BigDecimal
584                        let target = existing.value.as_numeric_mut();
585                        if value.cmp_to_big_decimal(target) == Ordering::Less {
586                            *target = value.to_big_decimal();
587                        }
588                        existing.update_op = UpdateOp::Min;
589                    }
590                    UpdateOp::Add => panic!(
591                        "cannot call min() on field '{}' after add/sub() - incompatible operations",
592                        name
593                    ),
594                    UpdateOp::Max => panic!(
595                        "cannot call min() on field '{}' after max() - incompatible operations",
596                        name
597                    ),
598                    UpdateOp::SetIfNull => panic!(
599                        "cannot call min() on field '{}' after set_if_null() - incompatible operations",
600                        name
601                    ),
602                }
603            }
604            None => {
605                // No existing value - insert new entry with Min operation
606                self.columns.insert(
607                    name.to_string(),
608                    FieldValue::new_numeric(value.to_big_decimal(), UpdateOp::Min),
609                );
610            }
611        }
612        self
613    }
614
615    /// Set only if column is null: column = COALESCE(column, new_value)
616    /// Used with upsert_row() for setting initial values that should not be overwritten.
617    /// When called multiple times, the FIRST value is kept (subsequent calls are no-ops).
618    /// Cannot be mixed with other operations on the same field.
619    pub fn set_if_null<T: ToDatabaseValue>(&mut self, name: &str, value: T) -> &mut Self {
620        if self.operation == Operation::Delete {
621            panic!("cannot set fields on a delete operation")
622        }
623
624        // Parse the value first
625        let new_value = value.to_value();
626
627        // Use get_mut to check for existing value
628        match self.columns.get_mut(name) {
629            Some(existing) => {
630                match existing.update_op {
631                    UpdateOp::Unspecified => panic!(
632                        "cannot call set_if_null() on field '{}' after unspecified - incompatible operations",
633                        name
634                    ),
635                    UpdateOp::Set | UpdateOp::SetIfNull => {
636                        // After set() or set_if_null(), the value is already determined
637                        // Keep the existing value - subsequent set_if_null() calls are no-op
638                    }
639                    UpdateOp::Add => panic!(
640                        "cannot call set_if_null() on field '{}' after add/sub() - incompatible operations",
641                        name
642                    ),
643                    UpdateOp::Max => panic!(
644                        "cannot call set_if_null() on field '{}' after max() - incompatible operations",
645                        name
646                    ),
647                    UpdateOp::Min => panic!(
648                        "cannot call set_if_null() on field '{}' after min() - incompatible operations",
649                        name
650                    ),
651                }
652            }
653            None => {
654                // No existing value - insert new entry with SetIfNull operation
655                self.columns.insert(
656                    name.to_string(),
657                    FieldValue::with_op(new_value, UpdateOp::SetIfNull),
658                );
659            }
660        }
661        self
662    }
663
664    /// Set a field to a raw value, this is useful for setting values that are not
665    /// normalized across all databases. In there, you can put the raw value as you
666    /// would in a SQL statement of the database you are targeting.
667    ///
668    /// This will be pass as a string to the database which will interpret it itself.
669    pub fn set_raw(&mut self, name: &str, value: String) -> &mut Self {
670        self.columns
671            .insert(name.to_string(), FieldValue::new(value));
672        self
673    }
674
675    /// Set a field to an array of values compatible with PostgresSQL database,
676    /// this method is currently experimental and hidden as we plan to support
677    /// array natively in the model.
678    ///
679    /// For now, this method should be used with great care as it ties the model
680    /// to the database implementation.
681    pub fn set_psql_array<T: ToDatabaseValue>(&mut self, name: &str, value: Vec<T>) -> &mut Row {
682        if self.operation == Operation::Delete {
683            panic!("cannot set fields on a delete operation")
684        }
685
686        let values = value
687            .into_iter()
688            .map(|x| x.to_value())
689            .collect::<Vec<_>>()
690            .join(",");
691
692        self.columns.insert(
693            name.to_string(),
694            FieldValue::new(format!("'{{{}}}'", values)),
695        );
696        self
697    }
698
699    /// Set a field to an array of values compatible with Clickhouse database,
700    /// this method is currently experimental and hidden as we plan to support
701    /// array natively in the model.
702    ///
703    /// For now, this method should be used with great care as it ties the model
704    /// to the database implementation.
705    pub fn set_clickhouse_array<T: ToDatabaseValue>(
706        &mut self,
707        name: &str,
708        value: Vec<T>,
709    ) -> &mut Row {
710        if self.operation == Operation::Delete {
711            panic!("cannot set fields on a delete operation")
712        }
713
714        let values = value
715            .into_iter()
716            .map(|x| x.to_value())
717            .collect::<Vec<_>>()
718            .join(",");
719
720        self.columns
721            .insert(name.to_string(), FieldValue::new(format!("[{}]", values)));
722        self
723    }
724}
725
726macro_rules! impl_to_database_value_proxy_to_ref {
727    ($name:ty) => {
728        impl ToDatabaseValue for $name {
729            fn to_value(self) -> String {
730                ToDatabaseValue::to_value(&self)
731            }
732        }
733    };
734}
735
736macro_rules! impl_to_database_value_proxy_to_string {
737    ($name:ty) => {
738        impl ToDatabaseValue for $name {
739            fn to_value(self) -> String {
740                ToString::to_string(&self)
741            }
742        }
743    };
744}
745
746pub trait ToDatabaseValue {
747    fn to_value(self) -> String;
748}
749
750impl_to_database_value_proxy_to_string!(i8);
751impl_to_database_value_proxy_to_string!(i16);
752impl_to_database_value_proxy_to_string!(i32);
753impl_to_database_value_proxy_to_string!(i64);
754impl_to_database_value_proxy_to_string!(u8);
755impl_to_database_value_proxy_to_string!(u16);
756impl_to_database_value_proxy_to_string!(u32);
757impl_to_database_value_proxy_to_string!(u64);
758impl_to_database_value_proxy_to_string!(bool);
759impl_to_database_value_proxy_to_string!(::prost_types::Timestamp);
760impl_to_database_value_proxy_to_string!(&::prost_types::Timestamp);
761impl_to_database_value_proxy_to_string!(&str);
762impl_to_database_value_proxy_to_string!(BigDecimal);
763impl_to_database_value_proxy_to_string!(&BigDecimal);
764impl_to_database_value_proxy_to_string!(BigInt);
765impl_to_database_value_proxy_to_string!(&BigInt);
766
767impl_to_database_value_proxy_to_ref!(Vec<u8>);
768
769impl ToDatabaseValue for &String {
770    fn to_value(self) -> String {
771        self.clone()
772    }
773}
774
775impl ToDatabaseValue for String {
776    fn to_value(self) -> String {
777        self
778    }
779}
780
781impl ToDatabaseValue for &Vec<u8> {
782    fn to_value(self) -> String {
783        Hex::encode(self)
784    }
785}
786
787impl<T: AsRef<[u8]>> ToDatabaseValue for Hex<T> {
788    fn to_value(self) -> String {
789        ToString::to_string(&self)
790    }
791}
792
793impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {
794    fn to_value(self) -> String {
795        ToString::to_string(self)
796    }
797}
798
799#[cfg(test)]
800mod test {
801    use crate::pb::sf::substreams::sink::database::v1::table_change::PrimaryKey as PrimaryKeyProto;
802    use crate::pb::sf::substreams::sink::database::v1::CompositePrimaryKey as CompositePrimaryKeyProto;
803    use crate::pb::sf::substreams::sink::database::v1::{DatabaseChanges, TableChange};
804    use crate::tables::PrimaryKey;
805    use crate::tables::Tables;
806    use crate::tables::ToDatabaseValue;
807    use pretty_assertions::assert_eq;
808
809    #[test]
810    fn to_database_value_proto_timestamp() {
811        assert_eq!(
812            ToDatabaseValue::to_value(::prost_types::Timestamp {
813                seconds: 60 * 60 + 60 + 1,
814                nanos: 1
815            }),
816            "1970-01-01T01:01:01.000000001Z"
817        );
818    }
819
820    #[test]
821    fn create_row_single_pk_direct() {
822        let mut tables = Tables::new();
823        tables.create_row("myevent", PrimaryKey::Single("myhash".to_string()));
824
825        assert_eq!(
826            tables.to_database_changes(),
827            DatabaseChanges {
828                table_changes: [change("myevent", "myhash", 0)].to_vec(),
829            }
830        );
831    }
832
833    #[test]
834    fn create_row_single_pk() {
835        let mut tables = Tables::new();
836        tables.create_row("myevent", "myhash");
837
838        assert_eq!(
839            tables.to_database_changes(),
840            DatabaseChanges {
841                table_changes: [change("myevent", "myhash", 0)].to_vec(),
842            }
843        );
844    }
845
846    #[test]
847    fn create_row_composite_pk() {
848        let mut tables = Tables::new();
849        tables.create_row(
850            "myevent",
851            [("evt_tx_hash", "hello"), ("evt_index", "world")],
852        );
853
854        assert_eq!(
855            tables.to_database_changes(),
856            DatabaseChanges {
857                table_changes: [change(
858                    "myevent",
859                    [("evt_tx_hash", "hello"), ("evt_index", "world")],
860                    0
861                )]
862                .to_vec()
863            }
864        );
865    }
866
867    #[test]
868    fn row_ordering() {
869        let mut tables = Tables::new();
870        tables.create_row("tableA", "one");
871        tables.create_row("tableC", "two");
872        tables.create_row("tableA", "three");
873        tables.create_row("tableD", "four");
874        tables.create_row("tableE", "five");
875        tables.create_row("tableC", "six");
876
877        assert_eq!(
878            tables.to_database_changes(),
879            DatabaseChanges {
880                table_changes: [
881                    change("tableA", "one", 0),
882                    change("tableC", "two", 1),
883                    change("tableA", "three", 2),
884                    change("tableD", "four", 3),
885                    change("tableE", "five", 4),
886                    change("tableC", "six", 5)
887                ]
888                .to_vec(),
889            }
890        );
891    }
892
893    fn change<K: Into<PrimaryKey>>(name: &str, key: K, ordinal: u64) -> TableChange {
894        TableChange {
895            table: name.to_string(),
896            ordinal,
897            operation: 1,
898            fields: [].into(),
899            primary_key: Some(match key.into() {
900                PrimaryKey::Single(pk) => PrimaryKeyProto::Pk(pk),
901                PrimaryKey::Composite(keys) => {
902                    PrimaryKeyProto::CompositePk(CompositePrimaryKeyProto {
903                        keys: keys.into_iter().collect(),
904                    })
905                }
906            }),
907        }
908    }
909}
910
911#[cfg(test)]
912mod update_op_tests {
913    use super::*;
914    use crate::pb::sf::substreams::sink::database::v1::field::UpdateOp;
915
916    // ============================================================
917    // Basic set() operation tests
918    // ============================================================
919
920    #[test]
921    fn set_stores_value_with_set_op() {
922        let mut tables = Tables::new();
923        let row = tables.create_row("test", "pk1");
924        row.set("balance", "1000");
925
926        let field = row.columns.get("balance").unwrap();
927        assert_eq!(field.value.as_string(), "1000");
928        assert_eq!(field.update_op, UpdateOp::Set);
929    }
930
931    #[test]
932    fn set_overwrites_previous_set() {
933        let mut tables = Tables::new();
934        let row = tables.create_row("test", "pk1");
935        row.set("balance", "1000");
936        row.set("balance", "2000");
937
938        let field = row.columns.get("balance").unwrap();
939        assert_eq!(field.value.as_string(), "2000");
940        assert_eq!(field.update_op, UpdateOp::Set);
941    }
942
943    // ============================================================
944    // Basic add() operation tests
945    // ============================================================
946
947    #[test]
948    fn add_alone_stores_with_add_op() {
949        let mut tables = Tables::new();
950        let row = tables.upsert_row("test", "pk1");
951        row.add("balance", "100");
952
953        let field = row.columns.get("balance").unwrap();
954        assert_eq!(field.value.as_string(), "100");
955        assert_eq!(field.update_op, UpdateOp::Add);
956    }
957
958    #[test]
959    fn add_alone_with_bigdecimal() {
960        let mut tables = Tables::new();
961        let row = tables.upsert_row("test", "pk1");
962        row.add("balance", "123.456789");
963
964        let field = row.columns.get("balance").unwrap();
965        assert_eq!(field.value.as_string(), "123.456789");
966        assert_eq!(field.update_op, UpdateOp::Add);
967    }
968
969    // ============================================================
970    // Basic sub() operation tests
971    // ============================================================
972
973    #[test]
974    fn sub_alone_stores_negated_with_add_op() {
975        let mut tables = Tables::new();
976        let row = tables.upsert_row("test", "pk1");
977        row.sub("balance", "100");
978
979        let field = row.columns.get("balance").unwrap();
980        assert_eq!(field.value.as_string(), "-100");
981        assert_eq!(field.update_op, UpdateOp::Add);
982    }
983
984    #[test]
985    fn sub_negates_negative_to_positive() {
986        let mut tables = Tables::new();
987        let row = tables.upsert_row("test", "pk1");
988        row.sub("balance", "-100");
989
990        let field = row.columns.get("balance").unwrap();
991        assert_eq!(field.value.as_string(), "100");
992        assert_eq!(field.update_op, UpdateOp::Add);
993    }
994
995    // ============================================================
996    // ADD + ADD accumulation tests
997    // ============================================================
998
999    #[test]
1000    fn add_plus_add_accumulates_keeps_add_op() {
1001        let mut tables = Tables::new();
1002        let row = tables.upsert_row("test", "pk1");
1003        row.add("balance", "100");
1004        row.add("balance", "50");
1005
1006        let field = row.columns.get("balance").unwrap();
1007        assert_eq!(field.value.as_string(), "150");
1008        assert_eq!(field.update_op, UpdateOp::Add);
1009    }
1010
1011    #[test]
1012    fn add_plus_sub_accumulates_keeps_add_op() {
1013        let mut tables = Tables::new();
1014        let row = tables.upsert_row("test", "pk1");
1015        row.add("balance", "100");
1016        row.sub("balance", "30");
1017
1018        let field = row.columns.get("balance").unwrap();
1019        assert_eq!(field.value.as_string(), "70");
1020        assert_eq!(field.update_op, UpdateOp::Add);
1021    }
1022
1023    #[test]
1024    fn sub_plus_add_accumulates_keeps_add_op() {
1025        let mut tables = Tables::new();
1026        let row = tables.upsert_row("test", "pk1");
1027        row.sub("balance", "100");
1028        row.add("balance", "30");
1029
1030        let field = row.columns.get("balance").unwrap();
1031        assert_eq!(field.value.as_string(), "-70");
1032        assert_eq!(field.update_op, UpdateOp::Add);
1033    }
1034
1035    #[test]
1036    fn sub_plus_sub_accumulates_keeps_add_op() {
1037        let mut tables = Tables::new();
1038        let row = tables.upsert_row("test", "pk1");
1039        row.sub("balance", "100");
1040        row.sub("balance", "50");
1041
1042        let field = row.columns.get("balance").unwrap();
1043        assert_eq!(field.value.as_string(), "-150");
1044        assert_eq!(field.update_op, UpdateOp::Add);
1045    }
1046
1047    // ============================================================
1048    // SET + ADD/SUB accumulation tests (critical for token creation + burn)
1049    // ============================================================
1050
1051    #[test]
1052    fn set_plus_add_accumulates_keeps_set_op() {
1053        let mut tables = Tables::new();
1054        let row = tables.upsert_row("test", "pk1");
1055        row.set("total_supply", "1000000000");
1056        row.add("total_supply", "500");
1057
1058        let field = row.columns.get("total_supply").unwrap();
1059        assert_eq!(field.value.as_string(), "1000000500");
1060        assert_eq!(field.update_op, UpdateOp::Set);
1061    }
1062
1063    #[test]
1064    fn set_plus_sub_accumulates_keeps_set_op() {
1065        let mut tables = Tables::new();
1066        let row = tables.upsert_row("test", "pk1");
1067        row.set("total_supply", "1000000000");
1068        row.sub("total_supply", "500");
1069
1070        let field = row.columns.get("total_supply").unwrap();
1071        assert_eq!(field.value.as_string(), "999999500");
1072        assert_eq!(field.update_op, UpdateOp::Set);
1073    }
1074
1075    #[test]
1076    fn set_plus_multiple_adds_accumulates_keeps_set_op() {
1077        let mut tables = Tables::new();
1078        let row = tables.upsert_row("test", "pk1");
1079        row.set("total_supply", "1000000000");
1080        row.add("total_supply", "100");
1081        row.add("total_supply", "200");
1082        row.sub("total_supply", "50");
1083
1084        let field = row.columns.get("total_supply").unwrap();
1085        assert_eq!(field.value.as_string(), "1000000250");
1086        assert_eq!(field.update_op, UpdateOp::Set);
1087    }
1088
1089    // ============================================================
1090    // set() can always reset a field - allowed transitions from any op
1091    // ============================================================
1092
1093    #[test]
1094    fn add_then_set_resets_value() {
1095        let mut tables = Tables::new();
1096        let row = tables.upsert_row("test", "pk1");
1097        row.add("balance", "100");
1098        row.set("balance", "999");
1099
1100        // set() resets the field completely
1101        let field = row.columns.get("balance").unwrap();
1102        assert_eq!(field.value.as_string(), "999");
1103        assert_eq!(field.update_op, UpdateOp::Set);
1104    }
1105
1106    #[test]
1107    fn max_then_set_resets_value() {
1108        let mut tables = Tables::new();
1109        let row = tables.upsert_row("test", "pk1");
1110        row.max("price", 100i64);
1111        row.set("price", "999");
1112
1113        // set() resets the field completely
1114        let field = row.columns.get("price").unwrap();
1115        assert_eq!(field.value.as_string(), "999");
1116        assert_eq!(field.update_op, UpdateOp::Set);
1117    }
1118
1119    #[test]
1120    fn min_then_set_resets_value() {
1121        let mut tables = Tables::new();
1122        let row = tables.upsert_row("test", "pk1");
1123        row.min("price", 100i64);
1124        row.set("price", "999");
1125
1126        // set() resets the field completely
1127        let field = row.columns.get("price").unwrap();
1128        assert_eq!(field.value.as_string(), "999");
1129        assert_eq!(field.update_op, UpdateOp::Set);
1130    }
1131
1132    #[test]
1133    fn set_if_null_then_set_resets_value() {
1134        let mut tables = Tables::new();
1135        let row = tables.upsert_row("test", "pk1");
1136        row.set_if_null("created", "2024-01-01");
1137        row.set("created", "2024-02-01");
1138
1139        // set() resets the field completely
1140        let field = row.columns.get("created").unwrap();
1141        assert_eq!(field.value.as_string(), "2024-02-01");
1142        assert_eq!(field.update_op, UpdateOp::Set);
1143    }
1144
1145    // ============================================================
1146    // Cross-operation incompatibility tests
1147    // ============================================================
1148
1149    #[test]
1150    #[should_panic(expected = "cannot call add/sub() on field 'x' after max()")]
1151    fn max_then_add_panics() {
1152        let mut tables = Tables::new();
1153        let row = tables.upsert_row("test", "pk1");
1154        row.max("x", 100i64);
1155        row.add("x", "50"); // Should panic
1156    }
1157
1158    #[test]
1159    #[should_panic(expected = "cannot call add/sub() on field 'x' after min()")]
1160    fn min_then_add_panics() {
1161        let mut tables = Tables::new();
1162        let row = tables.upsert_row("test", "pk1");
1163        row.min("x", 100i64);
1164        row.add("x", "50"); // Should panic
1165    }
1166
1167    #[test]
1168    #[should_panic(expected = "cannot call add/sub() on field 'x' after set_if_null()")]
1169    fn set_if_null_then_add_panics() {
1170        let mut tables = Tables::new();
1171        let row = tables.upsert_row("test", "pk1");
1172        row.set_if_null("x", "100");
1173        row.add("x", "50"); // Should panic
1174    }
1175
1176    #[test]
1177    #[should_panic(expected = "cannot call max() on field 'x' after add/sub()")]
1178    fn add_then_max_panics() {
1179        let mut tables = Tables::new();
1180        let row = tables.upsert_row("test", "pk1");
1181        row.add("x", "100");
1182        row.max("x", 50i64); // Should panic
1183    }
1184
1185    #[test]
1186    #[should_panic(expected = "cannot call max() on field 'x' after min()")]
1187    fn min_then_max_panics() {
1188        let mut tables = Tables::new();
1189        let row = tables.upsert_row("test", "pk1");
1190        row.min("x", 100i64);
1191        row.max("x", 50i64); // Should panic
1192    }
1193
1194    #[test]
1195    #[should_panic(expected = "cannot call min() on field 'x' after add/sub()")]
1196    fn add_then_min_panics() {
1197        let mut tables = Tables::new();
1198        let row = tables.upsert_row("test", "pk1");
1199        row.add("x", "100");
1200        row.min("x", 50i64); // Should panic
1201    }
1202
1203    #[test]
1204    #[should_panic(expected = "cannot call min() on field 'x' after max()")]
1205    fn max_then_min_panics() {
1206        let mut tables = Tables::new();
1207        let row = tables.upsert_row("test", "pk1");
1208        row.max("x", 100i64);
1209        row.min("x", 50i64); // Should panic
1210    }
1211
1212    #[test]
1213    #[should_panic(expected = "cannot call set_if_null() on field 'x' after add/sub()")]
1214    fn add_then_set_if_null_panics() {
1215        let mut tables = Tables::new();
1216        let row = tables.upsert_row("test", "pk1");
1217        row.add("x", "100");
1218        row.set_if_null("x", "50"); // Should panic
1219    }
1220
1221    #[test]
1222    #[should_panic(expected = "cannot call set_if_null() on field 'x' after max()")]
1223    fn max_then_set_if_null_panics() {
1224        let mut tables = Tables::new();
1225        let row = tables.upsert_row("test", "pk1");
1226        row.max("x", 100i64);
1227        row.set_if_null("x", "50"); // Should panic
1228    }
1229
1230    #[test]
1231    #[should_panic(expected = "cannot call set_if_null() on field 'x' after min()")]
1232    fn min_then_set_if_null_panics() {
1233        let mut tables = Tables::new();
1234        let row = tables.upsert_row("test", "pk1");
1235        row.min("x", 100i64);
1236        row.set_if_null("x", "50"); // Should panic
1237    }
1238
1239    #[test]
1240    #[should_panic(expected = "cannot call max() on field 'x' after set_if_null()")]
1241    fn set_if_null_then_max_panics() {
1242        let mut tables = Tables::new();
1243        let row = tables.upsert_row("test", "pk1");
1244        row.set_if_null("x", "100");
1245        row.max("x", 50i64); // Should panic
1246    }
1247
1248    #[test]
1249    #[should_panic(expected = "cannot call min() on field 'x' after set_if_null()")]
1250    fn set_if_null_then_min_panics() {
1251        let mut tables = Tables::new();
1252        let row = tables.upsert_row("test", "pk1");
1253        row.set_if_null("x", "100");
1254        row.min("x", 50i64); // Should panic
1255    }
1256
1257    // ============================================================
1258    // Valid set -> other op transitions
1259    // ============================================================
1260
1261    #[test]
1262    fn set_then_max_computes_maximum() {
1263        let mut tables = Tables::new();
1264        let row = tables.upsert_row("test", "pk1");
1265        row.set("price", "100");
1266        row.max("price", 50i64);
1267
1268        // max() computes max(100, 50) = 100
1269        let field = row.columns.get("price").unwrap();
1270        assert_eq!(field.value.as_string(), "100");
1271        assert_eq!(field.update_op, UpdateOp::Max);
1272    }
1273
1274    #[test]
1275    fn set_then_max_updates_when_new_is_greater() {
1276        let mut tables = Tables::new();
1277        let row = tables.upsert_row("test", "pk1");
1278        row.set("price", "50");
1279        row.max("price", 100i64);
1280
1281        // max() computes max(50, 100) = 100
1282        let field = row.columns.get("price").unwrap();
1283        assert_eq!(field.value.as_string(), "100");
1284        assert_eq!(field.update_op, UpdateOp::Max);
1285    }
1286
1287    #[test]
1288    fn set_then_min_computes_minimum() {
1289        let mut tables = Tables::new();
1290        let row = tables.upsert_row("test", "pk1");
1291        row.set("price", "100");
1292        row.min("price", 50i64);
1293
1294        // min() computes min(100, 50) = 50
1295        let field = row.columns.get("price").unwrap();
1296        assert_eq!(field.value.as_string(), "50");
1297        assert_eq!(field.update_op, UpdateOp::Min);
1298    }
1299
1300    #[test]
1301    fn set_then_min_keeps_existing_when_smaller() {
1302        let mut tables = Tables::new();
1303        let row = tables.upsert_row("test", "pk1");
1304        row.set("price", "50");
1305        row.min("price", 100i64);
1306
1307        // min() computes min(50, 100) = 50
1308        let field = row.columns.get("price").unwrap();
1309        assert_eq!(field.value.as_string(), "50");
1310        assert_eq!(field.update_op, UpdateOp::Min);
1311    }
1312
1313    #[test]
1314    fn set_then_set_if_null_is_noop() {
1315        let mut tables = Tables::new();
1316        let row = tables.upsert_row("test", "pk1");
1317        row.set("created", "2024-01-01");
1318        row.set_if_null("created", "2024-02-01"); // Should be no-op
1319
1320        // set() value is kept, set_if_null() is ignored
1321        let field = row.columns.get("created").unwrap();
1322        assert_eq!(field.value.as_string(), "2024-01-01");
1323        assert_eq!(field.update_op, UpdateOp::Set);
1324    }
1325
1326    // ============================================================
1327    // Multiple columns independence tests
1328    // ============================================================
1329
1330    #[test]
1331    fn multiple_columns_independent() {
1332        let mut tables = Tables::new();
1333        let row = tables.upsert_row("test", "pk1");
1334        row.set("balance", "1000");
1335        row.add("tx_count", "1");
1336        row.sub("balance", "100");
1337        row.add("tx_count", "1");
1338
1339        let balance = row.columns.get("balance").unwrap();
1340        assert_eq!(balance.value.as_string(), "900");
1341        assert_eq!(balance.update_op, UpdateOp::Set);
1342
1343        let tx_count = row.columns.get("tx_count").unwrap();
1344        assert_eq!(tx_count.value.as_string(), "2");
1345        assert_eq!(tx_count.update_op, UpdateOp::Add);
1346    }
1347
1348    // ============================================================
1349    // MAX operation tests
1350    // ============================================================
1351
1352    #[test]
1353    fn max_stores_with_max_op() {
1354        let mut tables = Tables::new();
1355        let row = tables.upsert_row("test", "pk1");
1356        row.max("high_price", 100i64);
1357
1358        let field = row.columns.get("high_price").unwrap();
1359        assert_eq!(field.value.as_string(), "100");
1360        assert_eq!(field.update_op, UpdateOp::Max);
1361    }
1362
1363    #[test]
1364    fn max_computes_maximum_value() {
1365        let mut tables = Tables::new();
1366        let row = tables.upsert_row("test", "pk1");
1367        row.max("high_price", 100i64);
1368        row.max("high_price", 50i64);
1369
1370        // max() now computes the actual maximum in database-changes
1371        let field = row.columns.get("high_price").unwrap();
1372        assert_eq!(field.value.as_string(), "100"); // Keeps 100 since it's greater than 50
1373        assert_eq!(field.update_op, UpdateOp::Max);
1374    }
1375
1376    #[test]
1377    fn max_updates_when_value_is_greater() {
1378        let mut tables = Tables::new();
1379        let row = tables.upsert_row("test", "pk1");
1380        row.max("high_price", 50i64);
1381        row.max("high_price", 100i64);
1382
1383        let field = row.columns.get("high_price").unwrap();
1384        assert_eq!(field.value.as_string(), "100"); // Updates to 100 since it's greater than 50
1385        assert_eq!(field.update_op, UpdateOp::Max);
1386    }
1387
1388    // ============================================================
1389    // MIN operation tests
1390    // ============================================================
1391
1392    #[test]
1393    fn min_stores_with_min_op() {
1394        let mut tables = Tables::new();
1395        let row = tables.upsert_row("test", "pk1");
1396        row.min("low_price", 100i64);
1397
1398        let field = row.columns.get("low_price").unwrap();
1399        assert_eq!(field.value.as_string(), "100");
1400        assert_eq!(field.update_op, UpdateOp::Min);
1401    }
1402
1403    #[test]
1404    fn min_computes_minimum_value() {
1405        let mut tables = Tables::new();
1406        let row = tables.upsert_row("test", "pk1");
1407        row.min("low_price", 50i64);
1408        row.min("low_price", 100i64);
1409
1410        // min() now computes the actual minimum in database-changes
1411        let field = row.columns.get("low_price").unwrap();
1412        assert_eq!(field.value.as_string(), "50"); // Keeps 50 since it's less than 100
1413        assert_eq!(field.update_op, UpdateOp::Min);
1414    }
1415
1416    #[test]
1417    fn min_updates_when_value_is_smaller() {
1418        let mut tables = Tables::new();
1419        let row = tables.upsert_row("test", "pk1");
1420        row.min("low_price", 100i64);
1421        row.min("low_price", 50i64);
1422
1423        let field = row.columns.get("low_price").unwrap();
1424        assert_eq!(field.value.as_string(), "50"); // Updates to 50 since it's less than 100
1425        assert_eq!(field.update_op, UpdateOp::Min);
1426    }
1427
1428    // ============================================================
1429    // SET_IF_NULL operation tests
1430    // ============================================================
1431
1432    #[test]
1433    fn set_if_null_stores_with_set_if_null_op() {
1434        let mut tables = Tables::new();
1435        let row = tables.upsert_row("test", "pk1");
1436        row.set_if_null("created_at", "2024-01-01");
1437
1438        let field = row.columns.get("created_at").unwrap();
1439        assert_eq!(field.value.as_string(), "2024-01-01");
1440        assert_eq!(field.update_op, UpdateOp::SetIfNull);
1441    }
1442
1443    #[test]
1444    fn set_if_null_keeps_first_value() {
1445        let mut tables = Tables::new();
1446        let row = tables.upsert_row("test", "pk1");
1447        row.set_if_null("created_at", "2024-01-01");
1448        row.set_if_null("created_at", "2024-02-01");
1449
1450        // set_if_null keeps the first value - subsequent calls are no-ops
1451        let field = row.columns.get("created_at").unwrap();
1452        assert_eq!(field.value.as_string(), "2024-01-01");
1453        assert_eq!(field.update_op, UpdateOp::SetIfNull);
1454    }
1455
1456    // ============================================================
1457    // BigDecimal precision tests
1458    // ============================================================
1459
1460    #[test]
1461    fn bigdecimal_precision_preserved() {
1462        let mut tables = Tables::new();
1463        let row = tables.upsert_row("test", "pk1");
1464        row.set("amount", "1234567890123456789.123456789");
1465        row.add("amount", "0.000000001");
1466
1467        let field = row.columns.get("amount").unwrap();
1468        assert_eq!(field.value.as_string(), "1234567890123456789.123456790");
1469        assert_eq!(field.update_op, UpdateOp::Set);
1470    }
1471
1472    #[test]
1473    fn multiple_adds_preserve_precision() {
1474        let mut tables = Tables::new();
1475        let row = tables.upsert_row("test", "pk1");
1476        row.add("amount", "0.1");
1477        row.add("amount", "0.1");
1478        row.add("amount", "0.1");
1479
1480        let field = row.columns.get("amount").unwrap();
1481        assert_eq!(field.value.as_string(), "0.3");
1482        assert_eq!(field.update_op, UpdateOp::Add);
1483    }
1484
1485    // ============================================================
1486    // Edge cases
1487    // ============================================================
1488
1489    #[test]
1490    fn add_with_zero() {
1491        let mut tables = Tables::new();
1492        let row = tables.upsert_row("test", "pk1");
1493        row.add("balance", "100");
1494        row.add("balance", "0");
1495
1496        let field = row.columns.get("balance").unwrap();
1497        assert_eq!(field.value.as_string(), "100");
1498        assert_eq!(field.update_op, UpdateOp::Add);
1499    }
1500
1501    #[test]
1502    fn set_then_add_zero() {
1503        let mut tables = Tables::new();
1504        let row = tables.upsert_row("test", "pk1");
1505        row.set("balance", "100");
1506        row.add("balance", "0");
1507
1508        let field = row.columns.get("balance").unwrap();
1509        assert_eq!(field.value.as_string(), "100");
1510        assert_eq!(field.update_op, UpdateOp::Set);
1511    }
1512
1513    #[test]
1514    fn add_resulting_in_zero() {
1515        let mut tables = Tables::new();
1516        let row = tables.upsert_row("test", "pk1");
1517        row.add("balance", "100");
1518        row.sub("balance", "100");
1519
1520        let field = row.columns.get("balance").unwrap();
1521        assert_eq!(field.value.as_string(), "0");
1522        assert_eq!(field.update_op, UpdateOp::Add);
1523    }
1524
1525    #[test]
1526    fn set_then_sub_to_zero() {
1527        let mut tables = Tables::new();
1528        let row = tables.upsert_row("test", "pk1");
1529        row.set("balance", "100");
1530        row.sub("balance", "100");
1531
1532        let field = row.columns.get("balance").unwrap();
1533        assert_eq!(field.value.as_string(), "0");
1534        assert_eq!(field.update_op, UpdateOp::Set);
1535    }
1536
1537    #[test]
1538    fn negative_result_from_accumulation() {
1539        let mut tables = Tables::new();
1540        let row = tables.upsert_row("test", "pk1");
1541        row.set("balance", "100");
1542        row.sub("balance", "200");
1543
1544        let field = row.columns.get("balance").unwrap();
1545        assert_eq!(field.value.as_string(), "-100");
1546        assert_eq!(field.update_op, UpdateOp::Set);
1547    }
1548
1549    // ============================================================
1550    // Non-numeric value edge cases
1551    // ============================================================
1552
1553    #[test]
1554    #[should_panic(expected = "add/sub() requires a valid numeric value")]
1555    fn add_non_numeric_panics() {
1556        let mut tables = Tables::new();
1557        let row = tables.upsert_row("test", "pk1");
1558        row.add("name", "hello"); // Should panic
1559    }
1560
1561    // ============================================================
1562    // Integration tests with to_database_changes
1563    // ============================================================
1564
1565    #[test]
1566    fn to_database_changes_includes_update_op() {
1567        let mut tables = Tables::new();
1568        let row = tables.upsert_row("tokens", "0xtoken");
1569        row.set("name", "MyToken");
1570        row.add("balance", "100");
1571
1572        let changes = tables.to_database_changes();
1573        assert_eq!(changes.table_changes.len(), 1);
1574
1575        let change = &changes.table_changes[0];
1576        assert_eq!(change.fields.len(), 2);
1577
1578        // Find balance field
1579        let balance_field = change.fields.iter().find(|f| f.name == "balance").unwrap();
1580        assert_eq!(balance_field.value, "100");
1581        assert_eq!(balance_field.update_op, UpdateOp::Add as i32);
1582
1583        // Find name field
1584        let name_field = change.fields.iter().find(|f| f.name == "name").unwrap();
1585        assert_eq!(name_field.value, "MyToken");
1586        assert_eq!(name_field.update_op, UpdateOp::Set as i32);
1587    }
1588
1589    // ============================================================
1590    // NumericComparable tests - verifying zero-allocation integer comparisons
1591    // ============================================================
1592
1593    #[test]
1594    fn max_with_i64_computes_correctly() {
1595        let mut tables = Tables::new();
1596        let row = tables.upsert_row("test", "pk1");
1597        row.max("value", 100i64);
1598        row.max("value", 50i64);
1599        row.max("value", 200i64);
1600
1601        let field = row.columns.get("value").unwrap();
1602        assert_eq!(field.value.as_string(), "200");
1603        assert_eq!(field.update_op, UpdateOp::Max);
1604    }
1605
1606    #[test]
1607    fn min_with_i64_computes_correctly() {
1608        let mut tables = Tables::new();
1609        let row = tables.upsert_row("test", "pk1");
1610        row.min("value", 100i64);
1611        row.min("value", 50i64);
1612        row.min("value", 200i64);
1613
1614        let field = row.columns.get("value").unwrap();
1615        assert_eq!(field.value.as_string(), "50");
1616        assert_eq!(field.update_op, UpdateOp::Min);
1617    }
1618
1619    #[test]
1620    fn max_with_u32_computes_correctly() {
1621        let mut tables = Tables::new();
1622        let row = tables.upsert_row("test", "pk1");
1623        row.max("value", 100u32);
1624        row.max("value", 50u32);
1625        row.max("value", 200u32);
1626
1627        let field = row.columns.get("value").unwrap();
1628        assert_eq!(field.value.as_string(), "200");
1629        assert_eq!(field.update_op, UpdateOp::Max);
1630    }
1631
1632    #[test]
1633    fn min_with_u32_computes_correctly() {
1634        let mut tables = Tables::new();
1635        let row = tables.upsert_row("test", "pk1");
1636        row.min("value", 100u32);
1637        row.min("value", 50u32);
1638        row.min("value", 200u32);
1639
1640        let field = row.columns.get("value").unwrap();
1641        assert_eq!(field.value.as_string(), "50");
1642        assert_eq!(field.update_op, UpdateOp::Min);
1643    }
1644
1645    #[test]
1646    fn max_with_i32_computes_correctly() {
1647        let mut tables = Tables::new();
1648        let row = tables.upsert_row("test", "pk1");
1649        row.max("value", 100i32);
1650        row.max("value", -50i32);
1651        row.max("value", 200i32);
1652
1653        let field = row.columns.get("value").unwrap();
1654        assert_eq!(field.value.as_string(), "200");
1655        assert_eq!(field.update_op, UpdateOp::Max);
1656    }
1657
1658    #[test]
1659    fn min_with_i32_computes_correctly() {
1660        let mut tables = Tables::new();
1661        let row = tables.upsert_row("test", "pk1");
1662        row.min("value", 100i32);
1663        row.min("value", -50i32);
1664        row.min("value", 200i32);
1665
1666        let field = row.columns.get("value").unwrap();
1667        assert_eq!(field.value.as_string(), "-50");
1668        assert_eq!(field.update_op, UpdateOp::Min);
1669    }
1670
1671    #[test]
1672    fn max_with_mixed_integer_types() {
1673        let mut tables = Tables::new();
1674        let row = tables.upsert_row("test", "pk1");
1675        row.max("value", 100i64);
1676        row.max("value", 50u32);
1677        row.max("value", 200i32);
1678
1679        let field = row.columns.get("value").unwrap();
1680        assert_eq!(field.value.as_string(), "200");
1681        assert_eq!(field.update_op, UpdateOp::Max);
1682    }
1683
1684    #[test]
1685    fn min_with_mixed_integer_types() {
1686        let mut tables = Tables::new();
1687        let row = tables.upsert_row("test", "pk1");
1688        row.min("value", 100i64);
1689        row.min("value", 50u32);
1690        row.min("value", 200i32);
1691
1692        let field = row.columns.get("value").unwrap();
1693        assert_eq!(field.value.as_string(), "50");
1694        assert_eq!(field.update_op, UpdateOp::Min);
1695    }
1696
1697    #[test]
1698    fn max_with_bigdecimal() {
1699        use std::str::FromStr;
1700        let mut tables = Tables::new();
1701        let row = tables.upsert_row("test", "pk1");
1702        row.max("value", BigDecimal::from_str("100.5").unwrap());
1703        row.max("value", BigDecimal::from_str("50.25").unwrap());
1704        row.max("value", BigDecimal::from_str("200.75").unwrap());
1705
1706        let field = row.columns.get("value").unwrap();
1707        assert_eq!(field.value.as_string(), "200.75");
1708        assert_eq!(field.update_op, UpdateOp::Max);
1709    }
1710
1711    #[test]
1712    fn min_with_bigdecimal() {
1713        use std::str::FromStr;
1714        let mut tables = Tables::new();
1715        let row = tables.upsert_row("test", "pk1");
1716        row.min("value", BigDecimal::from_str("100.5").unwrap());
1717        row.min("value", BigDecimal::from_str("50.25").unwrap());
1718        row.min("value", BigDecimal::from_str("200.75").unwrap());
1719
1720        let field = row.columns.get("value").unwrap();
1721        assert_eq!(field.value.as_string(), "50.25");
1722        assert_eq!(field.update_op, UpdateOp::Min);
1723    }
1724
1725    #[test]
1726    fn max_with_bigint() {
1727        let mut tables = Tables::new();
1728        let row = tables.upsert_row("test", "pk1");
1729        row.max("value", BigInt::from(100));
1730        row.max("value", BigInt::from(50));
1731        row.max("value", BigInt::from(200));
1732
1733        let field = row.columns.get("value").unwrap();
1734        assert_eq!(field.value.as_string(), "200");
1735        assert_eq!(field.update_op, UpdateOp::Max);
1736    }
1737
1738    #[test]
1739    fn min_with_bigint() {
1740        let mut tables = Tables::new();
1741        let row = tables.upsert_row("test", "pk1");
1742        row.min("value", BigInt::from(100));
1743        row.min("value", BigInt::from(50));
1744        row.min("value", BigInt::from(200));
1745
1746        let field = row.columns.get("value").unwrap();
1747        assert_eq!(field.value.as_string(), "50");
1748        assert_eq!(field.update_op, UpdateOp::Min);
1749    }
1750
1751    // ============================================================
1752    // Verification tests for comparison semantics
1753    // ============================================================
1754
1755    #[test]
1756    fn verify_integer_comparison_semantics() {
1757        use crate::numeric::NumericComparable;
1758        use std::str::FromStr;
1759
1760        let bd_100 = BigDecimal::from_str("100").unwrap();
1761
1762        // Test: 50 < 100 should return Less
1763        assert_eq!(50i64.cmp_to_big_decimal(&bd_100), Ordering::Less);
1764
1765        // Test: 200 > 100 should return Greater
1766        assert_eq!(200i64.cmp_to_big_decimal(&bd_100), Ordering::Greater);
1767
1768        // Test: 100 == 100 should return Equal
1769        assert_eq!(100i64.cmp_to_big_decimal(&bd_100), Ordering::Equal);
1770
1771        // Test with negative numbers
1772        assert_eq!((-50i32).cmp_to_big_decimal(&bd_100), Ordering::Less);
1773    }
1774}