Skip to main content

ave_core/distribution/
worker.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    SchemaType,
10    identity::{DigestIdentifier, PublicKey},
11};
12use ave_network::ComunicateInfo;
13
14use crate::{
15    ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
16    governance::{
17        Governance, GovernanceMessage, GovernanceResponse,
18        model::{HashThisRole, RoleTypes},
19        witnesses_register::{TrackerDeliveryMode, TrackerDeliveryRange},
20    },
21    helpers::network::service::NetworkSender,
22    model::{
23        common::{
24            check_subject_creation, check_witness_access, emit_fail,
25            node::get_subject_data,
26            subject::{
27                acquire_subject, create_subject, get_gov, get_gov_sn,
28                get_tracker_window as resolve_tracker_window, update_ledger,
29            },
30        },
31        event::Ledger,
32    },
33    node::SubjectData,
34    tracker::{Tracker, TrackerMessage, TrackerResponse},
35    update::{UpdateSubjectKind, UpdateWitnessOffer},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43    pub our_key: Arc<PublicKey>,
44    pub network: Arc<NetworkSender>,
45    pub ledger_batch_size: u64,
46}
47
48impl DistriWorker {
49    fn requester_id(
50        kind: &str,
51        subject_id: &DigestIdentifier,
52        info: &ComunicateInfo,
53        sender: &PublicKey,
54    ) -> String {
55        format!(
56            "{kind}:{subject_id}:{sender}:{}:{}",
57            info.request_id, info.version
58        )
59    }
60
61    async fn get_ledger(
62        &self,
63        ctx: &mut ActorContext<Self>,
64        subject_id: &DigestIdentifier,
65        hi_sn: u64,
66        lo_sn: Option<u64>,
67        is_gov: bool,
68    ) -> Result<(Vec<Ledger>, bool), ActorError> {
69        let path = ActorPath::from(format!(
70            "/user/node/subject_manager/{}",
71            subject_id
72        ));
73
74        if is_gov {
75            let governance_actor =
76                ctx.system().get_actor::<Governance>(&path).await?;
77
78            let response = governance_actor
79                .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
80                .await?;
81
82            match response {
83                GovernanceResponse::Ledger { ledger, is_all } => {
84                    Ok((ledger, is_all))
85                }
86                _ => Err(ActorError::UnexpectedResponse {
87                    expected: "GovernanceResponse::Ledger".to_owned(),
88                    path,
89                }),
90            }
91        } else {
92            let lease = acquire_subject(
93                ctx,
94                subject_id,
95                format!("send_distribution:{subject_id}"),
96                None,
97                true,
98            )
99            .await?;
100            let tracker_actor =
101                ctx.system().get_actor::<Tracker>(&path).await?;
102            let response = tracker_actor
103                .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
104                .await;
105            lease.finish(ctx).await?;
106            let response = response?;
107
108            match response {
109                TrackerResponse::Ledger { ledger, is_all } => {
110                    Ok((ledger, is_all))
111                }
112                _ => Err(ActorError::UnexpectedResponse {
113                    expected: "TrackerResponse::Ledger".to_owned(),
114                    path,
115                }),
116            }
117        }
118    }
119
120    fn build_response_info(
121        &self,
122        sender: PublicKey,
123        info: &ComunicateInfo,
124        receiver_actor: String,
125    ) -> ComunicateInfo {
126        ComunicateInfo {
127            receiver: sender,
128            request_id: info.request_id.clone(),
129            version: info.version,
130            receiver_actor,
131        }
132    }
133
134    async fn send_network_message(
135        &self,
136        info: ComunicateInfo,
137        message: ActorMessage,
138    ) -> Result<(), ActorError> {
139        self.network
140            .send_command(ave_network::CommandHelper::SendMessage {
141                message: NetworkMessage { info, message },
142            })
143            .await
144    }
145
146    async fn send_no_offer_response(
147        &self,
148        info: &ComunicateInfo,
149        sender: PublicKey,
150        receiver_actor: String,
151    ) -> Result<(), ActorError> {
152        let new_info = self.build_response_info(sender, info, receiver_actor);
153        self.send_network_message(new_info, ActorMessage::UpdateNoOffer)
154            .await
155    }
156
157    async fn get_governance_version(
158        &self,
159        ctx: &mut ActorContext<Self>,
160        subject_id: &DigestIdentifier,
161    ) -> Result<u64, ActorError> {
162        let data = get_subject_data(ctx, subject_id).await?;
163        let Some(SubjectData::Governance { .. }) = data else {
164            return Err(DistributorError::SubjectNotFound.into());
165        };
166
167        let governance_path = ActorPath::from(format!(
168            "/user/node/subject_manager/{}",
169            subject_id
170        ));
171        let governance_actor = ctx
172            .system()
173            .get_actor::<Governance>(&governance_path)
174            .await?;
175        let response =
176            governance_actor.ask(GovernanceMessage::GetVersion).await?;
177        let GovernanceResponse::Version(version) = response else {
178            return Err(ActorError::UnexpectedResponse {
179                path: governance_path,
180                expected: "GovernanceResponse::Version".to_owned(),
181            });
182        };
183
184        Ok(version)
185    }
186
187    async fn authorized_subj(
188        &self,
189        ctx: &ActorContext<Self>,
190        subject_id: &DigestIdentifier,
191    ) -> Result<(bool, Option<SubjectData>), ActorError> {
192        let node_path = ActorPath::from("/user/node");
193        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
194
195        let response = node_actor
196            .ask(NodeMessage::AuthData(subject_id.to_owned()))
197            .await?;
198        match response {
199            NodeResponse::AuthData { auth, subject_data } => {
200                Ok((auth, subject_data))
201            }
202            _ => Err(ActorError::UnexpectedResponse {
203                expected: "NodeResponse::AuthData".to_owned(),
204                path: node_path,
205            }),
206        }
207    }
208
209    async fn check_auth(
210        &self,
211        ctx: &mut ActorContext<Self>,
212        sender: PublicKey,
213        info: &ComunicateInfo,
214        ledger: &Ledger,
215    ) -> Result<(bool, bool), ActorError> {
216        let subject_id = ledger.get_subject_id();
217        // Si está auth o si soy el dueño del sujeto.
218        let (auth, subject_data) =
219            self.authorized_subj(ctx, &subject_id).await?;
220
221        // Extraer schema_id y governance_id según si conocemos el sujeto o no
222        let (schema_id, governance_id) = if let Some(ref data) = subject_data {
223            // Lo conozco
224            match data {
225                SubjectData::Tracker {
226                    governance_id,
227                    schema_id,
228                    ..
229                } => (schema_id.clone(), Some(governance_id.clone())),
230                SubjectData::Governance { .. } => {
231                    (SchemaType::Governance, None)
232                }
233            }
234        } else {
235            // No lo conozco - debe ser evento Create
236            if let Some(create) = ledger.get_create_event() {
237                if !create.schema_id.is_gov() && create.governance_id.is_empty()
238                {
239                    return Err(
240                        DistributorError::MissingGovernanceIdInCreate {
241                            subject_id: subject_id.clone(),
242                        }
243                        .into(),
244                    );
245                }
246
247                let gov_id = if create.schema_id.is_gov() {
248                    None
249                } else {
250                    Some(create.governance_id.clone())
251                };
252
253                (create.schema_id, gov_id)
254            } else {
255                // No es el primer evento: pedir el histórico directamente
256                // al sender que nos acaba de escribir.
257                self.request_ledger_from_sender(
258                    &subject_id,
259                    sender.clone(),
260                    info,
261                    None,
262                )
263                .await?;
264                return Err(DistributorError::UpdatingSubject.into());
265            }
266        };
267
268        let is_gov = schema_id.is_gov();
269        // Verificar autorización
270        if is_gov {
271            // Es una gobernanza
272            if !auth {
273                return Err(DistributorError::GovernanceNotAuthorized.into());
274            }
275        } else {
276            // Es un Tracker - verificar rol de witness si no está autorizado
277            let Some(governance_id) = governance_id else {
278                error!(
279                    subject_id = %subject_id,
280                    "Tracker subject is missing governance_id during authorization check"
281                );
282                return Err(DistributorError::MissingGovernanceId {
283                    subject_id: subject_id.clone(),
284                }
285                .into());
286            };
287            let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
288                DistributorError::GetGovernanceFailed {
289                    details: e.to_string(),
290                }
291            })?;
292
293            match gov.version.cmp(&ledger.gov_version) {
294                std::cmp::Ordering::Less => {
295                    return Err(DistributorError::GovernanceVersionMismatch {
296                        our_version: gov.version,
297                        their_version: ledger.gov_version,
298                    }
299                    .into());
300                }
301                std::cmp::Ordering::Equal => {}
302                std::cmp::Ordering::Greater => {}
303            }
304        }
305
306        Ok((is_gov, subject_data.is_some()))
307    }
308
309    async fn get_tracker_window(
310        &self,
311        ctx: &mut ActorContext<Self>,
312        subject_id: &DigestIdentifier,
313        sender: PublicKey,
314        actual_sn: Option<u64>,
315    ) -> Result<(u64, Option<u64>, bool, Vec<TrackerDeliveryRange>), ActorError>
316    {
317        let data = get_subject_data(ctx, subject_id).await?;
318
319        let Some(SubjectData::Tracker {
320            governance_id,
321            schema_id,
322            namespace,
323            ..
324        }) = data
325        else {
326            return Err(DistributorError::SubjectNotFound.into());
327        };
328
329        let (sn, clear_sn, is_all, ranges) = resolve_tracker_window(
330            ctx,
331            &governance_id,
332            subject_id,
333            sender.clone(),
334            namespace.clone(),
335            schema_id.clone(),
336            actual_sn,
337        )
338        .await?;
339
340        let Some(sn) = sn else {
341            let witness_sn = check_witness_access(
342                ctx,
343                &governance_id,
344                subject_id,
345                sender,
346                namespace,
347                schema_id,
348            )
349            .await?;
350
351            return match (actual_sn, witness_sn) {
352                (Some(actual_sn), Some(witness_sn))
353                    if actual_sn >= witness_sn =>
354                {
355                    Err(DistributorError::ActualSnBiggerThanWitness {
356                        actual_sn,
357                        witness_sn,
358                    }
359                    .into())
360                }
361                _ => Err(DistributorError::SenderNoAccess.into()),
362            };
363        };
364
365        Ok((sn, clear_sn, is_all, ranges))
366    }
367
368    fn tracker_delivery_mode(
369        ranges: &[TrackerDeliveryRange],
370        sn: u64,
371    ) -> Option<TrackerDeliveryMode> {
372        ranges
373            .iter()
374            .find(|range| range.from_sn <= sn && sn <= range.to_sn)
375            .map(|range| range.mode.clone())
376    }
377
378    fn project_tracker_ledger(
379        ledger: Vec<Ledger>,
380        ranges: &[TrackerDeliveryRange],
381    ) -> Result<Vec<Ledger>, ActorError> {
382        let mut projected = Vec::with_capacity(ledger.len());
383
384        for event in ledger {
385            let Some(mode) = Self::tracker_delivery_mode(ranges, event.sn)
386            else {
387                return Err(ActorError::FunctionalCritical {
388                    description: format!(
389                        "Missing tracker delivery range for sn {}",
390                        event.sn
391                    ),
392                });
393            };
394
395            match mode {
396                TrackerDeliveryMode::Clear => projected.push(event),
397                TrackerDeliveryMode::Opaque => projected
398                    .push(event.to_tracker_opaque().map_err(ActorError::from)?),
399            }
400        }
401
402        Ok(projected)
403    }
404
405    async fn check_witness(
406        &self,
407        ctx: &mut ActorContext<Self>,
408        subject_id: &DigestIdentifier,
409        sender: PublicKey,
410    ) -> Result<(u64, bool), ActorError> {
411        let data = get_subject_data(ctx, subject_id).await?;
412
413        let Some(data) = data else {
414            return Err(DistributorError::SubjectNotFound.into());
415        };
416
417        match data {
418            SubjectData::Tracker {
419                governance_id,
420                schema_id,
421                namespace,
422                ..
423            } => {
424                let Some(sn) = check_witness_access(
425                    ctx,
426                    &governance_id,
427                    subject_id,
428                    sender.clone(),
429                    namespace,
430                    schema_id,
431                )
432                .await?
433                else {
434                    return Err(DistributorError::SenderNoAccess.into());
435                };
436
437                Ok((sn, false))
438            }
439            SubjectData::Governance { .. } => {
440                let gov = get_gov(ctx, subject_id).await.map_err(|e| {
441                    DistributorError::GetGovernanceFailed {
442                        details: e.to_string(),
443                    }
444                })?;
445
446                if !gov.has_this_role(HashThisRole::Gov {
447                    who: sender.clone(),
448                    role: RoleTypes::Witness,
449                }) {
450                    return Err(DistributorError::SenderNotMember {
451                        sender: sender.to_string(),
452                    }
453                    .into());
454                }
455
456                Ok((get_gov_sn(ctx, subject_id).await?, true))
457            }
458        }
459    }
460
461    async fn build_last_sn_offer(
462        &self,
463        ctx: &mut ActorContext<Self>,
464        subject_id: &DigestIdentifier,
465        sender: PublicKey,
466        actual_sn: Option<u64>,
467    ) -> Result<UpdateWitnessOffer, ActorError> {
468        let data = get_subject_data(ctx, subject_id).await?;
469        let Some(data) = data else {
470            return Err(DistributorError::SubjectNotFound.into());
471        };
472
473        match data {
474            SubjectData::Tracker { .. } => {
475                let (sn, clear_sn, _, ranges) = self
476                    .get_tracker_window(
477                        ctx,
478                        subject_id,
479                        sender.clone(),
480                        actual_sn,
481                    )
482                    .await?;
483                Ok(UpdateWitnessOffer {
484                    kind: UpdateSubjectKind::Tracker,
485                    sn,
486                    clear_sn,
487                    ranges,
488                })
489            }
490            SubjectData::Governance { .. } => {
491                let (sn, ..) =
492                    self.check_witness(ctx, subject_id, sender.clone()).await?;
493                Ok(UpdateWitnessOffer {
494                    kind: UpdateSubjectKind::Governance,
495                    sn,
496                    clear_sn: None,
497                    ranges: Vec::new(),
498                })
499            }
500        }
501    }
502
503    async fn build_distribution_batch(
504        &self,
505        ctx: &mut ActorContext<Self>,
506        subject_id: &DigestIdentifier,
507        sender: PublicKey,
508        actual_sn: Option<u64>,
509        target_sn: Option<u64>,
510    ) -> Result<(Vec<Ledger>, bool, u64), ActorError> {
511        let data = get_subject_data(ctx, subject_id).await?;
512        let Some(data) = data else {
513            return Err(DistributorError::SubjectNotFound.into());
514        };
515
516        match data {
517            SubjectData::Tracker { .. } => {
518                let (window_sn, clear_sn, _, ranges) = self
519                    .get_tracker_window(ctx, subject_id, sender, actual_sn)
520                    .await?;
521
522                if let Some(actual_sn) = actual_sn
523                    && actual_sn >= window_sn
524                {
525                    return Err(DistributorError::ActualSnBiggerThanWitness {
526                        actual_sn,
527                        witness_sn: window_sn,
528                    }
529                    .into());
530                }
531
532                let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
533                let batch_hi_sn = from_sn
534                    .saturating_add(self.ledger_batch_size)
535                    .saturating_sub(1)
536                    .min(window_sn);
537                let preferred_hi_sn = clear_sn
538                    .filter(|clear_sn| {
539                        actual_sn.is_none_or(|actual_sn| *clear_sn > actual_sn)
540                    })
541                    .unwrap_or(window_sn);
542                let preferred_hi_sn = if from_sn == 0
543                    && preferred_hi_sn == 0
544                    && window_sn > 0
545                {
546                    window_sn
547                } else {
548                    preferred_hi_sn
549                };
550                let hi_sn = target_sn
551                    .unwrap_or(preferred_hi_sn)
552                    .min(preferred_hi_sn)
553                    .min(batch_hi_sn);
554
555                let (ledger, raw_is_all) = self
556                    .get_ledger(ctx, subject_id, hi_sn, actual_sn, false)
557                    .await?;
558
559                let ledger = Self::project_tracker_ledger(ledger, &ranges)?;
560                let is_all = raw_is_all && hi_sn == window_sn;
561                Ok((ledger, is_all, hi_sn))
562            }
563            SubjectData::Governance { .. } => {
564                let (witness_hi_sn, ..) =
565                    self.check_witness(ctx, subject_id, sender).await?;
566
567                if let Some(actual_sn) = actual_sn
568                    && actual_sn >= witness_hi_sn
569                {
570                    return Err(DistributorError::ActualSnBiggerThanWitness {
571                        actual_sn,
572                        witness_sn: witness_hi_sn,
573                    }
574                    .into());
575                }
576
577                let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
578                let batch_hi_sn = from_sn
579                    .saturating_add(self.ledger_batch_size)
580                    .saturating_sub(1)
581                    .min(witness_hi_sn);
582                let batch_hi_sn =
583                    target_sn.unwrap_or(batch_hi_sn).min(batch_hi_sn);
584
585                let (ledger, raw_is_all) = self
586                    .get_ledger(ctx, subject_id, batch_hi_sn, actual_sn, true)
587                    .await?;
588
589                let is_all = raw_is_all && batch_hi_sn == witness_hi_sn;
590                Ok((ledger, is_all, batch_hi_sn))
591            }
592        }
593    }
594
595    async fn request_ledger_from_sender(
596        &self,
597        subject_id: &DigestIdentifier,
598        sender: PublicKey,
599        info: &ComunicateInfo,
600        actual_sn: Option<u64>,
601    ) -> Result<(), ActorError> {
602        let new_info = self.build_response_info(
603            sender,
604            info,
605            format!("/user/node/distributor_{}", subject_id),
606        );
607
608        self.send_network_message(
609            new_info,
610            ActorMessage::DistributionLedgerReq {
611                actual_sn,
612                target_sn: None,
613                subject_id: subject_id.clone(),
614            },
615        )
616        .await
617    }
618
619    async fn handle_get_last_sn(
620        &self,
621        ctx: &mut ActorContext<Self>,
622        subject_id: DigestIdentifier,
623        actual_sn: Option<u64>,
624        info: ComunicateInfo,
625        sender: PublicKey,
626        receiver_actor: String,
627    ) -> Result<(), ActorError> {
628        let offer = self
629            .build_last_sn_offer(ctx, &subject_id, sender.clone(), actual_sn)
630            .await?;
631        let new_info =
632            self.build_response_info(sender.clone(), &info, receiver_actor);
633
634        self.send_network_message(
635            new_info,
636            ActorMessage::UpdateOffer {
637                offer: offer.clone(),
638            },
639        )
640        .await?;
641
642        debug!(
643            msg_type = "GetLastSn",
644            subject_id = %subject_id,
645            sn = offer.sn,
646            clear_sn = ?offer.clear_sn,
647            sender = %sender,
648            "Last SN response sent successfully"
649        );
650
651        Ok(())
652    }
653
654    async fn handle_get_governance_version(
655        &self,
656        ctx: &mut ActorContext<Self>,
657        subject_id: DigestIdentifier,
658        info: ComunicateInfo,
659        sender: PublicKey,
660        receiver_actor: String,
661    ) -> Result<(), ActorError> {
662        let version = self.get_governance_version(ctx, &subject_id).await?;
663        let new_info =
664            self.build_response_info(sender.clone(), &info, receiver_actor);
665
666        self.send_network_message(
667            new_info,
668            ActorMessage::GovernanceVersionRes { version },
669        )
670        .await?;
671
672        Ok(())
673    }
674
675    async fn handle_send_distribution(
676        &self,
677        ctx: &mut ActorContext<Self>,
678        actual_sn: Option<u64>,
679        target_sn: Option<u64>,
680        info: ComunicateInfo,
681        subject_id: DigestIdentifier,
682        sender: PublicKey,
683    ) -> Result<(), ActorError> {
684        let (ledger, is_all, hi_sn) = self
685            .build_distribution_batch(
686                ctx,
687                &subject_id,
688                sender.clone(),
689                actual_sn,
690                target_sn,
691            )
692            .await?;
693
694        let new_info = self.build_response_info(
695            sender.clone(),
696            &info,
697            format!("/user/node/distributor_{}", subject_id),
698        );
699
700        self.send_network_message(
701            new_info,
702            ActorMessage::DistributionLedgerRes {
703                ledger: ledger.clone(),
704                is_all,
705            },
706        )
707        .await?;
708
709        debug!(
710            msg_type = "SendDistribution",
711            subject_id = %subject_id,
712            sender = %sender,
713            ledger_count = ledger.len(),
714            is_all = is_all,
715            hi_sn = hi_sn,
716            actual_sn = ?actual_sn,
717            "Ledger distribution sent successfully"
718        );
719
720        Ok(())
721    }
722}
723
724#[async_trait]
725impl Actor for DistriWorker {
726    type Event = ();
727    type Message = DistriWorkerMessage;
728    type Response = ();
729
730    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
731        parent_span.map_or_else(
732            || info_span!("DistriWorker", id),
733            |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
734        )
735    }
736}
737
738#[derive(Debug, Clone)]
739pub enum DistriWorkerMessage {
740    GetLastSn {
741        subject_id: DigestIdentifier,
742        actual_sn: Option<u64>,
743        info: ComunicateInfo,
744        sender: PublicKey,
745        receiver_actor: String,
746    },
747    GetGovernanceVersion {
748        subject_id: DigestIdentifier,
749        info: ComunicateInfo,
750        sender: PublicKey,
751        receiver_actor: String,
752    },
753    // Un nodo nos solicitó la copia del ledger.
754    SendDistribution {
755        actual_sn: Option<u64>,
756        target_sn: Option<u64>,
757        subject_id: DigestIdentifier,
758        info: ComunicateInfo,
759        sender: PublicKey,
760    },
761    // Nos llega una replica, guardarla en informar que la hemos recivido
762    LastEventDistribution {
763        ledger: Box<Ledger>,
764        info: ComunicateInfo,
765        sender: PublicKey,
766    },
767    LedgerDistribution {
768        ledger: Vec<Ledger>,
769        is_all: bool,
770        info: ComunicateInfo,
771        sender: PublicKey,
772    },
773}
774
775impl Message for DistriWorkerMessage {}
776
777impl NotPersistentActor for DistriWorker {}
778
779#[async_trait]
780impl Handler<Self> for DistriWorker {
781    async fn handle_message(
782        &mut self,
783        _sender: ActorPath,
784        msg: DistriWorkerMessage,
785        ctx: &mut ActorContext<Self>,
786    ) -> Result<(), ActorError> {
787        match msg {
788            DistriWorkerMessage::GetLastSn {
789                subject_id,
790                actual_sn,
791                info,
792                sender,
793                receiver_actor,
794            } => match self
795                .handle_get_last_sn(
796                    ctx,
797                    subject_id.clone(),
798                    actual_sn,
799                    info.clone(),
800                    sender.clone(),
801                    receiver_actor.clone(),
802                )
803                .await
804            {
805                Ok(()) => {}
806                Err(e) => {
807                    if let ActorError::Functional { .. } = e {
808                        warn!(
809                            msg_type = "GetLastSn",
810                            subject_id = %subject_id,
811                            sender = %sender,
812                            error = %e,
813                            "Witness check failed"
814                        );
815                        self.send_no_offer_response(
816                            &info,
817                            sender.clone(),
818                            receiver_actor,
819                        )
820                        .await?;
821                        return Ok(());
822                    } else {
823                        error!(
824                            msg_type = "GetLastSn",
825                            subject_id = %subject_id,
826                            sender = %sender,
827                            error = %e,
828                            "Witness check failed"
829                        );
830                        return Err(emit_fail(ctx, e).await);
831                    }
832                }
833            },
834            DistriWorkerMessage::GetGovernanceVersion {
835                subject_id,
836                info,
837                sender,
838                receiver_actor,
839            } => match self
840                .handle_get_governance_version(
841                    ctx,
842                    subject_id.clone(),
843                    info,
844                    sender.clone(),
845                    receiver_actor,
846                )
847                .await
848            {
849                Ok(()) => {}
850                Err(e) => {
851                    if let ActorError::Functional { .. } = e {
852                        warn!(
853                            msg_type = "GetGovernanceVersion",
854                            subject_id = %subject_id,
855                            sender = %sender,
856                            error = %e,
857                            "Subject is not a governance"
858                        );
859                        return Err(e);
860                    } else {
861                        error!(
862                            msg_type = "GetGovernanceVersion",
863                            subject_id = %subject_id,
864                            sender = %sender,
865                            error = %e,
866                            "Failed to send governance version response to network"
867                        );
868                        return Err(emit_fail(ctx, e).await);
869                    }
870                }
871            },
872            DistriWorkerMessage::SendDistribution {
873                actual_sn,
874                target_sn,
875                info,
876                subject_id,
877                sender,
878            } => match self
879                .handle_send_distribution(
880                    ctx,
881                    actual_sn,
882                    target_sn,
883                    info,
884                    subject_id.clone(),
885                    sender.clone(),
886                )
887                .await
888            {
889                Ok(()) => {}
890                Err(e) => {
891                    if let ActorError::Functional { .. } = e {
892                        warn!(
893                            msg_type = "SendDistribution",
894                            subject_id = %subject_id,
895                            sender = %sender,
896                            error = %e,
897                            "Witness check failed"
898                        );
899                        return Err(e);
900                    } else {
901                        error!(
902                            msg_type = "SendDistribution",
903                            subject_id = %subject_id,
904                            sender = %sender,
905                            error = %e,
906                            "Failed to send ledger response to network"
907                        );
908                        return Err(emit_fail(ctx, e).await);
909                    }
910                }
911            },
912            DistriWorkerMessage::LastEventDistribution {
913                ledger,
914                info,
915                sender,
916            } => {
917                let subject_id = ledger.get_subject_id();
918                let sn = ledger.sn;
919
920                let (is_gov, ..) =
921                    match self
922                        .check_auth(ctx, sender.clone(), &info, &ledger)
923                        .await
924                    {
925                        Ok(is_gov) => is_gov,
926                        Err(e) => {
927                            if let ActorError::Functional { .. } = e {
928                                warn!(
929                                    msg_type = "LastEventDistribution",
930                                    subject_id = %subject_id,
931                                    sn = sn,
932                                    sender = %sender,
933                                    error = %e,
934                                    "Authorization check failed"
935                                );
936                                return Err(e);
937                            } else {
938                                error!(
939                                    msg_type = "LastEventDistribution",
940                                    subject_id = %subject_id,
941                                    sn = sn,
942                                    sender = %sender,
943                                    error = %e,
944                                    "Authorization check failed"
945                                );
946                                return Err(emit_fail(ctx, e).await);
947                            }
948                        }
949                    };
950
951                let lease = if ledger.is_create_event() {
952                    if let Err(e) = create_subject(ctx, *ledger.clone()).await {
953                        if let ActorError::Functional { .. } = e {
954                            warn!(
955                                msg_type = "LastEventDistribution",
956                                subject_id = %subject_id,
957                                sn = sn,
958                                error = %e,
959                                "Failed to create subject from create event"
960                            );
961                            return Err(e);
962                        } else {
963                            error!(
964                                msg_type = "LastEventDistribution",
965                                subject_id = %subject_id,
966                                sn = sn,
967                                error = %e,
968                                "Failed to create subject from create event"
969                            );
970                            return Err(emit_fail(ctx, e).await);
971                        }
972                    };
973
974                    None
975                } else {
976                    let requester = Self::requester_id(
977                        "last_event_distribution",
978                        &subject_id,
979                        &info,
980                        &sender,
981                    );
982                    let lease = if !is_gov {
983                        match acquire_subject(
984                            ctx,
985                            &subject_id,
986                            requester.clone(),
987                            None,
988                            true,
989                        )
990                        .await
991                        {
992                            Ok(lease) => Some(lease),
993                            Err(e) => {
994                                error!(
995                                    msg_type = "LastEventDistribution",
996                                    subject_id = %subject_id,
997                                    error = %e,
998                                    "Failed to bring up tracker for subject update"
999                                );
1000                                let error = DistributorError::UpTrackerFailed {
1001                                    details: e.to_string(),
1002                                };
1003                                return Err(emit_fail(ctx, error.into()).await);
1004                            }
1005                        }
1006                    } else {
1007                        None
1008                    };
1009
1010                    let update_result =
1011                        update_ledger(ctx, &subject_id, vec![*ledger.clone()])
1012                            .await;
1013
1014                    if let Some(lease) = lease.clone()
1015                        && update_result.is_err()
1016                    {
1017                        lease.finish(ctx).await?;
1018                    }
1019
1020                    match update_result {
1021                        Ok((last_sn, _, _)) if last_sn < ledger.sn => {
1022                            debug!(
1023                                msg_type = "LastEventDistribution",
1024                                subject_id = %subject_id,
1025                                last_sn = last_sn,
1026                                received_sn = sn,
1027                                "SN gap detected, requesting update"
1028                            );
1029
1030                            if let Err(e) = self
1031                                .request_ledger_from_sender(
1032                                    &subject_id,
1033                                    sender.clone(),
1034                                    &info,
1035                                    Some(last_sn),
1036                                )
1037                                .await
1038                            {
1039                                error!(
1040                                    msg_type = "LastEventDistribution",
1041                                    subject_id = %subject_id,
1042                                    last_sn = last_sn,
1043                                    error = %e,
1044                                    "Failed to request ledger from network"
1045                                );
1046                                return Err(emit_fail(ctx, e).await);
1047                            }
1048
1049                            if let Some(lease) = lease.clone() {
1050                                lease.finish(ctx).await?;
1051                            }
1052
1053                            return Ok(());
1054                        }
1055                        Ok((..)) => lease,
1056                        Err(e) => {
1057                            if let ActorError::Functional { .. } = e.clone() {
1058                                warn!(
1059                                    msg_type = "LastEventDistribution",
1060                                    subject_id = %subject_id,
1061                                    sn = sn,
1062                                    error = %e,
1063                                    "Failed to update subject ledger"
1064                                );
1065                                return Err(e);
1066                            } else {
1067                                error!(
1068                                    msg_type = "LastEventDistribution",
1069                                    subject_id = %subject_id,
1070                                    sn = sn,
1071                                    error = %e,
1072                                    "Failed to update subject ledger"
1073                                );
1074                                return Err(emit_fail(ctx, e).await);
1075                            }
1076                        }
1077                    }
1078                };
1079
1080                let new_info = self.build_response_info(
1081                    sender.clone(),
1082                    &info,
1083                    format!(
1084                        "/user/{}/{}",
1085                        info.request_id,
1086                        info.receiver.clone()
1087                    ),
1088                );
1089
1090                if let Err(e) = self
1091                    .send_network_message(
1092                        new_info,
1093                        ActorMessage::DistributionLastEventRes,
1094                    )
1095                    .await
1096                {
1097                    error!(
1098                        msg_type = "LastEventDistribution",
1099                        subject_id = %subject_id,
1100                        sn = sn,
1101                        error = %e,
1102                        "Failed to send distribution acknowledgment"
1103                    );
1104                    return Err(emit_fail(ctx, e).await);
1105                };
1106
1107                if let Some(lease) = lease {
1108                    lease.finish(ctx).await?;
1109                }
1110
1111                debug!(
1112                    msg_type = "LastEventDistribution",
1113                    subject_id = %subject_id,
1114                    sn = sn,
1115                    sender = %sender,
1116                    is_gov = is_gov,
1117                    "Last event distribution processed successfully"
1118                );
1119            }
1120            DistriWorkerMessage::LedgerDistribution {
1121                mut ledger,
1122                is_all,
1123                info,
1124                sender,
1125            } => {
1126                if ledger.is_empty() {
1127                    warn!(
1128                        msg_type = "LedgerDistribution",
1129                        sender = %sender,
1130                        "Received empty ledger distribution"
1131                    );
1132                    return Err(DistributorError::EmptyEvents.into());
1133                }
1134
1135                let subject_id = ledger[0].get_subject_id();
1136                let ledger_count = ledger.len();
1137                let first_sn = ledger[0].sn;
1138                let (is_gov, is_register) = match self
1139                    .check_auth(ctx, sender.clone(), &info, &ledger[0])
1140                    .await
1141                {
1142                    Ok(data) => data,
1143                    Err(e) => {
1144                        if let ActorError::Functional { .. } = e {
1145                            warn!(
1146                                msg_type = "LedgerDistribution",
1147                                subject_id = %subject_id,
1148                                sender = %sender,
1149                                ledger_count = ledger_count,
1150                                error = %e,
1151                                "Authorization check failed"
1152                            );
1153                            return Err(e);
1154                        } else {
1155                            error!(
1156                                msg_type = "LedgerDistribution",
1157                                subject_id = %subject_id,
1158                                sender = %sender,
1159                                ledger_count = ledger_count,
1160                                error = %e,
1161                                "Authorization check failed"
1162                            );
1163                            return Err(emit_fail(ctx, e).await);
1164                        }
1165                    }
1166                };
1167
1168                let lease = if ledger[0].is_create_event() && !is_register {
1169                    let create_ledger = ledger[0].clone();
1170                    let requester = Self::requester_id(
1171                        "ledger_distribution_create",
1172                        &subject_id,
1173                        &info,
1174                        &sender,
1175                    );
1176
1177                    let lease = if is_gov {
1178                        if let Err(e) =
1179                            create_subject(ctx, create_ledger.clone()).await
1180                        {
1181                            if let ActorError::Functional { .. } = e {
1182                                warn!(
1183                                    msg_type = "LedgerDistribution",
1184                                    subject_id = %subject_id,
1185                                    error = %e,
1186                                    "Failed to create subject from ledger"
1187                                );
1188                                return Err(e);
1189                            } else {
1190                                error!(
1191                                    msg_type = "LedgerDistribution",
1192                                    subject_id = %subject_id,
1193                                    error = %e,
1194                                    "Failed to create subject from ledger"
1195                                );
1196                                return Err(emit_fail(ctx, e).await);
1197                            }
1198                        };
1199                        None
1200                    } else {
1201                        let request = create_ledger
1202                            .get_create_event()
1203                            .ok_or_else(|| {
1204                                error!(
1205                                    msg_type = "LedgerDistribution",
1206                                    subject_id = %subject_id,
1207                                    "Create ledger is missing create event payload"
1208                                );
1209                                DistributorError::MissingCreateEventInCreateLedger {
1210                                    subject_id: subject_id.clone(),
1211                                }
1212                            })?;
1213
1214                        if let Err(e) = check_subject_creation(
1215                            ctx,
1216                            &request.governance_id,
1217                            create_ledger.ledger_seal_signature.signer.clone(),
1218                            create_ledger.gov_version,
1219                            request.namespace.to_string(),
1220                            request.schema_id,
1221                        )
1222                        .await
1223                        {
1224                            if let ActorError::Functional { .. } = e {
1225                                warn!(
1226                                    msg_type = "LedgerDistribution",
1227                                    subject_id = %subject_id,
1228                                    error = %e,
1229                                    "Failed to validate subject creation from ledger"
1230                                );
1231                                return Err(e);
1232                            } else {
1233                                error!(
1234                                    msg_type = "LedgerDistribution",
1235                                    subject_id = %subject_id,
1236                                    error = %e,
1237                                    "Failed to validate subject creation from ledger"
1238                                );
1239                                return Err(emit_fail(ctx, e).await);
1240                            }
1241                        }
1242
1243                        match acquire_subject(
1244                            ctx,
1245                            &subject_id,
1246                            requester,
1247                            Some(create_ledger),
1248                            true,
1249                        )
1250                        .await
1251                        {
1252                            Ok(lease) => Some(lease),
1253                            Err(e) => {
1254                                if let ActorError::Functional { .. } = e {
1255                                    warn!(
1256                                        msg_type = "LedgerDistribution",
1257                                        subject_id = %subject_id,
1258                                        error = %e,
1259                                        "Failed to create subject from ledger"
1260                                    );
1261                                    return Err(e);
1262                                } else {
1263                                    error!(
1264                                        msg_type = "LedgerDistribution",
1265                                        subject_id = %subject_id,
1266                                        error = %e,
1267                                        "Failed to create subject from ledger"
1268                                    );
1269                                    return Err(emit_fail(ctx, e).await);
1270                                }
1271                            }
1272                        }
1273                    };
1274
1275                    let _event = ledger.remove(0);
1276                    lease
1277                } else {
1278                    if ledger[0].is_create_event() && is_register {
1279                        let _event = ledger.remove(0);
1280                    }
1281
1282                    let requester = Self::requester_id(
1283                        "ledger_distribution",
1284                        &subject_id,
1285                        &info,
1286                        &sender,
1287                    );
1288                    if !ledger.is_empty() && !is_gov {
1289                        match acquire_subject(
1290                            ctx,
1291                            &subject_id,
1292                            requester.clone(),
1293                            None,
1294                            true,
1295                        )
1296                        .await
1297                        {
1298                            Ok(lease) => Some(lease),
1299                            Err(e) => {
1300                                error!(
1301                                    msg_type = "LedgerDistribution",
1302                                    subject_id = %subject_id,
1303                                    error = %e,
1304                                    "Failed to bring up tracker for subject update"
1305                                );
1306                                let error = DistributorError::UpTrackerFailed {
1307                                    details: e.to_string(),
1308                                };
1309                                return Err(emit_fail(ctx, error.into()).await);
1310                            }
1311                        }
1312                    } else {
1313                        None
1314                    }
1315                };
1316
1317                let lease = if !ledger.is_empty() {
1318                    let update_result =
1319                        update_ledger(ctx, &subject_id, ledger).await;
1320
1321                    if let Some(lease) = lease.clone()
1322                        && update_result.is_err()
1323                    {
1324                        lease.finish(ctx).await?;
1325                    }
1326
1327                    match update_result {
1328                        Ok((last_sn, _, _)) => {
1329                            if !is_all {
1330                                debug!(
1331                                    msg_type = "LedgerDistribution",
1332                                    subject_id = %subject_id,
1333                                    last_sn = last_sn,
1334                                    "Partial ledger received, requesting more"
1335                                );
1336
1337                                if let Err(e) = self
1338                                    .request_ledger_from_sender(
1339                                        &subject_id,
1340                                        sender.clone(),
1341                                        &info,
1342                                        Some(last_sn),
1343                                    )
1344                                    .await
1345                                {
1346                                    error!(
1347                                        msg_type = "LedgerDistribution",
1348                                        subject_id = %subject_id,
1349                                        last_sn = last_sn,
1350                                        error = %e,
1351                                        "Failed to request more ledger entries"
1352                                    );
1353                                    return Err(emit_fail(ctx, e).await);
1354                                };
1355                            }
1356
1357                            lease
1358                        }
1359                        Err(e) => {
1360                            if let ActorError::Functional { .. } = e.clone() {
1361                                warn!(
1362                                    msg_type = "LedgerDistribution",
1363                                    subject_id = %subject_id,
1364                                    first_sn = first_sn,
1365                                    ledger_count = ledger_count,
1366                                    error = %e,
1367                                    "Failed to update subject ledger"
1368                                );
1369                                return Err(e);
1370                            } else {
1371                                error!(
1372                                    msg_type = "LedgerDistribution",
1373                                    subject_id = %subject_id,
1374                                    first_sn = first_sn,
1375                                    ledger_count = ledger_count,
1376                                    error = %e,
1377                                    "Failed to update subject ledger"
1378                                );
1379                                return Err(emit_fail(ctx, e).await);
1380                            }
1381                        }
1382                    }
1383                } else {
1384                    lease
1385                };
1386
1387                if let Some(lease) = lease {
1388                    lease.finish(ctx).await?;
1389                }
1390
1391                debug!(
1392                    msg_type = "LedgerDistribution",
1393                    subject_id = %subject_id,
1394                    sender = %sender,
1395                    ledger_count = ledger_count,
1396                    is_all = is_all,
1397                    is_gov = is_gov,
1398                    "Ledger distribution processed successfully"
1399                );
1400            }
1401        };
1402
1403        Ok(())
1404    }
1405}