Skip to main content

ave_core/distribution/
worker.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    SchemaType,
10    identity::{DigestIdentifier, PublicKey},
11};
12use ave_network::ComunicateInfo;
13
14use crate::{
15    ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
16    governance::{
17        Governance, GovernanceMessage, GovernanceResponse,
18        model::{HashThisRole, RoleTypes},
19        witnesses_register::{TrackerDeliveryMode, TrackerDeliveryRange},
20    },
21    helpers::network::service::NetworkSender,
22    model::{
23        common::{
24            check_subject_creation, check_witness_access, emit_fail,
25            node::get_subject_data,
26            subject::{
27                acquire_subject, create_subject, get_gov, get_gov_sn,
28                get_tracker_window as resolve_tracker_window, update_ledger,
29            },
30        },
31        event::Ledger,
32    },
33    node::SubjectData,
34    tracker::{Tracker, TrackerMessage, TrackerResponse},
35    update::{UpdateSubjectKind, UpdateWitnessOffer},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43    pub our_key: Arc<PublicKey>,
44    pub network: Arc<NetworkSender>,
45    pub ledger_batch_size: u64,
46}
47
48impl DistriWorker {
49    fn requester_id(
50        kind: &str,
51        subject_id: &DigestIdentifier,
52        info: &ComunicateInfo,
53        sender: &PublicKey,
54    ) -> String {
55        format!(
56            "{kind}:{subject_id}:{sender}:{}:{}",
57            info.request_id, info.version
58        )
59    }
60
61    async fn get_ledger(
62        &self,
63        ctx: &mut ActorContext<Self>,
64        subject_id: &DigestIdentifier,
65        hi_sn: u64,
66        lo_sn: Option<u64>,
67        is_gov: bool,
68    ) -> Result<(Vec<Ledger>, bool), ActorError> {
69        let path = ActorPath::from(format!(
70            "/user/node/subject_manager/{}",
71            subject_id
72        ));
73
74        if is_gov {
75            let governance_actor =
76                ctx.system().get_actor::<Governance>(&path).await?;
77
78            let response = governance_actor
79                .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
80                .await?;
81
82            match response {
83                GovernanceResponse::Ledger { ledger, is_all } => {
84                    Ok((ledger, is_all))
85                }
86                _ => Err(ActorError::UnexpectedResponse {
87                    expected: "GovernanceResponse::Ledger".to_owned(),
88                    path,
89                }),
90            }
91        } else {
92            let lease = acquire_subject(
93                ctx,
94                subject_id,
95                format!("send_distribution:{subject_id}"),
96                None,
97                true,
98            )
99            .await?;
100            let tracker_actor =
101                ctx.system().get_actor::<Tracker>(&path).await?;
102            let response = tracker_actor
103                .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
104                .await;
105            lease.finish(ctx).await?;
106            let response = response?;
107
108            match response {
109                TrackerResponse::Ledger { ledger, is_all } => {
110                    Ok((ledger, is_all))
111                }
112                _ => Err(ActorError::UnexpectedResponse {
113                    expected: "TrackerResponse::Ledger".to_owned(),
114                    path,
115                }),
116            }
117        }
118    }
119
120    fn build_response_info(
121        &self,
122        sender: PublicKey,
123        info: &ComunicateInfo,
124        receiver_actor: String,
125    ) -> ComunicateInfo {
126        ComunicateInfo {
127            receiver: sender,
128            request_id: info.request_id.clone(),
129            version: info.version,
130            receiver_actor,
131        }
132    }
133
134    async fn send_network_message(
135        &self,
136        info: ComunicateInfo,
137        message: ActorMessage,
138    ) -> Result<(), ActorError> {
139        self.network
140            .send_command(ave_network::CommandHelper::SendMessage {
141                message: NetworkMessage { info, message },
142            })
143            .await
144    }
145
146    async fn send_no_offer_response(
147        &self,
148        info: &ComunicateInfo,
149        sender: PublicKey,
150        receiver_actor: String,
151    ) -> Result<(), ActorError> {
152        let new_info = self.build_response_info(sender, info, receiver_actor);
153        self.send_network_message(new_info, ActorMessage::UpdateNoOffer)
154            .await
155    }
156
157    async fn get_governance_version(
158        &self,
159        ctx: &mut ActorContext<Self>,
160        subject_id: &DigestIdentifier,
161    ) -> Result<u64, ActorError> {
162        let data = get_subject_data(ctx, subject_id).await?;
163        let Some(SubjectData::Governance { .. }) = data else {
164            return Err(DistributorError::SubjectNotFound.into());
165        };
166
167        let governance_path = ActorPath::from(format!(
168            "/user/node/subject_manager/{}",
169            subject_id
170        ));
171        let governance_actor = ctx
172            .system()
173            .get_actor::<Governance>(&governance_path)
174            .await?;
175        let response =
176            governance_actor.ask(GovernanceMessage::GetVersion).await?;
177        let GovernanceResponse::Version(version) = response else {
178            return Err(ActorError::UnexpectedResponse {
179                path: governance_path,
180                expected: "GovernanceResponse::Version".to_owned(),
181            });
182        };
183
184        Ok(version)
185    }
186
187    async fn authorized_subj(
188        &self,
189        ctx: &ActorContext<Self>,
190        subject_id: &DigestIdentifier,
191    ) -> Result<(bool, Option<SubjectData>), ActorError> {
192        let node_path = ActorPath::from("/user/node");
193        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
194
195        let response = node_actor
196            .ask(NodeMessage::AuthData(subject_id.to_owned()))
197            .await?;
198        match response {
199            NodeResponse::AuthData { auth, subject_data } => {
200                Ok((auth, subject_data))
201            }
202            _ => Err(ActorError::UnexpectedResponse {
203                expected: "NodeResponse::AuthData".to_owned(),
204                path: node_path,
205            }),
206        }
207    }
208
209    async fn check_auth(
210        &self,
211        ctx: &mut ActorContext<Self>,
212        sender: PublicKey,
213        info: &ComunicateInfo,
214        ledger: &Ledger,
215    ) -> Result<(bool, bool), ActorError> {
216        let subject_id = ledger.get_subject_id();
217        // Si está auth o si soy el dueño del sujeto.
218        let (auth, subject_data) =
219            self.authorized_subj(ctx, &subject_id).await?;
220
221        // Extraer schema_id y governance_id según si conocemos el sujeto o no
222        let (schema_id, governance_id) = if let Some(ref data) = subject_data {
223            // Lo conozco
224            match data {
225                SubjectData::Tracker {
226                    governance_id,
227                    schema_id,
228                    ..
229                } => (schema_id.clone(), Some(governance_id.clone())),
230                SubjectData::Governance { .. } => {
231                    (SchemaType::Governance, None)
232                }
233            }
234        } else {
235            // No lo conozco - debe ser evento Create
236            if let Some(create) = ledger.get_create_event() {
237                if !create.schema_id.is_gov() && create.governance_id.is_empty()
238                {
239                    return Err(
240                        DistributorError::MissingGovernanceIdInCreate {
241                            subject_id: subject_id.clone(),
242                        }
243                        .into(),
244                    );
245                }
246
247                let gov_id = if create.schema_id.is_gov() {
248                    None
249                } else {
250                    Some(create.governance_id.clone())
251                };
252
253                (create.schema_id, gov_id)
254            } else {
255                // No es el primer evento: pedir el histórico directamente
256                // al sender que nos acaba de escribir.
257                self.request_ledger_from_sender(
258                    &subject_id,
259                    sender.clone(),
260                    info,
261                    None,
262                )
263                .await?;
264                return Err(DistributorError::UpdatingSubject.into());
265            }
266        };
267
268        let is_gov = schema_id.is_gov();
269        // Verificar autorización
270        if is_gov {
271            // Es una gobernanza
272            if !auth {
273                return Err(DistributorError::GovernanceNotAuthorized.into());
274            }
275        } else {
276            // Es un Tracker - verificar rol de witness si no está autorizado
277            let Some(governance_id) = governance_id else {
278                error!(
279                    subject_id = %subject_id,
280                    "Tracker subject is missing governance_id during authorization check"
281                );
282                return Err(DistributorError::MissingGovernanceId {
283                    subject_id: subject_id.clone(),
284                }
285                .into());
286            };
287            let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
288                DistributorError::GetGovernanceFailed {
289                    details: e.to_string(),
290                }
291            })?;
292
293            match gov.version.cmp(&ledger.gov_version) {
294                std::cmp::Ordering::Less => {
295                    return Err(DistributorError::GovernanceVersionMismatch {
296                        our_version: gov.version,
297                        their_version: ledger.gov_version,
298                    }
299                    .into());
300                }
301                std::cmp::Ordering::Equal => {}
302                std::cmp::Ordering::Greater => {}
303            }
304        }
305
306        Ok((is_gov, subject_data.is_some()))
307    }
308
309    async fn get_tracker_window(
310        &self,
311        ctx: &mut ActorContext<Self>,
312        subject_id: &DigestIdentifier,
313        sender: PublicKey,
314        actual_sn: Option<u64>,
315    ) -> Result<(u64, Option<u64>, bool, Vec<TrackerDeliveryRange>), ActorError>
316    {
317        let data = get_subject_data(ctx, subject_id).await?;
318
319        let Some(SubjectData::Tracker {
320            governance_id,
321            schema_id,
322            namespace,
323            ..
324        }) = data
325        else {
326            return Err(DistributorError::SubjectNotFound.into());
327        };
328
329        let (sn, clear_sn, is_all, ranges) = resolve_tracker_window(
330            ctx,
331            &governance_id,
332            subject_id,
333            sender.clone(),
334            namespace.clone(),
335            schema_id.clone(),
336            actual_sn,
337        )
338        .await?;
339
340        let Some(sn) = sn else {
341            let witness_sn = check_witness_access(
342                ctx,
343                &governance_id,
344                subject_id,
345                sender,
346                namespace,
347                schema_id,
348            )
349            .await?;
350
351            return match (actual_sn, witness_sn) {
352                (Some(actual_sn), Some(witness_sn))
353                    if actual_sn >= witness_sn =>
354                {
355                    Err(DistributorError::ActualSnBiggerThanWitness {
356                        actual_sn,
357                        witness_sn,
358                    }
359                    .into())
360                }
361                _ => Err(DistributorError::SenderNoAccess.into()),
362            };
363        };
364
365        Ok((sn, clear_sn, is_all, ranges))
366    }
367
368    fn tracker_delivery_mode(
369        ranges: &[TrackerDeliveryRange],
370        sn: u64,
371    ) -> Option<TrackerDeliveryMode> {
372        ranges
373            .iter()
374            .find(|range| range.from_sn <= sn && sn <= range.to_sn)
375            .map(|range| range.mode.clone())
376    }
377
378    fn project_tracker_ledger(
379        ledger: Vec<Ledger>,
380        ranges: &[TrackerDeliveryRange],
381    ) -> Result<Vec<Ledger>, ActorError> {
382        let mut projected = Vec::with_capacity(ledger.len());
383
384        for event in ledger {
385            let Some(mode) = Self::tracker_delivery_mode(ranges, event.sn)
386            else {
387                return Err(ActorError::FunctionalCritical {
388                    description: format!(
389                        "Missing tracker delivery range for sn {}",
390                        event.sn
391                    ),
392                });
393            };
394
395            match mode {
396                TrackerDeliveryMode::Clear => projected.push(event),
397                TrackerDeliveryMode::Opaque => projected
398                    .push(event.to_tracker_opaque().map_err(ActorError::from)?),
399            }
400        }
401
402        Ok(projected)
403    }
404
405    async fn check_witness(
406        &self,
407        ctx: &mut ActorContext<Self>,
408        subject_id: &DigestIdentifier,
409        sender: PublicKey,
410    ) -> Result<(u64, bool), ActorError> {
411        let data = get_subject_data(ctx, subject_id).await?;
412
413        let Some(data) = data else {
414            return Err(DistributorError::SubjectNotFound.into());
415        };
416
417        match data {
418            SubjectData::Tracker {
419                governance_id,
420                schema_id,
421                namespace,
422                ..
423            } => {
424                let Some(sn) = check_witness_access(
425                    ctx,
426                    &governance_id,
427                    subject_id,
428                    sender.clone(),
429                    namespace,
430                    schema_id,
431                )
432                .await?
433                else {
434                    return Err(DistributorError::SenderNoAccess.into());
435                };
436
437                Ok((sn, false))
438            }
439            SubjectData::Governance { .. } => {
440                let gov = get_gov(ctx, subject_id).await.map_err(|e| {
441                    DistributorError::GetGovernanceFailed {
442                        details: e.to_string(),
443                    }
444                })?;
445
446                if !gov.has_this_role(HashThisRole::Gov {
447                    who: sender.clone(),
448                    role: RoleTypes::Witness,
449                }) {
450                    return Err(DistributorError::SenderNotMember {
451                        sender: sender.to_string(),
452                    }
453                    .into());
454                }
455
456                Ok((get_gov_sn(ctx, subject_id).await?, true))
457            }
458        }
459    }
460
461    async fn build_last_sn_offer(
462        &self,
463        ctx: &mut ActorContext<Self>,
464        subject_id: &DigestIdentifier,
465        sender: PublicKey,
466        actual_sn: Option<u64>,
467    ) -> Result<UpdateWitnessOffer, ActorError> {
468        let data = get_subject_data(ctx, subject_id).await?;
469        let Some(data) = data else {
470            return Err(DistributorError::SubjectNotFound.into());
471        };
472
473        match data {
474            SubjectData::Tracker { .. } => {
475                let (sn, clear_sn, _, ranges) = self
476                    .get_tracker_window(
477                        ctx,
478                        subject_id,
479                        sender.clone(),
480                        actual_sn,
481                    )
482                    .await?;
483                Ok(UpdateWitnessOffer {
484                    kind: UpdateSubjectKind::Tracker,
485                    sn,
486                    clear_sn,
487                    ranges,
488                })
489            }
490            SubjectData::Governance { .. } => {
491                let (sn, ..) =
492                    self.check_witness(ctx, subject_id, sender.clone()).await?;
493                Ok(UpdateWitnessOffer {
494                    kind: UpdateSubjectKind::Governance,
495                    sn,
496                    clear_sn: None,
497                    ranges: Vec::new(),
498                })
499            }
500        }
501    }
502
503    async fn build_distribution_batch(
504        &self,
505        ctx: &mut ActorContext<Self>,
506        subject_id: &DigestIdentifier,
507        sender: PublicKey,
508        actual_sn: Option<u64>,
509        target_sn: Option<u64>,
510    ) -> Result<(Vec<Ledger>, bool, u64), ActorError> {
511        let data = get_subject_data(ctx, subject_id).await?;
512        let Some(data) = data else {
513            return Err(DistributorError::SubjectNotFound.into());
514        };
515
516        match data {
517            SubjectData::Tracker { .. } => {
518                let (window_sn, clear_sn, _, ranges) = self
519                    .get_tracker_window(ctx, subject_id, sender, actual_sn)
520                    .await?;
521
522                if let Some(actual_sn) = actual_sn
523                    && actual_sn >= window_sn
524                {
525                    return Err(DistributorError::ActualSnBiggerThanWitness {
526                        actual_sn,
527                        witness_sn: window_sn,
528                    }
529                    .into());
530                }
531
532                let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
533                let batch_hi_sn = from_sn
534                    .saturating_add(self.ledger_batch_size)
535                    .saturating_sub(1)
536                    .min(window_sn);
537                let preferred_hi_sn = clear_sn
538                    .filter(|clear_sn| {
539                        actual_sn.is_none_or(|actual_sn| *clear_sn > actual_sn)
540                    })
541                    .unwrap_or(window_sn);
542                let preferred_hi_sn =
543                    if from_sn == 0 && preferred_hi_sn == 0 && window_sn > 0 {
544                        window_sn
545                    } else {
546                        preferred_hi_sn
547                    };
548                let hi_sn = target_sn
549                    .unwrap_or(preferred_hi_sn)
550                    .min(preferred_hi_sn)
551                    .min(batch_hi_sn);
552
553                let (ledger, raw_is_all) = self
554                    .get_ledger(ctx, subject_id, hi_sn, actual_sn, false)
555                    .await?;
556
557                let ledger = Self::project_tracker_ledger(ledger, &ranges)?;
558                let is_all = raw_is_all && hi_sn == window_sn;
559                Ok((ledger, is_all, hi_sn))
560            }
561            SubjectData::Governance { .. } => {
562                let (witness_hi_sn, ..) =
563                    self.check_witness(ctx, subject_id, sender).await?;
564
565                if let Some(actual_sn) = actual_sn
566                    && actual_sn >= witness_hi_sn
567                {
568                    return Err(DistributorError::ActualSnBiggerThanWitness {
569                        actual_sn,
570                        witness_sn: witness_hi_sn,
571                    }
572                    .into());
573                }
574
575                let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
576                let batch_hi_sn = from_sn
577                    .saturating_add(self.ledger_batch_size)
578                    .saturating_sub(1)
579                    .min(witness_hi_sn);
580                let batch_hi_sn =
581                    target_sn.unwrap_or(batch_hi_sn).min(batch_hi_sn);
582
583                let (ledger, raw_is_all) = self
584                    .get_ledger(ctx, subject_id, batch_hi_sn, actual_sn, true)
585                    .await?;
586
587                let is_all = raw_is_all && batch_hi_sn == witness_hi_sn;
588                Ok((ledger, is_all, batch_hi_sn))
589            }
590        }
591    }
592
593    async fn request_ledger_from_sender(
594        &self,
595        subject_id: &DigestIdentifier,
596        sender: PublicKey,
597        info: &ComunicateInfo,
598        actual_sn: Option<u64>,
599    ) -> Result<(), ActorError> {
600        let new_info = self.build_response_info(
601            sender,
602            info,
603            format!("/user/node/distributor_{}", subject_id),
604        );
605
606        self.send_network_message(
607            new_info,
608            ActorMessage::DistributionLedgerReq {
609                actual_sn,
610                target_sn: None,
611                subject_id: subject_id.clone(),
612            },
613        )
614        .await
615    }
616
617    async fn handle_get_last_sn(
618        &self,
619        ctx: &mut ActorContext<Self>,
620        subject_id: DigestIdentifier,
621        actual_sn: Option<u64>,
622        info: ComunicateInfo,
623        sender: PublicKey,
624        receiver_actor: String,
625    ) -> Result<(), ActorError> {
626        let offer = self
627            .build_last_sn_offer(ctx, &subject_id, sender.clone(), actual_sn)
628            .await?;
629        let new_info =
630            self.build_response_info(sender.clone(), &info, receiver_actor);
631
632        self.send_network_message(
633            new_info,
634            ActorMessage::UpdateOffer {
635                offer: offer.clone(),
636            },
637        )
638        .await?;
639
640        debug!(
641            msg_type = "GetLastSn",
642            subject_id = %subject_id,
643            sn = offer.sn,
644            clear_sn = ?offer.clear_sn,
645            sender = %sender,
646            "Last SN response sent successfully"
647        );
648
649        Ok(())
650    }
651
652    async fn handle_get_governance_version(
653        &self,
654        ctx: &mut ActorContext<Self>,
655        subject_id: DigestIdentifier,
656        info: ComunicateInfo,
657        sender: PublicKey,
658        receiver_actor: String,
659    ) -> Result<(), ActorError> {
660        let version = self.get_governance_version(ctx, &subject_id).await?;
661        let new_info =
662            self.build_response_info(sender.clone(), &info, receiver_actor);
663
664        self.send_network_message(
665            new_info,
666            ActorMessage::GovernanceVersionRes { version },
667        )
668        .await?;
669
670        Ok(())
671    }
672
673    async fn handle_send_distribution(
674        &self,
675        ctx: &mut ActorContext<Self>,
676        actual_sn: Option<u64>,
677        target_sn: Option<u64>,
678        info: ComunicateInfo,
679        subject_id: DigestIdentifier,
680        sender: PublicKey,
681    ) -> Result<(), ActorError> {
682        let (ledger, is_all, hi_sn) = self
683            .build_distribution_batch(
684                ctx,
685                &subject_id,
686                sender.clone(),
687                actual_sn,
688                target_sn,
689            )
690            .await?;
691
692        let new_info = self.build_response_info(
693            sender.clone(),
694            &info,
695            format!("/user/node/distributor_{}", subject_id),
696        );
697
698        self.send_network_message(
699            new_info,
700            ActorMessage::DistributionLedgerRes {
701                ledger: ledger.clone(),
702                is_all,
703            },
704        )
705        .await?;
706
707        debug!(
708            msg_type = "SendDistribution",
709            subject_id = %subject_id,
710            sender = %sender,
711            ledger_count = ledger.len(),
712            is_all = is_all,
713            hi_sn = hi_sn,
714            actual_sn = ?actual_sn,
715            "Ledger distribution sent successfully"
716        );
717
718        Ok(())
719    }
720}
721
722#[async_trait]
723impl Actor for DistriWorker {
724    type Event = ();
725    type Message = DistriWorkerMessage;
726    type Response = ();
727
728    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
729        parent_span.map_or_else(
730            || info_span!("DistriWorker", id),
731            |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
732        )
733    }
734}
735
736#[derive(Debug, Clone)]
737pub enum DistriWorkerMessage {
738    GetLastSn {
739        subject_id: DigestIdentifier,
740        actual_sn: Option<u64>,
741        info: ComunicateInfo,
742        sender: PublicKey,
743        receiver_actor: String,
744    },
745    GetGovernanceVersion {
746        subject_id: DigestIdentifier,
747        info: ComunicateInfo,
748        sender: PublicKey,
749        receiver_actor: String,
750    },
751    // Un nodo nos solicitó la copia del ledger.
752    SendDistribution {
753        actual_sn: Option<u64>,
754        target_sn: Option<u64>,
755        subject_id: DigestIdentifier,
756        info: ComunicateInfo,
757        sender: PublicKey,
758    },
759    // Nos llega una replica, guardarla en informar que la hemos recivido
760    LastEventDistribution {
761        ledger: Box<Ledger>,
762        info: ComunicateInfo,
763        sender: PublicKey,
764    },
765    LedgerDistribution {
766        ledger: Vec<Ledger>,
767        is_all: bool,
768        info: ComunicateInfo,
769        sender: PublicKey,
770    },
771}
772
773impl Message for DistriWorkerMessage {}
774
775impl NotPersistentActor for DistriWorker {}
776
777#[async_trait]
778impl Handler<Self> for DistriWorker {
779    async fn handle_message(
780        &mut self,
781        _sender: ActorPath,
782        msg: DistriWorkerMessage,
783        ctx: &mut ActorContext<Self>,
784    ) -> Result<(), ActorError> {
785        match msg {
786            DistriWorkerMessage::GetLastSn {
787                subject_id,
788                actual_sn,
789                info,
790                sender,
791                receiver_actor,
792            } => match self
793                .handle_get_last_sn(
794                    ctx,
795                    subject_id.clone(),
796                    actual_sn,
797                    info.clone(),
798                    sender.clone(),
799                    receiver_actor.clone(),
800                )
801                .await
802            {
803                Ok(()) => {}
804                Err(e) => {
805                    if let ActorError::Functional { .. } = e {
806                        warn!(
807                            msg_type = "GetLastSn",
808                            subject_id = %subject_id,
809                            sender = %sender,
810                            error = %e,
811                            "Witness check failed"
812                        );
813                        self.send_no_offer_response(
814                            &info,
815                            sender.clone(),
816                            receiver_actor,
817                        )
818                        .await?;
819                        return Ok(());
820                    } else {
821                        error!(
822                            msg_type = "GetLastSn",
823                            subject_id = %subject_id,
824                            sender = %sender,
825                            error = %e,
826                            "Witness check failed"
827                        );
828                        return Err(emit_fail(ctx, e).await);
829                    }
830                }
831            },
832            DistriWorkerMessage::GetGovernanceVersion {
833                subject_id,
834                info,
835                sender,
836                receiver_actor,
837            } => match self
838                .handle_get_governance_version(
839                    ctx,
840                    subject_id.clone(),
841                    info,
842                    sender.clone(),
843                    receiver_actor,
844                )
845                .await
846            {
847                Ok(()) => {}
848                Err(e) => {
849                    if let ActorError::Functional { .. } = e {
850                        warn!(
851                            msg_type = "GetGovernanceVersion",
852                            subject_id = %subject_id,
853                            sender = %sender,
854                            error = %e,
855                            "Subject is not a governance"
856                        );
857                        return Err(e);
858                    } else {
859                        error!(
860                            msg_type = "GetGovernanceVersion",
861                            subject_id = %subject_id,
862                            sender = %sender,
863                            error = %e,
864                            "Failed to send governance version response to network"
865                        );
866                        return Err(emit_fail(ctx, e).await);
867                    }
868                }
869            },
870            DistriWorkerMessage::SendDistribution {
871                actual_sn,
872                target_sn,
873                info,
874                subject_id,
875                sender,
876            } => match self
877                .handle_send_distribution(
878                    ctx,
879                    actual_sn,
880                    target_sn,
881                    info,
882                    subject_id.clone(),
883                    sender.clone(),
884                )
885                .await
886            {
887                Ok(()) => {}
888                Err(e) => {
889                    if let ActorError::Functional { .. } = e {
890                        warn!(
891                            msg_type = "SendDistribution",
892                            subject_id = %subject_id,
893                            sender = %sender,
894                            error = %e,
895                            "Witness check failed"
896                        );
897                        return Err(e);
898                    } else {
899                        error!(
900                            msg_type = "SendDistribution",
901                            subject_id = %subject_id,
902                            sender = %sender,
903                            error = %e,
904                            "Failed to send ledger response to network"
905                        );
906                        return Err(emit_fail(ctx, e).await);
907                    }
908                }
909            },
910            DistriWorkerMessage::LastEventDistribution {
911                ledger,
912                info,
913                sender,
914            } => {
915                let subject_id = ledger.get_subject_id();
916                let sn = ledger.sn;
917
918                let (is_gov, ..) = match self
919                    .check_auth(ctx, sender.clone(), &info, &ledger)
920                    .await
921                {
922                    Ok(is_gov) => is_gov,
923                    Err(e) => {
924                        if let ActorError::Functional { .. } = e {
925                            warn!(
926                                msg_type = "LastEventDistribution",
927                                subject_id = %subject_id,
928                                sn = sn,
929                                sender = %sender,
930                                error = %e,
931                                "Authorization check failed"
932                            );
933                            return Err(e);
934                        } else {
935                            error!(
936                                msg_type = "LastEventDistribution",
937                                subject_id = %subject_id,
938                                sn = sn,
939                                sender = %sender,
940                                error = %e,
941                                "Authorization check failed"
942                            );
943                            return Err(emit_fail(ctx, e).await);
944                        }
945                    }
946                };
947
948                let lease = if ledger.is_create_event() {
949                    if let Err(e) = create_subject(ctx, *ledger.clone()).await {
950                        if let ActorError::Functional { .. } = e {
951                            warn!(
952                                msg_type = "LastEventDistribution",
953                                subject_id = %subject_id,
954                                sn = sn,
955                                error = %e,
956                                "Failed to create subject from create event"
957                            );
958                            return Err(e);
959                        } else {
960                            error!(
961                                msg_type = "LastEventDistribution",
962                                subject_id = %subject_id,
963                                sn = sn,
964                                error = %e,
965                                "Failed to create subject from create event"
966                            );
967                            return Err(emit_fail(ctx, e).await);
968                        }
969                    };
970
971                    None
972                } else {
973                    let requester = Self::requester_id(
974                        "last_event_distribution",
975                        &subject_id,
976                        &info,
977                        &sender,
978                    );
979                    let lease = if !is_gov {
980                        match acquire_subject(
981                            ctx,
982                            &subject_id,
983                            requester.clone(),
984                            None,
985                            true,
986                        )
987                        .await
988                        {
989                            Ok(lease) => Some(lease),
990                            Err(e) => {
991                                error!(
992                                    msg_type = "LastEventDistribution",
993                                    subject_id = %subject_id,
994                                    error = %e,
995                                    "Failed to bring up tracker for subject update"
996                                );
997                                let error = DistributorError::UpTrackerFailed {
998                                    details: e.to_string(),
999                                };
1000                                return Err(emit_fail(ctx, error.into()).await);
1001                            }
1002                        }
1003                    } else {
1004                        None
1005                    };
1006
1007                    let update_result =
1008                        update_ledger(ctx, &subject_id, vec![*ledger.clone()])
1009                            .await;
1010
1011                    if let Some(lease) = lease.clone()
1012                        && update_result.is_err()
1013                    {
1014                        lease.finish(ctx).await?;
1015                    }
1016
1017                    match update_result {
1018                        Ok((last_sn, _, _)) if last_sn < ledger.sn => {
1019                            debug!(
1020                                msg_type = "LastEventDistribution",
1021                                subject_id = %subject_id,
1022                                last_sn = last_sn,
1023                                received_sn = sn,
1024                                "SN gap detected, requesting update"
1025                            );
1026
1027                            if let Err(e) = self
1028                                .request_ledger_from_sender(
1029                                    &subject_id,
1030                                    sender.clone(),
1031                                    &info,
1032                                    Some(last_sn),
1033                                )
1034                                .await
1035                            {
1036                                error!(
1037                                    msg_type = "LastEventDistribution",
1038                                    subject_id = %subject_id,
1039                                    last_sn = last_sn,
1040                                    error = %e,
1041                                    "Failed to request ledger from network"
1042                                );
1043                                return Err(emit_fail(ctx, e).await);
1044                            }
1045
1046                            if let Some(lease) = lease.clone() {
1047                                lease.finish(ctx).await?;
1048                            }
1049
1050                            return Ok(());
1051                        }
1052                        Ok((..)) => lease,
1053                        Err(e) => {
1054                            if let ActorError::Functional { .. } = e.clone() {
1055                                warn!(
1056                                    msg_type = "LastEventDistribution",
1057                                    subject_id = %subject_id,
1058                                    sn = sn,
1059                                    error = %e,
1060                                    "Failed to update subject ledger"
1061                                );
1062                                return Err(e);
1063                            } else {
1064                                error!(
1065                                    msg_type = "LastEventDistribution",
1066                                    subject_id = %subject_id,
1067                                    sn = sn,
1068                                    error = %e,
1069                                    "Failed to update subject ledger"
1070                                );
1071                                return Err(emit_fail(ctx, e).await);
1072                            }
1073                        }
1074                    }
1075                };
1076
1077                let new_info = self.build_response_info(
1078                    sender.clone(),
1079                    &info,
1080                    format!(
1081                        "/user/{}/{}",
1082                        info.request_id,
1083                        info.receiver.clone()
1084                    ),
1085                );
1086
1087                if let Err(e) = self
1088                    .send_network_message(
1089                        new_info,
1090                        ActorMessage::DistributionLastEventRes,
1091                    )
1092                    .await
1093                {
1094                    error!(
1095                        msg_type = "LastEventDistribution",
1096                        subject_id = %subject_id,
1097                        sn = sn,
1098                        error = %e,
1099                        "Failed to send distribution acknowledgment"
1100                    );
1101                    return Err(emit_fail(ctx, e).await);
1102                };
1103
1104                if let Some(lease) = lease {
1105                    lease.finish(ctx).await?;
1106                }
1107
1108                debug!(
1109                    msg_type = "LastEventDistribution",
1110                    subject_id = %subject_id,
1111                    sn = sn,
1112                    sender = %sender,
1113                    is_gov = is_gov,
1114                    "Last event distribution processed successfully"
1115                );
1116            }
1117            DistriWorkerMessage::LedgerDistribution {
1118                mut ledger,
1119                is_all,
1120                info,
1121                sender,
1122            } => {
1123                if ledger.is_empty() {
1124                    warn!(
1125                        msg_type = "LedgerDistribution",
1126                        sender = %sender,
1127                        "Received empty ledger distribution"
1128                    );
1129                    return Err(DistributorError::EmptyEvents.into());
1130                }
1131
1132                let subject_id = ledger[0].get_subject_id();
1133                let ledger_count = ledger.len();
1134                let first_sn = ledger[0].sn;
1135                let (is_gov, is_register) = match self
1136                    .check_auth(ctx, sender.clone(), &info, &ledger[0])
1137                    .await
1138                {
1139                    Ok(data) => data,
1140                    Err(e) => {
1141                        if let ActorError::Functional { .. } = e {
1142                            warn!(
1143                                msg_type = "LedgerDistribution",
1144                                subject_id = %subject_id,
1145                                sender = %sender,
1146                                ledger_count = ledger_count,
1147                                error = %e,
1148                                "Authorization check failed"
1149                            );
1150                            return Err(e);
1151                        } else {
1152                            error!(
1153                                msg_type = "LedgerDistribution",
1154                                subject_id = %subject_id,
1155                                sender = %sender,
1156                                ledger_count = ledger_count,
1157                                error = %e,
1158                                "Authorization check failed"
1159                            );
1160                            return Err(emit_fail(ctx, e).await);
1161                        }
1162                    }
1163                };
1164
1165                let lease = if ledger[0].is_create_event() && !is_register {
1166                    let create_ledger = ledger[0].clone();
1167                    let requester = Self::requester_id(
1168                        "ledger_distribution_create",
1169                        &subject_id,
1170                        &info,
1171                        &sender,
1172                    );
1173
1174                    let lease = if is_gov {
1175                        if let Err(e) =
1176                            create_subject(ctx, create_ledger.clone()).await
1177                        {
1178                            if let ActorError::Functional { .. } = e {
1179                                warn!(
1180                                    msg_type = "LedgerDistribution",
1181                                    subject_id = %subject_id,
1182                                    error = %e,
1183                                    "Failed to create subject from ledger"
1184                                );
1185                                return Err(e);
1186                            } else {
1187                                error!(
1188                                    msg_type = "LedgerDistribution",
1189                                    subject_id = %subject_id,
1190                                    error = %e,
1191                                    "Failed to create subject from ledger"
1192                                );
1193                                return Err(emit_fail(ctx, e).await);
1194                            }
1195                        };
1196                        None
1197                    } else {
1198                        let request = create_ledger
1199                            .get_create_event()
1200                            .ok_or_else(|| {
1201                                error!(
1202                                    msg_type = "LedgerDistribution",
1203                                    subject_id = %subject_id,
1204                                    "Create ledger is missing create event payload"
1205                                );
1206                                DistributorError::MissingCreateEventInCreateLedger {
1207                                    subject_id: subject_id.clone(),
1208                                }
1209                            })?;
1210
1211                        if let Err(e) = check_subject_creation(
1212                            ctx,
1213                            &request.governance_id,
1214                            create_ledger.ledger_seal_signature.signer.clone(),
1215                            create_ledger.gov_version,
1216                            request.namespace.to_string(),
1217                            request.schema_id,
1218                        )
1219                        .await
1220                        {
1221                            if let ActorError::Functional { .. } = e {
1222                                warn!(
1223                                    msg_type = "LedgerDistribution",
1224                                    subject_id = %subject_id,
1225                                    error = %e,
1226                                    "Failed to validate subject creation from ledger"
1227                                );
1228                                return Err(e);
1229                            } else {
1230                                error!(
1231                                    msg_type = "LedgerDistribution",
1232                                    subject_id = %subject_id,
1233                                    error = %e,
1234                                    "Failed to validate subject creation from ledger"
1235                                );
1236                                return Err(emit_fail(ctx, e).await);
1237                            }
1238                        }
1239
1240                        match acquire_subject(
1241                            ctx,
1242                            &subject_id,
1243                            requester,
1244                            Some(create_ledger),
1245                            true,
1246                        )
1247                        .await
1248                        {
1249                            Ok(lease) => Some(lease),
1250                            Err(e) => {
1251                                if let ActorError::Functional { .. } = e {
1252                                    warn!(
1253                                        msg_type = "LedgerDistribution",
1254                                        subject_id = %subject_id,
1255                                        error = %e,
1256                                        "Failed to create subject from ledger"
1257                                    );
1258                                    return Err(e);
1259                                } else {
1260                                    error!(
1261                                        msg_type = "LedgerDistribution",
1262                                        subject_id = %subject_id,
1263                                        error = %e,
1264                                        "Failed to create subject from ledger"
1265                                    );
1266                                    return Err(emit_fail(ctx, e).await);
1267                                }
1268                            }
1269                        }
1270                    };
1271
1272                    let _event = ledger.remove(0);
1273                    lease
1274                } else {
1275                    if ledger[0].is_create_event() && is_register {
1276                        let _event = ledger.remove(0);
1277                    }
1278
1279                    let requester = Self::requester_id(
1280                        "ledger_distribution",
1281                        &subject_id,
1282                        &info,
1283                        &sender,
1284                    );
1285                    if !ledger.is_empty() && !is_gov {
1286                        match acquire_subject(
1287                            ctx,
1288                            &subject_id,
1289                            requester.clone(),
1290                            None,
1291                            true,
1292                        )
1293                        .await
1294                        {
1295                            Ok(lease) => Some(lease),
1296                            Err(e) => {
1297                                error!(
1298                                    msg_type = "LedgerDistribution",
1299                                    subject_id = %subject_id,
1300                                    error = %e,
1301                                    "Failed to bring up tracker for subject update"
1302                                );
1303                                let error = DistributorError::UpTrackerFailed {
1304                                    details: e.to_string(),
1305                                };
1306                                return Err(emit_fail(ctx, error.into()).await);
1307                            }
1308                        }
1309                    } else {
1310                        None
1311                    }
1312                };
1313
1314                let lease = if !ledger.is_empty() {
1315                    let update_result =
1316                        update_ledger(ctx, &subject_id, ledger).await;
1317
1318                    if let Some(lease) = lease.clone()
1319                        && update_result.is_err()
1320                    {
1321                        lease.finish(ctx).await?;
1322                    }
1323
1324                    match update_result {
1325                        Ok((last_sn, _, _)) => {
1326                            if !is_all {
1327                                debug!(
1328                                    msg_type = "LedgerDistribution",
1329                                    subject_id = %subject_id,
1330                                    last_sn = last_sn,
1331                                    "Partial ledger received, requesting more"
1332                                );
1333
1334                                if let Err(e) = self
1335                                    .request_ledger_from_sender(
1336                                        &subject_id,
1337                                        sender.clone(),
1338                                        &info,
1339                                        Some(last_sn),
1340                                    )
1341                                    .await
1342                                {
1343                                    error!(
1344                                        msg_type = "LedgerDistribution",
1345                                        subject_id = %subject_id,
1346                                        last_sn = last_sn,
1347                                        error = %e,
1348                                        "Failed to request more ledger entries"
1349                                    );
1350                                    return Err(emit_fail(ctx, e).await);
1351                                };
1352                            }
1353
1354                            lease
1355                        }
1356                        Err(e) => {
1357                            if let ActorError::Functional { .. } = e.clone() {
1358                                warn!(
1359                                    msg_type = "LedgerDistribution",
1360                                    subject_id = %subject_id,
1361                                    first_sn = first_sn,
1362                                    ledger_count = ledger_count,
1363                                    error = %e,
1364                                    "Failed to update subject ledger"
1365                                );
1366                                return Err(e);
1367                            } else {
1368                                error!(
1369                                    msg_type = "LedgerDistribution",
1370                                    subject_id = %subject_id,
1371                                    first_sn = first_sn,
1372                                    ledger_count = ledger_count,
1373                                    error = %e,
1374                                    "Failed to update subject ledger"
1375                                );
1376                                return Err(emit_fail(ctx, e).await);
1377                            }
1378                        }
1379                    }
1380                } else {
1381                    lease
1382                };
1383
1384                if let Some(lease) = lease {
1385                    lease.finish(ctx).await?;
1386                }
1387
1388                debug!(
1389                    msg_type = "LedgerDistribution",
1390                    subject_id = %subject_id,
1391                    sender = %sender,
1392                    ledger_count = ledger_count,
1393                    is_all = is_all,
1394                    is_gov = is_gov,
1395                    "Ledger distribution processed successfully"
1396                );
1397            }
1398        };
1399
1400        Ok(())
1401    }
1402}