1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 Namespace, SchemaType,
10 identity::{DigestIdentifier, PublicKey},
11 request::EventRequest,
12};
13use network::ComunicateInfo;
14
15use crate::{
16 ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
17 governance::{
18 Governance, GovernanceMessage, GovernanceResponse,
19 model::{HashThisRole, RoleTypes},
20 },
21 helpers::network::service::NetworkSender,
22 model::{
23 common::{
24 check_witness_access, emit_fail,
25 node::{get_subject_data, i_owner_new_owner, try_to_update},
26 subject::{create_subject, get_gov, get_gov_sn, update_ledger},
27 },
28 event::Ledger,
29 },
30 node::SubjectData,
31 subject::SignedLedger,
32 tracker::{Tracker, TrackerMessage, TrackerResponse},
33};
34
35use tracing::{Span, debug, error, info_span, warn};
36
37use super::error::DistributorError;
38
39pub struct DistriWorker {
40 pub our_key: Arc<PublicKey>,
41 pub network: Arc<NetworkSender>,
42}
43
44impl DistriWorker {
45 async fn down_tracker(
46 &self,
47 ctx: &ActorContext<Self>,
48 subject_id: &DigestIdentifier,
49 ) -> Result<(), ActorError> {
50 let subject_path =
51 ActorPath::from(format!("/user/node/{}", subject_id));
52
53 let subject_actor =
54 ctx.system().get_actor::<Tracker>(&subject_path).await?;
55 subject_actor.ask_stop().await
56 }
57
58 async fn get_ledger(
59 &self,
60 ctx: &mut ActorContext<Self>,
61 subject_id: &DigestIdentifier,
62 hi_sn: u64,
63 lo_sn: Option<u64>,
64 is_gov: bool,
65 ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
66 let path = ActorPath::from(format!("/user/node/{}", subject_id));
67
68 if is_gov {
69 let governance_actor =
70 ctx.system().get_actor::<Governance>(&path).await?;
71
72 let response = governance_actor
73 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
74 .await?;
75
76 match response {
77 GovernanceResponse::Ledger { ledger, is_all } => {
78 Ok((ledger, is_all))
79 }
80 _ => Err(ActorError::UnexpectedResponse {
81 expected: "GovernanceResponse::Ledger".to_owned(),
82 path,
83 }),
84 }
85 } else {
86 let response = if let Ok(tracker_actor) =
87 ctx.system().get_actor::<Tracker>(&path).await
88 {
89 tracker_actor
90 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
91 .await?
92 } else {
93 Self::up_tracker(ctx, subject_id, true).await?;
94
95 let tracker_actor =
96 ctx.system().get_actor::<Tracker>(&path).await?;
97
98 let response = tracker_actor
99 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
100 .await?;
101
102 tracker_actor.ask_stop().await?;
103
104 response
105 };
106
107 match response {
108 TrackerResponse::Ledger { ledger, is_all } => {
109 Ok((ledger, is_all))
110 }
111 _ => Err(ActorError::UnexpectedResponse {
112 expected: "TrackerResponse::Ledger".to_owned(),
113 path,
114 }),
115 }
116 }
117 }
118
119 async fn authorized_subj(
120 &self,
121 ctx: &ActorContext<Self>,
122 subject_id: &DigestIdentifier,
123 ) -> Result<(bool, Option<SubjectData>), ActorError> {
124 let node_path = ActorPath::from("/user/node");
125 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
126
127 let response = node_actor
128 .ask(NodeMessage::AuthData(subject_id.to_owned()))
129 .await?;
130 match response {
131 NodeResponse::AuthData { auth, subject_data } => {
132 Ok((auth, subject_data))
133 }
134 _ => Err(ActorError::UnexpectedResponse {
135 expected: "NodeResponse::AuthData".to_owned(),
136 path: node_path,
137 }),
138 }
139 }
140
141 async fn check_auth(
142 &self,
143 ctx: &mut ActorContext<Self>,
144 signer: PublicKey,
145 ledger: Ledger,
146 ) -> Result<(bool, bool), ActorError> {
147 let subject_id = ledger.get_subject_id();
148 let (auth, subject_data) =
150 self.authorized_subj(ctx, &subject_id).await?;
151
152 let (schema_id, namespace, governance_id) = if let Some(ref data) =
154 subject_data
155 {
156 match data {
158 SubjectData::Tracker {
159 governance_id,
160 schema_id,
161 namespace,
162 ..
163 } => {
164 let namespace = Namespace::from(namespace.clone());
165 (schema_id.clone(), namespace, Some(governance_id.clone()))
166 }
167 SubjectData::Governance { .. } => {
168 (SchemaType::Governance, Namespace::new(), None)
169 }
170 }
171 } else {
172 if let EventRequest::Create(create) = ledger.event_request.content()
174 {
175 if !create.schema_id.is_gov() && create.governance_id.is_empty()
176 {
177 return Err(
178 DistributorError::MissingGovernanceIdInCreate {
179 subject_id: subject_id.clone(),
180 }
181 .into(),
182 );
183 }
184
185 let gov_id = if create.schema_id.is_gov() {
186 None
187 } else {
188 Some(create.governance_id.clone())
189 };
190
191 (create.schema_id.clone(), create.namespace.clone(), gov_id)
192 } else {
193 try_to_update(ctx, subject_id, Some(signer)).await?;
195 return Err(DistributorError::UpdatingSubject.into());
196 }
197 };
198
199 let is_gov = schema_id.is_gov();
200 if is_gov {
202 if !auth {
204 return Err(DistributorError::GovernanceNotAuthorized.into());
205 }
206 } else {
207 if !auth {
209 let governance_id =
210 governance_id.expect("governance_id is Some for Trackers");
211 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
212 DistributorError::GetGovernanceFailed {
213 details: e.to_string(),
214 }
215 })?;
216
217 match gov.version.cmp(&ledger.gov_version) {
218 std::cmp::Ordering::Less => {
219 return Err(
220 DistributorError::GovernanceVersionMismatch {
221 our_version: gov.version,
222 their_version: ledger.gov_version,
223 }
224 .into(),
225 );
226 }
227 std::cmp::Ordering::Equal => {}
228 std::cmp::Ordering::Greater => {}
229 };
230
231 if !gov.has_this_role(HashThisRole::SchemaWitness {
232 who: (*self.our_key).clone(),
233 creator: signer,
234 schema_id,
235 namespace,
236 }) {
237 return Err(DistributorError::NotWitness.into());
238 }
239 }
240 }
241
242 Ok((is_gov, subject_data.is_some()))
243 }
244
245 async fn check_witness(
246 &self,
247 ctx: &mut ActorContext<Self>,
248 subject_id: &DigestIdentifier,
249 sender: PublicKey,
250 ) -> Result<(u64, bool), ActorError> {
251 let data = get_subject_data(ctx, subject_id).await?;
252
253 let Some(data) = data else {
254 return Err(DistributorError::SubjectNotFound.into());
255 };
256
257 match data {
258 SubjectData::Tracker {
259 governance_id,
260 schema_id,
261 namespace,
262 ..
263 } => {
264 let Some(sn) = check_witness_access(
265 ctx,
266 &governance_id,
267 subject_id,
268 sender,
269 namespace,
270 schema_id,
271 )
272 .await?
273 else {
274 return Err(DistributorError::SenderNoAccess.into());
275 };
276
277 Ok((sn, false))
278 }
279 SubjectData::Governance { .. } => {
280 let gov = get_gov(ctx, subject_id).await.map_err(|e| {
281 DistributorError::GetGovernanceFailed {
282 details: e.to_string(),
283 }
284 })?;
285
286 if !gov.has_this_role(HashThisRole::Gov {
287 who: sender.clone(),
288 role: RoleTypes::Witness,
289 }) {
290 return Err(DistributorError::SenderNotMember {
291 sender: sender.to_string(),
292 }
293 .into());
294 }
295
296 Ok((get_gov_sn(ctx, subject_id).await?, true))
297 }
298 }
299 }
300
301 pub async fn up_tracker(
302 ctx: &mut ActorContext<Self>,
303 subject_id: &DigestIdentifier,
304 light: bool,
305 ) -> Result<(), ActorError> {
306 let node_path = ActorPath::from("/user/node");
307 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
308
309 let response = node_actor
311 .ask(NodeMessage::UpSubject {
312 subject_id: subject_id.to_owned(),
313 light,
314 })
315 .await?;
316
317 match response {
318 NodeResponse::Ok => Ok(()),
319 _ => Err(ActorError::UnexpectedResponse {
320 expected: "NodeResponse::Ok".to_owned(),
321 path: node_path,
322 }),
323 }
324 }
325}
326
327#[async_trait]
328impl Actor for DistriWorker {
329 type Event = ();
330 type Message = DistriWorkerMessage;
331 type Response = ();
332
333 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
334 parent_span.map_or_else(
335 || info_span!("DistriWorker", id),
336 |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
337 )
338 }
339}
340
341#[derive(Debug, Clone)]
342pub enum DistriWorkerMessage {
343 GetLastSn {
344 subject_id: DigestIdentifier,
345 info: ComunicateInfo,
346 sender: PublicKey,
347 receiver_actor: String,
348 },
349 SendDistribution {
351 actual_sn: Option<u64>,
352 subject_id: DigestIdentifier,
353 info: ComunicateInfo,
354 sender: PublicKey,
355 },
356 LastEventDistribution {
358 ledger: Box<SignedLedger>,
359 info: ComunicateInfo,
360 sender: PublicKey,
361 },
362 LedgerDistribution {
363 ledger: Vec<SignedLedger>,
364 is_all: bool,
365 info: ComunicateInfo,
366 sender: PublicKey,
367 },
368}
369
370impl Message for DistriWorkerMessage {}
371
372impl NotPersistentActor for DistriWorker {}
373
374#[async_trait]
375impl Handler<Self> for DistriWorker {
376 async fn handle_message(
377 &mut self,
378 _sender: ActorPath,
379 msg: DistriWorkerMessage,
380 ctx: &mut ActorContext<Self>,
381 ) -> Result<(), ActorError> {
382 match msg {
383 DistriWorkerMessage::GetLastSn {
384 subject_id,
385 info,
386 sender,
387 receiver_actor,
388 } => {
389 let (sn, ..) = match self
390 .check_witness(ctx, &subject_id, sender.clone())
391 .await
392 {
393 Ok(sn) => sn,
394 Err(e) => {
395 if let ActorError::Functional { .. } = e {
396 warn!(
397 msg_type = "GetLastSn",
398 subject_id = %subject_id,
399 sender = %sender,
400 error = %e,
401 "Witness check failed"
402 );
403 return Err(e);
404 } else {
405 error!(
406 msg_type = "GetLastSn",
407 subject_id = %subject_id,
408 sender = %sender,
409 error = %e,
410 "Witness check failed"
411 );
412 return Err(emit_fail(ctx, e).await);
413 }
414 }
415 };
416
417 let new_info = ComunicateInfo {
418 receiver: sender.clone(),
419 request_id: info.request_id,
420 version: info.version,
421 receiver_actor,
422 };
423
424 if let Err(e) = self
425 .network
426 .send_command(network::CommandHelper::SendMessage {
427 message: NetworkMessage {
428 info: new_info,
429 message: ActorMessage::AuthLastSn { sn },
430 },
431 })
432 .await
433 {
434 error!(
435 msg_type = "GetLastSn",
436 subject_id = %subject_id,
437 sn = sn,
438 error = %e,
439 "Failed to send last SN response to network"
440 );
441 return Err(emit_fail(ctx, e).await);
442 };
443
444 debug!(
445 msg_type = "GetLastSn",
446 subject_id = %subject_id,
447 sn = sn,
448 sender = %sender,
449 "Last SN response sent successfully"
450 );
451 }
452 DistriWorkerMessage::SendDistribution {
453 actual_sn,
454 info,
455 subject_id,
456 sender,
457 } => {
458 let (hi_sn, is_gov) = match self
459 .check_witness(ctx, &subject_id, sender.clone())
460 .await
461 {
462 Ok(sn) => sn,
463 Err(e) => {
464 if let ActorError::Functional { .. } = e {
465 warn!(
466 msg_type = "SendDistribution",
467 subject_id = %subject_id,
468 sender = %sender,
469 error = %e,
470 "Witness check failed"
471 );
472 return Err(e);
473 } else {
474 error!(
475 msg_type = "SendDistribution",
476 subject_id = %subject_id,
477 sender = %sender,
478 error = %e,
479 "Witness check failed"
480 );
481 return Err(emit_fail(ctx, e).await);
482 }
483 }
484 };
485
486 if let Some(actual_sn) = actual_sn
487 && actual_sn >= hi_sn
488 {
489 warn!(
490 msg_type = "SendDistribution",
491 subject_id = %subject_id,
492 actual_sn = actual_sn,
493 witness_sn = hi_sn,
494 "Requester SN is >= witness SN, nothing to send"
495 );
496 return Err(DistributorError::ActualSnBiggerThanWitness {
497 actual_sn,
498 witness_sn: hi_sn,
499 }
500 .into());
501 };
502
503 let (ledger, is_all) = match self
504 .get_ledger(ctx, &subject_id, hi_sn, actual_sn, is_gov)
505 .await
506 {
507 Ok(res) => res,
508 Err(e) => {
509 error!(
510 msg_type = "SendDistribution",
511 subject_id = %subject_id,
512 hi_sn = hi_sn,
513 actual_sn = ?actual_sn,
514 is_gov = is_gov,
515 error = %e,
516 "Failed to obtain ledger"
517 );
518 return Err(emit_fail(ctx, e).await);
519 }
520 };
521
522 let new_info = ComunicateInfo {
523 receiver: sender.clone(),
524 request_id: info.request_id,
525 version: info.version,
526 receiver_actor: format!(
527 "/user/node/distributor_{}",
528 subject_id
529 ),
530 };
531
532 if let Err(e) = self
533 .network
534 .send_command(network::CommandHelper::SendMessage {
535 message: NetworkMessage {
536 info: new_info,
537 message: ActorMessage::DistributionLedgerRes {
538 ledger: ledger.clone(),
539 is_all,
540 },
541 },
542 })
543 .await
544 {
545 error!(
546 msg_type = "SendDistribution",
547 subject_id = %subject_id,
548 ledger_count = ledger.len(),
549 is_all = is_all,
550 error = %e,
551 "Failed to send ledger response to network"
552 );
553 return Err(emit_fail(ctx, e).await);
554 };
555
556 debug!(
557 msg_type = "SendDistribution",
558 subject_id = %subject_id,
559 sender = %sender,
560 ledger_count = ledger.len(),
561 is_all = is_all,
562 hi_sn = hi_sn,
563 actual_sn = ?actual_sn,
564 "Ledger distribution sent successfully"
565 );
566 }
567 DistriWorkerMessage::LastEventDistribution {
568 ledger,
569 info,
570 sender,
571 } => {
572 let subject_id = ledger.content().get_subject_id();
573 let sn = ledger.content().sn;
574
575 let (is_gov, ..) = match self
576 .check_auth(ctx, sender.clone(), ledger.content().clone())
577 .await
578 {
579 Ok(is_gov) => is_gov,
580 Err(e) => {
581 if let ActorError::Functional { .. } = e {
582 warn!(
583 msg_type = "LastEventDistribution",
584 subject_id = %subject_id,
585 sn = sn,
586 sender = %sender,
587 error = %e,
588 "Authorization check failed"
589 );
590 return Err(e);
591 } else {
592 error!(
593 msg_type = "LastEventDistribution",
594 subject_id = %subject_id,
595 sn = sn,
596 sender = %sender,
597 error = %e,
598 "Authorization check failed"
599 );
600 return Err(emit_fail(ctx, e).await);
601 }
602 }
603 };
604
605 let (owner, new_owner) = if ledger
606 .content()
607 .event_request
608 .content()
609 .is_create_event()
610 {
611 if let Err(e) = create_subject(ctx, *ledger.clone()).await {
612 if let ActorError::Functional { .. } = e {
613 warn!(
614 msg_type = "LastEventDistribution",
615 subject_id = %subject_id,
616 sn = sn,
617 error = %e,
618 "Failed to create subject from create event"
619 );
620 return Err(e);
621 } else {
622 error!(
623 msg_type = "LastEventDistribution",
624 subject_id = %subject_id,
625 sn = sn,
626 error = %e,
627 "Failed to create subject from create event"
628 );
629 return Err(emit_fail(ctx, e).await);
630 }
631 };
632
633 (ledger.signature().signer.clone(), None)
634 } else {
635 let (i_owner, i_new_owner) =
636 match i_owner_new_owner(ctx, &subject_id).await {
637 Ok(res) => res,
638 Err(e) => {
639 error!(
640 msg_type = "LastEventDistribution",
641 subject_id = %subject_id,
642 error = %e,
643 "Failed to check owner status"
644 );
645 return Err(emit_fail(ctx, e).await);
646 }
647 };
648
649 if !i_new_owner.unwrap_or_default()
650 && !i_owner
651 && !is_gov
652 && let Err(e) =
653 Self::up_tracker(ctx, &subject_id, false).await
654 {
655 error!(
656 msg_type = "LastEventDistribution",
657 subject_id = %subject_id,
658 error = %e,
659 "Failed to bring up tracker for witness subject"
660 );
661 let error = DistributorError::UpTrackerFailed {
662 details: e.to_string(),
663 };
664 return Err(emit_fail(ctx, error.into()).await);
665 }
666
667 match update_ledger(ctx, &subject_id, vec![*ledger.clone()])
668 .await
669 {
670 Ok((last_sn, owner, new_owner))
671 if last_sn < ledger.content().sn =>
672 {
673 debug!(
674 msg_type = "LastEventDistribution",
675 subject_id = %subject_id,
676 last_sn = last_sn,
677 received_sn = sn,
678 "SN gap detected, requesting full ledger"
679 );
680
681 let new_info = ComunicateInfo {
682 receiver: sender,
683 request_id: info.request_id,
684 version: info.version,
685 receiver_actor: format!(
686 "/user/node/distributor_{}",
687 subject_id
688 ),
689 };
690
691 if let Err(e) = self.network.send_command(network::CommandHelper::SendMessage {
692 message: NetworkMessage {
693 info: new_info,
694 message: ActorMessage::DistributionLedgerReq {
695 actual_sn: Some(last_sn),
696 subject_id: subject_id.clone(),
697 },
698 },
699 }).await {
700 error!(
701 msg_type = "LastEventDistribution",
702 subject_id = %subject_id,
703 last_sn = last_sn,
704 error = %e,
705 "Failed to request ledger from network"
706 );
707 return Err(emit_fail(ctx, e).await);
708 };
709
710 let i_new_owner = if let Some(new_owner) = new_owner
711 {
712 new_owner == *self.our_key
713 } else {
714 false
715 };
716
717 if !is_gov
718 && owner != *self.our_key
719 && !i_new_owner
720 && let Err(e) =
721 self.down_tracker(ctx, &subject_id).await
722 {
723 error!(
724 msg_type = "LastEventDistribution",
725 subject_id = %subject_id,
726 error = %e,
727 "Failed to stop tracker after ledger request"
728 );
729 return Err(e);
730 }
731
732 return Ok(());
733 }
734 Ok((.., owner, new_owner)) => (owner, new_owner),
735 Err(e) => {
736 if let ActorError::Functional { .. } = e.clone() {
737 warn!(
738 msg_type = "LastEventDistribution",
739 subject_id = %subject_id,
740 sn = sn,
741 error = %e,
742 "Failed to update subject ledger"
743 );
744 return Err(e);
745 } else {
746 error!(
747 msg_type = "LastEventDistribution",
748 subject_id = %subject_id,
749 sn = sn,
750 error = %e,
751 "Failed to update subject ledger"
752 );
753 return Err(emit_fail(ctx, e).await);
754 }
755 }
756 }
757 };
758
759 let new_info = ComunicateInfo {
760 receiver: sender.clone(),
761 receiver_actor: format!(
762 "/user/{}/{}",
763 info.request_id,
764 info.receiver.clone()
765 ),
766 request_id: info.request_id,
767 version: info.version,
768 };
769
770 if let Err(e) = self
771 .network
772 .send_command(network::CommandHelper::SendMessage {
773 message: NetworkMessage {
774 info: new_info,
775 message: ActorMessage::DistributionLastEventRes,
776 },
777 })
778 .await
779 {
780 error!(
781 msg_type = "LastEventDistribution",
782 subject_id = %subject_id,
783 sn = sn,
784 error = %e,
785 "Failed to send distribution acknowledgment"
786 );
787 return Err(emit_fail(ctx, e).await);
788 };
789
790 let i_new_owner = if let Some(ref new_owner) = new_owner {
791 *new_owner == *self.our_key
792 } else {
793 false
794 };
795
796 if !is_gov
797 && owner != *self.our_key
798 && !i_new_owner
799 && let Err(e) = self.down_tracker(ctx, &subject_id).await
800 {
801 error!(
802 msg_type = "LastEventDistribution",
803 subject_id = %subject_id,
804 error = %e,
805 "Failed to stop tracker after processing"
806 );
807 return Err(e);
808 }
809
810 debug!(
811 msg_type = "LastEventDistribution",
812 subject_id = %subject_id,
813 sn = sn,
814 sender = %sender,
815 is_gov = is_gov,
816 "Last event distribution processed successfully"
817 );
818 }
819 DistriWorkerMessage::LedgerDistribution {
820 mut ledger,
821 is_all,
822 info,
823 sender,
824 } => {
825 if ledger.is_empty() {
826 warn!(
827 msg_type = "LedgerDistribution",
828 sender = %sender,
829 "Received empty ledger distribution"
830 );
831 return Err(DistributorError::EmptyEvents.into());
832 }
833
834 let subject_id = ledger[0].content().get_subject_id();
835 let ledger_count = ledger.len();
836 let first_sn = ledger[0].content().sn;
837
838 let (is_gov, is_register) = match self
839 .check_auth(
840 ctx,
841 sender.clone(),
842 ledger[0].content().clone(),
843 )
844 .await
845 {
846 Ok(data) => data,
847 Err(e) => {
848 if let ActorError::Functional { .. } = e {
849 warn!(
850 msg_type = "LedgerDistribution",
851 subject_id = %subject_id,
852 sender = %sender,
853 ledger_count = ledger_count,
854 error = %e,
855 "Authorization check failed"
856 );
857 return Err(e);
858 } else {
859 error!(
860 msg_type = "LedgerDistribution",
861 subject_id = %subject_id,
862 sender = %sender,
863 ledger_count = ledger_count,
864 error = %e,
865 "Authorization check failed"
866 );
867 return Err(emit_fail(ctx, e).await);
868 }
869 }
870 };
871
872 let (i_owner, i_new_owner) = if ledger[0]
873 .content()
874 .event_request
875 .content()
876 .is_create_event()
877 && !is_register
878 {
879 if let Err(e) = create_subject(ctx, ledger[0].clone()).await
880 {
881 if let ActorError::Functional { .. } = e {
882 warn!(
883 msg_type = "LedgerDistribution",
884 subject_id = %subject_id,
885 error = %e,
886 "Failed to create subject from ledger"
887 );
888 return Err(e);
889 } else {
890 error!(
891 msg_type = "LedgerDistribution",
892 subject_id = %subject_id,
893 error = %e,
894 "Failed to create subject from ledger"
895 );
896 return Err(emit_fail(ctx, e).await);
897 }
898 };
899
900 let event = ledger.remove(0);
901 (event.signature().signer == *self.our_key, false)
902 } else {
903 if ledger[0]
905 .content()
906 .event_request
907 .content()
908 .is_create_event()
909 && is_register
910 {
911 let _event = ledger.remove(0);
912 }
913
914 let (i_owner, i_new_owner) =
915 match i_owner_new_owner(ctx, &subject_id).await {
916 Ok(res) => res,
917 Err(e) => {
918 error!(
919 msg_type = "LedgerDistribution",
920 subject_id = %subject_id,
921 error = %e,
922 "Failed to check owner status"
923 );
924 return Err(emit_fail(ctx, e).await);
925 }
926 };
927
928 let i_new_owner = i_new_owner.unwrap_or_default();
929 if !i_new_owner
930 && !i_owner
931 && !is_gov
932 && let Err(e) =
933 Self::up_tracker(ctx, &subject_id, false).await
934 {
935 error!(
936 msg_type = "LedgerDistribution",
937 subject_id = %subject_id,
938 error = %e,
939 "Failed to bring up tracker for witness subject"
940 );
941 let error = DistributorError::UpTrackerFailed {
942 details: e.to_string(),
943 };
944 return Err(emit_fail(ctx, error.into()).await);
945 }
946
947 (i_owner, i_new_owner)
948 };
949
950 let (i_owner, i_new_owner) = if !ledger.is_empty() {
951 match update_ledger(ctx, &subject_id, ledger).await {
952 Ok((last_sn, owner, new_owner)) => {
953 let i_new_owner = if let Some(new_owner) = new_owner
954 {
955 new_owner == *self.our_key
956 } else {
957 false
958 };
959
960 if !is_all {
961 debug!(
962 msg_type = "LedgerDistribution",
963 subject_id = %subject_id,
964 last_sn = last_sn,
965 "Partial ledger received, requesting more"
966 );
967
968 let new_info = ComunicateInfo {
969 receiver: sender.clone(),
970 request_id: info.request_id,
971 version: info.version,
972 receiver_actor: format!(
973 "/user/node/distributor_{}",
974 subject_id
975 ),
976 };
977
978 if let Err(e) = self
979 .network
980 .send_command(network::CommandHelper::SendMessage {
981 message: NetworkMessage {
982 info: new_info,
983 message: ActorMessage::DistributionLedgerReq {
984 actual_sn: Some(last_sn),
985 subject_id: subject_id.clone(),
986 },
987 },
988 })
989 .await
990 {
991 error!(
992 msg_type = "LedgerDistribution",
993 subject_id = %subject_id,
994 last_sn = last_sn,
995 error = %e,
996 "Failed to request more ledger entries"
997 );
998 return Err(emit_fail(ctx, e).await);
999 };
1000 }
1001
1002 (owner == *self.our_key, i_new_owner)
1003 }
1004 Err(e) => {
1005 if let ActorError::Functional { .. } = e.clone() {
1006 warn!(
1007 msg_type = "LedgerDistribution",
1008 subject_id = %subject_id,
1009 first_sn = first_sn,
1010 ledger_count = ledger_count,
1011 error = %e,
1012 "Failed to update subject ledger"
1013 );
1014 return Err(e);
1015 } else {
1016 error!(
1017 msg_type = "LedgerDistribution",
1018 subject_id = %subject_id,
1019 first_sn = first_sn,
1020 ledger_count = ledger_count,
1021 error = %e,
1022 "Failed to update subject ledger"
1023 );
1024 return Err(emit_fail(ctx, e).await);
1025 }
1026 }
1027 }
1028 } else {
1029 (i_owner, i_new_owner)
1030 };
1031
1032 if !is_gov
1033 && !i_owner
1034 && !i_new_owner
1035 && let Err(e) = self.down_tracker(ctx, &subject_id).await
1036 {
1037 error!(
1038 msg_type = "LedgerDistribution",
1039 subject_id = %subject_id,
1040 error = %e,
1041 "Failed to stop tracker after processing"
1042 );
1043 return Err(e);
1044 }
1045
1046 debug!(
1047 msg_type = "LedgerDistribution",
1048 subject_id = %subject_id,
1049 sender = %sender,
1050 ledger_count = ledger_count,
1051 is_all = is_all,
1052 is_gov = is_gov,
1053 "Ledger distribution processed successfully"
1054 );
1055 }
1056 };
1057
1058 Ok(())
1059 }
1060}