1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 Namespace, SchemaType,
10 identity::{DigestIdentifier, PublicKey},
11 request::EventRequest,
12};
13use network::ComunicateInfo;
14
15use crate::{
16 ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
17 governance::{
18 Governance, GovernanceMessage, GovernanceResponse,
19 model::{HashThisRole, RoleTypes},
20 },
21 helpers::network::service::NetworkSender,
22 model::{
23 common::{
24 check_subject_creation, check_witness_access, emit_fail,
25 node::{get_subject_data, try_to_update},
26 subject::{
27 acquire_subject, create_subject, get_gov, get_gov_sn,
28 update_ledger,
29 },
30 },
31 event::Ledger,
32 },
33 node::SubjectData,
34 subject::SignedLedger,
35 tracker::{Tracker, TrackerMessage, TrackerResponse},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43 pub our_key: Arc<PublicKey>,
44 pub network: Arc<NetworkSender>,
45}
46
47impl DistriWorker {
48 fn requester_id(
49 kind: &str,
50 subject_id: &DigestIdentifier,
51 info: &ComunicateInfo,
52 sender: &PublicKey,
53 ) -> String {
54 format!(
55 "{kind}:{subject_id}:{sender}:{}:{}",
56 info.request_id, info.version
57 )
58 }
59
60 async fn get_ledger(
61 &self,
62 ctx: &mut ActorContext<Self>,
63 subject_id: &DigestIdentifier,
64 hi_sn: u64,
65 lo_sn: Option<u64>,
66 is_gov: bool,
67 ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
68 let path = ActorPath::from(format!(
69 "/user/node/subject_manager/{}",
70 subject_id
71 ));
72
73 if is_gov {
74 let governance_actor =
75 ctx.system().get_actor::<Governance>(&path).await?;
76
77 let response = governance_actor
78 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
79 .await?;
80
81 match response {
82 GovernanceResponse::Ledger { ledger, is_all } => {
83 Ok((ledger, is_all))
84 }
85 _ => Err(ActorError::UnexpectedResponse {
86 expected: "GovernanceResponse::Ledger".to_owned(),
87 path,
88 }),
89 }
90 } else {
91 let lease = acquire_subject(
92 ctx,
93 subject_id,
94 format!("send_distribution:{subject_id}"),
95 None,
96 true,
97 )
98 .await?;
99 let tracker_actor =
100 ctx.system().get_actor::<Tracker>(&path).await?;
101 let response = tracker_actor
102 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
103 .await;
104 lease.finish(ctx).await?;
105 let response = response?;
106
107 match response {
108 TrackerResponse::Ledger { ledger, is_all } => {
109 Ok((ledger, is_all))
110 }
111 _ => Err(ActorError::UnexpectedResponse {
112 expected: "TrackerResponse::Ledger".to_owned(),
113 path,
114 }),
115 }
116 }
117 }
118
119 async fn authorized_subj(
120 &self,
121 ctx: &ActorContext<Self>,
122 subject_id: &DigestIdentifier,
123 ) -> Result<(bool, Option<SubjectData>), ActorError> {
124 let node_path = ActorPath::from("/user/node");
125 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
126
127 let response = node_actor
128 .ask(NodeMessage::AuthData(subject_id.to_owned()))
129 .await?;
130 match response {
131 NodeResponse::AuthData { auth, subject_data } => {
132 Ok((auth, subject_data))
133 }
134 _ => Err(ActorError::UnexpectedResponse {
135 expected: "NodeResponse::AuthData".to_owned(),
136 path: node_path,
137 }),
138 }
139 }
140
141 async fn check_auth(
142 &self,
143 ctx: &mut ActorContext<Self>,
144 signer: PublicKey,
145 ledger: Ledger,
146 ) -> Result<(bool, bool), ActorError> {
147 let subject_id = ledger.get_subject_id();
148 let (auth, subject_data) =
150 self.authorized_subj(ctx, &subject_id).await?;
151
152 let (schema_id, namespace, governance_id) = if let Some(ref data) =
154 subject_data
155 {
156 match data {
158 SubjectData::Tracker {
159 governance_id,
160 schema_id,
161 namespace,
162 ..
163 } => {
164 let namespace = Namespace::from(namespace.clone());
165 (schema_id.clone(), namespace, Some(governance_id.clone()))
166 }
167 SubjectData::Governance { .. } => {
168 (SchemaType::Governance, Namespace::new(), None)
169 }
170 }
171 } else {
172 if let EventRequest::Create(create) = ledger.event_request.content()
174 {
175 if !create.schema_id.is_gov() && create.governance_id.is_empty()
176 {
177 return Err(
178 DistributorError::MissingGovernanceIdInCreate {
179 subject_id: subject_id.clone(),
180 }
181 .into(),
182 );
183 }
184
185 let gov_id = if create.schema_id.is_gov() {
186 None
187 } else {
188 Some(create.governance_id.clone())
189 };
190
191 (create.schema_id.clone(), create.namespace.clone(), gov_id)
192 } else {
193 try_to_update(ctx, subject_id, Some(signer)).await?;
195 return Err(DistributorError::UpdatingSubject.into());
196 }
197 };
198
199 let is_gov = schema_id.is_gov();
200 if is_gov {
202 if !auth {
204 return Err(DistributorError::GovernanceNotAuthorized.into());
205 }
206 } else {
207 if !auth {
209 let governance_id =
210 governance_id.expect("governance_id is Some for Trackers");
211 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
212 DistributorError::GetGovernanceFailed {
213 details: e.to_string(),
214 }
215 })?;
216
217 match gov.version.cmp(&ledger.gov_version) {
218 std::cmp::Ordering::Less => {
219 return Err(
220 DistributorError::GovernanceVersionMismatch {
221 our_version: gov.version,
222 their_version: ledger.gov_version,
223 }
224 .into(),
225 );
226 }
227 std::cmp::Ordering::Equal => {}
228 std::cmp::Ordering::Greater => {}
229 };
230
231 if !gov.has_this_role(HashThisRole::SchemaWitness {
232 who: (*self.our_key).clone(),
233 creator: signer,
234 schema_id,
235 namespace,
236 }) {
237 return Err(DistributorError::NotWitness.into());
238 }
239 }
240 }
241
242 Ok((is_gov, subject_data.is_some()))
243 }
244
245 async fn check_witness(
246 &self,
247 ctx: &mut ActorContext<Self>,
248 subject_id: &DigestIdentifier,
249 sender: PublicKey,
250 ) -> Result<(u64, bool), ActorError> {
251 let data = get_subject_data(ctx, subject_id).await?;
252
253 let Some(data) = data else {
254 return Err(DistributorError::SubjectNotFound.into());
255 };
256
257 match data {
258 SubjectData::Tracker {
259 governance_id,
260 schema_id,
261 namespace,
262 ..
263 } => {
264 let Some(sn) = check_witness_access(
265 ctx,
266 &governance_id,
267 subject_id,
268 sender,
269 namespace,
270 schema_id,
271 )
272 .await?
273 else {
274 return Err(DistributorError::SenderNoAccess.into());
275 };
276
277 Ok((sn, false))
278 }
279 SubjectData::Governance { .. } => {
280 let gov = get_gov(ctx, subject_id).await.map_err(|e| {
281 DistributorError::GetGovernanceFailed {
282 details: e.to_string(),
283 }
284 })?;
285
286 if !gov.has_this_role(HashThisRole::Gov {
287 who: sender.clone(),
288 role: RoleTypes::Witness,
289 }) {
290 return Err(DistributorError::SenderNotMember {
291 sender: sender.to_string(),
292 }
293 .into());
294 }
295
296 Ok((get_gov_sn(ctx, subject_id).await?, true))
297 }
298 }
299 }
300}
301
302#[async_trait]
303impl Actor for DistriWorker {
304 type Event = ();
305 type Message = DistriWorkerMessage;
306 type Response = ();
307
308 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
309 parent_span.map_or_else(
310 || info_span!("DistriWorker", id),
311 |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
312 )
313 }
314}
315
316#[derive(Debug, Clone)]
317pub enum DistriWorkerMessage {
318 GetLastSn {
319 subject_id: DigestIdentifier,
320 info: ComunicateInfo,
321 sender: PublicKey,
322 receiver_actor: String,
323 },
324 GetGovernanceVersion {
325 subject_id: DigestIdentifier,
326 info: ComunicateInfo,
327 sender: PublicKey,
328 receiver_actor: String,
329 },
330 SendDistribution {
332 actual_sn: Option<u64>,
333 subject_id: DigestIdentifier,
334 info: ComunicateInfo,
335 sender: PublicKey,
336 },
337 LastEventDistribution {
339 ledger: Box<SignedLedger>,
340 info: ComunicateInfo,
341 sender: PublicKey,
342 },
343 LedgerDistribution {
344 ledger: Vec<SignedLedger>,
345 is_all: bool,
346 info: ComunicateInfo,
347 sender: PublicKey,
348 },
349}
350
351impl Message for DistriWorkerMessage {}
352
353impl NotPersistentActor for DistriWorker {}
354
355#[async_trait]
356impl Handler<Self> for DistriWorker {
357 async fn handle_message(
358 &mut self,
359 _sender: ActorPath,
360 msg: DistriWorkerMessage,
361 ctx: &mut ActorContext<Self>,
362 ) -> Result<(), ActorError> {
363 match msg {
364 DistriWorkerMessage::GetLastSn {
365 subject_id,
366 info,
367 sender,
368 receiver_actor,
369 } => {
370 let (sn, ..) = match self
371 .check_witness(ctx, &subject_id, sender.clone())
372 .await
373 {
374 Ok(sn) => sn,
375 Err(e) => {
376 if let ActorError::Functional { .. } = e {
377 warn!(
378 msg_type = "GetLastSn",
379 subject_id = %subject_id,
380 sender = %sender,
381 error = %e,
382 "Witness check failed"
383 );
384 return Err(e);
385 } else {
386 error!(
387 msg_type = "GetLastSn",
388 subject_id = %subject_id,
389 sender = %sender,
390 error = %e,
391 "Witness check failed"
392 );
393 return Err(emit_fail(ctx, e).await);
394 }
395 }
396 };
397
398 let new_info = ComunicateInfo {
399 receiver: sender.clone(),
400 request_id: info.request_id,
401 version: info.version,
402 receiver_actor,
403 };
404
405 if let Err(e) = self
406 .network
407 .send_command(network::CommandHelper::SendMessage {
408 message: NetworkMessage {
409 info: new_info,
410 message: ActorMessage::AuthLastSn { sn },
411 },
412 })
413 .await
414 {
415 error!(
416 msg_type = "GetLastSn",
417 subject_id = %subject_id,
418 sn = sn,
419 error = %e,
420 "Failed to send last SN response to network"
421 );
422 return Err(emit_fail(ctx, e).await);
423 };
424
425 debug!(
426 msg_type = "GetLastSn",
427 subject_id = %subject_id,
428 sn = sn,
429 sender = %sender,
430 "Last SN response sent successfully"
431 );
432 }
433 DistriWorkerMessage::GetGovernanceVersion {
434 subject_id,
435 info,
436 sender,
437 receiver_actor,
438 } => {
439 let witness_ok = self
440 .check_witness(ctx, &subject_id, sender.clone())
441 .await
442 .is_ok();
443 let auth_ok = if witness_ok {
444 true
445 } else {
446 let auth_path = ActorPath::from("/user/node/auth");
447 match ctx
448 .system()
449 .get_actor::<crate::auth::Auth>(&auth_path)
450 .await
451 {
452 Ok(auth_actor) => match auth_actor
453 .ask(crate::auth::AuthMessage::GetAuth {
454 subject_id: subject_id.clone(),
455 })
456 .await
457 {
458 Ok(crate::auth::AuthResponse::Witnesses(
459 witnesses,
460 )) => witnesses.contains(&sender),
461 _ => false,
462 },
463 Err(_) => false,
464 }
465 };
466
467 if !auth_ok {
468 return Err(DistributorError::SenderNoAccess.into());
469 }
470
471 let governance_path = ActorPath::from(format!(
472 "/user/node/subject_manager/{}",
473 subject_id
474 ));
475 let governance_actor = ctx
476 .system()
477 .get_actor::<Governance>(&governance_path)
478 .await?;
479 let response =
480 governance_actor.ask(GovernanceMessage::GetVersion).await?;
481 let GovernanceResponse::Version(version) = response else {
482 return Err(ActorError::UnexpectedResponse {
483 path: governance_path,
484 expected: "GovernanceResponse::Version".to_owned(),
485 });
486 };
487
488 let new_info = ComunicateInfo {
489 receiver: sender.clone(),
490 request_id: info.request_id,
491 version: info.version,
492 receiver_actor,
493 };
494
495 if let Err(e) = self
496 .network
497 .send_command(network::CommandHelper::SendMessage {
498 message: NetworkMessage {
499 info: new_info,
500 message: ActorMessage::GovernanceVersionRes {
501 version,
502 },
503 },
504 })
505 .await
506 {
507 return Err(emit_fail(ctx, e).await);
508 }
509 }
510 DistriWorkerMessage::SendDistribution {
511 actual_sn,
512 info,
513 subject_id,
514 sender,
515 } => {
516 let (hi_sn, is_gov) = match self
517 .check_witness(ctx, &subject_id, sender.clone())
518 .await
519 {
520 Ok(sn) => sn,
521 Err(e) => {
522 if let ActorError::Functional { .. } = e {
523 warn!(
524 msg_type = "SendDistribution",
525 subject_id = %subject_id,
526 sender = %sender,
527 error = %e,
528 "Witness check failed"
529 );
530 return Err(e);
531 } else {
532 error!(
533 msg_type = "SendDistribution",
534 subject_id = %subject_id,
535 sender = %sender,
536 error = %e,
537 "Witness check failed"
538 );
539 return Err(emit_fail(ctx, e).await);
540 }
541 }
542 };
543
544 if let Some(actual_sn) = actual_sn
545 && actual_sn >= hi_sn
546 {
547 warn!(
548 msg_type = "SendDistribution",
549 subject_id = %subject_id,
550 actual_sn = actual_sn,
551 witness_sn = hi_sn,
552 "Requester SN is >= witness SN, nothing to send"
553 );
554 return Err(DistributorError::ActualSnBiggerThanWitness {
555 actual_sn,
556 witness_sn: hi_sn,
557 }
558 .into());
559 };
560
561 let (ledger, is_all) = match self
562 .get_ledger(ctx, &subject_id, hi_sn, actual_sn, is_gov)
563 .await
564 {
565 Ok(res) => res,
566 Err(e) => {
567 error!(
568 msg_type = "SendDistribution",
569 subject_id = %subject_id,
570 hi_sn = hi_sn,
571 actual_sn = ?actual_sn,
572 is_gov = is_gov,
573 error = %e,
574 "Failed to obtain ledger"
575 );
576 return Err(emit_fail(ctx, e).await);
577 }
578 };
579
580 let new_info = ComunicateInfo {
581 receiver: sender.clone(),
582 request_id: info.request_id,
583 version: info.version,
584 receiver_actor: format!(
585 "/user/node/distributor_{}",
586 subject_id
587 ),
588 };
589
590 if let Err(e) = self
591 .network
592 .send_command(network::CommandHelper::SendMessage {
593 message: NetworkMessage {
594 info: new_info,
595 message: ActorMessage::DistributionLedgerRes {
596 ledger: ledger.clone(),
597 is_all,
598 },
599 },
600 })
601 .await
602 {
603 error!(
604 msg_type = "SendDistribution",
605 subject_id = %subject_id,
606 ledger_count = ledger.len(),
607 is_all = is_all,
608 error = %e,
609 "Failed to send ledger response to network"
610 );
611 return Err(emit_fail(ctx, e).await);
612 };
613
614 debug!(
615 msg_type = "SendDistribution",
616 subject_id = %subject_id,
617 sender = %sender,
618 ledger_count = ledger.len(),
619 is_all = is_all,
620 hi_sn = hi_sn,
621 actual_sn = ?actual_sn,
622 "Ledger distribution sent successfully"
623 );
624 }
625 DistriWorkerMessage::LastEventDistribution {
626 ledger,
627 info,
628 sender,
629 } => {
630 let subject_id = ledger.content().get_subject_id();
631 let sn = ledger.content().sn;
632
633 let (is_gov, ..) = match self
634 .check_auth(ctx, sender.clone(), ledger.content().clone())
635 .await
636 {
637 Ok(is_gov) => is_gov,
638 Err(e) => {
639 if let ActorError::Functional { .. } = e {
640 warn!(
641 msg_type = "LastEventDistribution",
642 subject_id = %subject_id,
643 sn = sn,
644 sender = %sender,
645 error = %e,
646 "Authorization check failed"
647 );
648 return Err(e);
649 } else {
650 error!(
651 msg_type = "LastEventDistribution",
652 subject_id = %subject_id,
653 sn = sn,
654 sender = %sender,
655 error = %e,
656 "Authorization check failed"
657 );
658 return Err(emit_fail(ctx, e).await);
659 }
660 }
661 };
662
663 let lease = if ledger
664 .content()
665 .event_request
666 .content()
667 .is_create_event()
668 {
669 if let Err(e) = create_subject(ctx, *ledger.clone()).await {
670 if let ActorError::Functional { .. } = e {
671 warn!(
672 msg_type = "LastEventDistribution",
673 subject_id = %subject_id,
674 sn = sn,
675 error = %e,
676 "Failed to create subject from create event"
677 );
678 return Err(e);
679 } else {
680 error!(
681 msg_type = "LastEventDistribution",
682 subject_id = %subject_id,
683 sn = sn,
684 error = %e,
685 "Failed to create subject from create event"
686 );
687 return Err(emit_fail(ctx, e).await);
688 }
689 };
690
691 None
692 } else {
693 let requester = Self::requester_id(
694 "last_event_distribution",
695 &subject_id,
696 &info,
697 &sender,
698 );
699 let lease = if !is_gov {
700 match acquire_subject(
701 ctx,
702 &subject_id,
703 requester.clone(),
704 None,
705 true,
706 )
707 .await
708 {
709 Ok(lease) => Some(lease),
710 Err(e) => {
711 error!(
712 msg_type = "LastEventDistribution",
713 subject_id = %subject_id,
714 error = %e,
715 "Failed to bring up tracker for subject update"
716 );
717 let error = DistributorError::UpTrackerFailed {
718 details: e.to_string(),
719 };
720 return Err(emit_fail(ctx, error.into()).await);
721 }
722 }
723 } else {
724 None
725 };
726
727 let update_result =
728 update_ledger(ctx, &subject_id, vec![*ledger.clone()])
729 .await;
730
731 if let Some(lease) = lease.clone()
732 && update_result.is_err()
733 {
734 lease.finish(ctx).await?;
735 }
736
737 match update_result {
738 Ok((last_sn, _, _))
739 if last_sn < ledger.content().sn =>
740 {
741 debug!(
742 msg_type = "LastEventDistribution",
743 subject_id = %subject_id,
744 last_sn = last_sn,
745 received_sn = sn,
746 "SN gap detected, requesting full ledger"
747 );
748
749 let new_info = ComunicateInfo {
750 receiver: sender,
751 request_id: info.request_id,
752 version: info.version,
753 receiver_actor: format!(
754 "/user/node/distributor_{}",
755 subject_id
756 ),
757 };
758
759 if let Err(e) = self.network.send_command(network::CommandHelper::SendMessage {
760 message: NetworkMessage {
761 info: new_info,
762 message: ActorMessage::DistributionLedgerReq {
763 actual_sn: Some(last_sn),
764 subject_id: subject_id.clone(),
765 },
766 },
767 }).await {
768 error!(
769 msg_type = "LastEventDistribution",
770 subject_id = %subject_id,
771 last_sn = last_sn,
772 error = %e,
773 "Failed to request ledger from network"
774 );
775 return Err(emit_fail(ctx, e).await);
776 };
777
778 if let Some(lease) = lease.clone() {
779 lease.finish(ctx).await?;
780 }
781
782 return Ok(());
783 }
784 Ok((..)) => lease,
785 Err(e) => {
786 if let ActorError::Functional { .. } = e.clone() {
787 warn!(
788 msg_type = "LastEventDistribution",
789 subject_id = %subject_id,
790 sn = sn,
791 error = %e,
792 "Failed to update subject ledger"
793 );
794 return Err(e);
795 } else {
796 error!(
797 msg_type = "LastEventDistribution",
798 subject_id = %subject_id,
799 sn = sn,
800 error = %e,
801 "Failed to update subject ledger"
802 );
803 return Err(emit_fail(ctx, e).await);
804 }
805 }
806 }
807 };
808
809 let new_info = ComunicateInfo {
810 receiver: sender.clone(),
811 receiver_actor: format!(
812 "/user/{}/{}",
813 info.request_id,
814 info.receiver.clone()
815 ),
816 request_id: info.request_id.clone(),
817 version: info.version,
818 };
819
820 if let Err(e) = self
821 .network
822 .send_command(network::CommandHelper::SendMessage {
823 message: NetworkMessage {
824 info: new_info,
825 message: ActorMessage::DistributionLastEventRes,
826 },
827 })
828 .await
829 {
830 error!(
831 msg_type = "LastEventDistribution",
832 subject_id = %subject_id,
833 sn = sn,
834 error = %e,
835 "Failed to send distribution acknowledgment"
836 );
837 return Err(emit_fail(ctx, e).await);
838 };
839
840 if let Some(lease) = lease {
841 lease.finish(ctx).await?;
842 }
843
844 debug!(
845 msg_type = "LastEventDistribution",
846 subject_id = %subject_id,
847 sn = sn,
848 sender = %sender,
849 is_gov = is_gov,
850 "Last event distribution processed successfully"
851 );
852 }
853 DistriWorkerMessage::LedgerDistribution {
854 mut ledger,
855 is_all,
856 info,
857 sender,
858 } => {
859 if ledger.is_empty() {
860 warn!(
861 msg_type = "LedgerDistribution",
862 sender = %sender,
863 "Received empty ledger distribution"
864 );
865 return Err(DistributorError::EmptyEvents.into());
866 }
867
868 let subject_id = ledger[0].content().get_subject_id();
869 let ledger_count = ledger.len();
870 let first_sn = ledger[0].content().sn;
871
872 let (is_gov, is_register) = match self
873 .check_auth(
874 ctx,
875 sender.clone(),
876 ledger[0].content().clone(),
877 )
878 .await
879 {
880 Ok(data) => data,
881 Err(e) => {
882 if let ActorError::Functional { .. } = e {
883 warn!(
884 msg_type = "LedgerDistribution",
885 subject_id = %subject_id,
886 sender = %sender,
887 ledger_count = ledger_count,
888 error = %e,
889 "Authorization check failed"
890 );
891 return Err(e);
892 } else {
893 error!(
894 msg_type = "LedgerDistribution",
895 subject_id = %subject_id,
896 sender = %sender,
897 ledger_count = ledger_count,
898 error = %e,
899 "Authorization check failed"
900 );
901 return Err(emit_fail(ctx, e).await);
902 }
903 }
904 };
905
906 let lease = if ledger[0]
907 .content()
908 .event_request
909 .content()
910 .is_create_event()
911 && !is_register
912 {
913 let create_ledger = ledger[0].clone();
914 let requester = Self::requester_id(
915 "ledger_distribution_create",
916 &subject_id,
917 &info,
918 &sender,
919 );
920
921 let lease = if is_gov {
922 if let Err(e) =
923 create_subject(ctx, create_ledger.clone()).await
924 {
925 if let ActorError::Functional { .. } = e {
926 warn!(
927 msg_type = "LedgerDistribution",
928 subject_id = %subject_id,
929 error = %e,
930 "Failed to create subject from ledger"
931 );
932 return Err(e);
933 } else {
934 error!(
935 msg_type = "LedgerDistribution",
936 subject_id = %subject_id,
937 error = %e,
938 "Failed to create subject from ledger"
939 );
940 return Err(emit_fail(ctx, e).await);
941 }
942 };
943 None
944 } else {
945 let EventRequest::Create(request) = create_ledger
946 .content()
947 .event_request
948 .content()
949 .clone()
950 else {
951 return Err(DistributorError::EmptyEvents.into());
952 };
953
954 if let Err(e) = check_subject_creation(
955 ctx,
956 &request.governance_id,
957 create_ledger.signature().signer.clone(),
958 create_ledger.content().gov_version,
959 request.namespace.to_string(),
960 request.schema_id,
961 )
962 .await
963 {
964 if let ActorError::Functional { .. } = e {
965 warn!(
966 msg_type = "LedgerDistribution",
967 subject_id = %subject_id,
968 error = %e,
969 "Failed to validate subject creation from ledger"
970 );
971 return Err(e);
972 } else {
973 error!(
974 msg_type = "LedgerDistribution",
975 subject_id = %subject_id,
976 error = %e,
977 "Failed to validate subject creation from ledger"
978 );
979 return Err(emit_fail(ctx, e).await);
980 }
981 }
982
983 match acquire_subject(
984 ctx,
985 &subject_id,
986 requester,
987 Some(create_ledger),
988 true,
989 )
990 .await
991 {
992 Ok(lease) => Some(lease),
993 Err(e) => {
994 if let ActorError::Functional { .. } = e {
995 warn!(
996 msg_type = "LedgerDistribution",
997 subject_id = %subject_id,
998 error = %e,
999 "Failed to create subject from ledger"
1000 );
1001 return Err(e);
1002 } else {
1003 error!(
1004 msg_type = "LedgerDistribution",
1005 subject_id = %subject_id,
1006 error = %e,
1007 "Failed to create subject from ledger"
1008 );
1009 return Err(emit_fail(ctx, e).await);
1010 }
1011 }
1012 }
1013 };
1014
1015 let _event = ledger.remove(0);
1016 lease
1017 } else {
1018 if ledger[0]
1019 .content()
1020 .event_request
1021 .content()
1022 .is_create_event()
1023 && is_register
1024 {
1025 let _event = ledger.remove(0);
1026 }
1027
1028 let requester = Self::requester_id(
1029 "ledger_distribution",
1030 &subject_id,
1031 &info,
1032 &sender,
1033 );
1034 if !ledger.is_empty() && !is_gov {
1035 match acquire_subject(
1036 ctx,
1037 &subject_id,
1038 requester.clone(),
1039 None,
1040 true,
1041 )
1042 .await
1043 {
1044 Ok(lease) => Some(lease),
1045 Err(e) => {
1046 error!(
1047 msg_type = "LedgerDistribution",
1048 subject_id = %subject_id,
1049 error = %e,
1050 "Failed to bring up tracker for subject update"
1051 );
1052 let error = DistributorError::UpTrackerFailed {
1053 details: e.to_string(),
1054 };
1055 return Err(emit_fail(ctx, error.into()).await);
1056 }
1057 }
1058 } else {
1059 None
1060 }
1061 };
1062
1063 let lease = if !ledger.is_empty() {
1064 let update_result =
1065 update_ledger(ctx, &subject_id, ledger).await;
1066
1067 if let Some(lease) = lease.clone()
1068 && update_result.is_err()
1069 {
1070 lease.finish(ctx).await?;
1071 }
1072
1073 match update_result {
1074 Ok((last_sn, _, _)) => {
1075 if !is_all {
1076 debug!(
1077 msg_type = "LedgerDistribution",
1078 subject_id = %subject_id,
1079 last_sn = last_sn,
1080 "Partial ledger received, requesting more"
1081 );
1082
1083 let new_info = ComunicateInfo {
1084 receiver: sender.clone(),
1085 request_id: info.request_id.clone(),
1086 version: info.version,
1087 receiver_actor: format!(
1088 "/user/node/distributor_{}",
1089 subject_id
1090 ),
1091 };
1092
1093 if let Err(e) = self
1094 .network
1095 .send_command(network::CommandHelper::SendMessage {
1096 message: NetworkMessage {
1097 info: new_info,
1098 message: ActorMessage::DistributionLedgerReq {
1099 actual_sn: Some(last_sn),
1100 subject_id: subject_id.clone(),
1101 },
1102 },
1103 })
1104 .await
1105 {
1106 error!(
1107 msg_type = "LedgerDistribution",
1108 subject_id = %subject_id,
1109 last_sn = last_sn,
1110 error = %e,
1111 "Failed to request more ledger entries"
1112 );
1113 return Err(emit_fail(ctx, e).await);
1114 };
1115 }
1116
1117 lease
1118 }
1119 Err(e) => {
1120 if let ActorError::Functional { .. } = e.clone() {
1121 warn!(
1122 msg_type = "LedgerDistribution",
1123 subject_id = %subject_id,
1124 first_sn = first_sn,
1125 ledger_count = ledger_count,
1126 error = %e,
1127 "Failed to update subject ledger"
1128 );
1129 return Err(e);
1130 } else {
1131 error!(
1132 msg_type = "LedgerDistribution",
1133 subject_id = %subject_id,
1134 first_sn = first_sn,
1135 ledger_count = ledger_count,
1136 error = %e,
1137 "Failed to update subject ledger"
1138 );
1139 return Err(emit_fail(ctx, e).await);
1140 }
1141 }
1142 }
1143 } else {
1144 lease
1145 };
1146
1147 if let Some(lease) = lease {
1148 lease.finish(ctx).await?;
1149 }
1150
1151 debug!(
1152 msg_type = "LedgerDistribution",
1153 subject_id = %subject_id,
1154 sender = %sender,
1155 ledger_count = ledger_count,
1156 is_all = is_all,
1157 is_gov = is_gov,
1158 "Ledger distribution processed successfully"
1159 );
1160 }
1161 };
1162
1163 Ok(())
1164 }
1165}