Skip to main content

ave_core/distribution/
worker.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use ave_actors::{
5    Actor, ActorContext, ActorError, ActorPath, Handler, Message,
6    NotPersistentActor,
7};
8use ave_common::{
9    Namespace, SchemaType,
10    identity::{DigestIdentifier, PublicKey},
11    request::EventRequest,
12};
13use network::ComunicateInfo;
14
15use crate::{
16    ActorMessage, NetworkMessage, Node, NodeMessage, NodeResponse,
17    governance::{
18        Governance, GovernanceMessage, GovernanceResponse,
19        model::{HashThisRole, RoleTypes},
20    },
21    helpers::network::service::NetworkSender,
22    model::{
23        common::{
24            check_witness_access, emit_fail,
25            node::{get_subject_data, i_owner_new_owner, try_to_update},
26            subject::{create_subject, get_gov, get_gov_sn, update_ledger},
27        },
28        event::Ledger,
29    },
30    node::SubjectData,
31    subject::SignedLedger,
32    tracker::{Tracker, TrackerMessage, TrackerResponse},
33};
34
35use tracing::{Span, debug, error, info_span, warn};
36
37use super::error::DistributorError;
38
39pub struct DistriWorker {
40    pub our_key: Arc<PublicKey>,
41    pub network: Arc<NetworkSender>,
42}
43
44impl DistriWorker {
45    async fn down_tracker(
46        &self,
47        ctx: &ActorContext<Self>,
48        subject_id: &DigestIdentifier,
49    ) -> Result<(), ActorError> {
50        let subject_path =
51            ActorPath::from(format!("/user/node/{}", subject_id));
52
53        let subject_actor =
54            ctx.system().get_actor::<Tracker>(&subject_path).await?;
55        subject_actor.ask_stop().await
56    }
57
58    async fn get_ledger(
59        &self,
60        ctx: &mut ActorContext<Self>,
61        subject_id: &DigestIdentifier,
62        hi_sn: u64,
63        lo_sn: Option<u64>,
64        is_gov: bool,
65    ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
66        let path = ActorPath::from(format!("/user/node/{}", subject_id));
67
68        if is_gov {
69            let governance_actor =
70                ctx.system().get_actor::<Governance>(&path).await?;
71
72            let response = governance_actor
73                .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
74                .await?;
75
76            match response {
77                GovernanceResponse::Ledger { ledger, is_all } => {
78                    Ok((ledger, is_all))
79                }
80                _ => Err(ActorError::UnexpectedResponse {
81                    expected: "GovernanceResponse::Ledger".to_owned(),
82                    path,
83                }),
84            }
85        } else {
86            let response = if let Ok(tracker_actor) =
87                ctx.system().get_actor::<Tracker>(&path).await
88            {
89                tracker_actor
90                    .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
91                    .await?
92            } else {
93                Self::up_tracker(ctx, subject_id, true).await?;
94
95                let tracker_actor =
96                    ctx.system().get_actor::<Tracker>(&path).await?;
97
98                let response = tracker_actor
99                    .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
100                    .await?;
101
102                tracker_actor.ask_stop().await?;
103
104                response
105            };
106
107            match response {
108                TrackerResponse::Ledger { ledger, is_all } => {
109                    Ok((ledger, is_all))
110                }
111                _ => Err(ActorError::UnexpectedResponse {
112                    expected: "TrackerResponse::Ledger".to_owned(),
113                    path,
114                }),
115            }
116        }
117    }
118
119    async fn authorized_subj(
120        &self,
121        ctx: &ActorContext<Self>,
122        subject_id: &DigestIdentifier,
123    ) -> Result<(bool, Option<SubjectData>), ActorError> {
124        let node_path = ActorPath::from("/user/node");
125        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
126
127        let response = node_actor
128            .ask(NodeMessage::AuthData(subject_id.to_owned()))
129            .await?;
130        match response {
131            NodeResponse::AuthData { auth, subject_data } => {
132                Ok((auth, subject_data))
133            }
134            _ => Err(ActorError::UnexpectedResponse {
135                expected: "NodeResponse::AuthData".to_owned(),
136                path: node_path,
137            }),
138        }
139    }
140
141    async fn check_auth(
142        &self,
143        ctx: &mut ActorContext<Self>,
144        signer: PublicKey,
145        ledger: Ledger,
146    ) -> Result<(bool, bool), ActorError> {
147        let subject_id = ledger.get_subject_id();
148        // Si está auth o si soy el dueño del sujeto.
149        let (auth, subject_data) =
150            self.authorized_subj(ctx, &subject_id).await?;
151
152        // Extraer schema_id, namespace y governance_id según si conocemos el sujeto o no
153        let (schema_id, namespace, governance_id) = if let Some(ref data) =
154            subject_data
155        {
156            // Lo conozco
157            match data {
158                SubjectData::Tracker {
159                    governance_id,
160                    schema_id,
161                    namespace,
162                    ..
163                } => {
164                    let namespace = Namespace::from(namespace.clone());
165                    (schema_id.clone(), namespace, Some(governance_id.clone()))
166                }
167                SubjectData::Governance { .. } => {
168                    (SchemaType::Governance, Namespace::new(), None)
169                }
170            }
171        } else {
172            // No lo conozco - debe ser evento Create
173            if let EventRequest::Create(create) = ledger.event_request.content()
174            {
175                if !create.schema_id.is_gov() && create.governance_id.is_empty()
176                {
177                    return Err(
178                        DistributorError::MissingGovernanceIdInCreate {
179                            subject_id: subject_id.clone(),
180                        }
181                        .into(),
182                    );
183                }
184
185                let gov_id = if create.schema_id.is_gov() {
186                    None
187                } else {
188                    Some(create.governance_id.clone())
189                };
190
191                (create.schema_id.clone(), create.namespace.clone(), gov_id)
192            } else {
193                // No es el primer evento, necesito el primero
194                try_to_update(ctx, subject_id, Some(signer)).await?;
195                return Err(DistributorError::UpdatingSubject.into());
196            }
197        };
198
199        let is_gov = schema_id.is_gov();
200        // Verificar autorización
201        if is_gov {
202            // Es una gobernanza
203            if !auth {
204                return Err(DistributorError::GovernanceNotAuthorized.into());
205            }
206        } else {
207            // Es un Tracker - verificar rol de witness si no está autorizado
208            if !auth {
209                let governance_id =
210                    governance_id.expect("governance_id is Some for Trackers");
211                let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
212                    DistributorError::GetGovernanceFailed {
213                        details: e.to_string(),
214                    }
215                })?;
216
217                match gov.version.cmp(&ledger.gov_version) {
218                    std::cmp::Ordering::Less => {
219                        return Err(
220                            DistributorError::GovernanceVersionMismatch {
221                                our_version: gov.version,
222                                their_version: ledger.gov_version,
223                            }
224                            .into(),
225                        );
226                    }
227                    std::cmp::Ordering::Equal => {}
228                    std::cmp::Ordering::Greater => {}
229                };
230
231                if !gov.has_this_role(HashThisRole::SchemaWitness {
232                    who: (*self.our_key).clone(),
233                    creator: signer,
234                    schema_id,
235                    namespace,
236                }) {
237                    return Err(DistributorError::NotWitness.into());
238                }
239            }
240        }
241
242        Ok((is_gov, subject_data.is_some()))
243    }
244
245    async fn check_witness(
246        &self,
247        ctx: &mut ActorContext<Self>,
248        subject_id: &DigestIdentifier,
249        sender: PublicKey,
250    ) -> Result<(u64, bool), ActorError> {
251        let data = get_subject_data(ctx, subject_id).await?;
252
253        let Some(data) = data else {
254            return Err(DistributorError::SubjectNotFound.into());
255        };
256
257        match data {
258            SubjectData::Tracker {
259                governance_id,
260                schema_id,
261                namespace,
262                ..
263            } => {
264                let Some(sn) = check_witness_access(
265                    ctx,
266                    &governance_id,
267                    subject_id,
268                    sender,
269                    namespace,
270                    schema_id,
271                )
272                .await?
273                else {
274                    return Err(DistributorError::SenderNoAccess.into());
275                };
276
277                Ok((sn, false))
278            }
279            SubjectData::Governance { .. } => {
280                let gov = get_gov(ctx, subject_id).await.map_err(|e| {
281                    DistributorError::GetGovernanceFailed {
282                        details: e.to_string(),
283                    }
284                })?;
285
286                if !gov.has_this_role(HashThisRole::Gov {
287                    who: sender.clone(),
288                    role: RoleTypes::Witness,
289                }) {
290                    return Err(DistributorError::SenderNotMember {
291                        sender: sender.to_string(),
292                    }
293                    .into());
294                }
295
296                Ok((get_gov_sn(ctx, subject_id).await?, true))
297            }
298        }
299    }
300
301    pub async fn up_tracker(
302        ctx: &mut ActorContext<Self>,
303        subject_id: &DigestIdentifier,
304        light: bool,
305    ) -> Result<(), ActorError> {
306        let node_path = ActorPath::from("/user/node");
307        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
308
309        // We obtain the validator
310        let response = node_actor
311            .ask(NodeMessage::UpSubject {
312                subject_id: subject_id.to_owned(),
313                light,
314            })
315            .await?;
316
317        match response {
318            NodeResponse::Ok => Ok(()),
319            _ => Err(ActorError::UnexpectedResponse {
320                expected: "NodeResponse::Ok".to_owned(),
321                path: node_path,
322            }),
323        }
324    }
325}
326
327#[async_trait]
328impl Actor for DistriWorker {
329    type Event = ();
330    type Message = DistriWorkerMessage;
331    type Response = ();
332
333    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
334        parent_span.map_or_else(
335            || info_span!("DistriWorker", id),
336            |parent_span| info_span!(parent: parent_span, "DistriWorker", id),
337        )
338    }
339}
340
341#[derive(Debug, Clone)]
342pub enum DistriWorkerMessage {
343    GetLastSn {
344        subject_id: DigestIdentifier,
345        info: ComunicateInfo,
346        sender: PublicKey,
347        receiver_actor: String,
348    },
349    // Un nodo nos solicitó la copia del ledger.
350    SendDistribution {
351        actual_sn: Option<u64>,
352        subject_id: DigestIdentifier,
353        info: ComunicateInfo,
354        sender: PublicKey,
355    },
356    // Nos llega una replica, guardarla en informar que la hemos recivido
357    LastEventDistribution {
358        ledger: Box<SignedLedger>,
359        info: ComunicateInfo,
360        sender: PublicKey,
361    },
362    LedgerDistribution {
363        ledger: Vec<SignedLedger>,
364        is_all: bool,
365        info: ComunicateInfo,
366        sender: PublicKey,
367    },
368}
369
370impl Message for DistriWorkerMessage {}
371
372impl NotPersistentActor for DistriWorker {}
373
374#[async_trait]
375impl Handler<Self> for DistriWorker {
376    async fn handle_message(
377        &mut self,
378        _sender: ActorPath,
379        msg: DistriWorkerMessage,
380        ctx: &mut ActorContext<Self>,
381    ) -> Result<(), ActorError> {
382        match msg {
383            DistriWorkerMessage::GetLastSn {
384                subject_id,
385                info,
386                sender,
387                receiver_actor,
388            } => {
389                let (sn, ..) = match self
390                    .check_witness(ctx, &subject_id, sender.clone())
391                    .await
392                {
393                    Ok(sn) => sn,
394                    Err(e) => {
395                        if let ActorError::Functional { .. } = e {
396                            warn!(
397                                msg_type = "GetLastSn",
398                                subject_id = %subject_id,
399                                sender = %sender,
400                                error = %e,
401                                "Witness check failed"
402                            );
403                            return Err(e);
404                        } else {
405                            error!(
406                                msg_type = "GetLastSn",
407                                subject_id = %subject_id,
408                                sender = %sender,
409                                error = %e,
410                                "Witness check failed"
411                            );
412                            return Err(emit_fail(ctx, e).await);
413                        }
414                    }
415                };
416
417                let new_info = ComunicateInfo {
418                    receiver: sender.clone(),
419                    request_id: info.request_id,
420                    version: info.version,
421                    receiver_actor,
422                };
423
424                if let Err(e) = self
425                    .network
426                    .send_command(network::CommandHelper::SendMessage {
427                        message: NetworkMessage {
428                            info: new_info,
429                            message: ActorMessage::AuthLastSn { sn },
430                        },
431                    })
432                    .await
433                {
434                    error!(
435                        msg_type = "GetLastSn",
436                        subject_id = %subject_id,
437                        sn = sn,
438                        error = %e,
439                        "Failed to send last SN response to network"
440                    );
441                    return Err(emit_fail(ctx, e).await);
442                };
443
444                debug!(
445                    msg_type = "GetLastSn",
446                    subject_id = %subject_id,
447                    sn = sn,
448                    sender = %sender,
449                    "Last SN response sent successfully"
450                );
451            }
452            DistriWorkerMessage::SendDistribution {
453                actual_sn,
454                info,
455                subject_id,
456                sender,
457            } => {
458                let (hi_sn, is_gov) = match self
459                    .check_witness(ctx, &subject_id, sender.clone())
460                    .await
461                {
462                    Ok(sn) => sn,
463                    Err(e) => {
464                        if let ActorError::Functional { .. } = e {
465                            warn!(
466                                msg_type = "SendDistribution",
467                                subject_id = %subject_id,
468                                sender = %sender,
469                                error = %e,
470                                "Witness check failed"
471                            );
472                            return Err(e);
473                        } else {
474                            error!(
475                                msg_type = "SendDistribution",
476                                subject_id = %subject_id,
477                                sender = %sender,
478                                error = %e,
479                                "Witness check failed"
480                            );
481                            return Err(emit_fail(ctx, e).await);
482                        }
483                    }
484                };
485
486                if let Some(actual_sn) = actual_sn
487                    && actual_sn >= hi_sn
488                {
489                    warn!(
490                        msg_type = "SendDistribution",
491                        subject_id = %subject_id,
492                        actual_sn = actual_sn,
493                        witness_sn = hi_sn,
494                        "Requester SN is >= witness SN, nothing to send"
495                    );
496                    return Err(DistributorError::ActualSnBiggerThanWitness {
497                        actual_sn,
498                        witness_sn: hi_sn,
499                    }
500                    .into());
501                };
502
503                let (ledger, is_all) = match self
504                    .get_ledger(ctx, &subject_id, hi_sn, actual_sn, is_gov)
505                    .await
506                {
507                    Ok(res) => res,
508                    Err(e) => {
509                        error!(
510                            msg_type = "SendDistribution",
511                            subject_id = %subject_id,
512                            hi_sn = hi_sn,
513                            actual_sn = ?actual_sn,
514                            is_gov = is_gov,
515                            error = %e,
516                            "Failed to obtain ledger"
517                        );
518                        return Err(emit_fail(ctx, e).await);
519                    }
520                };
521
522                let new_info = ComunicateInfo {
523                    receiver: sender.clone(),
524                    request_id: info.request_id,
525                    version: info.version,
526                    receiver_actor: format!(
527                        "/user/node/distributor_{}",
528                        subject_id
529                    ),
530                };
531
532                if let Err(e) = self
533                    .network
534                    .send_command(network::CommandHelper::SendMessage {
535                        message: NetworkMessage {
536                            info: new_info,
537                            message: ActorMessage::DistributionLedgerRes {
538                                ledger: ledger.clone(),
539                                is_all,
540                            },
541                        },
542                    })
543                    .await
544                {
545                    error!(
546                        msg_type = "SendDistribution",
547                        subject_id = %subject_id,
548                        ledger_count = ledger.len(),
549                        is_all = is_all,
550                        error = %e,
551                        "Failed to send ledger response to network"
552                    );
553                    return Err(emit_fail(ctx, e).await);
554                };
555
556                debug!(
557                    msg_type = "SendDistribution",
558                    subject_id = %subject_id,
559                    sender = %sender,
560                    ledger_count = ledger.len(),
561                    is_all = is_all,
562                    hi_sn = hi_sn,
563                    actual_sn = ?actual_sn,
564                    "Ledger distribution sent successfully"
565                );
566            }
567            DistriWorkerMessage::LastEventDistribution {
568                ledger,
569                info,
570                sender,
571            } => {
572                let subject_id = ledger.content().get_subject_id();
573                let sn = ledger.content().sn;
574
575                let (is_gov, ..) = match self
576                    .check_auth(ctx, sender.clone(), ledger.content().clone())
577                    .await
578                {
579                    Ok(is_gov) => is_gov,
580                    Err(e) => {
581                        if let ActorError::Functional { .. } = e {
582                            warn!(
583                                msg_type = "LastEventDistribution",
584                                subject_id = %subject_id,
585                                sn = sn,
586                                sender = %sender,
587                                error = %e,
588                                "Authorization check failed"
589                            );
590                            return Err(e);
591                        } else {
592                            error!(
593                                msg_type = "LastEventDistribution",
594                                subject_id = %subject_id,
595                                sn = sn,
596                                sender = %sender,
597                                error = %e,
598                                "Authorization check failed"
599                            );
600                            return Err(emit_fail(ctx, e).await);
601                        }
602                    }
603                };
604
605                let (owner, new_owner) = if ledger
606                    .content()
607                    .event_request
608                    .content()
609                    .is_create_event()
610                {
611                    if let Err(e) = create_subject(ctx, *ledger.clone()).await {
612                        if let ActorError::Functional { .. } = e {
613                            warn!(
614                                msg_type = "LastEventDistribution",
615                                subject_id = %subject_id,
616                                sn = sn,
617                                error = %e,
618                                "Failed to create subject from create event"
619                            );
620                            return Err(e);
621                        } else {
622                            error!(
623                                msg_type = "LastEventDistribution",
624                                subject_id = %subject_id,
625                                sn = sn,
626                                error = %e,
627                                "Failed to create subject from create event"
628                            );
629                            return Err(emit_fail(ctx, e).await);
630                        }
631                    };
632
633                    (ledger.signature().signer.clone(), None)
634                } else {
635                    let (i_owner, i_new_owner) =
636                        match i_owner_new_owner(ctx, &subject_id).await {
637                            Ok(res) => res,
638                            Err(e) => {
639                                error!(
640                                    msg_type = "LastEventDistribution",
641                                    subject_id = %subject_id,
642                                    error = %e,
643                                    "Failed to check owner status"
644                                );
645                                return Err(emit_fail(ctx, e).await);
646                            }
647                        };
648
649                    if !i_new_owner.unwrap_or_default()
650                        && !i_owner
651                        && !is_gov
652                        && let Err(e) =
653                            Self::up_tracker(ctx, &subject_id, false).await
654                    {
655                        error!(
656                            msg_type = "LastEventDistribution",
657                            subject_id = %subject_id,
658                            error = %e,
659                            "Failed to bring up tracker for witness subject"
660                        );
661                        let error = DistributorError::UpTrackerFailed {
662                            details: e.to_string(),
663                        };
664                        return Err(emit_fail(ctx, error.into()).await);
665                    }
666
667                    match update_ledger(ctx, &subject_id, vec![*ledger.clone()])
668                        .await
669                    {
670                        Ok((last_sn, owner, new_owner))
671                            if last_sn < ledger.content().sn =>
672                        {
673                            debug!(
674                                msg_type = "LastEventDistribution",
675                                subject_id = %subject_id,
676                                last_sn = last_sn,
677                                received_sn = sn,
678                                "SN gap detected, requesting full ledger"
679                            );
680
681                            let new_info = ComunicateInfo {
682                                receiver: sender,
683                                request_id: info.request_id,
684                                version: info.version,
685                                receiver_actor: format!(
686                                    "/user/node/distributor_{}",
687                                    subject_id
688                                ),
689                            };
690
691                            if let Err(e) = self.network.send_command(network::CommandHelper::SendMessage {
692                                    message: NetworkMessage {
693                                        info: new_info,
694                                        message: ActorMessage::DistributionLedgerReq {
695                                            actual_sn: Some(last_sn),
696                                            subject_id: subject_id.clone(),
697                                        },
698                                    },
699                                }).await {
700                                    error!(
701                                        msg_type = "LastEventDistribution",
702                                        subject_id = %subject_id,
703                                        last_sn = last_sn,
704                                        error = %e,
705                                        "Failed to request ledger from network"
706                                    );
707                                    return Err(emit_fail(ctx, e).await);
708                                };
709
710                            let i_new_owner = if let Some(new_owner) = new_owner
711                            {
712                                new_owner == *self.our_key
713                            } else {
714                                false
715                            };
716
717                            if !is_gov
718                                && owner != *self.our_key
719                                && !i_new_owner
720                                && let Err(e) =
721                                    self.down_tracker(ctx, &subject_id).await
722                            {
723                                error!(
724                                    msg_type = "LastEventDistribution",
725                                    subject_id = %subject_id,
726                                    error = %e,
727                                    "Failed to stop tracker after ledger request"
728                                );
729                                return Err(e);
730                            }
731
732                            return Ok(());
733                        }
734                        Ok((.., owner, new_owner)) => (owner, new_owner),
735                        Err(e) => {
736                            if let ActorError::Functional { .. } = e.clone() {
737                                warn!(
738                                    msg_type = "LastEventDistribution",
739                                    subject_id = %subject_id,
740                                    sn = sn,
741                                    error = %e,
742                                    "Failed to update subject ledger"
743                                );
744                                return Err(e);
745                            } else {
746                                error!(
747                                    msg_type = "LastEventDistribution",
748                                    subject_id = %subject_id,
749                                    sn = sn,
750                                    error = %e,
751                                    "Failed to update subject ledger"
752                                );
753                                return Err(emit_fail(ctx, e).await);
754                            }
755                        }
756                    }
757                };
758
759                let new_info = ComunicateInfo {
760                    receiver: sender.clone(),
761                    receiver_actor: format!(
762                        "/user/{}/{}",
763                        info.request_id,
764                        info.receiver.clone()
765                    ),
766                    request_id: info.request_id,
767                    version: info.version,
768                };
769
770                if let Err(e) = self
771                    .network
772                    .send_command(network::CommandHelper::SendMessage {
773                        message: NetworkMessage {
774                            info: new_info,
775                            message: ActorMessage::DistributionLastEventRes,
776                        },
777                    })
778                    .await
779                {
780                    error!(
781                        msg_type = "LastEventDistribution",
782                        subject_id = %subject_id,
783                        sn = sn,
784                        error = %e,
785                        "Failed to send distribution acknowledgment"
786                    );
787                    return Err(emit_fail(ctx, e).await);
788                };
789
790                let i_new_owner = if let Some(ref new_owner) = new_owner {
791                    *new_owner == *self.our_key
792                } else {
793                    false
794                };
795
796                if !is_gov
797                    && owner != *self.our_key
798                    && !i_new_owner
799                    && let Err(e) = self.down_tracker(ctx, &subject_id).await
800                {
801                    error!(
802                        msg_type = "LastEventDistribution",
803                        subject_id = %subject_id,
804                        error = %e,
805                        "Failed to stop tracker after processing"
806                    );
807                    return Err(e);
808                }
809
810                debug!(
811                    msg_type = "LastEventDistribution",
812                    subject_id = %subject_id,
813                    sn = sn,
814                    sender = %sender,
815                    is_gov = is_gov,
816                    "Last event distribution processed successfully"
817                );
818            }
819            DistriWorkerMessage::LedgerDistribution {
820                mut ledger,
821                is_all,
822                info,
823                sender,
824            } => {
825                if ledger.is_empty() {
826                    warn!(
827                        msg_type = "LedgerDistribution",
828                        sender = %sender,
829                        "Received empty ledger distribution"
830                    );
831                    return Err(DistributorError::EmptyEvents.into());
832                }
833
834                let subject_id = ledger[0].content().get_subject_id();
835                let ledger_count = ledger.len();
836                let first_sn = ledger[0].content().sn;
837
838                let (is_gov, is_register) = match self
839                    .check_auth(
840                        ctx,
841                        sender.clone(),
842                        ledger[0].content().clone(),
843                    )
844                    .await
845                {
846                    Ok(data) => data,
847                    Err(e) => {
848                        if let ActorError::Functional { .. } = e {
849                            warn!(
850                                msg_type = "LedgerDistribution",
851                                subject_id = %subject_id,
852                                sender = %sender,
853                                ledger_count = ledger_count,
854                                error = %e,
855                                "Authorization check failed"
856                            );
857                            return Err(e);
858                        } else {
859                            error!(
860                                msg_type = "LedgerDistribution",
861                                subject_id = %subject_id,
862                                sender = %sender,
863                                ledger_count = ledger_count,
864                                error = %e,
865                                "Authorization check failed"
866                            );
867                            return Err(emit_fail(ctx, e).await);
868                        }
869                    }
870                };
871
872                let (i_owner, i_new_owner) = if ledger[0]
873                    .content()
874                    .event_request
875                    .content()
876                    .is_create_event()
877                    && !is_register
878                {
879                    if let Err(e) = create_subject(ctx, ledger[0].clone()).await
880                    {
881                        if let ActorError::Functional { .. } = e {
882                            warn!(
883                                msg_type = "LedgerDistribution",
884                                subject_id = %subject_id,
885                                error = %e,
886                                "Failed to create subject from ledger"
887                            );
888                            return Err(e);
889                        } else {
890                            error!(
891                                msg_type = "LedgerDistribution",
892                                subject_id = %subject_id,
893                                error = %e,
894                                "Failed to create subject from ledger"
895                            );
896                            return Err(emit_fail(ctx, e).await);
897                        }
898                    };
899
900                    let event = ledger.remove(0);
901                    (event.signature().signer == *self.our_key, false)
902                } else {
903                    // TODO en un futuro mejorar esto
904                    if ledger[0]
905                        .content()
906                        .event_request
907                        .content()
908                        .is_create_event()
909                        && is_register
910                    {
911                        let _event = ledger.remove(0);
912                    }
913
914                    let (i_owner, i_new_owner) =
915                        match i_owner_new_owner(ctx, &subject_id).await {
916                            Ok(res) => res,
917                            Err(e) => {
918                                error!(
919                                    msg_type = "LedgerDistribution",
920                                    subject_id = %subject_id,
921                                    error = %e,
922                                    "Failed to check owner status"
923                                );
924                                return Err(emit_fail(ctx, e).await);
925                            }
926                        };
927
928                    let i_new_owner = i_new_owner.unwrap_or_default();
929                    if !i_new_owner
930                        && !i_owner
931                        && !is_gov
932                        && let Err(e) =
933                            Self::up_tracker(ctx, &subject_id, false).await
934                    {
935                        error!(
936                            msg_type = "LedgerDistribution",
937                            subject_id = %subject_id,
938                            error = %e,
939                            "Failed to bring up tracker for witness subject"
940                        );
941                        let error = DistributorError::UpTrackerFailed {
942                            details: e.to_string(),
943                        };
944                        return Err(emit_fail(ctx, error.into()).await);
945                    }
946
947                    (i_owner, i_new_owner)
948                };
949
950                let (i_owner, i_new_owner) = if !ledger.is_empty() {
951                    match update_ledger(ctx, &subject_id, ledger).await {
952                        Ok((last_sn, owner, new_owner)) => {
953                            let i_new_owner = if let Some(new_owner) = new_owner
954                            {
955                                new_owner == *self.our_key
956                            } else {
957                                false
958                            };
959
960                            if !is_all {
961                                debug!(
962                                    msg_type = "LedgerDistribution",
963                                    subject_id = %subject_id,
964                                    last_sn = last_sn,
965                                    "Partial ledger received, requesting more"
966                                );
967
968                                let new_info = ComunicateInfo {
969                                    receiver: sender.clone(),
970                                    request_id: info.request_id,
971                                    version: info.version,
972                                    receiver_actor: format!(
973                                        "/user/node/distributor_{}",
974                                        subject_id
975                                    ),
976                                };
977
978                                if let Err(e) = self
979                                    .network
980                                    .send_command(network::CommandHelper::SendMessage {
981                                        message: NetworkMessage {
982                                            info: new_info,
983                                            message: ActorMessage::DistributionLedgerReq {
984                                                actual_sn: Some(last_sn),
985                                                subject_id: subject_id.clone(),
986                                            },
987                                        },
988                                    })
989                                    .await
990                                {
991                                    error!(
992                                        msg_type = "LedgerDistribution",
993                                        subject_id = %subject_id,
994                                        last_sn = last_sn,
995                                        error = %e,
996                                        "Failed to request more ledger entries"
997                                    );
998                                    return Err(emit_fail(ctx, e).await);
999                                };
1000                            }
1001
1002                            (owner == *self.our_key, i_new_owner)
1003                        }
1004                        Err(e) => {
1005                            if let ActorError::Functional { .. } = e.clone() {
1006                                warn!(
1007                                    msg_type = "LedgerDistribution",
1008                                    subject_id = %subject_id,
1009                                    first_sn = first_sn,
1010                                    ledger_count = ledger_count,
1011                                    error = %e,
1012                                    "Failed to update subject ledger"
1013                                );
1014                                return Err(e);
1015                            } else {
1016                                error!(
1017                                    msg_type = "LedgerDistribution",
1018                                    subject_id = %subject_id,
1019                                    first_sn = first_sn,
1020                                    ledger_count = ledger_count,
1021                                    error = %e,
1022                                    "Failed to update subject ledger"
1023                                );
1024                                return Err(emit_fail(ctx, e).await);
1025                            }
1026                        }
1027                    }
1028                } else {
1029                    (i_owner, i_new_owner)
1030                };
1031
1032                if !is_gov
1033                    && !i_owner
1034                    && !i_new_owner
1035                    && let Err(e) = self.down_tracker(ctx, &subject_id).await
1036                {
1037                    error!(
1038                        msg_type = "LedgerDistribution",
1039                        subject_id = %subject_id,
1040                        error = %e,
1041                        "Failed to stop tracker after processing"
1042                    );
1043                    return Err(e);
1044                }
1045
1046                debug!(
1047                    msg_type = "LedgerDistribution",
1048                    subject_id = %subject_id,
1049                    sender = %sender,
1050                    ledger_count = ledger_count,
1051                    is_all = is_all,
1052                    is_gov = is_gov,
1053                    "Ledger distribution processed successfully"
1054                );
1055            }
1056        };
1057
1058        Ok(())
1059    }
1060}