Skip to main content

bsv_wallet_toolbox/storage/sync/
process_sync_chunk.rs

1//! processSyncChunk implementation -- applies incoming entities with ID remapping.
2//!
3//! For each entity type in the incoming chunk:
4//! 1. Check if the entity exists locally (by natural key, not ID).
5//! 2. If exists: merge using MergeEntity, update if changed, map foreign->local ID.
6//! 3. If not exists: insert, map foreign->local ID.
7//! 4. Update sync_map max_updated_at to the latest timestamp seen.
8//!
9//! Entities are processed in dependency order so that foreign key remapping
10//! has all needed mappings available when processing dependent entities.
11
12use chrono::NaiveDateTime;
13use serde::{Deserialize, Serialize};
14
15use crate::error::{WalletError, WalletResult};
16use crate::storage::find_args::*;
17use crate::storage::sync::merge::MergeEntity;
18use crate::storage::sync::sync_map::{SyncChunk, SyncMap};
19use crate::storage::traits::provider::StorageProvider;
20use crate::storage::TrxToken;
21
22/// Process an incoming SyncChunk by merging its entities into local storage.
23///
24/// Entities are processed in dependency order:
25/// 1. User (no dependencies)
26/// 2. ProvenTx, OutputBasket, TxLabel, OutputTag (depend on User)
27/// 3. Transaction (depends on ProvenTx)
28/// 4. Output (depends on Transaction, OutputBasket)
29/// 5. TxLabelMap (depends on TxLabel, Transaction)
30/// 6. OutputTagMap (depends on OutputTag, Output)
31/// 7. Certificate (depends on User)
32/// 8. CertificateField (depends on Certificate)
33/// 9. Commission (depends on Transaction)
34/// 10. ProvenTxReq (depends on ProvenTx)
35pub async fn process_sync_chunk(
36    storage: &dyn StorageProvider,
37    chunk: SyncChunk,
38    sync_map: &mut SyncMap,
39    trx: Option<&TrxToken>,
40) -> WalletResult<ProcessSyncChunkResult> {
41    let mut result = ProcessSyncChunkResult::default();
42
43    // 1. User
44    if let Some(ref user) = chunk.user {
45        let (local_user, _created) = storage.find_or_insert_user(&user.identity_key, trx).await?;
46
47        // Update if incoming is newer
48        if user.updated_at > local_user.updated_at {
49            storage
50                .update_user(
51                    local_user.user_id,
52                    &UserPartial {
53                        active_storage: Some(user.active_storage.clone()),
54                        ..Default::default()
55                    },
56                    trx,
57                )
58                .await?;
59            result.updates += 1;
60        }
61    }
62
63    // Look up local user_id for the chunk's identity key
64    let local_user = storage
65        .find_user_by_identity_key(&chunk.user_identity_key, trx)
66        .await?;
67    let local_user_id = match local_user {
68        Some(u) => u.user_id,
69        None => {
70            return Err(WalletError::Internal(format!(
71                "processSyncChunk: user not found for identity key {}",
72                chunk.user_identity_key
73            )));
74        }
75    };
76
77    // 2. ProvenTx (no FK dependencies beyond user)
78    if let Some(proven_txs) = chunk.proven_txs {
79        for incoming in &proven_txs {
80            sync_map.proven_tx.update_max(incoming.updated_at);
81            sync_map.proven_tx.count += 1;
82
83            // Find by natural key: txid
84            let existing = storage
85                .find_proven_txs(
86                    &FindProvenTxsArgs {
87                        partial: ProvenTxPartial {
88                            txid: Some(incoming.txid.clone()),
89                            ..Default::default()
90                        },
91                        ..Default::default()
92                    },
93                    trx,
94                )
95                .await?;
96
97            let foreign_id = incoming.proven_tx_id;
98            if let Some(local) = existing.into_iter().next() {
99                // Existing: merge
100                if local.should_update(incoming) {
101                    storage
102                        .update_proven_tx(
103                            local.proven_tx_id,
104                            &ProvenTxPartial {
105                                height: Some(incoming.height),
106                                block_hash: Some(incoming.block_hash.clone()),
107                                ..Default::default()
108                            },
109                            trx,
110                        )
111                        .await?;
112                    result.updates += 1;
113                }
114                sync_map
115                    .proven_tx
116                    .map_id(foreign_id, local.proven_tx_id)
117                    .map_err(|e| WalletError::Internal(e))?;
118            } else {
119                // New: insert
120                let local_id = storage.insert_proven_tx(incoming, trx).await?;
121                sync_map
122                    .proven_tx
123                    .map_id(foreign_id, local_id)
124                    .map_err(|e| WalletError::Internal(e))?;
125                result.inserts += 1;
126            }
127        }
128    }
129
130    // 3. OutputBasket
131    if let Some(output_baskets) = chunk.output_baskets {
132        for incoming in &output_baskets {
133            sync_map.output_basket.update_max(incoming.updated_at);
134            sync_map.output_basket.count += 1;
135
136            let foreign_id = incoming.basket_id;
137            let local = storage
138                .find_or_insert_output_basket(local_user_id, &incoming.name, trx)
139                .await?;
140
141            if local.should_update(incoming) {
142                storage
143                    .update_output_basket(
144                        local.basket_id,
145                        &OutputBasketPartial {
146                            is_deleted: Some(incoming.is_deleted),
147                            ..Default::default()
148                        },
149                        trx,
150                    )
151                    .await?;
152                result.updates += 1;
153            }
154            sync_map
155                .output_basket
156                .map_id(foreign_id, local.basket_id)
157                .map_err(|e| WalletError::Internal(e))?;
158        }
159    }
160
161    // 4. TxLabel
162    if let Some(tx_labels) = chunk.tx_labels {
163        for incoming in &tx_labels {
164            sync_map.tx_label.update_max(incoming.updated_at);
165            sync_map.tx_label.count += 1;
166
167            let foreign_id = incoming.tx_label_id;
168            let local = storage
169                .find_or_insert_tx_label(local_user_id, &incoming.label, trx)
170                .await?;
171
172            if local.should_update(incoming) {
173                storage
174                    .update_tx_label(
175                        local.tx_label_id,
176                        &TxLabelPartial {
177                            is_deleted: Some(incoming.is_deleted),
178                            ..Default::default()
179                        },
180                        trx,
181                    )
182                    .await?;
183                result.updates += 1;
184            }
185            sync_map
186                .tx_label
187                .map_id(foreign_id, local.tx_label_id)
188                .map_err(|e| WalletError::Internal(e))?;
189        }
190    }
191
192    // 5. OutputTag
193    if let Some(output_tags) = chunk.output_tags {
194        for incoming in &output_tags {
195            sync_map.output_tag.update_max(incoming.updated_at);
196            sync_map.output_tag.count += 1;
197
198            let foreign_id = incoming.output_tag_id;
199            let local = storage
200                .find_or_insert_output_tag(local_user_id, &incoming.tag, trx)
201                .await?;
202
203            if local.should_update(incoming) {
204                storage
205                    .update_output_tag(
206                        local.output_tag_id,
207                        &OutputTagPartial {
208                            is_deleted: Some(incoming.is_deleted),
209                            ..Default::default()
210                        },
211                        trx,
212                    )
213                    .await?;
214                result.updates += 1;
215            }
216            sync_map
217                .output_tag
218                .map_id(foreign_id, local.output_tag_id)
219                .map_err(|e| WalletError::Internal(e))?;
220        }
221    }
222
223    // 6. Transaction (depends on ProvenTx for proven_tx_id FK)
224    if let Some(transactions) = chunk.transactions {
225        for incoming in &transactions {
226            sync_map.transaction.update_max(incoming.updated_at);
227            sync_map.transaction.count += 1;
228
229            let foreign_id = incoming.transaction_id;
230
231            // Remap proven_tx_id FK if present
232            let local_proven_tx_id = match incoming.proven_tx_id {
233                Some(fk) => sync_map.proven_tx.get_local_id(fk).or(Some(fk)),
234                None => None,
235            };
236
237            // Find by natural key: reference + user_id (unique per user)
238            let existing = storage
239                .find_transactions(
240                    &FindTransactionsArgs {
241                        partial: TransactionPartial {
242                            user_id: Some(local_user_id),
243                            reference: Some(incoming.reference.clone()),
244                            ..Default::default()
245                        },
246                        ..Default::default()
247                    },
248                    trx,
249                )
250                .await?;
251
252            if let Some(local) = existing.into_iter().next() {
253                if local.should_update(incoming) {
254                    storage
255                        .update_transaction(
256                            local.transaction_id,
257                            &TransactionPartial {
258                                proven_tx_id: local_proven_tx_id,
259                                status: Some(incoming.status.clone()),
260                                txid: incoming.txid.clone(),
261                                ..Default::default()
262                            },
263                            trx,
264                        )
265                        .await?;
266                    result.updates += 1;
267                }
268                sync_map
269                    .transaction
270                    .map_id(foreign_id, local.transaction_id)
271                    .map_err(|e| WalletError::Internal(e))?;
272            } else {
273                // Insert with remapped user_id and proven_tx_id
274                let mut to_insert = incoming.clone();
275                to_insert.user_id = local_user_id;
276                to_insert.proven_tx_id = local_proven_tx_id;
277                to_insert.transaction_id = 0; // auto-increment
278                let local_id = storage.insert_transaction(&to_insert, trx).await?;
279                sync_map
280                    .transaction
281                    .map_id(foreign_id, local_id)
282                    .map_err(|e| WalletError::Internal(e))?;
283                result.inserts += 1;
284            }
285        }
286    }
287
288    // 7. Output (depends on Transaction and OutputBasket for FKs)
289    if let Some(outputs) = chunk.outputs {
290        for incoming in &outputs {
291            sync_map.output.update_max(incoming.updated_at);
292            sync_map.output.count += 1;
293
294            let foreign_id = incoming.output_id;
295
296            // Remap transaction_id FK
297            let local_tx_id = sync_map
298                .transaction
299                .get_local_id(incoming.transaction_id)
300                .unwrap_or(incoming.transaction_id);
301
302            // Remap basket_id FK if present
303            let local_basket_id = match incoming.basket_id {
304                Some(fk) => sync_map.output_basket.get_local_id(fk).or(Some(fk)),
305                None => None,
306            };
307
308            // Remap spent_by FK if present
309            let local_spent_by = match incoming.spent_by {
310                Some(fk) => sync_map.transaction.get_local_id(fk).or(Some(fk)),
311                None => None,
312            };
313
314            // Find by natural key: transaction_id + vout
315            let existing = storage
316                .find_outputs(
317                    &FindOutputsArgs {
318                        partial: OutputPartial {
319                            transaction_id: Some(local_tx_id),
320                            vout: Some(incoming.vout),
321                            ..Default::default()
322                        },
323                        ..Default::default()
324                    },
325                    trx,
326                )
327                .await?;
328
329            if let Some(local) = existing.into_iter().next() {
330                if local.should_update(incoming) {
331                    storage
332                        .update_output(
333                            local.output_id,
334                            &OutputPartial {
335                                basket_id: local_basket_id,
336                                spendable: Some(incoming.spendable),
337                                spent_by: local_spent_by,
338                                ..Default::default()
339                            },
340                            trx,
341                        )
342                        .await?;
343                    result.updates += 1;
344                }
345                sync_map
346                    .output
347                    .map_id(foreign_id, local.output_id)
348                    .map_err(|e| WalletError::Internal(e))?;
349            } else {
350                let mut to_insert = incoming.clone();
351                to_insert.user_id = local_user_id;
352                to_insert.transaction_id = local_tx_id;
353                to_insert.basket_id = local_basket_id;
354                to_insert.spent_by = local_spent_by;
355                to_insert.output_id = 0; // auto-increment
356                let local_id = storage.insert_output(&to_insert, trx).await?;
357                sync_map
358                    .output
359                    .map_id(foreign_id, local_id)
360                    .map_err(|e| WalletError::Internal(e))?;
361                result.inserts += 1;
362            }
363        }
364    }
365
366    // 8. TxLabelMap (depends on TxLabel and Transaction)
367    if let Some(tx_label_maps) = chunk.tx_label_maps {
368        for incoming in &tx_label_maps {
369            sync_map.tx_label_map.update_max(incoming.updated_at);
370            sync_map.tx_label_map.count += 1;
371
372            // Remap FKs
373            let local_tx_label_id = sync_map
374                .tx_label
375                .get_local_id(incoming.tx_label_id)
376                .unwrap_or(incoming.tx_label_id);
377            let local_tx_id = sync_map
378                .transaction
379                .get_local_id(incoming.transaction_id)
380                .unwrap_or(incoming.transaction_id);
381
382            // Find by natural key: tx_label_id + transaction_id
383            let existing = storage
384                .find_tx_label_maps(
385                    &FindTxLabelMapsArgs {
386                        partial: TxLabelMapPartial {
387                            tx_label_id: Some(local_tx_label_id),
388                            transaction_id: Some(local_tx_id),
389                            ..Default::default()
390                        },
391                        ..Default::default()
392                    },
393                    trx,
394                )
395                .await?;
396
397            if let Some(local) = existing.into_iter().next() {
398                if local.should_update(incoming) {
399                    storage
400                        .update_tx_label_map(
401                            local.transaction_id,
402                            local.tx_label_id,
403                            &TxLabelMapPartial {
404                                is_deleted: Some(incoming.is_deleted),
405                                ..Default::default()
406                            },
407                            trx,
408                        )
409                        .await?;
410                    result.updates += 1;
411                }
412            } else {
413                let mut to_insert = incoming.clone();
414                to_insert.tx_label_id = local_tx_label_id;
415                to_insert.transaction_id = local_tx_id;
416                storage.insert_tx_label_map(&to_insert, trx).await?;
417                result.inserts += 1;
418            }
419        }
420    }
421
422    // 9. OutputTagMap (depends on OutputTag and Output)
423    if let Some(output_tag_maps) = chunk.output_tag_maps {
424        for incoming in &output_tag_maps {
425            sync_map.output_tag_map.update_max(incoming.updated_at);
426            sync_map.output_tag_map.count += 1;
427
428            // Remap FKs
429            let local_tag_id = sync_map
430                .output_tag
431                .get_local_id(incoming.output_tag_id)
432                .unwrap_or(incoming.output_tag_id);
433            let local_output_id = sync_map
434                .output
435                .get_local_id(incoming.output_id)
436                .unwrap_or(incoming.output_id);
437
438            // Find by natural key: output_tag_id + output_id
439            let existing = storage
440                .find_output_tag_maps(
441                    &FindOutputTagMapsArgs {
442                        partial: OutputTagMapPartial {
443                            output_tag_id: Some(local_tag_id),
444                            output_id: Some(local_output_id),
445                            ..Default::default()
446                        },
447                        ..Default::default()
448                    },
449                    trx,
450                )
451                .await?;
452
453            if let Some(local) = existing.into_iter().next() {
454                if local.should_update(incoming) {
455                    storage
456                        .update_output_tag_map(
457                            local.output_id,
458                            local.output_tag_id,
459                            &OutputTagMapPartial {
460                                is_deleted: Some(incoming.is_deleted),
461                                ..Default::default()
462                            },
463                            trx,
464                        )
465                        .await?;
466                    result.updates += 1;
467                }
468            } else {
469                let mut to_insert = incoming.clone();
470                to_insert.output_tag_id = local_tag_id;
471                to_insert.output_id = local_output_id;
472                storage.insert_output_tag_map(&to_insert, trx).await?;
473                result.inserts += 1;
474            }
475        }
476    }
477
478    // 10. Certificate (depends on User)
479    if let Some(certificates) = chunk.certificates {
480        for incoming in &certificates {
481            sync_map.certificate.update_max(incoming.updated_at);
482            sync_map.certificate.count += 1;
483
484            let foreign_id = incoming.certificate_id;
485
486            // Find by natural key: serial_number + certifier (unique per certificate)
487            let existing = storage
488                .find_certificates(
489                    &FindCertificatesArgs {
490                        partial: CertificatePartial {
491                            user_id: Some(local_user_id),
492                            serial_number: Some(incoming.serial_number.clone()),
493                            certifier: Some(incoming.certifier.clone()),
494                            ..Default::default()
495                        },
496                        ..Default::default()
497                    },
498                    trx,
499                )
500                .await?;
501
502            if let Some(local) = existing.into_iter().next() {
503                if local.should_update(incoming) {
504                    storage
505                        .update_certificate(
506                            local.certificate_id,
507                            &CertificatePartial {
508                                subject: Some(incoming.subject.clone()),
509                                verifier: incoming.verifier.clone().map(Some).unwrap_or(None),
510                                revocation_outpoint: Some(incoming.revocation_outpoint.clone()),
511                                signature: Some(incoming.signature.clone()),
512                                is_deleted: Some(incoming.is_deleted),
513                                ..Default::default()
514                            },
515                            trx,
516                        )
517                        .await?;
518                    result.updates += 1;
519                }
520                sync_map
521                    .certificate
522                    .map_id(foreign_id, local.certificate_id)
523                    .map_err(|e| WalletError::Internal(e))?;
524            } else {
525                let mut to_insert = incoming.clone();
526                to_insert.user_id = local_user_id;
527                to_insert.certificate_id = 0;
528                let local_id = storage.insert_certificate(&to_insert, trx).await?;
529                sync_map
530                    .certificate
531                    .map_id(foreign_id, local_id)
532                    .map_err(|e| WalletError::Internal(e))?;
533                result.inserts += 1;
534            }
535        }
536    }
537
538    // 11. CertificateField (depends on Certificate)
539    if let Some(certificate_fields) = chunk.certificate_fields {
540        for incoming in &certificate_fields {
541            sync_map.certificate_field.update_max(incoming.updated_at);
542            sync_map.certificate_field.count += 1;
543
544            // Remap certificate_id FK
545            let local_cert_id = sync_map
546                .certificate
547                .get_local_id(incoming.certificate_id)
548                .unwrap_or(incoming.certificate_id);
549
550            // Find by natural key: certificate_id + field_name
551            let existing = storage
552                .find_certificate_fields(
553                    &FindCertificateFieldsArgs {
554                        partial: CertificateFieldPartial {
555                            certificate_id: Some(local_cert_id),
556                            field_name: Some(incoming.field_name.clone()),
557                            ..Default::default()
558                        },
559                        ..Default::default()
560                    },
561                    trx,
562                )
563                .await?;
564
565            if let Some(local) = existing.into_iter().next() {
566                if local.should_update(incoming) {
567                    storage
568                        .update_certificate_field(
569                            local.certificate_id,
570                            &local.field_name,
571                            &CertificateFieldPartial {
572                                field_value: Some(incoming.field_value.clone()),
573                                master_key: Some(incoming.master_key.clone()),
574                                ..Default::default()
575                            },
576                            trx,
577                        )
578                        .await?;
579                    result.updates += 1;
580                }
581            } else {
582                let mut to_insert = incoming.clone();
583                to_insert.user_id = local_user_id;
584                to_insert.certificate_id = local_cert_id;
585                storage.insert_certificate_field(&to_insert, trx).await?;
586                result.inserts += 1;
587            }
588        }
589    }
590
591    // 12. Commission (depends on Transaction)
592    if let Some(commissions) = chunk.commissions {
593        for incoming in &commissions {
594            sync_map.commission.update_max(incoming.updated_at);
595            sync_map.commission.count += 1;
596
597            let foreign_id = incoming.commission_id;
598
599            // Remap transaction_id FK
600            let local_tx_id = sync_map
601                .transaction
602                .get_local_id(incoming.transaction_id)
603                .unwrap_or(incoming.transaction_id);
604
605            // Find by natural key: transaction_id + key_offset
606            let existing = storage
607                .find_commissions(
608                    &FindCommissionsArgs {
609                        partial: CommissionPartial {
610                            user_id: Some(local_user_id),
611                            transaction_id: Some(local_tx_id),
612                            key_offset: Some(incoming.key_offset.clone()),
613                            ..Default::default()
614                        },
615                        ..Default::default()
616                    },
617                    trx,
618                )
619                .await?;
620
621            if let Some(local) = existing.into_iter().next() {
622                if local.should_update(incoming) {
623                    storage
624                        .update_commission(
625                            local.commission_id,
626                            &CommissionPartial {
627                                satoshis: Some(incoming.satoshis),
628                                is_redeemed: Some(incoming.is_redeemed),
629                                ..Default::default()
630                            },
631                            trx,
632                        )
633                        .await?;
634                    result.updates += 1;
635                }
636                sync_map
637                    .commission
638                    .map_id(foreign_id, local.commission_id)
639                    .map_err(|e| WalletError::Internal(e))?;
640            } else {
641                let mut to_insert = incoming.clone();
642                to_insert.user_id = local_user_id;
643                to_insert.transaction_id = local_tx_id;
644                to_insert.commission_id = 0;
645                let local_id = storage.insert_commission(&to_insert, trx).await?;
646                sync_map
647                    .commission
648                    .map_id(foreign_id, local_id)
649                    .map_err(|e| WalletError::Internal(e))?;
650                result.inserts += 1;
651            }
652        }
653    }
654
655    // 13. ProvenTxReq (depends on ProvenTx)
656    if let Some(proven_tx_reqs) = chunk.proven_tx_reqs {
657        for incoming in &proven_tx_reqs {
658            sync_map.proven_tx_req.update_max(incoming.updated_at);
659            sync_map.proven_tx_req.count += 1;
660
661            let foreign_id = incoming.proven_tx_req_id;
662
663            // Remap proven_tx_id FK if present
664            let local_proven_tx_id = match incoming.proven_tx_id {
665                Some(fk) => sync_map.proven_tx.get_local_id(fk).or(Some(fk)),
666                None => None,
667            };
668
669            // Find by natural key: txid
670            let existing = storage
671                .find_proven_tx_reqs(
672                    &FindProvenTxReqsArgs {
673                        partial: ProvenTxReqPartial {
674                            txid: Some(incoming.txid.clone()),
675                            ..Default::default()
676                        },
677                        ..Default::default()
678                    },
679                    trx,
680                )
681                .await?;
682
683            if let Some(local) = existing.into_iter().next() {
684                if local.should_update(incoming) {
685                    storage
686                        .update_proven_tx_req(
687                            local.proven_tx_req_id,
688                            &ProvenTxReqPartial {
689                                proven_tx_id: local_proven_tx_id,
690                                status: Some(incoming.status.clone()),
691                                notified: Some(incoming.notified),
692                                ..Default::default()
693                            },
694                            trx,
695                        )
696                        .await?;
697                    result.updates += 1;
698                }
699                sync_map
700                    .proven_tx_req
701                    .map_id(foreign_id, local.proven_tx_req_id)
702                    .map_err(|e| WalletError::Internal(e))?;
703            } else {
704                let mut to_insert = incoming.clone();
705                to_insert.proven_tx_id = local_proven_tx_id;
706                to_insert.proven_tx_req_id = 0;
707                let local_id = storage.insert_proven_tx_req(&to_insert, trx).await?;
708                sync_map
709                    .proven_tx_req
710                    .map_id(foreign_id, local_id)
711                    .map_err(|e| WalletError::Internal(e))?;
712                result.inserts += 1;
713            }
714        }
715    }
716
717    Ok(result)
718}
719
720/// Result of processing a sync chunk.
721#[derive(Debug, Default, Serialize, Deserialize)]
722#[serde(rename_all = "camelCase")]
723pub struct ProcessSyncChunkResult {
724    /// Whether the sync is complete (no more chunks pending).
725    /// TS server may return `0`/`1` instead of `false`/`true`.
726    #[serde(deserialize_with = "crate::serde_helpers::bool_from_int_or_bool")]
727    pub done: bool,
728    /// Latest updated_at timestamp seen in this chunk.
729    #[serde(
730        rename = "maxUpdated_at",
731        default,
732        with = "crate::serde_datetime::option"
733    )]
734    pub max_updated_at: Option<NaiveDateTime>,
735    /// Number of entities inserted.
736    pub inserts: i64,
737    /// Number of entities updated.
738    pub updates: i64,
739}