1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 SchemaType,
10 identity::{DigestIdentifier, PublicKey},
11};
12use ave_network::ComunicateInfo;
13
14use crate::{
15 ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
16 governance::{
17 Governance, GovernanceMessage, GovernanceResponse,
18 model::{HashThisRole, RoleTypes},
19 witnesses_register::{TrackerDeliveryMode, TrackerDeliveryRange},
20 },
21 helpers::network::service::NetworkSender,
22 model::{
23 common::{
24 check_subject_creation, check_witness_access, emit_fail,
25 node::get_subject_data,
26 subject::{
27 acquire_subject, create_subject, get_gov, get_gov_sn,
28 get_tracker_window as resolve_tracker_window, update_ledger,
29 },
30 },
31 event::Ledger,
32 },
33 node::SubjectData,
34 tracker::{Tracker, TrackerMessage, TrackerResponse},
35 update::{UpdateSubjectKind, UpdateWitnessOffer},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43 pub our_key: Arc<PublicKey>,
44 pub network: Arc<NetworkSender>,
45 pub ledger_batch_size: u64,
46}
47
48impl DistriWorker {
49 fn requester_id(
50 kind: &str,
51 subject_id: &DigestIdentifier,
52 info: &ComunicateInfo,
53 sender: &PublicKey,
54 ) -> String {
55 format!(
56 "{kind}:{subject_id}:{sender}:{}:{}",
57 info.request_id, info.version
58 )
59 }
60
61 async fn get_ledger(
62 &self,
63 ctx: &mut ActorContext<Self>,
64 subject_id: &DigestIdentifier,
65 hi_sn: u64,
66 lo_sn: Option<u64>,
67 is_gov: bool,
68 ) -> Result<(Vec<Ledger>, bool), ActorError> {
69 let path = ActorPath::from(format!(
70 "/user/node/subject_manager/{}",
71 subject_id
72 ));
73
74 if is_gov {
75 let governance_actor =
76 ctx.system().get_actor::<Governance>(&path).await?;
77
78 let response = governance_actor
79 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
80 .await?;
81
82 match response {
83 GovernanceResponse::Ledger { ledger, is_all } => {
84 Ok((ledger, is_all))
85 }
86 _ => Err(ActorError::UnexpectedResponse {
87 expected: "GovernanceResponse::Ledger".to_owned(),
88 path,
89 }),
90 }
91 } else {
92 let lease = acquire_subject(
93 ctx,
94 subject_id,
95 format!("send_distribution:{subject_id}"),
96 None,
97 true,
98 )
99 .await?;
100 let tracker_actor =
101 ctx.system().get_actor::<Tracker>(&path).await?;
102 let response = tracker_actor
103 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
104 .await;
105 lease.finish(ctx).await?;
106 let response = response?;
107
108 match response {
109 TrackerResponse::Ledger { ledger, is_all } => {
110 Ok((ledger, is_all))
111 }
112 _ => Err(ActorError::UnexpectedResponse {
113 expected: "TrackerResponse::Ledger".to_owned(),
114 path,
115 }),
116 }
117 }
118 }
119
120 fn build_response_info(
121 &self,
122 sender: PublicKey,
123 info: &ComunicateInfo,
124 receiver_actor: String,
125 ) -> ComunicateInfo {
126 ComunicateInfo {
127 receiver: sender,
128 request_id: info.request_id.clone(),
129 version: info.version,
130 receiver_actor,
131 }
132 }
133
134 async fn send_network_message(
135 &self,
136 info: ComunicateInfo,
137 message: ActorMessage,
138 ) -> Result<(), ActorError> {
139 self.network
140 .send_command(ave_network::CommandHelper::SendMessage {
141 message: NetworkMessage { info, message },
142 })
143 .await
144 }
145
146 async fn send_no_offer_response(
147 &self,
148 info: &ComunicateInfo,
149 sender: PublicKey,
150 receiver_actor: String,
151 ) -> Result<(), ActorError> {
152 let new_info = self.build_response_info(sender, info, receiver_actor);
153 self.send_network_message(new_info, ActorMessage::UpdateNoOffer)
154 .await
155 }
156
157 async fn get_governance_version(
158 &self,
159 ctx: &mut ActorContext<Self>,
160 subject_id: &DigestIdentifier,
161 ) -> Result<u64, ActorError> {
162 let data = get_subject_data(ctx, subject_id).await?;
163 let Some(SubjectData::Governance { .. }) = data else {
164 return Err(DistributorError::SubjectNotFound.into());
165 };
166
167 let governance_path = ActorPath::from(format!(
168 "/user/node/subject_manager/{}",
169 subject_id
170 ));
171 let governance_actor = ctx
172 .system()
173 .get_actor::<Governance>(&governance_path)
174 .await?;
175 let response =
176 governance_actor.ask(GovernanceMessage::GetVersion).await?;
177 let GovernanceResponse::Version(version) = response else {
178 return Err(ActorError::UnexpectedResponse {
179 path: governance_path,
180 expected: "GovernanceResponse::Version".to_owned(),
181 });
182 };
183
184 Ok(version)
185 }
186
187 async fn authorized_subj(
188 &self,
189 ctx: &ActorContext<Self>,
190 subject_id: &DigestIdentifier,
191 ) -> Result<(bool, Option<SubjectData>), ActorError> {
192 let node_path = ActorPath::from("/user/node");
193 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
194
195 let response = node_actor
196 .ask(NodeMessage::AuthData(subject_id.to_owned()))
197 .await?;
198 match response {
199 NodeResponse::AuthData { auth, subject_data } => {
200 Ok((auth, subject_data))
201 }
202 _ => Err(ActorError::UnexpectedResponse {
203 expected: "NodeResponse::AuthData".to_owned(),
204 path: node_path,
205 }),
206 }
207 }
208
209 async fn check_auth(
210 &self,
211 ctx: &mut ActorContext<Self>,
212 sender: PublicKey,
213 info: &ComunicateInfo,
214 ledger: &Ledger,
215 ) -> Result<(bool, bool), ActorError> {
216 let subject_id = ledger.get_subject_id();
217 let (auth, subject_data) =
219 self.authorized_subj(ctx, &subject_id).await?;
220
221 let (schema_id, governance_id) = if let Some(ref data) = subject_data {
223 match data {
225 SubjectData::Tracker {
226 governance_id,
227 schema_id,
228 ..
229 } => (schema_id.clone(), Some(governance_id.clone())),
230 SubjectData::Governance { .. } => {
231 (SchemaType::Governance, None)
232 }
233 }
234 } else {
235 if let Some(create) = ledger.get_create_event() {
237 if !create.schema_id.is_gov() && create.governance_id.is_empty()
238 {
239 return Err(
240 DistributorError::MissingGovernanceIdInCreate {
241 subject_id: subject_id.clone(),
242 }
243 .into(),
244 );
245 }
246
247 let gov_id = if create.schema_id.is_gov() {
248 None
249 } else {
250 Some(create.governance_id.clone())
251 };
252
253 (create.schema_id, gov_id)
254 } else {
255 self.request_ledger_from_sender(
258 &subject_id,
259 sender.clone(),
260 info,
261 None,
262 )
263 .await?;
264 return Err(DistributorError::UpdatingSubject.into());
265 }
266 };
267
268 let is_gov = schema_id.is_gov();
269 if is_gov {
271 if !auth {
273 return Err(DistributorError::GovernanceNotAuthorized.into());
274 }
275 } else {
276 let Some(governance_id) = governance_id else {
278 error!(
279 subject_id = %subject_id,
280 "Tracker subject is missing governance_id during authorization check"
281 );
282 return Err(DistributorError::MissingGovernanceId {
283 subject_id: subject_id.clone(),
284 }
285 .into());
286 };
287 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
288 DistributorError::GetGovernanceFailed {
289 details: e.to_string(),
290 }
291 })?;
292
293 match gov.version.cmp(&ledger.gov_version) {
294 std::cmp::Ordering::Less => {
295 return Err(DistributorError::GovernanceVersionMismatch {
296 our_version: gov.version,
297 their_version: ledger.gov_version,
298 }
299 .into());
300 }
301 std::cmp::Ordering::Equal => {}
302 std::cmp::Ordering::Greater => {}
303 }
304 }
305
306 Ok((is_gov, subject_data.is_some()))
307 }
308
309 async fn get_tracker_window(
310 &self,
311 ctx: &mut ActorContext<Self>,
312 subject_id: &DigestIdentifier,
313 sender: PublicKey,
314 actual_sn: Option<u64>,
315 ) -> Result<(u64, Option<u64>, bool, Vec<TrackerDeliveryRange>), ActorError>
316 {
317 let data = get_subject_data(ctx, subject_id).await?;
318
319 let Some(SubjectData::Tracker {
320 governance_id,
321 schema_id,
322 namespace,
323 ..
324 }) = data
325 else {
326 return Err(DistributorError::SubjectNotFound.into());
327 };
328
329 let (sn, clear_sn, is_all, ranges) = resolve_tracker_window(
330 ctx,
331 &governance_id,
332 subject_id,
333 sender.clone(),
334 namespace.clone(),
335 schema_id.clone(),
336 actual_sn,
337 )
338 .await?;
339
340 let Some(sn) = sn else {
341 let witness_sn = check_witness_access(
342 ctx,
343 &governance_id,
344 subject_id,
345 sender,
346 namespace,
347 schema_id,
348 )
349 .await?;
350
351 return match (actual_sn, witness_sn) {
352 (Some(actual_sn), Some(witness_sn))
353 if actual_sn >= witness_sn =>
354 {
355 Err(DistributorError::ActualSnBiggerThanWitness {
356 actual_sn,
357 witness_sn,
358 }
359 .into())
360 }
361 _ => Err(DistributorError::SenderNoAccess.into()),
362 };
363 };
364
365 Ok((sn, clear_sn, is_all, ranges))
366 }
367
368 fn tracker_delivery_mode(
369 ranges: &[TrackerDeliveryRange],
370 sn: u64,
371 ) -> Option<TrackerDeliveryMode> {
372 ranges
373 .iter()
374 .find(|range| range.from_sn <= sn && sn <= range.to_sn)
375 .map(|range| range.mode.clone())
376 }
377
378 fn project_tracker_ledger(
379 ledger: Vec<Ledger>,
380 ranges: &[TrackerDeliveryRange],
381 ) -> Result<Vec<Ledger>, ActorError> {
382 let mut projected = Vec::with_capacity(ledger.len());
383
384 for event in ledger {
385 let Some(mode) = Self::tracker_delivery_mode(ranges, event.sn)
386 else {
387 return Err(ActorError::FunctionalCritical {
388 description: format!(
389 "Missing tracker delivery range for sn {}",
390 event.sn
391 ),
392 });
393 };
394
395 match mode {
396 TrackerDeliveryMode::Clear => projected.push(event),
397 TrackerDeliveryMode::Opaque => projected
398 .push(event.to_tracker_opaque().map_err(ActorError::from)?),
399 }
400 }
401
402 Ok(projected)
403 }
404
405 async fn check_witness(
406 &self,
407 ctx: &mut ActorContext<Self>,
408 subject_id: &DigestIdentifier,
409 sender: PublicKey,
410 ) -> Result<(u64, bool), ActorError> {
411 let data = get_subject_data(ctx, subject_id).await?;
412
413 let Some(data) = data else {
414 return Err(DistributorError::SubjectNotFound.into());
415 };
416
417 match data {
418 SubjectData::Tracker {
419 governance_id,
420 schema_id,
421 namespace,
422 ..
423 } => {
424 let Some(sn) = check_witness_access(
425 ctx,
426 &governance_id,
427 subject_id,
428 sender.clone(),
429 namespace,
430 schema_id,
431 )
432 .await?
433 else {
434 return Err(DistributorError::SenderNoAccess.into());
435 };
436
437 Ok((sn, false))
438 }
439 SubjectData::Governance { .. } => {
440 let gov = get_gov(ctx, subject_id).await.map_err(|e| {
441 DistributorError::GetGovernanceFailed {
442 details: e.to_string(),
443 }
444 })?;
445
446 if !gov.has_this_role(HashThisRole::Gov {
447 who: sender.clone(),
448 role: RoleTypes::Witness,
449 }) {
450 return Err(DistributorError::SenderNotMember {
451 sender: sender.to_string(),
452 }
453 .into());
454 }
455
456 Ok((get_gov_sn(ctx, subject_id).await?, true))
457 }
458 }
459 }
460
461 async fn build_last_sn_offer(
462 &self,
463 ctx: &mut ActorContext<Self>,
464 subject_id: &DigestIdentifier,
465 sender: PublicKey,
466 actual_sn: Option<u64>,
467 ) -> Result<UpdateWitnessOffer, ActorError> {
468 let data = get_subject_data(ctx, subject_id).await?;
469 let Some(data) = data else {
470 return Err(DistributorError::SubjectNotFound.into());
471 };
472
473 match data {
474 SubjectData::Tracker { .. } => {
475 let (sn, clear_sn, _, ranges) = self
476 .get_tracker_window(
477 ctx,
478 subject_id,
479 sender.clone(),
480 actual_sn,
481 )
482 .await?;
483 Ok(UpdateWitnessOffer {
484 kind: UpdateSubjectKind::Tracker,
485 sn,
486 clear_sn,
487 ranges,
488 })
489 }
490 SubjectData::Governance { .. } => {
491 let (sn, ..) =
492 self.check_witness(ctx, subject_id, sender.clone()).await?;
493 Ok(UpdateWitnessOffer {
494 kind: UpdateSubjectKind::Governance,
495 sn,
496 clear_sn: None,
497 ranges: Vec::new(),
498 })
499 }
500 }
501 }
502
503 async fn build_distribution_batch(
504 &self,
505 ctx: &mut ActorContext<Self>,
506 subject_id: &DigestIdentifier,
507 sender: PublicKey,
508 actual_sn: Option<u64>,
509 target_sn: Option<u64>,
510 ) -> Result<(Vec<Ledger>, bool, u64), ActorError> {
511 let data = get_subject_data(ctx, subject_id).await?;
512 let Some(data) = data else {
513 return Err(DistributorError::SubjectNotFound.into());
514 };
515
516 match data {
517 SubjectData::Tracker { .. } => {
518 let (window_sn, clear_sn, _, ranges) = self
519 .get_tracker_window(ctx, subject_id, sender, actual_sn)
520 .await?;
521
522 if let Some(actual_sn) = actual_sn
523 && actual_sn >= window_sn
524 {
525 return Err(DistributorError::ActualSnBiggerThanWitness {
526 actual_sn,
527 witness_sn: window_sn,
528 }
529 .into());
530 }
531
532 let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
533 let batch_hi_sn = from_sn
534 .saturating_add(self.ledger_batch_size)
535 .saturating_sub(1)
536 .min(window_sn);
537 let preferred_hi_sn = clear_sn
538 .filter(|clear_sn| {
539 actual_sn.is_none_or(|actual_sn| *clear_sn > actual_sn)
540 })
541 .unwrap_or(window_sn);
542 let preferred_hi_sn = if from_sn == 0
543 && preferred_hi_sn == 0
544 && window_sn > 0
545 {
546 window_sn
547 } else {
548 preferred_hi_sn
549 };
550 let hi_sn = target_sn
551 .unwrap_or(preferred_hi_sn)
552 .min(preferred_hi_sn)
553 .min(batch_hi_sn);
554
555 let (ledger, raw_is_all) = self
556 .get_ledger(ctx, subject_id, hi_sn, actual_sn, false)
557 .await?;
558
559 let ledger = Self::project_tracker_ledger(ledger, &ranges)?;
560 let is_all = raw_is_all && hi_sn == window_sn;
561 Ok((ledger, is_all, hi_sn))
562 }
563 SubjectData::Governance { .. } => {
564 let (witness_hi_sn, ..) =
565 self.check_witness(ctx, subject_id, sender).await?;
566
567 if let Some(actual_sn) = actual_sn
568 && actual_sn >= witness_hi_sn
569 {
570 return Err(DistributorError::ActualSnBiggerThanWitness {
571 actual_sn,
572 witness_sn: witness_hi_sn,
573 }
574 .into());
575 }
576
577 let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
578 let batch_hi_sn = from_sn
579 .saturating_add(self.ledger_batch_size)
580 .saturating_sub(1)
581 .min(witness_hi_sn);
582 let batch_hi_sn =
583 target_sn.unwrap_or(batch_hi_sn).min(batch_hi_sn);
584
585 let (ledger, raw_is_all) = self
586 .get_ledger(ctx, subject_id, batch_hi_sn, actual_sn, true)
587 .await?;
588
589 let is_all = raw_is_all && batch_hi_sn == witness_hi_sn;
590 Ok((ledger, is_all, batch_hi_sn))
591 }
592 }
593 }
594
595 async fn request_ledger_from_sender(
596 &self,
597 subject_id: &DigestIdentifier,
598 sender: PublicKey,
599 info: &ComunicateInfo,
600 actual_sn: Option<u64>,
601 ) -> Result<(), ActorError> {
602 let new_info = self.build_response_info(
603 sender,
604 info,
605 format!("/user/node/distributor_{}", subject_id),
606 );
607
608 self.send_network_message(
609 new_info,
610 ActorMessage::DistributionLedgerReq {
611 actual_sn,
612 target_sn: None,
613 subject_id: subject_id.clone(),
614 },
615 )
616 .await
617 }
618
619 async fn handle_get_last_sn(
620 &self,
621 ctx: &mut ActorContext<Self>,
622 subject_id: DigestIdentifier,
623 actual_sn: Option<u64>,
624 info: ComunicateInfo,
625 sender: PublicKey,
626 receiver_actor: String,
627 ) -> Result<(), ActorError> {
628 let offer = self
629 .build_last_sn_offer(ctx, &subject_id, sender.clone(), actual_sn)
630 .await?;
631 let new_info =
632 self.build_response_info(sender.clone(), &info, receiver_actor);
633
634 self.send_network_message(
635 new_info,
636 ActorMessage::UpdateOffer {
637 offer: offer.clone(),
638 },
639 )
640 .await?;
641
642 debug!(
643 msg_type = "GetLastSn",
644 subject_id = %subject_id,
645 sn = offer.sn,
646 clear_sn = ?offer.clear_sn,
647 sender = %sender,
648 "Last SN response sent successfully"
649 );
650
651 Ok(())
652 }
653
654 async fn handle_get_governance_version(
655 &self,
656 ctx: &mut ActorContext<Self>,
657 subject_id: DigestIdentifier,
658 info: ComunicateInfo,
659 sender: PublicKey,
660 receiver_actor: String,
661 ) -> Result<(), ActorError> {
662 let version = self.get_governance_version(ctx, &subject_id).await?;
663 let new_info =
664 self.build_response_info(sender.clone(), &info, receiver_actor);
665
666 self.send_network_message(
667 new_info,
668 ActorMessage::GovernanceVersionRes { version },
669 )
670 .await?;
671
672 Ok(())
673 }
674
675 async fn handle_send_distribution(
676 &self,
677 ctx: &mut ActorContext<Self>,
678 actual_sn: Option<u64>,
679 target_sn: Option<u64>,
680 info: ComunicateInfo,
681 subject_id: DigestIdentifier,
682 sender: PublicKey,
683 ) -> Result<(), ActorError> {
684 let (ledger, is_all, hi_sn) = self
685 .build_distribution_batch(
686 ctx,
687 &subject_id,
688 sender.clone(),
689 actual_sn,
690 target_sn,
691 )
692 .await?;
693
694 let new_info = self.build_response_info(
695 sender.clone(),
696 &info,
697 format!("/user/node/distributor_{}", subject_id),
698 );
699
700 self.send_network_message(
701 new_info,
702 ActorMessage::DistributionLedgerRes {
703 ledger: ledger.clone(),
704 is_all,
705 },
706 )
707 .await?;
708
709 debug!(
710 msg_type = "SendDistribution",
711 subject_id = %subject_id,
712 sender = %sender,
713 ledger_count = ledger.len(),
714 is_all = is_all,
715 hi_sn = hi_sn,
716 actual_sn = ?actual_sn,
717 "Ledger distribution sent successfully"
718 );
719
720 Ok(())
721 }
722}
723
724#[async_trait]
725impl Actor for DistriWorker {
726 type Event = ();
727 type Message = DistriWorkerMessage;
728 type Response = ();
729
730 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
731 parent_span.map_or_else(
732 || info_span!("DistriWorker", id),
733 |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
734 )
735 }
736}
737
738#[derive(Debug, Clone)]
739pub enum DistriWorkerMessage {
740 GetLastSn {
741 subject_id: DigestIdentifier,
742 actual_sn: Option<u64>,
743 info: ComunicateInfo,
744 sender: PublicKey,
745 receiver_actor: String,
746 },
747 GetGovernanceVersion {
748 subject_id: DigestIdentifier,
749 info: ComunicateInfo,
750 sender: PublicKey,
751 receiver_actor: String,
752 },
753 SendDistribution {
755 actual_sn: Option<u64>,
756 target_sn: Option<u64>,
757 subject_id: DigestIdentifier,
758 info: ComunicateInfo,
759 sender: PublicKey,
760 },
761 LastEventDistribution {
763 ledger: Box<Ledger>,
764 info: ComunicateInfo,
765 sender: PublicKey,
766 },
767 LedgerDistribution {
768 ledger: Vec<Ledger>,
769 is_all: bool,
770 info: ComunicateInfo,
771 sender: PublicKey,
772 },
773}
774
775impl Message for DistriWorkerMessage {}
776
777impl NotPersistentActor for DistriWorker {}
778
779#[async_trait]
780impl Handler<Self> for DistriWorker {
781 async fn handle_message(
782 &mut self,
783 _sender: ActorPath,
784 msg: DistriWorkerMessage,
785 ctx: &mut ActorContext<Self>,
786 ) -> Result<(), ActorError> {
787 match msg {
788 DistriWorkerMessage::GetLastSn {
789 subject_id,
790 actual_sn,
791 info,
792 sender,
793 receiver_actor,
794 } => match self
795 .handle_get_last_sn(
796 ctx,
797 subject_id.clone(),
798 actual_sn,
799 info.clone(),
800 sender.clone(),
801 receiver_actor.clone(),
802 )
803 .await
804 {
805 Ok(()) => {}
806 Err(e) => {
807 if let ActorError::Functional { .. } = e {
808 warn!(
809 msg_type = "GetLastSn",
810 subject_id = %subject_id,
811 sender = %sender,
812 error = %e,
813 "Witness check failed"
814 );
815 self.send_no_offer_response(
816 &info,
817 sender.clone(),
818 receiver_actor,
819 )
820 .await?;
821 return Ok(());
822 } else {
823 error!(
824 msg_type = "GetLastSn",
825 subject_id = %subject_id,
826 sender = %sender,
827 error = %e,
828 "Witness check failed"
829 );
830 return Err(emit_fail(ctx, e).await);
831 }
832 }
833 },
834 DistriWorkerMessage::GetGovernanceVersion {
835 subject_id,
836 info,
837 sender,
838 receiver_actor,
839 } => match self
840 .handle_get_governance_version(
841 ctx,
842 subject_id.clone(),
843 info,
844 sender.clone(),
845 receiver_actor,
846 )
847 .await
848 {
849 Ok(()) => {}
850 Err(e) => {
851 if let ActorError::Functional { .. } = e {
852 warn!(
853 msg_type = "GetGovernanceVersion",
854 subject_id = %subject_id,
855 sender = %sender,
856 error = %e,
857 "Subject is not a governance"
858 );
859 return Err(e);
860 } else {
861 error!(
862 msg_type = "GetGovernanceVersion",
863 subject_id = %subject_id,
864 sender = %sender,
865 error = %e,
866 "Failed to send governance version response to network"
867 );
868 return Err(emit_fail(ctx, e).await);
869 }
870 }
871 },
872 DistriWorkerMessage::SendDistribution {
873 actual_sn,
874 target_sn,
875 info,
876 subject_id,
877 sender,
878 } => match self
879 .handle_send_distribution(
880 ctx,
881 actual_sn,
882 target_sn,
883 info,
884 subject_id.clone(),
885 sender.clone(),
886 )
887 .await
888 {
889 Ok(()) => {}
890 Err(e) => {
891 if let ActorError::Functional { .. } = e {
892 warn!(
893 msg_type = "SendDistribution",
894 subject_id = %subject_id,
895 sender = %sender,
896 error = %e,
897 "Witness check failed"
898 );
899 return Err(e);
900 } else {
901 error!(
902 msg_type = "SendDistribution",
903 subject_id = %subject_id,
904 sender = %sender,
905 error = %e,
906 "Failed to send ledger response to network"
907 );
908 return Err(emit_fail(ctx, e).await);
909 }
910 }
911 },
912 DistriWorkerMessage::LastEventDistribution {
913 ledger,
914 info,
915 sender,
916 } => {
917 let subject_id = ledger.get_subject_id();
918 let sn = ledger.sn;
919
920 let (is_gov, ..) =
921 match self
922 .check_auth(ctx, sender.clone(), &info, &ledger)
923 .await
924 {
925 Ok(is_gov) => is_gov,
926 Err(e) => {
927 if let ActorError::Functional { .. } = e {
928 warn!(
929 msg_type = "LastEventDistribution",
930 subject_id = %subject_id,
931 sn = sn,
932 sender = %sender,
933 error = %e,
934 "Authorization check failed"
935 );
936 return Err(e);
937 } else {
938 error!(
939 msg_type = "LastEventDistribution",
940 subject_id = %subject_id,
941 sn = sn,
942 sender = %sender,
943 error = %e,
944 "Authorization check failed"
945 );
946 return Err(emit_fail(ctx, e).await);
947 }
948 }
949 };
950
951 let lease = if ledger.is_create_event() {
952 if let Err(e) = create_subject(ctx, *ledger.clone()).await {
953 if let ActorError::Functional { .. } = e {
954 warn!(
955 msg_type = "LastEventDistribution",
956 subject_id = %subject_id,
957 sn = sn,
958 error = %e,
959 "Failed to create subject from create event"
960 );
961 return Err(e);
962 } else {
963 error!(
964 msg_type = "LastEventDistribution",
965 subject_id = %subject_id,
966 sn = sn,
967 error = %e,
968 "Failed to create subject from create event"
969 );
970 return Err(emit_fail(ctx, e).await);
971 }
972 };
973
974 None
975 } else {
976 let requester = Self::requester_id(
977 "last_event_distribution",
978 &subject_id,
979 &info,
980 &sender,
981 );
982 let lease = if !is_gov {
983 match acquire_subject(
984 ctx,
985 &subject_id,
986 requester.clone(),
987 None,
988 true,
989 )
990 .await
991 {
992 Ok(lease) => Some(lease),
993 Err(e) => {
994 error!(
995 msg_type = "LastEventDistribution",
996 subject_id = %subject_id,
997 error = %e,
998 "Failed to bring up tracker for subject update"
999 );
1000 let error = DistributorError::UpTrackerFailed {
1001 details: e.to_string(),
1002 };
1003 return Err(emit_fail(ctx, error.into()).await);
1004 }
1005 }
1006 } else {
1007 None
1008 };
1009
1010 let update_result =
1011 update_ledger(ctx, &subject_id, vec![*ledger.clone()])
1012 .await;
1013
1014 if let Some(lease) = lease.clone()
1015 && update_result.is_err()
1016 {
1017 lease.finish(ctx).await?;
1018 }
1019
1020 match update_result {
1021 Ok((last_sn, _, _)) if last_sn < ledger.sn => {
1022 debug!(
1023 msg_type = "LastEventDistribution",
1024 subject_id = %subject_id,
1025 last_sn = last_sn,
1026 received_sn = sn,
1027 "SN gap detected, requesting update"
1028 );
1029
1030 if let Err(e) = self
1031 .request_ledger_from_sender(
1032 &subject_id,
1033 sender.clone(),
1034 &info,
1035 Some(last_sn),
1036 )
1037 .await
1038 {
1039 error!(
1040 msg_type = "LastEventDistribution",
1041 subject_id = %subject_id,
1042 last_sn = last_sn,
1043 error = %e,
1044 "Failed to request ledger from network"
1045 );
1046 return Err(emit_fail(ctx, e).await);
1047 }
1048
1049 if let Some(lease) = lease.clone() {
1050 lease.finish(ctx).await?;
1051 }
1052
1053 return Ok(());
1054 }
1055 Ok((..)) => lease,
1056 Err(e) => {
1057 if let ActorError::Functional { .. } = e.clone() {
1058 warn!(
1059 msg_type = "LastEventDistribution",
1060 subject_id = %subject_id,
1061 sn = sn,
1062 error = %e,
1063 "Failed to update subject ledger"
1064 );
1065 return Err(e);
1066 } else {
1067 error!(
1068 msg_type = "LastEventDistribution",
1069 subject_id = %subject_id,
1070 sn = sn,
1071 error = %e,
1072 "Failed to update subject ledger"
1073 );
1074 return Err(emit_fail(ctx, e).await);
1075 }
1076 }
1077 }
1078 };
1079
1080 let new_info = self.build_response_info(
1081 sender.clone(),
1082 &info,
1083 format!(
1084 "/user/{}/{}",
1085 info.request_id,
1086 info.receiver.clone()
1087 ),
1088 );
1089
1090 if let Err(e) = self
1091 .send_network_message(
1092 new_info,
1093 ActorMessage::DistributionLastEventRes,
1094 )
1095 .await
1096 {
1097 error!(
1098 msg_type = "LastEventDistribution",
1099 subject_id = %subject_id,
1100 sn = sn,
1101 error = %e,
1102 "Failed to send distribution acknowledgment"
1103 );
1104 return Err(emit_fail(ctx, e).await);
1105 };
1106
1107 if let Some(lease) = lease {
1108 lease.finish(ctx).await?;
1109 }
1110
1111 debug!(
1112 msg_type = "LastEventDistribution",
1113 subject_id = %subject_id,
1114 sn = sn,
1115 sender = %sender,
1116 is_gov = is_gov,
1117 "Last event distribution processed successfully"
1118 );
1119 }
1120 DistriWorkerMessage::LedgerDistribution {
1121 mut ledger,
1122 is_all,
1123 info,
1124 sender,
1125 } => {
1126 if ledger.is_empty() {
1127 warn!(
1128 msg_type = "LedgerDistribution",
1129 sender = %sender,
1130 "Received empty ledger distribution"
1131 );
1132 return Err(DistributorError::EmptyEvents.into());
1133 }
1134
1135 let subject_id = ledger[0].get_subject_id();
1136 let ledger_count = ledger.len();
1137 let first_sn = ledger[0].sn;
1138 let (is_gov, is_register) = match self
1139 .check_auth(ctx, sender.clone(), &info, &ledger[0])
1140 .await
1141 {
1142 Ok(data) => data,
1143 Err(e) => {
1144 if let ActorError::Functional { .. } = e {
1145 warn!(
1146 msg_type = "LedgerDistribution",
1147 subject_id = %subject_id,
1148 sender = %sender,
1149 ledger_count = ledger_count,
1150 error = %e,
1151 "Authorization check failed"
1152 );
1153 return Err(e);
1154 } else {
1155 error!(
1156 msg_type = "LedgerDistribution",
1157 subject_id = %subject_id,
1158 sender = %sender,
1159 ledger_count = ledger_count,
1160 error = %e,
1161 "Authorization check failed"
1162 );
1163 return Err(emit_fail(ctx, e).await);
1164 }
1165 }
1166 };
1167
1168 let lease = if ledger[0].is_create_event() && !is_register {
1169 let create_ledger = ledger[0].clone();
1170 let requester = Self::requester_id(
1171 "ledger_distribution_create",
1172 &subject_id,
1173 &info,
1174 &sender,
1175 );
1176
1177 let lease = if is_gov {
1178 if let Err(e) =
1179 create_subject(ctx, create_ledger.clone()).await
1180 {
1181 if let ActorError::Functional { .. } = e {
1182 warn!(
1183 msg_type = "LedgerDistribution",
1184 subject_id = %subject_id,
1185 error = %e,
1186 "Failed to create subject from ledger"
1187 );
1188 return Err(e);
1189 } else {
1190 error!(
1191 msg_type = "LedgerDistribution",
1192 subject_id = %subject_id,
1193 error = %e,
1194 "Failed to create subject from ledger"
1195 );
1196 return Err(emit_fail(ctx, e).await);
1197 }
1198 };
1199 None
1200 } else {
1201 let request = create_ledger
1202 .get_create_event()
1203 .ok_or_else(|| {
1204 error!(
1205 msg_type = "LedgerDistribution",
1206 subject_id = %subject_id,
1207 "Create ledger is missing create event payload"
1208 );
1209 DistributorError::MissingCreateEventInCreateLedger {
1210 subject_id: subject_id.clone(),
1211 }
1212 })?;
1213
1214 if let Err(e) = check_subject_creation(
1215 ctx,
1216 &request.governance_id,
1217 create_ledger.ledger_seal_signature.signer.clone(),
1218 create_ledger.gov_version,
1219 request.namespace.to_string(),
1220 request.schema_id,
1221 )
1222 .await
1223 {
1224 if let ActorError::Functional { .. } = e {
1225 warn!(
1226 msg_type = "LedgerDistribution",
1227 subject_id = %subject_id,
1228 error = %e,
1229 "Failed to validate subject creation from ledger"
1230 );
1231 return Err(e);
1232 } else {
1233 error!(
1234 msg_type = "LedgerDistribution",
1235 subject_id = %subject_id,
1236 error = %e,
1237 "Failed to validate subject creation from ledger"
1238 );
1239 return Err(emit_fail(ctx, e).await);
1240 }
1241 }
1242
1243 match acquire_subject(
1244 ctx,
1245 &subject_id,
1246 requester,
1247 Some(create_ledger),
1248 true,
1249 )
1250 .await
1251 {
1252 Ok(lease) => Some(lease),
1253 Err(e) => {
1254 if let ActorError::Functional { .. } = e {
1255 warn!(
1256 msg_type = "LedgerDistribution",
1257 subject_id = %subject_id,
1258 error = %e,
1259 "Failed to create subject from ledger"
1260 );
1261 return Err(e);
1262 } else {
1263 error!(
1264 msg_type = "LedgerDistribution",
1265 subject_id = %subject_id,
1266 error = %e,
1267 "Failed to create subject from ledger"
1268 );
1269 return Err(emit_fail(ctx, e).await);
1270 }
1271 }
1272 }
1273 };
1274
1275 let _event = ledger.remove(0);
1276 lease
1277 } else {
1278 if ledger[0].is_create_event() && is_register {
1279 let _event = ledger.remove(0);
1280 }
1281
1282 let requester = Self::requester_id(
1283 "ledger_distribution",
1284 &subject_id,
1285 &info,
1286 &sender,
1287 );
1288 if !ledger.is_empty() && !is_gov {
1289 match acquire_subject(
1290 ctx,
1291 &subject_id,
1292 requester.clone(),
1293 None,
1294 true,
1295 )
1296 .await
1297 {
1298 Ok(lease) => Some(lease),
1299 Err(e) => {
1300 error!(
1301 msg_type = "LedgerDistribution",
1302 subject_id = %subject_id,
1303 error = %e,
1304 "Failed to bring up tracker for subject update"
1305 );
1306 let error = DistributorError::UpTrackerFailed {
1307 details: e.to_string(),
1308 };
1309 return Err(emit_fail(ctx, error.into()).await);
1310 }
1311 }
1312 } else {
1313 None
1314 }
1315 };
1316
1317 let lease = if !ledger.is_empty() {
1318 let update_result =
1319 update_ledger(ctx, &subject_id, ledger).await;
1320
1321 if let Some(lease) = lease.clone()
1322 && update_result.is_err()
1323 {
1324 lease.finish(ctx).await?;
1325 }
1326
1327 match update_result {
1328 Ok((last_sn, _, _)) => {
1329 if !is_all {
1330 debug!(
1331 msg_type = "LedgerDistribution",
1332 subject_id = %subject_id,
1333 last_sn = last_sn,
1334 "Partial ledger received, requesting more"
1335 );
1336
1337 if let Err(e) = self
1338 .request_ledger_from_sender(
1339 &subject_id,
1340 sender.clone(),
1341 &info,
1342 Some(last_sn),
1343 )
1344 .await
1345 {
1346 error!(
1347 msg_type = "LedgerDistribution",
1348 subject_id = %subject_id,
1349 last_sn = last_sn,
1350 error = %e,
1351 "Failed to request more ledger entries"
1352 );
1353 return Err(emit_fail(ctx, e).await);
1354 };
1355 }
1356
1357 lease
1358 }
1359 Err(e) => {
1360 if let ActorError::Functional { .. } = e.clone() {
1361 warn!(
1362 msg_type = "LedgerDistribution",
1363 subject_id = %subject_id,
1364 first_sn = first_sn,
1365 ledger_count = ledger_count,
1366 error = %e,
1367 "Failed to update subject ledger"
1368 );
1369 return Err(e);
1370 } else {
1371 error!(
1372 msg_type = "LedgerDistribution",
1373 subject_id = %subject_id,
1374 first_sn = first_sn,
1375 ledger_count = ledger_count,
1376 error = %e,
1377 "Failed to update subject ledger"
1378 );
1379 return Err(emit_fail(ctx, e).await);
1380 }
1381 }
1382 }
1383 } else {
1384 lease
1385 };
1386
1387 if let Some(lease) = lease {
1388 lease.finish(ctx).await?;
1389 }
1390
1391 debug!(
1392 msg_type = "LedgerDistribution",
1393 subject_id = %subject_id,
1394 sender = %sender,
1395 ledger_count = ledger_count,
1396 is_all = is_all,
1397 is_gov = is_gov,
1398 "Ledger distribution processed successfully"
1399 );
1400 }
1401 };
1402
1403 Ok(())
1404 }
1405}