1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5 Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6 NotPersistentActor,
7};
8use ave_common::{
9 SchemaType,
10 identity::{DigestIdentifier, PublicKey},
11};
12use ave_network::ComunicateInfo;
13
14use crate::{
15 ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
16 governance::{
17 Governance, GovernanceMessage, GovernanceResponse,
18 model::{HashThisRole, RoleTypes},
19 witnesses_register::{TrackerDeliveryMode, TrackerDeliveryRange},
20 },
21 helpers::network::service::NetworkSender,
22 model::{
23 common::{
24 check_subject_creation, check_witness_access, emit_fail,
25 node::get_subject_data,
26 subject::{
27 acquire_subject, create_subject, get_gov, get_gov_sn,
28 get_tracker_window as resolve_tracker_window, update_ledger,
29 },
30 },
31 event::Ledger,
32 },
33 node::SubjectData,
34 tracker::{Tracker, TrackerMessage, TrackerResponse},
35 update::{UpdateSubjectKind, UpdateWitnessOffer},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43 pub our_key: Arc<PublicKey>,
44 pub network: Arc<NetworkSender>,
45 pub ledger_batch_size: u64,
46}
47
48impl DistriWorker {
49 fn requester_id(
50 kind: &str,
51 subject_id: &DigestIdentifier,
52 info: &ComunicateInfo,
53 sender: &PublicKey,
54 ) -> String {
55 format!(
56 "{kind}:{subject_id}:{sender}:{}:{}",
57 info.request_id, info.version
58 )
59 }
60
61 async fn get_ledger(
62 &self,
63 ctx: &mut ActorContext<Self>,
64 subject_id: &DigestIdentifier,
65 hi_sn: u64,
66 lo_sn: Option<u64>,
67 is_gov: bool,
68 ) -> Result<(Vec<Ledger>, bool), ActorError> {
69 let path = ActorPath::from(format!(
70 "/user/node/subject_manager/{}",
71 subject_id
72 ));
73
74 if is_gov {
75 let governance_actor =
76 ctx.system().get_actor::<Governance>(&path).await?;
77
78 let response = governance_actor
79 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
80 .await?;
81
82 match response {
83 GovernanceResponse::Ledger { ledger, is_all } => {
84 Ok((ledger, is_all))
85 }
86 _ => Err(ActorError::UnexpectedResponse {
87 expected: "GovernanceResponse::Ledger".to_owned(),
88 path,
89 }),
90 }
91 } else {
92 let lease = acquire_subject(
93 ctx,
94 subject_id,
95 format!("send_distribution:{subject_id}"),
96 None,
97 true,
98 )
99 .await?;
100 let tracker_actor =
101 ctx.system().get_actor::<Tracker>(&path).await?;
102 let response = tracker_actor
103 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
104 .await;
105 lease.finish(ctx).await?;
106 let response = response?;
107
108 match response {
109 TrackerResponse::Ledger { ledger, is_all } => {
110 Ok((ledger, is_all))
111 }
112 _ => Err(ActorError::UnexpectedResponse {
113 expected: "TrackerResponse::Ledger".to_owned(),
114 path,
115 }),
116 }
117 }
118 }
119
120 fn build_response_info(
121 &self,
122 sender: PublicKey,
123 info: &ComunicateInfo,
124 receiver_actor: String,
125 ) -> ComunicateInfo {
126 ComunicateInfo {
127 receiver: sender,
128 request_id: info.request_id.clone(),
129 version: info.version,
130 receiver_actor,
131 }
132 }
133
134 async fn send_network_message(
135 &self,
136 info: ComunicateInfo,
137 message: ActorMessage,
138 ) -> Result<(), ActorError> {
139 self.network
140 .send_command(ave_network::CommandHelper::SendMessage {
141 message: NetworkMessage { info, message },
142 })
143 .await
144 }
145
146 async fn send_no_offer_response(
147 &self,
148 info: &ComunicateInfo,
149 sender: PublicKey,
150 receiver_actor: String,
151 ) -> Result<(), ActorError> {
152 let new_info = self.build_response_info(sender, info, receiver_actor);
153 self.send_network_message(new_info, ActorMessage::UpdateNoOffer)
154 .await
155 }
156
157 async fn get_governance_version(
158 &self,
159 ctx: &mut ActorContext<Self>,
160 subject_id: &DigestIdentifier,
161 ) -> Result<u64, ActorError> {
162 let data = get_subject_data(ctx, subject_id).await?;
163 let Some(SubjectData::Governance { .. }) = data else {
164 return Err(DistributorError::SubjectNotFound.into());
165 };
166
167 let governance_path = ActorPath::from(format!(
168 "/user/node/subject_manager/{}",
169 subject_id
170 ));
171 let governance_actor = ctx
172 .system()
173 .get_actor::<Governance>(&governance_path)
174 .await?;
175 let response =
176 governance_actor.ask(GovernanceMessage::GetVersion).await?;
177 let GovernanceResponse::Version(version) = response else {
178 return Err(ActorError::UnexpectedResponse {
179 path: governance_path,
180 expected: "GovernanceResponse::Version".to_owned(),
181 });
182 };
183
184 Ok(version)
185 }
186
187 async fn authorized_subj(
188 &self,
189 ctx: &ActorContext<Self>,
190 subject_id: &DigestIdentifier,
191 ) -> Result<(bool, Option<SubjectData>), ActorError> {
192 let node_path = ActorPath::from("/user/node");
193 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
194
195 let response = node_actor
196 .ask(NodeMessage::AuthData(subject_id.to_owned()))
197 .await?;
198 match response {
199 NodeResponse::AuthData { auth, subject_data } => {
200 Ok((auth, subject_data))
201 }
202 _ => Err(ActorError::UnexpectedResponse {
203 expected: "NodeResponse::AuthData".to_owned(),
204 path: node_path,
205 }),
206 }
207 }
208
209 async fn check_auth(
210 &self,
211 ctx: &mut ActorContext<Self>,
212 sender: PublicKey,
213 info: &ComunicateInfo,
214 ledger: &Ledger,
215 ) -> Result<(bool, bool), ActorError> {
216 let subject_id = ledger.get_subject_id();
217 let (auth, subject_data) =
219 self.authorized_subj(ctx, &subject_id).await?;
220
221 let (schema_id, governance_id) = if let Some(ref data) = subject_data {
223 match data {
225 SubjectData::Tracker {
226 governance_id,
227 schema_id,
228 ..
229 } => (schema_id.clone(), Some(governance_id.clone())),
230 SubjectData::Governance { .. } => {
231 (SchemaType::Governance, None)
232 }
233 }
234 } else {
235 if let Some(create) = ledger.get_create_event() {
237 if !create.schema_id.is_gov() && create.governance_id.is_empty()
238 {
239 return Err(
240 DistributorError::MissingGovernanceIdInCreate {
241 subject_id: subject_id.clone(),
242 }
243 .into(),
244 );
245 }
246
247 let gov_id = if create.schema_id.is_gov() {
248 None
249 } else {
250 Some(create.governance_id.clone())
251 };
252
253 (create.schema_id, gov_id)
254 } else {
255 self.request_ledger_from_sender(
258 &subject_id,
259 sender.clone(),
260 info,
261 None,
262 )
263 .await?;
264 return Err(DistributorError::UpdatingSubject.into());
265 }
266 };
267
268 let is_gov = schema_id.is_gov();
269 if is_gov {
271 if !auth {
273 return Err(DistributorError::GovernanceNotAuthorized.into());
274 }
275 } else {
276 let Some(governance_id) = governance_id else {
278 error!(
279 subject_id = %subject_id,
280 "Tracker subject is missing governance_id during authorization check"
281 );
282 return Err(DistributorError::MissingGovernanceId {
283 subject_id: subject_id.clone(),
284 }
285 .into());
286 };
287 let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
288 DistributorError::GetGovernanceFailed {
289 details: e.to_string(),
290 }
291 })?;
292
293 match gov.version.cmp(&ledger.gov_version) {
294 std::cmp::Ordering::Less => {
295 return Err(DistributorError::GovernanceVersionMismatch {
296 our_version: gov.version,
297 their_version: ledger.gov_version,
298 }
299 .into());
300 }
301 std::cmp::Ordering::Equal => {}
302 std::cmp::Ordering::Greater => {}
303 }
304 }
305
306 Ok((is_gov, subject_data.is_some()))
307 }
308
309 async fn get_tracker_window(
310 &self,
311 ctx: &mut ActorContext<Self>,
312 subject_id: &DigestIdentifier,
313 sender: PublicKey,
314 actual_sn: Option<u64>,
315 ) -> Result<(u64, Option<u64>, bool, Vec<TrackerDeliveryRange>), ActorError>
316 {
317 let data = get_subject_data(ctx, subject_id).await?;
318
319 let Some(SubjectData::Tracker {
320 governance_id,
321 schema_id,
322 namespace,
323 ..
324 }) = data
325 else {
326 return Err(DistributorError::SubjectNotFound.into());
327 };
328
329 let (sn, clear_sn, is_all, ranges) = resolve_tracker_window(
330 ctx,
331 &governance_id,
332 subject_id,
333 sender.clone(),
334 namespace.clone(),
335 schema_id.clone(),
336 actual_sn,
337 )
338 .await?;
339
340 let Some(sn) = sn else {
341 let witness_sn = check_witness_access(
342 ctx,
343 &governance_id,
344 subject_id,
345 sender,
346 namespace,
347 schema_id,
348 )
349 .await?;
350
351 return match (actual_sn, witness_sn) {
352 (Some(actual_sn), Some(witness_sn))
353 if actual_sn >= witness_sn =>
354 {
355 Err(DistributorError::ActualSnBiggerThanWitness {
356 actual_sn,
357 witness_sn,
358 }
359 .into())
360 }
361 _ => Err(DistributorError::SenderNoAccess.into()),
362 };
363 };
364
365 Ok((sn, clear_sn, is_all, ranges))
366 }
367
368 fn tracker_delivery_mode(
369 ranges: &[TrackerDeliveryRange],
370 sn: u64,
371 ) -> Option<TrackerDeliveryMode> {
372 ranges
373 .iter()
374 .find(|range| range.from_sn <= sn && sn <= range.to_sn)
375 .map(|range| range.mode.clone())
376 }
377
378 fn project_tracker_ledger(
379 ledger: Vec<Ledger>,
380 ranges: &[TrackerDeliveryRange],
381 ) -> Result<Vec<Ledger>, ActorError> {
382 let mut projected = Vec::with_capacity(ledger.len());
383
384 for event in ledger {
385 let Some(mode) = Self::tracker_delivery_mode(ranges, event.sn)
386 else {
387 return Err(ActorError::FunctionalCritical {
388 description: format!(
389 "Missing tracker delivery range for sn {}",
390 event.sn
391 ),
392 });
393 };
394
395 match mode {
396 TrackerDeliveryMode::Clear => projected.push(event),
397 TrackerDeliveryMode::Opaque => projected
398 .push(event.to_tracker_opaque().map_err(ActorError::from)?),
399 }
400 }
401
402 Ok(projected)
403 }
404
405 async fn check_witness(
406 &self,
407 ctx: &mut ActorContext<Self>,
408 subject_id: &DigestIdentifier,
409 sender: PublicKey,
410 ) -> Result<(u64, bool), ActorError> {
411 let data = get_subject_data(ctx, subject_id).await?;
412
413 let Some(data) = data else {
414 return Err(DistributorError::SubjectNotFound.into());
415 };
416
417 match data {
418 SubjectData::Tracker {
419 governance_id,
420 schema_id,
421 namespace,
422 ..
423 } => {
424 let Some(sn) = check_witness_access(
425 ctx,
426 &governance_id,
427 subject_id,
428 sender.clone(),
429 namespace,
430 schema_id,
431 )
432 .await?
433 else {
434 return Err(DistributorError::SenderNoAccess.into());
435 };
436
437 Ok((sn, false))
438 }
439 SubjectData::Governance { .. } => {
440 let gov = get_gov(ctx, subject_id).await.map_err(|e| {
441 DistributorError::GetGovernanceFailed {
442 details: e.to_string(),
443 }
444 })?;
445
446 if !gov.has_this_role(HashThisRole::Gov {
447 who: sender.clone(),
448 role: RoleTypes::Witness,
449 }) {
450 return Err(DistributorError::SenderNotMember {
451 sender: sender.to_string(),
452 }
453 .into());
454 }
455
456 Ok((get_gov_sn(ctx, subject_id).await?, true))
457 }
458 }
459 }
460
461 async fn build_last_sn_offer(
462 &self,
463 ctx: &mut ActorContext<Self>,
464 subject_id: &DigestIdentifier,
465 sender: PublicKey,
466 actual_sn: Option<u64>,
467 ) -> Result<UpdateWitnessOffer, ActorError> {
468 let data = get_subject_data(ctx, subject_id).await?;
469 let Some(data) = data else {
470 return Err(DistributorError::SubjectNotFound.into());
471 };
472
473 match data {
474 SubjectData::Tracker { .. } => {
475 let (sn, clear_sn, _, ranges) = self
476 .get_tracker_window(
477 ctx,
478 subject_id,
479 sender.clone(),
480 actual_sn,
481 )
482 .await?;
483 Ok(UpdateWitnessOffer {
484 kind: UpdateSubjectKind::Tracker,
485 sn,
486 clear_sn,
487 ranges,
488 })
489 }
490 SubjectData::Governance { .. } => {
491 let (sn, ..) =
492 self.check_witness(ctx, subject_id, sender.clone()).await?;
493 Ok(UpdateWitnessOffer {
494 kind: UpdateSubjectKind::Governance,
495 sn,
496 clear_sn: None,
497 ranges: Vec::new(),
498 })
499 }
500 }
501 }
502
503 async fn build_distribution_batch(
504 &self,
505 ctx: &mut ActorContext<Self>,
506 subject_id: &DigestIdentifier,
507 sender: PublicKey,
508 actual_sn: Option<u64>,
509 target_sn: Option<u64>,
510 ) -> Result<(Vec<Ledger>, bool, u64), ActorError> {
511 let data = get_subject_data(ctx, subject_id).await?;
512 let Some(data) = data else {
513 return Err(DistributorError::SubjectNotFound.into());
514 };
515
516 match data {
517 SubjectData::Tracker { .. } => {
518 let (window_sn, clear_sn, _, ranges) = self
519 .get_tracker_window(ctx, subject_id, sender, actual_sn)
520 .await?;
521
522 if let Some(actual_sn) = actual_sn
523 && actual_sn >= window_sn
524 {
525 return Err(DistributorError::ActualSnBiggerThanWitness {
526 actual_sn,
527 witness_sn: window_sn,
528 }
529 .into());
530 }
531
532 let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
533 let batch_hi_sn = from_sn
534 .saturating_add(self.ledger_batch_size)
535 .saturating_sub(1)
536 .min(window_sn);
537 let preferred_hi_sn = clear_sn
538 .filter(|clear_sn| {
539 actual_sn.is_none_or(|actual_sn| *clear_sn > actual_sn)
540 })
541 .unwrap_or(window_sn);
542 let preferred_hi_sn =
543 if from_sn == 0 && preferred_hi_sn == 0 && window_sn > 0 {
544 window_sn
545 } else {
546 preferred_hi_sn
547 };
548 let hi_sn = target_sn
549 .unwrap_or(preferred_hi_sn)
550 .min(preferred_hi_sn)
551 .min(batch_hi_sn);
552
553 let (ledger, raw_is_all) = self
554 .get_ledger(ctx, subject_id, hi_sn, actual_sn, false)
555 .await?;
556
557 let ledger = Self::project_tracker_ledger(ledger, &ranges)?;
558 let is_all = raw_is_all && hi_sn == window_sn;
559 Ok((ledger, is_all, hi_sn))
560 }
561 SubjectData::Governance { .. } => {
562 let (witness_hi_sn, ..) =
563 self.check_witness(ctx, subject_id, sender).await?;
564
565 if let Some(actual_sn) = actual_sn
566 && actual_sn >= witness_hi_sn
567 {
568 return Err(DistributorError::ActualSnBiggerThanWitness {
569 actual_sn,
570 witness_sn: witness_hi_sn,
571 }
572 .into());
573 }
574
575 let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
576 let batch_hi_sn = from_sn
577 .saturating_add(self.ledger_batch_size)
578 .saturating_sub(1)
579 .min(witness_hi_sn);
580 let batch_hi_sn =
581 target_sn.unwrap_or(batch_hi_sn).min(batch_hi_sn);
582
583 let (ledger, raw_is_all) = self
584 .get_ledger(ctx, subject_id, batch_hi_sn, actual_sn, true)
585 .await?;
586
587 let is_all = raw_is_all && batch_hi_sn == witness_hi_sn;
588 Ok((ledger, is_all, batch_hi_sn))
589 }
590 }
591 }
592
593 async fn request_ledger_from_sender(
594 &self,
595 subject_id: &DigestIdentifier,
596 sender: PublicKey,
597 info: &ComunicateInfo,
598 actual_sn: Option<u64>,
599 ) -> Result<(), ActorError> {
600 let new_info = self.build_response_info(
601 sender,
602 info,
603 format!("/user/node/distributor_{}", subject_id),
604 );
605
606 self.send_network_message(
607 new_info,
608 ActorMessage::DistributionLedgerReq {
609 actual_sn,
610 target_sn: None,
611 subject_id: subject_id.clone(),
612 },
613 )
614 .await
615 }
616
617 async fn handle_get_last_sn(
618 &self,
619 ctx: &mut ActorContext<Self>,
620 subject_id: DigestIdentifier,
621 actual_sn: Option<u64>,
622 info: ComunicateInfo,
623 sender: PublicKey,
624 receiver_actor: String,
625 ) -> Result<(), ActorError> {
626 let offer = self
627 .build_last_sn_offer(ctx, &subject_id, sender.clone(), actual_sn)
628 .await?;
629 let new_info =
630 self.build_response_info(sender.clone(), &info, receiver_actor);
631
632 self.send_network_message(
633 new_info,
634 ActorMessage::UpdateOffer {
635 offer: offer.clone(),
636 },
637 )
638 .await?;
639
640 debug!(
641 msg_type = "GetLastSn",
642 subject_id = %subject_id,
643 sn = offer.sn,
644 clear_sn = ?offer.clear_sn,
645 sender = %sender,
646 "Last SN response sent successfully"
647 );
648
649 Ok(())
650 }
651
652 async fn handle_get_governance_version(
653 &self,
654 ctx: &mut ActorContext<Self>,
655 subject_id: DigestIdentifier,
656 info: ComunicateInfo,
657 sender: PublicKey,
658 receiver_actor: String,
659 ) -> Result<(), ActorError> {
660 let version = self.get_governance_version(ctx, &subject_id).await?;
661 let new_info =
662 self.build_response_info(sender.clone(), &info, receiver_actor);
663
664 self.send_network_message(
665 new_info,
666 ActorMessage::GovernanceVersionRes { version },
667 )
668 .await?;
669
670 Ok(())
671 }
672
673 async fn handle_send_distribution(
674 &self,
675 ctx: &mut ActorContext<Self>,
676 actual_sn: Option<u64>,
677 target_sn: Option<u64>,
678 info: ComunicateInfo,
679 subject_id: DigestIdentifier,
680 sender: PublicKey,
681 ) -> Result<(), ActorError> {
682 let (ledger, is_all, hi_sn) = self
683 .build_distribution_batch(
684 ctx,
685 &subject_id,
686 sender.clone(),
687 actual_sn,
688 target_sn,
689 )
690 .await?;
691
692 let new_info = self.build_response_info(
693 sender.clone(),
694 &info,
695 format!("/user/node/distributor_{}", subject_id),
696 );
697
698 self.send_network_message(
699 new_info,
700 ActorMessage::DistributionLedgerRes {
701 ledger: ledger.clone(),
702 is_all,
703 },
704 )
705 .await?;
706
707 debug!(
708 msg_type = "SendDistribution",
709 subject_id = %subject_id,
710 sender = %sender,
711 ledger_count = ledger.len(),
712 is_all = is_all,
713 hi_sn = hi_sn,
714 actual_sn = ?actual_sn,
715 "Ledger distribution sent successfully"
716 );
717
718 Ok(())
719 }
720}
721
722#[async_trait]
723impl Actor for DistriWorker {
724 type Event = ();
725 type Message = DistriWorkerMessage;
726 type Response = ();
727
728 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
729 parent_span.map_or_else(
730 || info_span!("DistriWorker", id),
731 |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
732 )
733 }
734}
735
736#[derive(Debug, Clone)]
737pub enum DistriWorkerMessage {
738 GetLastSn {
739 subject_id: DigestIdentifier,
740 actual_sn: Option<u64>,
741 info: ComunicateInfo,
742 sender: PublicKey,
743 receiver_actor: String,
744 },
745 GetGovernanceVersion {
746 subject_id: DigestIdentifier,
747 info: ComunicateInfo,
748 sender: PublicKey,
749 receiver_actor: String,
750 },
751 SendDistribution {
753 actual_sn: Option<u64>,
754 target_sn: Option<u64>,
755 subject_id: DigestIdentifier,
756 info: ComunicateInfo,
757 sender: PublicKey,
758 },
759 LastEventDistribution {
761 ledger: Box<Ledger>,
762 info: ComunicateInfo,
763 sender: PublicKey,
764 },
765 LedgerDistribution {
766 ledger: Vec<Ledger>,
767 is_all: bool,
768 info: ComunicateInfo,
769 sender: PublicKey,
770 },
771}
772
773impl Message for DistriWorkerMessage {}
774
775impl NotPersistentActor for DistriWorker {}
776
777#[async_trait]
778impl Handler<Self> for DistriWorker {
779 async fn handle_message(
780 &mut self,
781 _sender: ActorPath,
782 msg: DistriWorkerMessage,
783 ctx: &mut ActorContext<Self>,
784 ) -> Result<(), ActorError> {
785 match msg {
786 DistriWorkerMessage::GetLastSn {
787 subject_id,
788 actual_sn,
789 info,
790 sender,
791 receiver_actor,
792 } => match self
793 .handle_get_last_sn(
794 ctx,
795 subject_id.clone(),
796 actual_sn,
797 info.clone(),
798 sender.clone(),
799 receiver_actor.clone(),
800 )
801 .await
802 {
803 Ok(()) => {}
804 Err(e) => {
805 if let ActorError::Functional { .. } = e {
806 warn!(
807 msg_type = "GetLastSn",
808 subject_id = %subject_id,
809 sender = %sender,
810 error = %e,
811 "Witness check failed"
812 );
813 self.send_no_offer_response(
814 &info,
815 sender.clone(),
816 receiver_actor,
817 )
818 .await?;
819 return Ok(());
820 } else {
821 error!(
822 msg_type = "GetLastSn",
823 subject_id = %subject_id,
824 sender = %sender,
825 error = %e,
826 "Witness check failed"
827 );
828 return Err(emit_fail(ctx, e).await);
829 }
830 }
831 },
832 DistriWorkerMessage::GetGovernanceVersion {
833 subject_id,
834 info,
835 sender,
836 receiver_actor,
837 } => match self
838 .handle_get_governance_version(
839 ctx,
840 subject_id.clone(),
841 info,
842 sender.clone(),
843 receiver_actor,
844 )
845 .await
846 {
847 Ok(()) => {}
848 Err(e) => {
849 if let ActorError::Functional { .. } = e {
850 warn!(
851 msg_type = "GetGovernanceVersion",
852 subject_id = %subject_id,
853 sender = %sender,
854 error = %e,
855 "Subject is not a governance"
856 );
857 return Err(e);
858 } else {
859 error!(
860 msg_type = "GetGovernanceVersion",
861 subject_id = %subject_id,
862 sender = %sender,
863 error = %e,
864 "Failed to send governance version response to network"
865 );
866 return Err(emit_fail(ctx, e).await);
867 }
868 }
869 },
870 DistriWorkerMessage::SendDistribution {
871 actual_sn,
872 target_sn,
873 info,
874 subject_id,
875 sender,
876 } => match self
877 .handle_send_distribution(
878 ctx,
879 actual_sn,
880 target_sn,
881 info,
882 subject_id.clone(),
883 sender.clone(),
884 )
885 .await
886 {
887 Ok(()) => {}
888 Err(e) => {
889 if let ActorError::Functional { .. } = e {
890 warn!(
891 msg_type = "SendDistribution",
892 subject_id = %subject_id,
893 sender = %sender,
894 error = %e,
895 "Witness check failed"
896 );
897 return Err(e);
898 } else {
899 error!(
900 msg_type = "SendDistribution",
901 subject_id = %subject_id,
902 sender = %sender,
903 error = %e,
904 "Failed to send ledger response to network"
905 );
906 return Err(emit_fail(ctx, e).await);
907 }
908 }
909 },
910 DistriWorkerMessage::LastEventDistribution {
911 ledger,
912 info,
913 sender,
914 } => {
915 let subject_id = ledger.get_subject_id();
916 let sn = ledger.sn;
917
918 let (is_gov, ..) = match self
919 .check_auth(ctx, sender.clone(), &info, &ledger)
920 .await
921 {
922 Ok(is_gov) => is_gov,
923 Err(e) => {
924 if let ActorError::Functional { .. } = e {
925 warn!(
926 msg_type = "LastEventDistribution",
927 subject_id = %subject_id,
928 sn = sn,
929 sender = %sender,
930 error = %e,
931 "Authorization check failed"
932 );
933 return Err(e);
934 } else {
935 error!(
936 msg_type = "LastEventDistribution",
937 subject_id = %subject_id,
938 sn = sn,
939 sender = %sender,
940 error = %e,
941 "Authorization check failed"
942 );
943 return Err(emit_fail(ctx, e).await);
944 }
945 }
946 };
947
948 let lease = if ledger.is_create_event() {
949 if let Err(e) = create_subject(ctx, *ledger.clone()).await {
950 if let ActorError::Functional { .. } = e {
951 warn!(
952 msg_type = "LastEventDistribution",
953 subject_id = %subject_id,
954 sn = sn,
955 error = %e,
956 "Failed to create subject from create event"
957 );
958 return Err(e);
959 } else {
960 error!(
961 msg_type = "LastEventDistribution",
962 subject_id = %subject_id,
963 sn = sn,
964 error = %e,
965 "Failed to create subject from create event"
966 );
967 return Err(emit_fail(ctx, e).await);
968 }
969 };
970
971 None
972 } else {
973 let requester = Self::requester_id(
974 "last_event_distribution",
975 &subject_id,
976 &info,
977 &sender,
978 );
979 let lease = if !is_gov {
980 match acquire_subject(
981 ctx,
982 &subject_id,
983 requester.clone(),
984 None,
985 true,
986 )
987 .await
988 {
989 Ok(lease) => Some(lease),
990 Err(e) => {
991 error!(
992 msg_type = "LastEventDistribution",
993 subject_id = %subject_id,
994 error = %e,
995 "Failed to bring up tracker for subject update"
996 );
997 let error = DistributorError::UpTrackerFailed {
998 details: e.to_string(),
999 };
1000 return Err(emit_fail(ctx, error.into()).await);
1001 }
1002 }
1003 } else {
1004 None
1005 };
1006
1007 let update_result =
1008 update_ledger(ctx, &subject_id, vec![*ledger.clone()])
1009 .await;
1010
1011 if let Some(lease) = lease.clone()
1012 && update_result.is_err()
1013 {
1014 lease.finish(ctx).await?;
1015 }
1016
1017 match update_result {
1018 Ok((last_sn, _, _)) if last_sn < ledger.sn => {
1019 debug!(
1020 msg_type = "LastEventDistribution",
1021 subject_id = %subject_id,
1022 last_sn = last_sn,
1023 received_sn = sn,
1024 "SN gap detected, requesting update"
1025 );
1026
1027 if let Err(e) = self
1028 .request_ledger_from_sender(
1029 &subject_id,
1030 sender.clone(),
1031 &info,
1032 Some(last_sn),
1033 )
1034 .await
1035 {
1036 error!(
1037 msg_type = "LastEventDistribution",
1038 subject_id = %subject_id,
1039 last_sn = last_sn,
1040 error = %e,
1041 "Failed to request ledger from network"
1042 );
1043 return Err(emit_fail(ctx, e).await);
1044 }
1045
1046 if let Some(lease) = lease.clone() {
1047 lease.finish(ctx).await?;
1048 }
1049
1050 return Ok(());
1051 }
1052 Ok((..)) => lease,
1053 Err(e) => {
1054 if let ActorError::Functional { .. } = e.clone() {
1055 warn!(
1056 msg_type = "LastEventDistribution",
1057 subject_id = %subject_id,
1058 sn = sn,
1059 error = %e,
1060 "Failed to update subject ledger"
1061 );
1062 return Err(e);
1063 } else {
1064 error!(
1065 msg_type = "LastEventDistribution",
1066 subject_id = %subject_id,
1067 sn = sn,
1068 error = %e,
1069 "Failed to update subject ledger"
1070 );
1071 return Err(emit_fail(ctx, e).await);
1072 }
1073 }
1074 }
1075 };
1076
1077 let new_info = self.build_response_info(
1078 sender.clone(),
1079 &info,
1080 format!(
1081 "/user/{}/{}",
1082 info.request_id,
1083 info.receiver.clone()
1084 ),
1085 );
1086
1087 if let Err(e) = self
1088 .send_network_message(
1089 new_info,
1090 ActorMessage::DistributionLastEventRes,
1091 )
1092 .await
1093 {
1094 error!(
1095 msg_type = "LastEventDistribution",
1096 subject_id = %subject_id,
1097 sn = sn,
1098 error = %e,
1099 "Failed to send distribution acknowledgment"
1100 );
1101 return Err(emit_fail(ctx, e).await);
1102 };
1103
1104 if let Some(lease) = lease {
1105 lease.finish(ctx).await?;
1106 }
1107
1108 debug!(
1109 msg_type = "LastEventDistribution",
1110 subject_id = %subject_id,
1111 sn = sn,
1112 sender = %sender,
1113 is_gov = is_gov,
1114 "Last event distribution processed successfully"
1115 );
1116 }
1117 DistriWorkerMessage::LedgerDistribution {
1118 mut ledger,
1119 is_all,
1120 info,
1121 sender,
1122 } => {
1123 if ledger.is_empty() {
1124 warn!(
1125 msg_type = "LedgerDistribution",
1126 sender = %sender,
1127 "Received empty ledger distribution"
1128 );
1129 return Err(DistributorError::EmptyEvents.into());
1130 }
1131
1132 let subject_id = ledger[0].get_subject_id();
1133 let ledger_count = ledger.len();
1134 let first_sn = ledger[0].sn;
1135 let (is_gov, is_register) = match self
1136 .check_auth(ctx, sender.clone(), &info, &ledger[0])
1137 .await
1138 {
1139 Ok(data) => data,
1140 Err(e) => {
1141 if let ActorError::Functional { .. } = e {
1142 warn!(
1143 msg_type = "LedgerDistribution",
1144 subject_id = %subject_id,
1145 sender = %sender,
1146 ledger_count = ledger_count,
1147 error = %e,
1148 "Authorization check failed"
1149 );
1150 return Err(e);
1151 } else {
1152 error!(
1153 msg_type = "LedgerDistribution",
1154 subject_id = %subject_id,
1155 sender = %sender,
1156 ledger_count = ledger_count,
1157 error = %e,
1158 "Authorization check failed"
1159 );
1160 return Err(emit_fail(ctx, e).await);
1161 }
1162 }
1163 };
1164
1165 let lease = if ledger[0].is_create_event() && !is_register {
1166 let create_ledger = ledger[0].clone();
1167 let requester = Self::requester_id(
1168 "ledger_distribution_create",
1169 &subject_id,
1170 &info,
1171 &sender,
1172 );
1173
1174 let lease = if is_gov {
1175 if let Err(e) =
1176 create_subject(ctx, create_ledger.clone()).await
1177 {
1178 if let ActorError::Functional { .. } = e {
1179 warn!(
1180 msg_type = "LedgerDistribution",
1181 subject_id = %subject_id,
1182 error = %e,
1183 "Failed to create subject from ledger"
1184 );
1185 return Err(e);
1186 } else {
1187 error!(
1188 msg_type = "LedgerDistribution",
1189 subject_id = %subject_id,
1190 error = %e,
1191 "Failed to create subject from ledger"
1192 );
1193 return Err(emit_fail(ctx, e).await);
1194 }
1195 };
1196 None
1197 } else {
1198 let request = create_ledger
1199 .get_create_event()
1200 .ok_or_else(|| {
1201 error!(
1202 msg_type = "LedgerDistribution",
1203 subject_id = %subject_id,
1204 "Create ledger is missing create event payload"
1205 );
1206 DistributorError::MissingCreateEventInCreateLedger {
1207 subject_id: subject_id.clone(),
1208 }
1209 })?;
1210
1211 if let Err(e) = check_subject_creation(
1212 ctx,
1213 &request.governance_id,
1214 create_ledger.ledger_seal_signature.signer.clone(),
1215 create_ledger.gov_version,
1216 request.namespace.to_string(),
1217 request.schema_id,
1218 )
1219 .await
1220 {
1221 if let ActorError::Functional { .. } = e {
1222 warn!(
1223 msg_type = "LedgerDistribution",
1224 subject_id = %subject_id,
1225 error = %e,
1226 "Failed to validate subject creation from ledger"
1227 );
1228 return Err(e);
1229 } else {
1230 error!(
1231 msg_type = "LedgerDistribution",
1232 subject_id = %subject_id,
1233 error = %e,
1234 "Failed to validate subject creation from ledger"
1235 );
1236 return Err(emit_fail(ctx, e).await);
1237 }
1238 }
1239
1240 match acquire_subject(
1241 ctx,
1242 &subject_id,
1243 requester,
1244 Some(create_ledger),
1245 true,
1246 )
1247 .await
1248 {
1249 Ok(lease) => Some(lease),
1250 Err(e) => {
1251 if let ActorError::Functional { .. } = e {
1252 warn!(
1253 msg_type = "LedgerDistribution",
1254 subject_id = %subject_id,
1255 error = %e,
1256 "Failed to create subject from ledger"
1257 );
1258 return Err(e);
1259 } else {
1260 error!(
1261 msg_type = "LedgerDistribution",
1262 subject_id = %subject_id,
1263 error = %e,
1264 "Failed to create subject from ledger"
1265 );
1266 return Err(emit_fail(ctx, e).await);
1267 }
1268 }
1269 }
1270 };
1271
1272 let _event = ledger.remove(0);
1273 lease
1274 } else {
1275 if ledger[0].is_create_event() && is_register {
1276 let _event = ledger.remove(0);
1277 }
1278
1279 let requester = Self::requester_id(
1280 "ledger_distribution",
1281 &subject_id,
1282 &info,
1283 &sender,
1284 );
1285 if !ledger.is_empty() && !is_gov {
1286 match acquire_subject(
1287 ctx,
1288 &subject_id,
1289 requester.clone(),
1290 None,
1291 true,
1292 )
1293 .await
1294 {
1295 Ok(lease) => Some(lease),
1296 Err(e) => {
1297 error!(
1298 msg_type = "LedgerDistribution",
1299 subject_id = %subject_id,
1300 error = %e,
1301 "Failed to bring up tracker for subject update"
1302 );
1303 let error = DistributorError::UpTrackerFailed {
1304 details: e.to_string(),
1305 };
1306 return Err(emit_fail(ctx, error.into()).await);
1307 }
1308 }
1309 } else {
1310 None
1311 }
1312 };
1313
1314 let lease = if !ledger.is_empty() {
1315 let update_result =
1316 update_ledger(ctx, &subject_id, ledger).await;
1317
1318 if let Some(lease) = lease.clone()
1319 && update_result.is_err()
1320 {
1321 lease.finish(ctx).await?;
1322 }
1323
1324 match update_result {
1325 Ok((last_sn, _, _)) => {
1326 if !is_all {
1327 debug!(
1328 msg_type = "LedgerDistribution",
1329 subject_id = %subject_id,
1330 last_sn = last_sn,
1331 "Partial ledger received, requesting more"
1332 );
1333
1334 if let Err(e) = self
1335 .request_ledger_from_sender(
1336 &subject_id,
1337 sender.clone(),
1338 &info,
1339 Some(last_sn),
1340 )
1341 .await
1342 {
1343 error!(
1344 msg_type = "LedgerDistribution",
1345 subject_id = %subject_id,
1346 last_sn = last_sn,
1347 error = %e,
1348 "Failed to request more ledger entries"
1349 );
1350 return Err(emit_fail(ctx, e).await);
1351 };
1352 }
1353
1354 lease
1355 }
1356 Err(e) => {
1357 if let ActorError::Functional { .. } = e.clone() {
1358 warn!(
1359 msg_type = "LedgerDistribution",
1360 subject_id = %subject_id,
1361 first_sn = first_sn,
1362 ledger_count = ledger_count,
1363 error = %e,
1364 "Failed to update subject ledger"
1365 );
1366 return Err(e);
1367 } else {
1368 error!(
1369 msg_type = "LedgerDistribution",
1370 subject_id = %subject_id,
1371 first_sn = first_sn,
1372 ledger_count = ledger_count,
1373 error = %e,
1374 "Failed to update subject ledger"
1375 );
1376 return Err(emit_fail(ctx, e).await);
1377 }
1378 }
1379 }
1380 } else {
1381 lease
1382 };
1383
1384 if let Some(lease) = lease {
1385 lease.finish(ctx).await?;
1386 }
1387
1388 debug!(
1389 msg_type = "LedgerDistribution",
1390 subject_id = %subject_id,
1391 sender = %sender,
1392 ledger_count = ledger_count,
1393 is_all = is_all,
1394 is_gov = is_gov,
1395 "Ledger distribution processed successfully"
1396 );
1397 }
1398 };
1399
1400 Ok(())
1401 }
1402}