1use chrono::NaiveDateTime;
13use serde::{Deserialize, Serialize};
14
15use crate::error::{WalletError, WalletResult};
16use crate::storage::find_args::*;
17use crate::storage::sync::merge::MergeEntity;
18use crate::storage::sync::sync_map::{SyncChunk, SyncMap};
19use crate::storage::traits::provider::StorageProvider;
20use crate::storage::TrxToken;
21
22pub async fn process_sync_chunk(
36 storage: &dyn StorageProvider,
37 chunk: SyncChunk,
38 sync_map: &mut SyncMap,
39 trx: Option<&TrxToken>,
40) -> WalletResult<ProcessSyncChunkResult> {
41 let mut result = ProcessSyncChunkResult::default();
42
43 if let Some(ref user) = chunk.user {
45 let (local_user, _created) = storage.find_or_insert_user(&user.identity_key, trx).await?;
46
47 if user.updated_at > local_user.updated_at {
49 storage
50 .update_user(
51 local_user.user_id,
52 &UserPartial {
53 active_storage: Some(user.active_storage.clone()),
54 ..Default::default()
55 },
56 trx,
57 )
58 .await?;
59 result.updates += 1;
60 }
61 }
62
63 let local_user = storage
65 .find_user_by_identity_key(&chunk.user_identity_key, trx)
66 .await?;
67 let local_user_id = match local_user {
68 Some(u) => u.user_id,
69 None => {
70 return Err(WalletError::Internal(format!(
71 "processSyncChunk: user not found for identity key {}",
72 chunk.user_identity_key
73 )));
74 }
75 };
76
77 if let Some(proven_txs) = chunk.proven_txs {
79 for incoming in &proven_txs {
80 sync_map.proven_tx.update_max(incoming.updated_at);
81 sync_map.proven_tx.count += 1;
82
83 let existing = storage
85 .find_proven_txs(
86 &FindProvenTxsArgs {
87 partial: ProvenTxPartial {
88 txid: Some(incoming.txid.clone()),
89 ..Default::default()
90 },
91 ..Default::default()
92 },
93 trx,
94 )
95 .await?;
96
97 let foreign_id = incoming.proven_tx_id;
98 if let Some(local) = existing.into_iter().next() {
99 if local.should_update(incoming) {
101 storage
102 .update_proven_tx(
103 local.proven_tx_id,
104 &ProvenTxPartial {
105 height: Some(incoming.height),
106 block_hash: Some(incoming.block_hash.clone()),
107 ..Default::default()
108 },
109 trx,
110 )
111 .await?;
112 result.updates += 1;
113 }
114 sync_map
115 .proven_tx
116 .map_id(foreign_id, local.proven_tx_id)
117 .map_err(|e| WalletError::Internal(e))?;
118 } else {
119 let local_id = storage.insert_proven_tx(incoming, trx).await?;
121 sync_map
122 .proven_tx
123 .map_id(foreign_id, local_id)
124 .map_err(|e| WalletError::Internal(e))?;
125 result.inserts += 1;
126 }
127 }
128 }
129
130 if let Some(output_baskets) = chunk.output_baskets {
132 for incoming in &output_baskets {
133 sync_map.output_basket.update_max(incoming.updated_at);
134 sync_map.output_basket.count += 1;
135
136 let foreign_id = incoming.basket_id;
137 let local = storage
138 .find_or_insert_output_basket(local_user_id, &incoming.name, trx)
139 .await?;
140
141 if local.should_update(incoming) {
142 storage
143 .update_output_basket(
144 local.basket_id,
145 &OutputBasketPartial {
146 is_deleted: Some(incoming.is_deleted),
147 ..Default::default()
148 },
149 trx,
150 )
151 .await?;
152 result.updates += 1;
153 }
154 sync_map
155 .output_basket
156 .map_id(foreign_id, local.basket_id)
157 .map_err(|e| WalletError::Internal(e))?;
158 }
159 }
160
161 if let Some(tx_labels) = chunk.tx_labels {
163 for incoming in &tx_labels {
164 sync_map.tx_label.update_max(incoming.updated_at);
165 sync_map.tx_label.count += 1;
166
167 let foreign_id = incoming.tx_label_id;
168 let local = storage
169 .find_or_insert_tx_label(local_user_id, &incoming.label, trx)
170 .await?;
171
172 if local.should_update(incoming) {
173 storage
174 .update_tx_label(
175 local.tx_label_id,
176 &TxLabelPartial {
177 is_deleted: Some(incoming.is_deleted),
178 ..Default::default()
179 },
180 trx,
181 )
182 .await?;
183 result.updates += 1;
184 }
185 sync_map
186 .tx_label
187 .map_id(foreign_id, local.tx_label_id)
188 .map_err(|e| WalletError::Internal(e))?;
189 }
190 }
191
192 if let Some(output_tags) = chunk.output_tags {
194 for incoming in &output_tags {
195 sync_map.output_tag.update_max(incoming.updated_at);
196 sync_map.output_tag.count += 1;
197
198 let foreign_id = incoming.output_tag_id;
199 let local = storage
200 .find_or_insert_output_tag(local_user_id, &incoming.tag, trx)
201 .await?;
202
203 if local.should_update(incoming) {
204 storage
205 .update_output_tag(
206 local.output_tag_id,
207 &OutputTagPartial {
208 is_deleted: Some(incoming.is_deleted),
209 ..Default::default()
210 },
211 trx,
212 )
213 .await?;
214 result.updates += 1;
215 }
216 sync_map
217 .output_tag
218 .map_id(foreign_id, local.output_tag_id)
219 .map_err(|e| WalletError::Internal(e))?;
220 }
221 }
222
223 if let Some(transactions) = chunk.transactions {
225 for incoming in &transactions {
226 sync_map.transaction.update_max(incoming.updated_at);
227 sync_map.transaction.count += 1;
228
229 let foreign_id = incoming.transaction_id;
230
231 let local_proven_tx_id = match incoming.proven_tx_id {
233 Some(fk) => sync_map.proven_tx.get_local_id(fk).or(Some(fk)),
234 None => None,
235 };
236
237 let existing = storage
239 .find_transactions(
240 &FindTransactionsArgs {
241 partial: TransactionPartial {
242 user_id: Some(local_user_id),
243 reference: Some(incoming.reference.clone()),
244 ..Default::default()
245 },
246 ..Default::default()
247 },
248 trx,
249 )
250 .await?;
251
252 if let Some(local) = existing.into_iter().next() {
253 if local.should_update(incoming) {
254 storage
255 .update_transaction(
256 local.transaction_id,
257 &TransactionPartial {
258 proven_tx_id: local_proven_tx_id,
259 status: Some(incoming.status.clone()),
260 txid: incoming.txid.clone(),
261 ..Default::default()
262 },
263 trx,
264 )
265 .await?;
266 result.updates += 1;
267 }
268 sync_map
269 .transaction
270 .map_id(foreign_id, local.transaction_id)
271 .map_err(|e| WalletError::Internal(e))?;
272 } else {
273 let mut to_insert = incoming.clone();
275 to_insert.user_id = local_user_id;
276 to_insert.proven_tx_id = local_proven_tx_id;
277 to_insert.transaction_id = 0; let local_id = storage.insert_transaction(&to_insert, trx).await?;
279 sync_map
280 .transaction
281 .map_id(foreign_id, local_id)
282 .map_err(|e| WalletError::Internal(e))?;
283 result.inserts += 1;
284 }
285 }
286 }
287
288 if let Some(outputs) = chunk.outputs {
290 for incoming in &outputs {
291 sync_map.output.update_max(incoming.updated_at);
292 sync_map.output.count += 1;
293
294 let foreign_id = incoming.output_id;
295
296 let local_tx_id = sync_map
298 .transaction
299 .get_local_id(incoming.transaction_id)
300 .unwrap_or(incoming.transaction_id);
301
302 let local_basket_id = match incoming.basket_id {
304 Some(fk) => sync_map.output_basket.get_local_id(fk).or(Some(fk)),
305 None => None,
306 };
307
308 let local_spent_by = match incoming.spent_by {
310 Some(fk) => sync_map.transaction.get_local_id(fk).or(Some(fk)),
311 None => None,
312 };
313
314 let existing = storage
316 .find_outputs(
317 &FindOutputsArgs {
318 partial: OutputPartial {
319 transaction_id: Some(local_tx_id),
320 vout: Some(incoming.vout),
321 ..Default::default()
322 },
323 ..Default::default()
324 },
325 trx,
326 )
327 .await?;
328
329 if let Some(local) = existing.into_iter().next() {
330 if local.should_update(incoming) {
331 storage
332 .update_output(
333 local.output_id,
334 &OutputPartial {
335 basket_id: local_basket_id,
336 spendable: Some(incoming.spendable),
337 spent_by: local_spent_by,
338 ..Default::default()
339 },
340 trx,
341 )
342 .await?;
343 result.updates += 1;
344 }
345 sync_map
346 .output
347 .map_id(foreign_id, local.output_id)
348 .map_err(|e| WalletError::Internal(e))?;
349 } else {
350 let mut to_insert = incoming.clone();
351 to_insert.user_id = local_user_id;
352 to_insert.transaction_id = local_tx_id;
353 to_insert.basket_id = local_basket_id;
354 to_insert.spent_by = local_spent_by;
355 to_insert.output_id = 0; let local_id = storage.insert_output(&to_insert, trx).await?;
357 sync_map
358 .output
359 .map_id(foreign_id, local_id)
360 .map_err(|e| WalletError::Internal(e))?;
361 result.inserts += 1;
362 }
363 }
364 }
365
366 if let Some(tx_label_maps) = chunk.tx_label_maps {
368 for incoming in &tx_label_maps {
369 sync_map.tx_label_map.update_max(incoming.updated_at);
370 sync_map.tx_label_map.count += 1;
371
372 let local_tx_label_id = sync_map
374 .tx_label
375 .get_local_id(incoming.tx_label_id)
376 .unwrap_or(incoming.tx_label_id);
377 let local_tx_id = sync_map
378 .transaction
379 .get_local_id(incoming.transaction_id)
380 .unwrap_or(incoming.transaction_id);
381
382 let existing = storage
384 .find_tx_label_maps(
385 &FindTxLabelMapsArgs {
386 partial: TxLabelMapPartial {
387 tx_label_id: Some(local_tx_label_id),
388 transaction_id: Some(local_tx_id),
389 ..Default::default()
390 },
391 ..Default::default()
392 },
393 trx,
394 )
395 .await?;
396
397 if let Some(local) = existing.into_iter().next() {
398 if local.should_update(incoming) {
399 storage
400 .update_tx_label_map(
401 local.transaction_id,
402 local.tx_label_id,
403 &TxLabelMapPartial {
404 is_deleted: Some(incoming.is_deleted),
405 ..Default::default()
406 },
407 trx,
408 )
409 .await?;
410 result.updates += 1;
411 }
412 } else {
413 let mut to_insert = incoming.clone();
414 to_insert.tx_label_id = local_tx_label_id;
415 to_insert.transaction_id = local_tx_id;
416 storage.insert_tx_label_map(&to_insert, trx).await?;
417 result.inserts += 1;
418 }
419 }
420 }
421
422 if let Some(output_tag_maps) = chunk.output_tag_maps {
424 for incoming in &output_tag_maps {
425 sync_map.output_tag_map.update_max(incoming.updated_at);
426 sync_map.output_tag_map.count += 1;
427
428 let local_tag_id = sync_map
430 .output_tag
431 .get_local_id(incoming.output_tag_id)
432 .unwrap_or(incoming.output_tag_id);
433 let local_output_id = sync_map
434 .output
435 .get_local_id(incoming.output_id)
436 .unwrap_or(incoming.output_id);
437
438 let existing = storage
440 .find_output_tag_maps(
441 &FindOutputTagMapsArgs {
442 partial: OutputTagMapPartial {
443 output_tag_id: Some(local_tag_id),
444 output_id: Some(local_output_id),
445 ..Default::default()
446 },
447 ..Default::default()
448 },
449 trx,
450 )
451 .await?;
452
453 if let Some(local) = existing.into_iter().next() {
454 if local.should_update(incoming) {
455 storage
456 .update_output_tag_map(
457 local.output_id,
458 local.output_tag_id,
459 &OutputTagMapPartial {
460 is_deleted: Some(incoming.is_deleted),
461 ..Default::default()
462 },
463 trx,
464 )
465 .await?;
466 result.updates += 1;
467 }
468 } else {
469 let mut to_insert = incoming.clone();
470 to_insert.output_tag_id = local_tag_id;
471 to_insert.output_id = local_output_id;
472 storage.insert_output_tag_map(&to_insert, trx).await?;
473 result.inserts += 1;
474 }
475 }
476 }
477
478 if let Some(certificates) = chunk.certificates {
480 for incoming in &certificates {
481 sync_map.certificate.update_max(incoming.updated_at);
482 sync_map.certificate.count += 1;
483
484 let foreign_id = incoming.certificate_id;
485
486 let existing = storage
488 .find_certificates(
489 &FindCertificatesArgs {
490 partial: CertificatePartial {
491 user_id: Some(local_user_id),
492 serial_number: Some(incoming.serial_number.clone()),
493 certifier: Some(incoming.certifier.clone()),
494 ..Default::default()
495 },
496 ..Default::default()
497 },
498 trx,
499 )
500 .await?;
501
502 if let Some(local) = existing.into_iter().next() {
503 if local.should_update(incoming) {
504 storage
505 .update_certificate(
506 local.certificate_id,
507 &CertificatePartial {
508 subject: Some(incoming.subject.clone()),
509 verifier: incoming.verifier.clone().map(Some).unwrap_or(None),
510 revocation_outpoint: Some(incoming.revocation_outpoint.clone()),
511 signature: Some(incoming.signature.clone()),
512 is_deleted: Some(incoming.is_deleted),
513 ..Default::default()
514 },
515 trx,
516 )
517 .await?;
518 result.updates += 1;
519 }
520 sync_map
521 .certificate
522 .map_id(foreign_id, local.certificate_id)
523 .map_err(|e| WalletError::Internal(e))?;
524 } else {
525 let mut to_insert = incoming.clone();
526 to_insert.user_id = local_user_id;
527 to_insert.certificate_id = 0;
528 let local_id = storage.insert_certificate(&to_insert, trx).await?;
529 sync_map
530 .certificate
531 .map_id(foreign_id, local_id)
532 .map_err(|e| WalletError::Internal(e))?;
533 result.inserts += 1;
534 }
535 }
536 }
537
538 if let Some(certificate_fields) = chunk.certificate_fields {
540 for incoming in &certificate_fields {
541 sync_map.certificate_field.update_max(incoming.updated_at);
542 sync_map.certificate_field.count += 1;
543
544 let local_cert_id = sync_map
546 .certificate
547 .get_local_id(incoming.certificate_id)
548 .unwrap_or(incoming.certificate_id);
549
550 let existing = storage
552 .find_certificate_fields(
553 &FindCertificateFieldsArgs {
554 partial: CertificateFieldPartial {
555 certificate_id: Some(local_cert_id),
556 field_name: Some(incoming.field_name.clone()),
557 ..Default::default()
558 },
559 ..Default::default()
560 },
561 trx,
562 )
563 .await?;
564
565 if let Some(local) = existing.into_iter().next() {
566 if local.should_update(incoming) {
567 storage
568 .update_certificate_field(
569 local.certificate_id,
570 &local.field_name,
571 &CertificateFieldPartial {
572 field_value: Some(incoming.field_value.clone()),
573 master_key: Some(incoming.master_key.clone()),
574 ..Default::default()
575 },
576 trx,
577 )
578 .await?;
579 result.updates += 1;
580 }
581 } else {
582 let mut to_insert = incoming.clone();
583 to_insert.user_id = local_user_id;
584 to_insert.certificate_id = local_cert_id;
585 storage.insert_certificate_field(&to_insert, trx).await?;
586 result.inserts += 1;
587 }
588 }
589 }
590
591 if let Some(commissions) = chunk.commissions {
593 for incoming in &commissions {
594 sync_map.commission.update_max(incoming.updated_at);
595 sync_map.commission.count += 1;
596
597 let foreign_id = incoming.commission_id;
598
599 let local_tx_id = sync_map
601 .transaction
602 .get_local_id(incoming.transaction_id)
603 .unwrap_or(incoming.transaction_id);
604
605 let existing = storage
607 .find_commissions(
608 &FindCommissionsArgs {
609 partial: CommissionPartial {
610 user_id: Some(local_user_id),
611 transaction_id: Some(local_tx_id),
612 key_offset: Some(incoming.key_offset.clone()),
613 ..Default::default()
614 },
615 ..Default::default()
616 },
617 trx,
618 )
619 .await?;
620
621 if let Some(local) = existing.into_iter().next() {
622 if local.should_update(incoming) {
623 storage
624 .update_commission(
625 local.commission_id,
626 &CommissionPartial {
627 satoshis: Some(incoming.satoshis),
628 is_redeemed: Some(incoming.is_redeemed),
629 ..Default::default()
630 },
631 trx,
632 )
633 .await?;
634 result.updates += 1;
635 }
636 sync_map
637 .commission
638 .map_id(foreign_id, local.commission_id)
639 .map_err(|e| WalletError::Internal(e))?;
640 } else {
641 let mut to_insert = incoming.clone();
642 to_insert.user_id = local_user_id;
643 to_insert.transaction_id = local_tx_id;
644 to_insert.commission_id = 0;
645 let local_id = storage.insert_commission(&to_insert, trx).await?;
646 sync_map
647 .commission
648 .map_id(foreign_id, local_id)
649 .map_err(|e| WalletError::Internal(e))?;
650 result.inserts += 1;
651 }
652 }
653 }
654
655 if let Some(proven_tx_reqs) = chunk.proven_tx_reqs {
657 for incoming in &proven_tx_reqs {
658 sync_map.proven_tx_req.update_max(incoming.updated_at);
659 sync_map.proven_tx_req.count += 1;
660
661 let foreign_id = incoming.proven_tx_req_id;
662
663 let local_proven_tx_id = match incoming.proven_tx_id {
665 Some(fk) => sync_map.proven_tx.get_local_id(fk).or(Some(fk)),
666 None => None,
667 };
668
669 let existing = storage
671 .find_proven_tx_reqs(
672 &FindProvenTxReqsArgs {
673 partial: ProvenTxReqPartial {
674 txid: Some(incoming.txid.clone()),
675 ..Default::default()
676 },
677 ..Default::default()
678 },
679 trx,
680 )
681 .await?;
682
683 if let Some(local) = existing.into_iter().next() {
684 if local.should_update(incoming) {
685 storage
686 .update_proven_tx_req(
687 local.proven_tx_req_id,
688 &ProvenTxReqPartial {
689 proven_tx_id: local_proven_tx_id,
690 status: Some(incoming.status.clone()),
691 notified: Some(incoming.notified),
692 ..Default::default()
693 },
694 trx,
695 )
696 .await?;
697 result.updates += 1;
698 }
699 sync_map
700 .proven_tx_req
701 .map_id(foreign_id, local.proven_tx_req_id)
702 .map_err(|e| WalletError::Internal(e))?;
703 } else {
704 let mut to_insert = incoming.clone();
705 to_insert.proven_tx_id = local_proven_tx_id;
706 to_insert.proven_tx_req_id = 0;
707 let local_id = storage.insert_proven_tx_req(&to_insert, trx).await?;
708 sync_map
709 .proven_tx_req
710 .map_id(foreign_id, local_id)
711 .map_err(|e| WalletError::Internal(e))?;
712 result.inserts += 1;
713 }
714 }
715 }
716
717 Ok(result)
718}
719
720#[derive(Debug, Default, Serialize, Deserialize)]
722#[serde(rename_all = "camelCase")]
723pub struct ProcessSyncChunkResult {
724 #[serde(deserialize_with = "crate::serde_helpers::bool_from_int_or_bool")]
727 pub done: bool,
728 #[serde(
730 rename = "maxUpdated_at",
731 default,
732 with = "crate::serde_datetime::option"
733 )]
734 pub max_updated_at: Option<NaiveDateTime>,
735 pub inserts: i64,
737 pub updates: i64,
739}