Skip to main content

ave_core/distribution/
worker.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    Namespace, SchemaType,
10    identity::{DigestIdentifier, PublicKey},
11    request::EventRequest,
12};
13use network::ComunicateInfo;
14
15use crate::{
16    ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
17    governance::{
18        Governance, GovernanceMessage, GovernanceResponse,
19        model::{HashThisRole, RoleTypes},
20    },
21    helpers::network::service::NetworkSender,
22    model::{
23        common::{
24            check_subject_creation, check_witness_access, emit_fail,
25            node::{get_subject_data, try_to_update},
26            subject::{
27                acquire_subject, create_subject, get_gov, get_gov_sn,
28                update_ledger,
29            },
30        },
31        event::Ledger,
32    },
33    node::SubjectData,
34    subject::SignedLedger,
35    tracker::{Tracker, TrackerMessage, TrackerResponse},
36};
37
38use tracing::{Span, debug, error, info_span, warn};
39
40use super::error::DistributorError;
41
42pub struct DistriWorker {
43    pub our_key: Arc<PublicKey>,
44    pub network: Arc<NetworkSender>,
45}
46
47impl DistriWorker {
48    fn requester_id(
49        kind: &str,
50        subject_id: &DigestIdentifier,
51        info: &ComunicateInfo,
52        sender: &PublicKey,
53    ) -> String {
54        format!(
55            "{kind}:{subject_id}:{sender}:{}:{}",
56            info.request_id, info.version
57        )
58    }
59
60    async fn get_ledger(
61        &self,
62        ctx: &mut ActorContext<Self>,
63        subject_id: &DigestIdentifier,
64        hi_sn: u64,
65        lo_sn: Option<u64>,
66        is_gov: bool,
67    ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
68        let path = ActorPath::from(format!(
69            "/user/node/subject_manager/{}",
70            subject_id
71        ));
72
73        if is_gov {
74            let governance_actor =
75                ctx.system().get_actor::<Governance>(&path).await?;
76
77            let response = governance_actor
78                .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
79                .await?;
80
81            match response {
82                GovernanceResponse::Ledger { ledger, is_all } => {
83                    Ok((ledger, is_all))
84                }
85                _ => Err(ActorError::UnexpectedResponse {
86                    expected: "GovernanceResponse::Ledger".to_owned(),
87                    path,
88                }),
89            }
90        } else {
91            let lease = acquire_subject(
92                ctx,
93                subject_id,
94                format!("send_distribution:{subject_id}"),
95                None,
96                true,
97            )
98            .await?;
99            let tracker_actor =
100                ctx.system().get_actor::<Tracker>(&path).await?;
101            let response = tracker_actor
102                .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
103                .await;
104            lease.finish(ctx).await?;
105            let response = response?;
106
107            match response {
108                TrackerResponse::Ledger { ledger, is_all } => {
109                    Ok((ledger, is_all))
110                }
111                _ => Err(ActorError::UnexpectedResponse {
112                    expected: "TrackerResponse::Ledger".to_owned(),
113                    path,
114                }),
115            }
116        }
117    }
118
119    async fn authorized_subj(
120        &self,
121        ctx: &ActorContext<Self>,
122        subject_id: &DigestIdentifier,
123    ) -> Result<(bool, Option<SubjectData>), ActorError> {
124        let node_path = ActorPath::from("/user/node");
125        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
126
127        let response = node_actor
128            .ask(NodeMessage::AuthData(subject_id.to_owned()))
129            .await?;
130        match response {
131            NodeResponse::AuthData { auth, subject_data } => {
132                Ok((auth, subject_data))
133            }
134            _ => Err(ActorError::UnexpectedResponse {
135                expected: "NodeResponse::AuthData".to_owned(),
136                path: node_path,
137            }),
138        }
139    }
140
141    async fn check_auth(
142        &self,
143        ctx: &mut ActorContext<Self>,
144        signer: PublicKey,
145        ledger: Ledger,
146    ) -> Result<(bool, bool), ActorError> {
147        let subject_id = ledger.get_subject_id();
148        // Si está auth o si soy el dueño del sujeto.
149        let (auth, subject_data) =
150            self.authorized_subj(ctx, &subject_id).await?;
151
152        // Extraer schema_id, namespace y governance_id según si conocemos el sujeto o no
153        let (schema_id, namespace, governance_id) = if let Some(ref data) =
154            subject_data
155        {
156            // Lo conozco
157            match data {
158                SubjectData::Tracker {
159                    governance_id,
160                    schema_id,
161                    namespace,
162                    ..
163                } => {
164                    let namespace = Namespace::from(namespace.clone());
165                    (schema_id.clone(), namespace, Some(governance_id.clone()))
166                }
167                SubjectData::Governance { .. } => {
168                    (SchemaType::Governance, Namespace::new(), None)
169                }
170            }
171        } else {
172            // No lo conozco - debe ser evento Create
173            if let EventRequest::Create(create) = ledger.event_request.content()
174            {
175                if !create.schema_id.is_gov() && create.governance_id.is_empty()
176                {
177                    return Err(
178                        DistributorError::MissingGovernanceIdInCreate {
179                            subject_id: subject_id.clone(),
180                        }
181                        .into(),
182                    );
183                }
184
185                let gov_id = if create.schema_id.is_gov() {
186                    None
187                } else {
188                    Some(create.governance_id.clone())
189                };
190
191                (create.schema_id.clone(), create.namespace.clone(), gov_id)
192            } else {
193                // No es el primer evento, necesito el primero
194                try_to_update(ctx, subject_id, Some(signer)).await?;
195                return Err(DistributorError::UpdatingSubject.into());
196            }
197        };
198
199        let is_gov = schema_id.is_gov();
200        // Verificar autorización
201        if is_gov {
202            // Es una gobernanza
203            if !auth {
204                return Err(DistributorError::GovernanceNotAuthorized.into());
205            }
206        } else {
207            // Es un Tracker - verificar rol de witness si no está autorizado
208            if !auth {
209                let governance_id =
210                    governance_id.expect("governance_id is Some for Trackers");
211                let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
212                    DistributorError::GetGovernanceFailed {
213                        details: e.to_string(),
214                    }
215                })?;
216
217                match gov.version.cmp(&ledger.gov_version) {
218                    std::cmp::Ordering::Less => {
219                        return Err(
220                            DistributorError::GovernanceVersionMismatch {
221                                our_version: gov.version,
222                                their_version: ledger.gov_version,
223                            }
224                            .into(),
225                        );
226                    }
227                    std::cmp::Ordering::Equal => {}
228                    std::cmp::Ordering::Greater => {}
229                };
230
231                if !gov.has_this_role(HashThisRole::SchemaWitness {
232                    who: (*self.our_key).clone(),
233                    creator: signer,
234                    schema_id,
235                    namespace,
236                }) {
237                    return Err(DistributorError::NotWitness.into());
238                }
239            }
240        }
241
242        Ok((is_gov, subject_data.is_some()))
243    }
244
245    async fn check_witness(
246        &self,
247        ctx: &mut ActorContext<Self>,
248        subject_id: &DigestIdentifier,
249        sender: PublicKey,
250    ) -> Result<(u64, bool), ActorError> {
251        let data = get_subject_data(ctx, subject_id).await?;
252
253        let Some(data) = data else {
254            return Err(DistributorError::SubjectNotFound.into());
255        };
256
257        match data {
258            SubjectData::Tracker {
259                governance_id,
260                schema_id,
261                namespace,
262                ..
263            } => {
264                let Some(sn) = check_witness_access(
265                    ctx,
266                    &governance_id,
267                    subject_id,
268                    sender,
269                    namespace,
270                    schema_id,
271                )
272                .await?
273                else {
274                    return Err(DistributorError::SenderNoAccess.into());
275                };
276
277                Ok((sn, false))
278            }
279            SubjectData::Governance { .. } => {
280                let gov = get_gov(ctx, subject_id).await.map_err(|e| {
281                    DistributorError::GetGovernanceFailed {
282                        details: e.to_string(),
283                    }
284                })?;
285
286                if !gov.has_this_role(HashThisRole::Gov {
287                    who: sender.clone(),
288                    role: RoleTypes::Witness,
289                }) {
290                    return Err(DistributorError::SenderNotMember {
291                        sender: sender.to_string(),
292                    }
293                    .into());
294                }
295
296                Ok((get_gov_sn(ctx, subject_id).await?, true))
297            }
298        }
299    }
300}
301
302#[async_trait]
303impl Actor for DistriWorker {
304    type Event = ();
305    type Message = DistriWorkerMessage;
306    type Response = ();
307
308    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
309        parent_span.map_or_else(
310            || info_span!("DistriWorker", id),
311            |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
312        )
313    }
314}
315
316#[derive(Debug, Clone)]
317pub enum DistriWorkerMessage {
318    GetLastSn {
319        subject_id: DigestIdentifier,
320        info: ComunicateInfo,
321        sender: PublicKey,
322        receiver_actor: String,
323    },
324    GetGovernanceVersion {
325        subject_id: DigestIdentifier,
326        info: ComunicateInfo,
327        sender: PublicKey,
328        receiver_actor: String,
329    },
330    // Un nodo nos solicitó la copia del ledger.
331    SendDistribution {
332        actual_sn: Option<u64>,
333        subject_id: DigestIdentifier,
334        info: ComunicateInfo,
335        sender: PublicKey,
336    },
337    // Nos llega una replica, guardarla en informar que la hemos recivido
338    LastEventDistribution {
339        ledger: Box<SignedLedger>,
340        info: ComunicateInfo,
341        sender: PublicKey,
342    },
343    LedgerDistribution {
344        ledger: Vec<SignedLedger>,
345        is_all: bool,
346        info: ComunicateInfo,
347        sender: PublicKey,
348    },
349}
350
351impl Message for DistriWorkerMessage {}
352
353impl NotPersistentActor for DistriWorker {}
354
355#[async_trait]
356impl Handler<Self> for DistriWorker {
357    async fn handle_message(
358        &mut self,
359        _sender: ActorPath,
360        msg: DistriWorkerMessage,
361        ctx: &mut ActorContext<Self>,
362    ) -> Result<(), ActorError> {
363        match msg {
364            DistriWorkerMessage::GetLastSn {
365                subject_id,
366                info,
367                sender,
368                receiver_actor,
369            } => {
370                let (sn, ..) = match self
371                    .check_witness(ctx, &subject_id, sender.clone())
372                    .await
373                {
374                    Ok(sn) => sn,
375                    Err(e) => {
376                        if let ActorError::Functional { .. } = e {
377                            warn!(
378                                msg_type = "GetLastSn",
379                                subject_id = %subject_id,
380                                sender = %sender,
381                                error = %e,
382                                "Witness check failed"
383                            );
384                            return Err(e);
385                        } else {
386                            error!(
387                                msg_type = "GetLastSn",
388                                subject_id = %subject_id,
389                                sender = %sender,
390                                error = %e,
391                                "Witness check failed"
392                            );
393                            return Err(emit_fail(ctx, e).await);
394                        }
395                    }
396                };
397
398                let new_info = ComunicateInfo {
399                    receiver: sender.clone(),
400                    request_id: info.request_id,
401                    version: info.version,
402                    receiver_actor,
403                };
404
405                if let Err(e) = self
406                    .network
407                    .send_command(network::CommandHelper::SendMessage {
408                        message: NetworkMessage {
409                            info: new_info,
410                            message: ActorMessage::AuthLastSn { sn },
411                        },
412                    })
413                    .await
414                {
415                    error!(
416                        msg_type = "GetLastSn",
417                        subject_id = %subject_id,
418                        sn = sn,
419                        error = %e,
420                        "Failed to send last SN response to network"
421                    );
422                    return Err(emit_fail(ctx, e).await);
423                };
424
425                debug!(
426                    msg_type = "GetLastSn",
427                    subject_id = %subject_id,
428                    sn = sn,
429                    sender = %sender,
430                    "Last SN response sent successfully"
431                );
432            }
433            DistriWorkerMessage::GetGovernanceVersion {
434                subject_id,
435                info,
436                sender,
437                receiver_actor,
438            } => {
439                let witness_ok = self
440                    .check_witness(ctx, &subject_id, sender.clone())
441                    .await
442                    .is_ok();
443                let auth_ok = if witness_ok {
444                    true
445                } else {
446                    let auth_path = ActorPath::from("/user/node/auth");
447                    match ctx
448                        .system()
449                        .get_actor::<crate::auth::Auth>(&auth_path)
450                        .await
451                    {
452                        Ok(auth_actor) => match auth_actor
453                            .ask(crate::auth::AuthMessage::GetAuth {
454                                subject_id: subject_id.clone(),
455                            })
456                            .await
457                        {
458                            Ok(crate::auth::AuthResponse::Witnesses(
459                                witnesses,
460                            )) => witnesses.contains(&sender),
461                            _ => false,
462                        },
463                        Err(_) => false,
464                    }
465                };
466
467                if !auth_ok {
468                    return Err(DistributorError::SenderNoAccess.into());
469                }
470
471                let governance_path = ActorPath::from(format!(
472                    "/user/node/subject_manager/{}",
473                    subject_id
474                ));
475                let governance_actor = ctx
476                    .system()
477                    .get_actor::<Governance>(&governance_path)
478                    .await?;
479                let response =
480                    governance_actor.ask(GovernanceMessage::GetVersion).await?;
481                let GovernanceResponse::Version(version) = response else {
482                    return Err(ActorError::UnexpectedResponse {
483                        path: governance_path,
484                        expected: "GovernanceResponse::Version".to_owned(),
485                    });
486                };
487
488                let new_info = ComunicateInfo {
489                    receiver: sender.clone(),
490                    request_id: info.request_id,
491                    version: info.version,
492                    receiver_actor,
493                };
494
495                if let Err(e) = self
496                    .network
497                    .send_command(network::CommandHelper::SendMessage {
498                        message: NetworkMessage {
499                            info: new_info,
500                            message: ActorMessage::GovernanceVersionRes {
501                                version,
502                            },
503                        },
504                    })
505                    .await
506                {
507                    return Err(emit_fail(ctx, e).await);
508                }
509            }
510            DistriWorkerMessage::SendDistribution {
511                actual_sn,
512                info,
513                subject_id,
514                sender,
515            } => {
516                let (hi_sn, is_gov) = match self
517                    .check_witness(ctx, &subject_id, sender.clone())
518                    .await
519                {
520                    Ok(sn) => sn,
521                    Err(e) => {
522                        if let ActorError::Functional { .. } = e {
523                            warn!(
524                                msg_type = "SendDistribution",
525                                subject_id = %subject_id,
526                                sender = %sender,
527                                error = %e,
528                                "Witness check failed"
529                            );
530                            return Err(e);
531                        } else {
532                            error!(
533                                msg_type = "SendDistribution",
534                                subject_id = %subject_id,
535                                sender = %sender,
536                                error = %e,
537                                "Witness check failed"
538                            );
539                            return Err(emit_fail(ctx, e).await);
540                        }
541                    }
542                };
543
544                if let Some(actual_sn) = actual_sn
545                    && actual_sn >= hi_sn
546                {
547                    warn!(
548                        msg_type = "SendDistribution",
549                        subject_id = %subject_id,
550                        actual_sn = actual_sn,
551                        witness_sn = hi_sn,
552                        "Requester SN is >= witness SN, nothing to send"
553                    );
554                    return Err(DistributorError::ActualSnBiggerThanWitness {
555                        actual_sn,
556                        witness_sn: hi_sn,
557                    }
558                    .into());
559                };
560
561                let (ledger, is_all) = match self
562                    .get_ledger(ctx, &subject_id, hi_sn, actual_sn, is_gov)
563                    .await
564                {
565                    Ok(res) => res,
566                    Err(e) => {
567                        error!(
568                            msg_type = "SendDistribution",
569                            subject_id = %subject_id,
570                            hi_sn = hi_sn,
571                            actual_sn = ?actual_sn,
572                            is_gov = is_gov,
573                            error = %e,
574                            "Failed to obtain ledger"
575                        );
576                        return Err(emit_fail(ctx, e).await);
577                    }
578                };
579
580                let new_info = ComunicateInfo {
581                    receiver: sender.clone(),
582                    request_id: info.request_id,
583                    version: info.version,
584                    receiver_actor: format!(
585                        "/user/node/distributor_{}",
586                        subject_id
587                    ),
588                };
589
590                if let Err(e) = self
591                    .network
592                    .send_command(network::CommandHelper::SendMessage {
593                        message: NetworkMessage {
594                            info: new_info,
595                            message: ActorMessage::DistributionLedgerRes {
596                                ledger: ledger.clone(),
597                                is_all,
598                            },
599                        },
600                    })
601                    .await
602                {
603                    error!(
604                        msg_type = "SendDistribution",
605                        subject_id = %subject_id,
606                        ledger_count = ledger.len(),
607                        is_all = is_all,
608                        error = %e,
609                        "Failed to send ledger response to network"
610                    );
611                    return Err(emit_fail(ctx, e).await);
612                };
613
614                debug!(
615                    msg_type = "SendDistribution",
616                    subject_id = %subject_id,
617                    sender = %sender,
618                    ledger_count = ledger.len(),
619                    is_all = is_all,
620                    hi_sn = hi_sn,
621                    actual_sn = ?actual_sn,
622                    "Ledger distribution sent successfully"
623                );
624            }
625            DistriWorkerMessage::LastEventDistribution {
626                ledger,
627                info,
628                sender,
629            } => {
630                let subject_id = ledger.content().get_subject_id();
631                let sn = ledger.content().sn;
632
633                let (is_gov, ..) = match self
634                    .check_auth(ctx, sender.clone(), ledger.content().clone())
635                    .await
636                {
637                    Ok(is_gov) => is_gov,
638                    Err(e) => {
639                        if let ActorError::Functional { .. } = e {
640                            warn!(
641                                msg_type = "LastEventDistribution",
642                                subject_id = %subject_id,
643                                sn = sn,
644                                sender = %sender,
645                                error = %e,
646                                "Authorization check failed"
647                            );
648                            return Err(e);
649                        } else {
650                            error!(
651                                msg_type = "LastEventDistribution",
652                                subject_id = %subject_id,
653                                sn = sn,
654                                sender = %sender,
655                                error = %e,
656                                "Authorization check failed"
657                            );
658                            return Err(emit_fail(ctx, e).await);
659                        }
660                    }
661                };
662
663                let lease = if ledger
664                    .content()
665                    .event_request
666                    .content()
667                    .is_create_event()
668                {
669                    if let Err(e) = create_subject(ctx, *ledger.clone()).await {
670                        if let ActorError::Functional { .. } = e {
671                            warn!(
672                                msg_type = "LastEventDistribution",
673                                subject_id = %subject_id,
674                                sn = sn,
675                                error = %e,
676                                "Failed to create subject from create event"
677                            );
678                            return Err(e);
679                        } else {
680                            error!(
681                                msg_type = "LastEventDistribution",
682                                subject_id = %subject_id,
683                                sn = sn,
684                                error = %e,
685                                "Failed to create subject from create event"
686                            );
687                            return Err(emit_fail(ctx, e).await);
688                        }
689                    };
690
691                    None
692                } else {
693                    let requester = Self::requester_id(
694                        "last_event_distribution",
695                        &subject_id,
696                        &info,
697                        &sender,
698                    );
699                    let lease = if !is_gov {
700                        match acquire_subject(
701                            ctx,
702                            &subject_id,
703                            requester.clone(),
704                            None,
705                            true,
706                        )
707                        .await
708                        {
709                            Ok(lease) => Some(lease),
710                            Err(e) => {
711                                error!(
712                                    msg_type = "LastEventDistribution",
713                                    subject_id = %subject_id,
714                                    error = %e,
715                                    "Failed to bring up tracker for subject update"
716                                );
717                                let error = DistributorError::UpTrackerFailed {
718                                    details: e.to_string(),
719                                };
720                                return Err(emit_fail(ctx, error.into()).await);
721                            }
722                        }
723                    } else {
724                        None
725                    };
726
727                    let update_result =
728                        update_ledger(ctx, &subject_id, vec![*ledger.clone()])
729                            .await;
730
731                    if let Some(lease) = lease.clone()
732                        && update_result.is_err()
733                    {
734                        lease.finish(ctx).await?;
735                    }
736
737                    match update_result {
738                        Ok((last_sn, _, _))
739                            if last_sn < ledger.content().sn =>
740                        {
741                            debug!(
742                                msg_type = "LastEventDistribution",
743                                subject_id = %subject_id,
744                                last_sn = last_sn,
745                                received_sn = sn,
746                                "SN gap detected, requesting full ledger"
747                            );
748
749                            let new_info = ComunicateInfo {
750                                receiver: sender,
751                                request_id: info.request_id,
752                                version: info.version,
753                                receiver_actor: format!(
754                                    "/user/node/distributor_{}",
755                                    subject_id
756                                ),
757                            };
758
759                            if let Err(e) = self.network.send_command(network::CommandHelper::SendMessage {
760                                    message: NetworkMessage {
761                                        info: new_info,
762                                        message: ActorMessage::DistributionLedgerReq {
763                                            actual_sn: Some(last_sn),
764                                            subject_id: subject_id.clone(),
765                                        },
766                                    },
767                                }).await {
768                                    error!(
769                                        msg_type = "LastEventDistribution",
770                                        subject_id = %subject_id,
771                                        last_sn = last_sn,
772                                        error = %e,
773                                        "Failed to request ledger from network"
774                                    );
775                                    return Err(emit_fail(ctx, e).await);
776                                };
777
778                            if let Some(lease) = lease.clone() {
779                                lease.finish(ctx).await?;
780                            }
781
782                            return Ok(());
783                        }
784                        Ok((..)) => lease,
785                        Err(e) => {
786                            if let ActorError::Functional { .. } = e.clone() {
787                                warn!(
788                                    msg_type = "LastEventDistribution",
789                                    subject_id = %subject_id,
790                                    sn = sn,
791                                    error = %e,
792                                    "Failed to update subject ledger"
793                                );
794                                return Err(e);
795                            } else {
796                                error!(
797                                    msg_type = "LastEventDistribution",
798                                    subject_id = %subject_id,
799                                    sn = sn,
800                                    error = %e,
801                                    "Failed to update subject ledger"
802                                );
803                                return Err(emit_fail(ctx, e).await);
804                            }
805                        }
806                    }
807                };
808
809                let new_info = ComunicateInfo {
810                    receiver: sender.clone(),
811                    receiver_actor: format!(
812                        "/user/{}/{}",
813                        info.request_id,
814                        info.receiver.clone()
815                    ),
816                    request_id: info.request_id.clone(),
817                    version: info.version,
818                };
819
820                if let Err(e) = self
821                    .network
822                    .send_command(network::CommandHelper::SendMessage {
823                        message: NetworkMessage {
824                            info: new_info,
825                            message: ActorMessage::DistributionLastEventRes,
826                        },
827                    })
828                    .await
829                {
830                    error!(
831                        msg_type = "LastEventDistribution",
832                        subject_id = %subject_id,
833                        sn = sn,
834                        error = %e,
835                        "Failed to send distribution acknowledgment"
836                    );
837                    return Err(emit_fail(ctx, e).await);
838                };
839
840                if let Some(lease) = lease {
841                    lease.finish(ctx).await?;
842                }
843
844                debug!(
845                    msg_type = "LastEventDistribution",
846                    subject_id = %subject_id,
847                    sn = sn,
848                    sender = %sender,
849                    is_gov = is_gov,
850                    "Last event distribution processed successfully"
851                );
852            }
853            DistriWorkerMessage::LedgerDistribution {
854                mut ledger,
855                is_all,
856                info,
857                sender,
858            } => {
859                if ledger.is_empty() {
860                    warn!(
861                        msg_type = "LedgerDistribution",
862                        sender = %sender,
863                        "Received empty ledger distribution"
864                    );
865                    return Err(DistributorError::EmptyEvents.into());
866                }
867
868                let subject_id = ledger[0].content().get_subject_id();
869                let ledger_count = ledger.len();
870                let first_sn = ledger[0].content().sn;
871
872                let (is_gov, is_register) = match self
873                    .check_auth(
874                        ctx,
875                        sender.clone(),
876                        ledger[0].content().clone(),
877                    )
878                    .await
879                {
880                    Ok(data) => data,
881                    Err(e) => {
882                        if let ActorError::Functional { .. } = e {
883                            warn!(
884                                msg_type = "LedgerDistribution",
885                                subject_id = %subject_id,
886                                sender = %sender,
887                                ledger_count = ledger_count,
888                                error = %e,
889                                "Authorization check failed"
890                            );
891                            return Err(e);
892                        } else {
893                            error!(
894                                msg_type = "LedgerDistribution",
895                                subject_id = %subject_id,
896                                sender = %sender,
897                                ledger_count = ledger_count,
898                                error = %e,
899                                "Authorization check failed"
900                            );
901                            return Err(emit_fail(ctx, e).await);
902                        }
903                    }
904                };
905
906                let lease = if ledger[0]
907                    .content()
908                    .event_request
909                    .content()
910                    .is_create_event()
911                    && !is_register
912                {
913                    let create_ledger = ledger[0].clone();
914                    let requester = Self::requester_id(
915                        "ledger_distribution_create",
916                        &subject_id,
917                        &info,
918                        &sender,
919                    );
920
921                    let lease = if is_gov {
922                        if let Err(e) =
923                            create_subject(ctx, create_ledger.clone()).await
924                        {
925                            if let ActorError::Functional { .. } = e {
926                                warn!(
927                                    msg_type = "LedgerDistribution",
928                                    subject_id = %subject_id,
929                                    error = %e,
930                                    "Failed to create subject from ledger"
931                                );
932                                return Err(e);
933                            } else {
934                                error!(
935                                    msg_type = "LedgerDistribution",
936                                    subject_id = %subject_id,
937                                    error = %e,
938                                    "Failed to create subject from ledger"
939                                );
940                                return Err(emit_fail(ctx, e).await);
941                            }
942                        };
943                        None
944                    } else {
945                        let EventRequest::Create(request) = create_ledger
946                            .content()
947                            .event_request
948                            .content()
949                            .clone()
950                        else {
951                            return Err(DistributorError::EmptyEvents.into());
952                        };
953
954                        if let Err(e) = check_subject_creation(
955                            ctx,
956                            &request.governance_id,
957                            create_ledger.signature().signer.clone(),
958                            create_ledger.content().gov_version,
959                            request.namespace.to_string(),
960                            request.schema_id,
961                        )
962                        .await
963                        {
964                            if let ActorError::Functional { .. } = e {
965                                warn!(
966                                    msg_type = "LedgerDistribution",
967                                    subject_id = %subject_id,
968                                    error = %e,
969                                    "Failed to validate subject creation from ledger"
970                                );
971                                return Err(e);
972                            } else {
973                                error!(
974                                    msg_type = "LedgerDistribution",
975                                    subject_id = %subject_id,
976                                    error = %e,
977                                    "Failed to validate subject creation from ledger"
978                                );
979                                return Err(emit_fail(ctx, e).await);
980                            }
981                        }
982
983                        match acquire_subject(
984                            ctx,
985                            &subject_id,
986                            requester,
987                            Some(create_ledger),
988                            true,
989                        )
990                        .await
991                        {
992                            Ok(lease) => Some(lease),
993                            Err(e) => {
994                                if let ActorError::Functional { .. } = e {
995                                    warn!(
996                                        msg_type = "LedgerDistribution",
997                                        subject_id = %subject_id,
998                                        error = %e,
999                                        "Failed to create subject from ledger"
1000                                    );
1001                                    return Err(e);
1002                                } else {
1003                                    error!(
1004                                        msg_type = "LedgerDistribution",
1005                                        subject_id = %subject_id,
1006                                        error = %e,
1007                                        "Failed to create subject from ledger"
1008                                    );
1009                                    return Err(emit_fail(ctx, e).await);
1010                                }
1011                            }
1012                        }
1013                    };
1014
1015                    let _event = ledger.remove(0);
1016                    lease
1017                } else {
1018                    if ledger[0]
1019                        .content()
1020                        .event_request
1021                        .content()
1022                        .is_create_event()
1023                        && is_register
1024                    {
1025                        let _event = ledger.remove(0);
1026                    }
1027
1028                    let requester = Self::requester_id(
1029                        "ledger_distribution",
1030                        &subject_id,
1031                        &info,
1032                        &sender,
1033                    );
1034                    if !ledger.is_empty() && !is_gov {
1035                        match acquire_subject(
1036                            ctx,
1037                            &subject_id,
1038                            requester.clone(),
1039                            None,
1040                            true,
1041                        )
1042                        .await
1043                        {
1044                            Ok(lease) => Some(lease),
1045                            Err(e) => {
1046                                error!(
1047                                    msg_type = "LedgerDistribution",
1048                                    subject_id = %subject_id,
1049                                    error = %e,
1050                                    "Failed to bring up tracker for subject update"
1051                                );
1052                                let error = DistributorError::UpTrackerFailed {
1053                                    details: e.to_string(),
1054                                };
1055                                return Err(emit_fail(ctx, error.into()).await);
1056                            }
1057                        }
1058                    } else {
1059                        None
1060                    }
1061                };
1062
1063                let lease = if !ledger.is_empty() {
1064                    let update_result =
1065                        update_ledger(ctx, &subject_id, ledger).await;
1066
1067                    if let Some(lease) = lease.clone()
1068                        && update_result.is_err()
1069                    {
1070                        lease.finish(ctx).await?;
1071                    }
1072
1073                    match update_result {
1074                        Ok((last_sn, _, _)) => {
1075                            if !is_all {
1076                                debug!(
1077                                    msg_type = "LedgerDistribution",
1078                                    subject_id = %subject_id,
1079                                    last_sn = last_sn,
1080                                    "Partial ledger received, requesting more"
1081                                );
1082
1083                                let new_info = ComunicateInfo {
1084                                    receiver: sender.clone(),
1085                                    request_id: info.request_id.clone(),
1086                                    version: info.version,
1087                                    receiver_actor: format!(
1088                                        "/user/node/distributor_{}",
1089                                        subject_id
1090                                    ),
1091                                };
1092
1093                                if let Err(e) = self
1094                                    .network
1095                                    .send_command(network::CommandHelper::SendMessage {
1096                                        message: NetworkMessage {
1097                                            info: new_info,
1098                                            message: ActorMessage::DistributionLedgerReq {
1099                                                actual_sn: Some(last_sn),
1100                                                subject_id: subject_id.clone(),
1101                                            },
1102                                        },
1103                                    })
1104                                    .await
1105                                {
1106                                    error!(
1107                                        msg_type = "LedgerDistribution",
1108                                        subject_id = %subject_id,
1109                                        last_sn = last_sn,
1110                                        error = %e,
1111                                        "Failed to request more ledger entries"
1112                                    );
1113                                    return Err(emit_fail(ctx, e).await);
1114                                };
1115                            }
1116
1117                            lease
1118                        }
1119                        Err(e) => {
1120                            if let ActorError::Functional { .. } = e.clone() {
1121                                warn!(
1122                                    msg_type = "LedgerDistribution",
1123                                    subject_id = %subject_id,
1124                                    first_sn = first_sn,
1125                                    ledger_count = ledger_count,
1126                                    error = %e,
1127                                    "Failed to update subject ledger"
1128                                );
1129                                return Err(e);
1130                            } else {
1131                                error!(
1132                                    msg_type = "LedgerDistribution",
1133                                    subject_id = %subject_id,
1134                                    first_sn = first_sn,
1135                                    ledger_count = ledger_count,
1136                                    error = %e,
1137                                    "Failed to update subject ledger"
1138                                );
1139                                return Err(emit_fail(ctx, e).await);
1140                            }
1141                        }
1142                    }
1143                } else {
1144                    lease
1145                };
1146
1147                if let Some(lease) = lease {
1148                    lease.finish(ctx).await?;
1149                }
1150
1151                debug!(
1152                    msg_type = "LedgerDistribution",
1153                    subject_id = %subject_id,
1154                    sender = %sender,
1155                    ledger_count = ledger_count,
1156                    is_all = is_all,
1157                    is_gov = is_gov,
1158                    "Ledger distribution processed successfully"
1159                );
1160            }
1161        };
1162
1163        Ok(())
1164    }
1165}