Skip to main content

ave_core/request/
manager.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9    DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18use std::sync::Arc;
19use std::time::Duration;
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24    Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::EvaluateData;
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30    HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::helpers::network::service::NetworkSender;
33use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
34use crate::model::common::send_to_tracking;
35use crate::model::common::subject::{
36    create_subject, get_gov, get_gov_sn, get_last_ledger_event, get_metadata,
37    make_obsolete, update_ledger,
38};
39use crate::model::event::{
40    ApprovalData, EvaluationData, Ledger, Protocols, ValidationData,
41};
42use crate::node::SubjectData;
43use crate::request::error::RequestManagerError;
44use crate::request::tracking::RequestTrackingMessage;
45use crate::request::{RequestHandler, RequestHandlerMessage};
46use crate::subject::{Metadata, SignedLedger};
47
48use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
49use crate::{
50    ActorMessage, NetworkMessage, Validation, ValidationMessage,
51    approval::{Approval, ApprovalMessage},
52    auth::{Auth, AuthMessage, AuthResponse},
53    db::Storable,
54    evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
55    model::common::emit_fail,
56    update::{Update, UpdateMessage, UpdateNew, UpdateType},
57};
58
59use super::{
60    reboot::{Reboot, RebootMessage},
61    types::{ReqManInitMessage, RequestManagerState},
62};
63
64#[derive(Clone, Debug, Serialize, Deserialize)]
65pub struct RequestManager {
66    #[serde(skip)]
67    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
68    #[serde(skip)]
69    our_key: Arc<PublicKey>,
70    #[serde(skip)]
71    id: DigestIdentifier,
72    #[serde(skip)]
73    subject_id: DigestIdentifier,
74    #[serde(skip)]
75    retry_timeout: u64,
76    #[serde(skip)]
77    retry_diff: u64,
78    command: ReqManInitMessage,
79    request: Option<Signed<EventRequest>>,
80    state: RequestManagerState,
81    version: u64,
82}
83
84#[derive(Debug, Clone)]
85pub enum RebootType {
86    Normal,
87    Diff,
88    TimeOut,
89}
90
91pub struct InitRequestManager {
92    pub our_key: Arc<PublicKey>,
93    pub subject_id: DigestIdentifier,
94    pub helpers: (HashAlgorithm, Arc<NetworkSender>),
95}
96
97impl BorshSerialize for RequestManager {
98    fn serialize<W: std::io::Write>(
99        &self,
100        writer: &mut W,
101    ) -> std::io::Result<()> {
102        BorshSerialize::serialize(&self.command, writer)?;
103        BorshSerialize::serialize(&self.state, writer)?;
104        BorshSerialize::serialize(&self.version, writer)?;
105        BorshSerialize::serialize(&self.request, writer)?;
106
107        Ok(())
108    }
109}
110
111impl BorshDeserialize for RequestManager {
112    fn deserialize_reader<R: std::io::Read>(
113        reader: &mut R,
114    ) -> std::io::Result<Self> {
115        // Deserialize the persisted fields
116        let command = ReqManInitMessage::deserialize_reader(reader)?;
117        let state = RequestManagerState::deserialize_reader(reader)?;
118        let version = u64::deserialize_reader(reader)?;
119        let request =
120            Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
121
122        let our_key = Arc::new(PublicKey::default());
123        let subject_id = DigestIdentifier::default();
124        let id = DigestIdentifier::default();
125
126        Ok(Self {
127            retry_diff: 0,
128            retry_timeout: 0,
129            helpers: None,
130            our_key,
131            id,
132            subject_id,
133            command,
134            request,
135            state,
136            version,
137        })
138    }
139}
140
141impl RequestManager {
142    //////// EVAL
143    ////////////////////////////////////////////////
144    //Revisado
145    async fn build_evaluation(
146        &mut self,
147        ctx: &mut ActorContext<Self>,
148    ) -> Result<(), RequestManagerError> {
149        let Some(request) = self.request.clone() else {
150            return Err(RequestManagerError::RequestNotSet);
151        };
152
153        self.on_event(
154            RequestManagerEvent::UpdateState {
155                state: Box::new(RequestManagerState::Evaluation),
156            },
157            ctx,
158        )
159        .await;
160
161        let metadata = Self::check_data_eval(ctx, &request).await?;
162
163        let (signed_evaluation_req, quorum, signers, init_state) =
164            self.build_request_eval(ctx, &metadata, &request).await?;
165
166        if signers.is_empty() {
167            warn!(
168                request_id = %self.id,
169                schema_id = %metadata.schema_id,
170                "No evaluators available for schema"
171            );
172
173            return Err(RequestManagerError::NoEvaluatorsAvailable {
174                schema_id: metadata.schema_id.to_string(),
175                governance_id: signed_evaluation_req
176                    .content()
177                    .governance_id
178                    .clone(),
179            });
180        }
181
182        self.run_evaluation(
183            ctx,
184            signed_evaluation_req.clone(),
185            quorum,
186            init_state,
187            signers,
188        )
189        .await
190    }
191
192    // revisado
193    async fn check_data_eval(
194        ctx: &mut ActorContext<Self>,
195        request: &Signed<EventRequest>,
196    ) -> Result<Metadata, RequestManagerError> {
197        let (subject_id, confirm) = match request.content().clone() {
198            EventRequest::Fact(event) => (event.subject_id, false),
199            EventRequest::Transfer(event) => (event.subject_id, false),
200            EventRequest::Confirm(event) => (event.subject_id, true),
201            _ => {
202                return Err(
203                    RequestManagerError::InvalidEventRequestForEvaluation,
204                );
205            }
206        };
207
208        let metadata = get_metadata(ctx, &subject_id).await?;
209
210        if confirm && !metadata.schema_id.is_gov() {
211            return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
212        }
213
214        Ok(metadata)
215    }
216
217    async fn build_request_eval(
218        &self,
219        ctx: &mut ActorContext<Self>,
220        metadata: &Metadata,
221        request: &Signed<EventRequest>,
222    ) -> Result<
223        (
224            Signed<EvaluationReq>,
225            Quorum,
226            HashSet<PublicKey>,
227            Option<ValueWrapper>,
228        ),
229        RequestManagerError,
230    > {
231        let is_gov = metadata.schema_id.is_gov();
232
233        let request_type = EventRequestType::from(request.content());
234        let (evaluate_data, governance_data, init_state) = match (
235            is_gov,
236            request_type,
237        ) {
238            (true, EventRequestType::Fact) => {
239                let state =
240                    GovernanceData::try_from(metadata.properties.clone())?;
241
242                (
243                    EvaluateData::GovFact {
244                        state: state.clone(),
245                    },
246                    state,
247                    None,
248                )
249            }
250            (true, EventRequestType::Transfer) => {
251                let state =
252                    GovernanceData::try_from(metadata.properties.clone())?;
253
254                (
255                    EvaluateData::GovTransfer {
256                        state: state.clone(),
257                    },
258                    state,
259                    None,
260                )
261            }
262            (true, EventRequestType::Confirm) => {
263                let state =
264                    GovernanceData::try_from(metadata.properties.clone())?;
265
266                (
267                    EvaluateData::GovConfirm {
268                        state: state.clone(),
269                    },
270                    state,
271                    None,
272                )
273            }
274            (false, EventRequestType::Fact) => {
275                let governance_data =
276                    get_gov(ctx, &metadata.governance_id).await?;
277
278                let init_state =
279                    governance_data.get_init_state(&metadata.schema_id)?;
280
281                (
282                    EvaluateData::TrackerSchemasFact {
283                        contract: format!(
284                            "{}_{}",
285                            metadata.governance_id, metadata.schema_id
286                        ),
287                        state: metadata.properties.clone(),
288                    },
289                    governance_data,
290                    Some(init_state),
291                )
292            }
293            (false, EventRequestType::Transfer) => {
294                let governance_data =
295                    get_gov(ctx, &metadata.governance_id).await?;
296                (
297                    EvaluateData::TrackerSchemasTransfer {
298                        governance_data: governance_data.clone(),
299                        namespace: metadata.namespace.clone(),
300                        schema_id: metadata.schema_id.clone(),
301                        state: metadata.properties.clone(),
302                    },
303                    governance_data,
304                    None,
305                )
306            }
307            _ => unreachable!(
308                "It was previously verified that the matched cases are the only possible ones"
309            ),
310        };
311
312        let (signers, quorum) = governance_data.get_quorum_and_signers(
313            ProtocolTypes::Evaluation,
314            &metadata.schema_id,
315            metadata.namespace.clone(),
316        )?;
317
318        let eval_req = EvaluationReq {
319            event_request: request.clone(),
320            data: evaluate_data,
321            sn: metadata.sn + 1,
322            gov_version: governance_data.version,
323            namespace: metadata.namespace.clone(),
324            schema_id: metadata.schema_id.clone(),
325            signer: (*self.our_key).clone(),
326            signer_is_owner: *self.our_key == request.signature().signer,
327            governance_id: metadata.governance_id.clone(),
328        };
329
330        let signature =
331            get_sign(ctx, SignTypesNode::EvaluationReq(eval_req.clone()))
332                .await?;
333
334        let signed_evaluation_req: Signed<EvaluationReq> =
335            Signed::from_parts(eval_req, signature);
336        Ok((signed_evaluation_req, quorum, signers, init_state))
337    }
338
339    async fn run_evaluation(
340        &self,
341        ctx: &mut ActorContext<Self>,
342        request: Signed<EvaluationReq>,
343        quorum: Quorum,
344        init_state: Option<ValueWrapper>,
345        signers: HashSet<PublicKey>,
346    ) -> Result<(), RequestManagerError> {
347        let Some((hash, network)) = self.helpers.clone() else {
348            return Err(RequestManagerError::HelpersNotInitialized);
349        };
350
351        info!("Init evaluation {}", self.id);
352        let child = ctx
353            .create_child(
354                "evaluation",
355                Evaluation::new(
356                    self.our_key.clone(),
357                    request,
358                    quorum,
359                    init_state,
360                    hash,
361                    network,
362                ),
363            )
364            .await?;
365
366        child
367            .tell(EvaluationMessage::Create {
368                request_id: self.id.clone(),
369                version: self.version,
370                signers,
371            })
372            .await?;
373
374        send_to_tracking(
375            ctx,
376            RequestTrackingMessage::UpdateState {
377                request_id: self.id.clone(),
378                state: RequestState::Evaluation,
379            },
380        )
381        .await?;
382
383        Ok(())
384    }
385    //////// APPROVE
386    ////////////////////////////////////////////////
387    async fn build_request_appro(
388        &self,
389        ctx: &mut ActorContext<Self>,
390        eval_req: EvaluationReq,
391        evaluator_res: EvaluatorResponse,
392    ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
393        let request = ApprovalReq {
394            subject_id: self.subject_id.clone(),
395            sn: eval_req.sn,
396            gov_version: eval_req.gov_version,
397            patch: evaluator_res.patch,
398            signer: eval_req.signer,
399        };
400
401        let signature =
402            get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
403
404        let signed_approval_req: Signed<ApprovalReq> =
405            Signed::from_parts(request, signature);
406
407        Ok(signed_approval_req)
408    }
409
410    async fn build_approval(
411        &self,
412        ctx: &mut ActorContext<Self>,
413        eval_req: EvaluationReq,
414        eval_res: EvaluatorResponse,
415    ) -> Result<(), RequestManagerError> {
416        let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
417
418        let governance_data =
419            get_gov(ctx, &request.content().subject_id).await?;
420
421        let (signers, quorum) = governance_data.get_quorum_and_signers(
422            ProtocolTypes::Approval,
423            &SchemaType::Governance,
424            Namespace::new(),
425        )?;
426
427        if signers.is_empty() {
428            warn!(
429                request_id = %self.id,
430                schema_id = %SchemaType::Governance,
431                "No approvers available for schema"
432            );
433
434            return Err(RequestManagerError::NoApproversAvailable {
435                schema_id: SchemaType::Governance.to_string(),
436                governance_id: self.subject_id.clone(),
437            });
438        }
439
440        self.run_approval(ctx, request, quorum, signers).await
441    }
442
443    async fn run_approval(
444        &self,
445        ctx: &mut ActorContext<Self>,
446        request: Signed<ApprovalReq>,
447        quorum: Quorum,
448        signers: HashSet<PublicKey>,
449    ) -> Result<(), RequestManagerError> {
450        let Some((hash, network)) = self.helpers.clone() else {
451            return Err(RequestManagerError::HelpersNotInitialized);
452        };
453
454        info!("Init approval {}", self.id);
455        let child = ctx
456            .create_child(
457                "approval",
458                Approval::new(
459                    self.our_key.clone(),
460                    request,
461                    quorum,
462                    signers,
463                    hash,
464                    network,
465                ),
466            )
467            .await?;
468
469        child
470            .tell(ApprovalMessage::Create {
471                request_id: self.id.clone(),
472                version: self.version,
473            })
474            .await?;
475
476        send_to_tracking(
477            ctx,
478            RequestTrackingMessage::UpdateState {
479                request_id: self.id.clone(),
480                state: RequestState::Approval,
481            },
482        )
483        .await?;
484
485        Ok(())
486    }
487
488    //////// VALI
489    ////////////////////////////////////////////////
490    async fn build_validation_req(
491        &mut self,
492        ctx: &mut ActorContext<Self>,
493        eval: Option<(EvaluationReq, EvaluationData)>,
494        appro_data: Option<ApprovalData>,
495    ) -> Result<
496        (
497            Signed<ValidationReq>,
498            Quorum,
499            HashSet<PublicKey>,
500            Option<ValueWrapper>,
501        ),
502        RequestManagerError,
503    > {
504        let (vali_req, quorum, signers, init_state, schema_id) =
505            self.build_validation_data(ctx, eval, appro_data).await?;
506
507        if signers.is_empty() {
508            warn!(
509                request_id = %self.id,
510                schema_id = %schema_id,
511                "No validators available for schema"
512            );
513
514            return Err(RequestManagerError::NoValidatorsAvailable {
515                schema_id: schema_id.to_string(),
516                governance_id: vali_req.get_governance_id().expect("The build process verified that the event request is valid")
517            });
518        }
519
520        let signature = get_sign(
521            ctx,
522            SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
523        )
524        .await?;
525
526        let signed_validation_req: Signed<ValidationReq> =
527            Signed::from_parts(vali_req, signature);
528
529        self.on_event(
530            RequestManagerEvent::UpdateState {
531                state: Box::new(RequestManagerState::Validation {
532                    request: Box::new(signed_validation_req.clone()),
533                    quorum: quorum.clone(),
534                    init_state: init_state.clone(),
535                    signers: signers.clone(),
536                }),
537            },
538            ctx,
539        )
540        .await;
541
542        Ok((signed_validation_req, quorum, signers, init_state))
543    }
544
545    async fn build_validation_data(
546        &self,
547        ctx: &mut ActorContext<Self>,
548        eval: Option<(EvaluationReq, EvaluationData)>,
549        appro_data: Option<ApprovalData>,
550    ) -> Result<
551        (
552            ValidationReq,
553            Quorum,
554            HashSet<PublicKey>,
555            Option<ValueWrapper>,
556            SchemaType,
557        ),
558        RequestManagerError,
559    > {
560        let Some(request) = self.request.clone() else {
561            return Err(RequestManagerError::RequestNotSet);
562        };
563
564        if let EventRequest::Create(create) = request.content() {
565            if create.schema_id.is_gov() {
566                let governance_data =
567                    GovernanceData::new((*self.our_key).clone());
568                let (signers, quorum) = governance_data
569                    .get_quorum_and_signers(
570                        ProtocolTypes::Validation,
571                        &SchemaType::Governance,
572                        Namespace::new(),
573                    )?;
574
575                Ok((
576                    ValidationReq::Create {
577                        event_request: request.clone(),
578                        gov_version: 0,
579                        subject_id: self.subject_id.clone(),
580                    },
581                    quorum,
582                    signers,
583                    None,
584                    SchemaType::Governance,
585                ))
586            } else {
587                let governance_data =
588                    get_gov(ctx, &create.governance_id).await?;
589
590                let (signers, quorum) = governance_data
591                    .get_quorum_and_signers(
592                        ProtocolTypes::Validation,
593                        &create.schema_id,
594                        create.namespace.clone(),
595                    )?;
596
597                let init_state =
598                    governance_data.get_init_state(&create.schema_id)?;
599
600                Ok((
601                    ValidationReq::Create {
602                        event_request: request.clone(),
603                        gov_version: governance_data.version,
604                        subject_id: self.subject_id.clone(),
605                    },
606                    quorum,
607                    signers,
608                    Some(init_state),
609                    create.schema_id.clone(),
610                ))
611            }
612        } else {
613            let Some((hash, ..)) = self.helpers else {
614                return Err(RequestManagerError::HelpersNotInitialized);
615            };
616
617            let governance_data = get_gov(ctx, &self.subject_id).await?;
618
619            let (actual_protocols, gov_version, sn) =
620                if let Some((eval_req, eval_data)) = eval {
621                    if let Some(approval_data) = appro_data {
622                        (
623                            ActualProtocols::EvalApprove {
624                                eval_data,
625                                approval_data,
626                            },
627                            eval_req.gov_version,
628                            Some(eval_req.sn),
629                        )
630                    } else {
631                        (
632                            ActualProtocols::Eval { eval_data },
633                            eval_req.gov_version,
634                            Some(eval_req.sn),
635                        )
636                    }
637                } else {
638                    (ActualProtocols::None, governance_data.version, None)
639                };
640
641            let metadata = get_metadata(ctx, &self.subject_id).await?;
642            let sn = if let Some(sn) = sn {
643                sn
644            } else {
645                metadata.sn + 1
646            };
647
648            let (signers, quorum) = governance_data.get_quorum_and_signers(
649                ProtocolTypes::Validation,
650                &metadata.schema_id,
651                metadata.namespace.clone(),
652            )?;
653
654            let last_ledger_event =
655                get_last_ledger_event(ctx, &self.subject_id).await?;
656
657            let Some(last_ledger_event) = last_ledger_event else {
658                return Err(RequestManagerError::LastLedgerEventNotFound);
659            };
660
661            let ledger_hash = hash_borsh(&*hash.hasher(), &last_ledger_event.0)
662                .map_err(|e| RequestManagerError::LedgerHashFailed {
663                    details: e.to_string(),
664                })?;
665
666            let schema_id = metadata.schema_id.clone();
667
668            Ok((
669                ValidationReq::Event {
670                    actual_protocols: Box::new(actual_protocols),
671                    event_request: request.clone(),
672                    metadata: Box::new(metadata),
673                    last_data: Box::new(LastData {
674                        vali_data: last_ledger_event
675                            .content()
676                            .protocols
677                            .get_validation_data(),
678                        gov_version: last_ledger_event.content().gov_version,
679                    }),
680                    gov_version,
681                    ledger_hash,
682                    sn,
683                },
684                quorum,
685                signers,
686                None,
687                schema_id,
688            ))
689        }
690    }
691
692    async fn run_validation(
693        &self,
694        ctx: &mut ActorContext<Self>,
695        request: Signed<ValidationReq>,
696        quorum: Quorum,
697        signers: HashSet<PublicKey>,
698        init_state: Option<ValueWrapper>,
699    ) -> Result<(), RequestManagerError> {
700        let Some((hash, network)) = self.helpers.clone() else {
701            return Err(RequestManagerError::HelpersNotInitialized);
702        };
703
704        info!("Init validation {}", self.id);
705        let child = ctx
706            .create_child(
707                "validation",
708                Validation::new(
709                    self.our_key.clone(),
710                    request,
711                    init_state,
712                    quorum,
713                    hash,
714                    network,
715                ),
716            )
717            .await?;
718
719        child
720            .tell(ValidationMessage::Create {
721                request_id: self.id.clone(),
722                version: self.version,
723                signers,
724            })
725            .await?;
726
727        send_to_tracking(
728            ctx,
729            RequestTrackingMessage::UpdateState {
730                request_id: self.id.clone(),
731                state: RequestState::Validation,
732            },
733        )
734        .await?;
735
736        Ok(())
737    }
738    //////// Distribution
739    ////////////////////////////////////////////////
740    async fn build_ledger(
741        &mut self,
742        ctx: &mut ActorContext<Self>,
743        val_req: ValidationReq,
744        val_res: ValidationData,
745    ) -> Result<SignedLedger, RequestManagerError> {
746        let ledger = match val_req {
747            ValidationReq::Create {
748                event_request,
749                gov_version,
750                ..
751            } => Ledger {
752                event_request,
753                gov_version,
754                sn: 0,
755                prev_ledger_event_hash: DigestIdentifier::default(),
756                protocols: Protocols::Create {
757                    validation: val_res,
758                },
759            },
760            ValidationReq::Event {
761                actual_protocols,
762                event_request,
763                metadata,
764                gov_version,
765                sn,
766                ledger_hash,
767                ..
768            } => Ledger {
769                gov_version,
770                sn,
771                prev_ledger_event_hash: ledger_hash,
772                protocols: Protocols::build(
773                    metadata.schema_id.is_gov(),
774                    EventRequestType::from(event_request.content()),
775                    *actual_protocols,
776                    val_res,
777                )?,
778                event_request,
779            },
780        };
781
782        let signature =
783            get_sign(ctx, SignTypesNode::Ledger(ledger.clone())).await?;
784
785        let ledger = SignedLedger(Signed::from_parts(ledger, signature));
786
787        self.on_event(
788            RequestManagerEvent::UpdateState {
789                state: Box::new(RequestManagerState::UpdateSubject {
790                    ledger: ledger.clone(),
791                }),
792            },
793            ctx,
794        )
795        .await;
796
797        Ok(ledger)
798    }
799
800    async fn update_subject(
801        &mut self,
802        ctx: &mut ActorContext<Self>,
803        ledger: SignedLedger,
804    ) -> Result<(), RequestManagerError> {
805        if ledger.content().event_request.content().is_create_event() {
806            if let Err(e) = create_subject(ctx, ledger.clone()).await {
807                if let ActorError::Functional { .. } = e {
808                    return Err(RequestManagerError::CheckLimit);
809                } else {
810                    return Err(RequestManagerError::ActorError(e));
811                }
812            };
813        } else {
814            update_ledger(ctx, &self.subject_id, vec![ledger.clone()]).await?;
815        }
816
817        self.on_event(
818            RequestManagerEvent::UpdateState {
819                state: Box::new(RequestManagerState::Distribution { ledger }),
820            },
821            ctx,
822        )
823        .await;
824
825        Ok(())
826    }
827
828    async fn build_distribution(
829        &self,
830        ctx: &mut ActorContext<Self>,
831        ledger: SignedLedger,
832    ) -> Result<bool, RequestManagerError> {
833        let witnesses = self
834            .build_distribution_data(ctx, ledger.signature().signer.clone())
835            .await?;
836
837        let Some(mut witnesses) = witnesses else {
838            return Ok(false);
839        };
840
841        witnesses.remove(&self.our_key);
842
843        if witnesses.is_empty() {
844            warn!(
845                request_id = %self.id,
846                "No witnesses available for distribution"
847            );
848            return Ok(false);
849        }
850
851        self.run_distribution(ctx, witnesses, ledger).await?;
852
853        Ok(true)
854    }
855
856    async fn build_distribution_data(
857        &self,
858        ctx: &mut ActorContext<Self>,
859        creator: PublicKey,
860    ) -> Result<Option<HashSet<PublicKey>>, RequestManagerError> {
861        let Some(request) = self.request.clone() else {
862            return Err(RequestManagerError::RequestNotSet);
863        };
864
865        let witnesses = if let EventRequest::Create(create) = request.content()
866        {
867            if create.schema_id == SchemaType::Governance {
868                None
869            } else {
870                let governance_data = get_gov(ctx, &self.subject_id).await?;
871
872                let witnesses =
873                    governance_data.get_witnesses(WitnessesData::Schema {
874                        creator,
875                        schema_id: create.schema_id.clone(),
876                        namespace: create.namespace.clone(),
877                    })?;
878
879                Some(witnesses)
880            }
881        } else {
882            let data = get_subject_data(ctx, &self.subject_id).await?;
883
884            let Some(data) = data else {
885                return Err(RequestManagerError::SubjectDataNotFound {
886                    subject_id: self.subject_id.to_string(),
887                });
888            };
889
890            let governance_data = get_gov(ctx, &self.subject_id).await?;
891
892            let witnesses = match data {
893                SubjectData::Governance { .. } => {
894                    governance_data.get_witnesses(WitnessesData::Gov)?
895                }
896                SubjectData::Tracker {
897                    schema_id,
898                    namespace,
899                    ..
900                } => governance_data.get_witnesses(WitnessesData::Schema {
901                    creator,
902                    schema_id,
903                    namespace: Namespace::from(namespace),
904                })?,
905            };
906
907            Some(witnesses)
908        };
909
910        Ok(witnesses)
911    }
912
913    async fn run_distribution(
914        &self,
915        ctx: &mut ActorContext<Self>,
916        witnesses: HashSet<PublicKey>,
917        ledger: SignedLedger,
918    ) -> Result<(), RequestManagerError> {
919        let Some((.., network)) = self.helpers.clone() else {
920            return Err(RequestManagerError::HelpersNotInitialized);
921        };
922
923        info!("Init distribution {}", self.id);
924        let child = ctx
925            .create_child(
926                "distribution",
927                Distribution::new(
928                    network,
929                    DistributionType::Request,
930                    self.id.clone(),
931                ),
932            )
933            .await?;
934
935        child
936            .tell(DistributionMessage::Create {
937                ledger: Box::new(ledger),
938                witnesses,
939            })
940            .await?;
941
942        send_to_tracking(
943            ctx,
944            RequestTrackingMessage::UpdateState {
945                request_id: self.id.clone(),
946                state: RequestState::Distribution,
947            },
948        )
949        .await?;
950
951        Ok(())
952    }
953
954    //////// Reboot
955    ////////////////////////////////////////////////
956    async fn init_wait(
957        &self,
958        ctx: &mut ActorContext<Self>,
959        governance_id: &DigestIdentifier,
960    ) -> Result<(), RequestManagerError> {
961        let actor = ctx
962            .create_child(
963                "reboot",
964                Reboot::new(governance_id.clone(), self.id.clone()),
965            )
966            .await?;
967
968        actor.tell(RebootMessage::Init).await?;
969
970        Ok(())
971    }
972
973    async fn init_update(
974        &self,
975        ctx: &mut ActorContext<Self>,
976        governance_id: &DigestIdentifier,
977    ) -> Result<(), RequestManagerError> {
978        let Some((.., network)) = self.helpers.clone() else {
979            return Err(RequestManagerError::HelpersNotInitialized);
980        };
981
982        let gov_sn = get_gov_sn(ctx, governance_id).await?;
983
984        let governance_data = get_gov(ctx, governance_id).await?;
985
986        let mut witnesses = {
987            let gov_witnesses =
988                governance_data.get_witnesses(WitnessesData::Gov)?;
989
990            let auth_witnesses =
991                Self::get_witnesses_auth(ctx, governance_id.clone())
992                    .await
993                    .unwrap_or_default();
994
995            gov_witnesses
996                .union(&auth_witnesses)
997                .cloned()
998                .collect::<HashSet<PublicKey>>()
999        };
1000
1001        witnesses.remove(&self.our_key);
1002
1003        if witnesses.is_empty() {
1004            if let Ok(actor) = ctx.reference().await {
1005                actor
1006                    .tell(RequestManagerMessage::FinishReboot {
1007                        request_id: self.id.clone(),
1008                    })
1009                    .await?;
1010            };
1011        } else if witnesses.len() == 1 {
1012            let objetive = witnesses.iter().next().expect("len is 1");
1013            let info = ComunicateInfo {
1014                receiver: objetive.clone(),
1015                request_id: String::default(),
1016                version: 0,
1017                receiver_actor: format!(
1018                    "/user/node/distributor_{}",
1019                    governance_id
1020                ),
1021            };
1022
1023            network
1024                .send_command(network::CommandHelper::SendMessage {
1025                    message: NetworkMessage {
1026                        info,
1027                        message: ActorMessage::DistributionLedgerReq {
1028                            actual_sn: Some(gov_sn),
1029                            subject_id: governance_id.clone(),
1030                        },
1031                    },
1032                })
1033                .await?;
1034
1035            let Ok(actor) = ctx.reference().await else {
1036                return Ok(());
1037            };
1038
1039            actor
1040                .tell(RequestManagerMessage::RebootWait {
1041                    request_id: self.id.clone(),
1042                    governance_id: governance_id.clone(),
1043                })
1044                .await?;
1045        } else {
1046            let data = UpdateNew {
1047                network,
1048                subject_id: governance_id.clone(),
1049                our_sn: Some(gov_sn),
1050                witnesses,
1051                update_type: UpdateType::Request {
1052                    subject_id: self.subject_id.clone(),
1053                    id: self.id.clone(),
1054                },
1055            };
1056
1057            let updater = Update::new(data);
1058            let Ok(child) = ctx.create_child("update", updater).await else {
1059                let Ok(actor) = ctx.reference().await else {
1060                    return Ok(());
1061                };
1062
1063                actor
1064                    .tell(RequestManagerMessage::RebootWait {
1065                        request_id: self.id.clone(),
1066                        governance_id: governance_id.clone(),
1067                    })
1068                    .await?;
1069
1070                return Ok(());
1071            };
1072
1073            child.tell(UpdateMessage::Run).await?;
1074        }
1075
1076        Ok(())
1077    }
1078
1079    async fn get_witnesses_auth(
1080        ctx: &ActorContext<Self>,
1081        governance_id: DigestIdentifier,
1082    ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1083        let path = ActorPath::from("/user/node/auth");
1084        let actor = ctx.system().get_actor::<Auth>(&path).await?;
1085
1086        let response = actor
1087            .ask(AuthMessage::GetAuth {
1088                subject_id: governance_id,
1089            })
1090            .await?;
1091
1092        match response {
1093            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1094            _ => Err(RequestManagerError::ActorError(
1095                ActorError::UnexpectedResponse {
1096                    path,
1097                    expected: "AuthResponse::Witnesses".to_owned(),
1098                },
1099            )),
1100        }
1101    }
1102
1103    //////// General
1104    ////////////////////////////////////////////////
1105    async fn send_reboot(
1106        &self,
1107        ctx: &ActorContext<Self>,
1108        governance_id: DigestIdentifier,
1109    ) -> Result<(), ActorError> {
1110        let Ok(actor) = ctx.reference().await else {
1111            return Ok(());
1112        };
1113
1114        actor
1115            .tell(RequestManagerMessage::Reboot {
1116                request_id: self.id.clone(),
1117                governance_id,
1118                reboot_type: RebootType::TimeOut,
1119            })
1120            .await
1121    }
1122
1123    async fn match_error(
1124        &mut self,
1125        ctx: &mut ActorContext<Self>,
1126        error: RequestManagerError,
1127    ) {
1128        match error {
1129            RequestManagerError::NoEvaluatorsAvailable {
1130                governance_id,
1131                ..
1132            }
1133            | RequestManagerError::NoApproversAvailable {
1134                governance_id, ..
1135            }
1136            | RequestManagerError::NoValidatorsAvailable {
1137                governance_id,
1138                ..
1139            } => {
1140                if let Err(e) = self.send_reboot(ctx, governance_id).await {
1141                    emit_fail(ctx, e).await;
1142                }
1143            }
1144            RequestManagerError::CheckLimit
1145            | RequestManagerError::Governance(..) => {
1146                if let Err(e) = self
1147                    .abort_request(
1148                        ctx,
1149                        error.to_string(),
1150                        None,
1151                        (*self.our_key).clone(),
1152                    )
1153                    .await
1154                {
1155                    emit_fail(
1156                        ctx,
1157                        ActorError::FunctionalCritical {
1158                            description: e.to_string(),
1159                        },
1160                    )
1161                    .await;
1162                }
1163            }
1164            _ => {
1165                emit_fail(
1166                    ctx,
1167                    ActorError::FunctionalCritical {
1168                        description: error.to_string(),
1169                    },
1170                )
1171                .await;
1172            }
1173        }
1174    }
1175
1176    async fn finish_request(
1177        &mut self,
1178        ctx: &mut ActorContext<Self>,
1179    ) -> Result<(), RequestManagerError> {
1180        info!("Ending {}", self.id);
1181        send_to_tracking(
1182            ctx,
1183            RequestTrackingMessage::UpdateState {
1184                request_id: self.id.clone(),
1185                state: RequestState::Finish,
1186            },
1187        )
1188        .await?;
1189
1190        self.on_event(RequestManagerEvent::Finish, ctx).await;
1191
1192        self.end_request(ctx).await?;
1193
1194        Ok(())
1195    }
1196
1197    async fn reboot(
1198        &mut self,
1199        ctx: &mut ActorContext<Self>,
1200        reboot_type: RebootType,
1201        governance_id: DigestIdentifier,
1202    ) -> Result<(), RequestManagerError> {
1203        self.on_event(
1204            RequestManagerEvent::UpdateState {
1205                state: Box::new(RequestManagerState::Reboot),
1206            },
1207            ctx,
1208        )
1209        .await;
1210
1211        let Ok(actor) = ctx.reference().await else {
1212            return Ok(());
1213        };
1214
1215        let request_id = self.id.clone();
1216
1217        match reboot_type {
1218            RebootType::Normal => {
1219                info!("Launching Normal reboot {}", self.id);
1220                send_to_tracking(
1221                    ctx,
1222                    RequestTrackingMessage::UpdateState {
1223                        request_id: self.id.clone(),
1224                        state: RequestState::Reboot,
1225                    },
1226                )
1227                .await?;
1228
1229                actor
1230                    .tell(RequestManagerMessage::RebootUpdate {
1231                        request_id,
1232                        governance_id,
1233                    })
1234                    .await?;
1235            }
1236            RebootType::Diff => {
1237                info!("Launching Diff reboot {}", self.id);
1238                self.retry_diff += 1;
1239
1240                let seconds = match self.retry_diff {
1241                    1 => 10,
1242                    2 => 20,
1243                    3 => 30,
1244                    _ => 60,
1245                };
1246
1247                info!(
1248                    "Launching Diff reboot {}, try: {}, seconds: {}",
1249                    self.id, self.retry_diff, seconds
1250                );
1251
1252                send_to_tracking(
1253                    ctx,
1254                    RequestTrackingMessage::UpdateState {
1255                        request_id: self.id.clone(),
1256                        state: RequestState::RebootDiff {
1257                            seconds,
1258                            count: self.retry_diff,
1259                        },
1260                    },
1261                )
1262                .await?;
1263
1264                tokio::spawn(async move {
1265                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1266                    let _ = actor
1267                        .tell(RequestManagerMessage::RebootUpdate {
1268                            request_id,
1269                            governance_id,
1270                        })
1271                        .await;
1272                });
1273            }
1274            RebootType::TimeOut => {
1275                self.retry_timeout += 1;
1276
1277                let seconds = match self.retry_timeout {
1278                    1 => 30,
1279                    2 => 60,
1280                    3 => 120,
1281                    _ => 300,
1282                };
1283
1284                info!(
1285                    "Launching TimeOut reboot {}, try: {}, seconds: {}",
1286                    self.id, self.retry_timeout, seconds
1287                );
1288                send_to_tracking(
1289                    ctx,
1290                    RequestTrackingMessage::UpdateState {
1291                        request_id: self.id.clone(),
1292                        state: RequestState::RebootTimeOut {
1293                            seconds,
1294                            count: self.retry_timeout,
1295                        },
1296                    },
1297                )
1298                .await?;
1299
1300                tokio::spawn(async move {
1301                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1302                    let _ = actor
1303                        .tell(RequestManagerMessage::RebootUpdate {
1304                            request_id,
1305                            governance_id,
1306                        })
1307                        .await;
1308                });
1309            }
1310        }
1311
1312        Ok(())
1313    }
1314
1315    async fn match_command(
1316        &mut self,
1317        ctx: &mut ActorContext<Self>,
1318    ) -> Result<(), RequestManagerError> {
1319        match self.command {
1320            ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1321            ReqManInitMessage::Validate => {
1322                let (request, quorum, signers, init_state) =
1323                    self.build_validation_req(ctx, None, None).await?;
1324
1325                self.run_validation(ctx, request, quorum, signers, init_state)
1326                    .await
1327            }
1328        }
1329    }
1330
1331    async fn check_signature(
1332        &self,
1333        ctx: &mut ActorContext<Self>,
1334    ) -> Result<(), RequestManagerError> {
1335        let Some(request) = self.request.clone() else {
1336            return Err(RequestManagerError::RequestNotSet);
1337        };
1338
1339        if let EventRequest::Fact { .. } = request.content() {
1340            let subject_data = get_subject_data(ctx, &self.subject_id).await?;
1341            let Some(subject_data) = subject_data else {
1342                return Err(RequestManagerError::SubjecData);
1343            };
1344
1345            let gov = get_gov(ctx, &self.subject_id).await?;
1346            match subject_data {
1347                SubjectData::Tracker {
1348                    schema_id,
1349                    namespace,
1350                    ..
1351                } => {
1352                    if !gov.has_this_role(HashThisRole::Schema {
1353                        who: request.signature().signer.clone(),
1354                        role: RoleTypes::Issuer,
1355                        schema_id,
1356                        namespace: Namespace::from(namespace),
1357                    }) {
1358                        return Err(RequestManagerError::NotIssuer);
1359                    }
1360                }
1361                SubjectData::Governance { .. } => {
1362                    if !gov.has_this_role(HashThisRole::Gov {
1363                        who: request.signature().signer.clone(),
1364                        role: RoleTypes::Issuer,
1365                    }) {
1366                        return Err(RequestManagerError::NotIssuer);
1367                    }
1368                }
1369            }
1370        }
1371
1372        Ok(())
1373    }
1374
1375    async fn stops_childs(
1376        &self,
1377        ctx: &mut ActorContext<Self>,
1378    ) -> Result<(), RequestManagerError> {
1379        match self.state {
1380            RequestManagerState::Reboot => {
1381                if let Ok(actor) = ctx.get_child::<Update>("update").await {
1382                    actor.ask_stop().await?;
1383                };
1384                if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1385                    actor.ask_stop().await?;
1386                };
1387            }
1388            RequestManagerState::Evaluation => {
1389                if let Ok(actor) =
1390                    ctx.get_child::<Evaluation>("evaluation").await
1391                {
1392                    actor.ask_stop().await?;
1393                };
1394            }
1395            RequestManagerState::Approval { .. } => {
1396                if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1397                    actor.ask_stop().await?;
1398                };
1399                let _ = make_obsolete(ctx, &self.subject_id).await;
1400            }
1401            RequestManagerState::Validation { .. } => {
1402                if let Ok(actor) =
1403                    ctx.get_child::<Validation>("validation").await
1404                {
1405                    actor.ask_stop().await?;
1406                };
1407            }
1408            RequestManagerState::Distribution { .. } => {
1409                if let Ok(actor) =
1410                    ctx.get_child::<Distribution>("distribution").await
1411                {
1412                    actor.ask_stop().await?;
1413                };
1414            }
1415            _ => {}
1416        }
1417
1418        Ok(())
1419    }
1420
1421    async fn abort_request(
1422        &mut self,
1423        ctx: &mut ActorContext<Self>,
1424        error: String,
1425        sn: Option<u64>,
1426        who: PublicKey,
1427    ) -> Result<(), RequestManagerError> {
1428        self.stops_childs(ctx).await?;
1429
1430        info!("Aborting {}", self.id);
1431        send_to_tracking(
1432            ctx,
1433            RequestTrackingMessage::UpdateState {
1434                request_id: self.id.clone(),
1435                state: RequestState::Abort {
1436                    subject_id: self.subject_id.to_string(),
1437                    error,
1438                    sn,
1439                    who: who.to_string(),
1440                },
1441            },
1442        )
1443        .await?;
1444
1445        self.on_event(RequestManagerEvent::Finish, ctx).await;
1446
1447        self.end_request(ctx).await?;
1448
1449        Ok(())
1450    }
1451
1452    async fn end_request(
1453        &self,
1454        ctx: &ActorContext<Self>,
1455    ) -> Result<(), RequestManagerError> {
1456        let actor = ctx.get_parent::<RequestHandler>().await?;
1457        actor
1458            .tell(RequestHandlerMessage::EndHandling {
1459                subject_id: self.subject_id.clone(),
1460            })
1461            .await?;
1462
1463        Ok(())
1464    }
1465}
1466
1467#[derive(Debug, Clone)]
1468pub enum RequestManagerMessage {
1469    Run {
1470        request_id: DigestIdentifier,
1471    },
1472    FirstRun {
1473        command: ReqManInitMessage,
1474        request: Signed<EventRequest>,
1475        request_id: DigestIdentifier,
1476    },
1477    Abort {
1478        request_id: DigestIdentifier,
1479        who: PublicKey,
1480        reason: String,
1481        sn: u64,
1482    },
1483    ManualAbort,
1484    Reboot {
1485        request_id: DigestIdentifier,
1486        governance_id: DigestIdentifier,
1487        reboot_type: RebootType,
1488    },
1489    RebootUpdate {
1490        request_id: DigestIdentifier,
1491        governance_id: DigestIdentifier,
1492    },
1493    RebootWait {
1494        request_id: DigestIdentifier,
1495        governance_id: DigestIdentifier,
1496    },
1497    FinishReboot {
1498        request_id: DigestIdentifier,
1499    },
1500    EvaluationRes {
1501        request_id: DigestIdentifier,
1502        eval_req: Box<EvaluationReq>,
1503        eval_res: EvaluationData,
1504    },
1505    ApprovalRes {
1506        request_id: DigestIdentifier,
1507        appro_res: ApprovalData,
1508    },
1509    ValidationRes {
1510        request_id: DigestIdentifier,
1511        val_req: Box<ValidationReq>,
1512        val_res: ValidationData,
1513    },
1514    FinishRequest {
1515        request_id: DigestIdentifier,
1516    },
1517}
1518
1519impl Message for RequestManagerMessage {}
1520
1521#[derive(
1522    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
1523)]
1524pub enum RequestManagerEvent {
1525    Finish,
1526    UpdateState {
1527        state: Box<RequestManagerState>,
1528    },
1529    UpdateVersion {
1530        version: u64,
1531    },
1532    SafeState {
1533        command: ReqManInitMessage,
1534        request: Signed<EventRequest>,
1535    },
1536}
1537
1538impl Event for RequestManagerEvent {}
1539
1540#[async_trait]
1541impl Actor for RequestManager {
1542    type Event = RequestManagerEvent;
1543    type Message = RequestManagerMessage;
1544    type Response = ();
1545
1546    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1547        parent_span.map_or_else(
1548            || info_span!("RequestManager", id),
1549            |parent_span| info_span!(parent: parent_span, "RequestManager", id),
1550        )
1551    }
1552
1553    async fn pre_start(
1554        &mut self,
1555        ctx: &mut ActorContext<Self>,
1556    ) -> Result<(), ActorError> {
1557        if let Err(e) =
1558            self.init_store("request_manager", None, false, ctx).await
1559        {
1560            error!(
1561                error = %e,
1562                subject_id = %self.subject_id,
1563                "Failed to initialize store"
1564            );
1565            return Err(e);
1566        }
1567        Ok(())
1568    }
1569}
1570
1571#[async_trait]
1572impl Handler<Self> for RequestManager {
1573    // The async state machine inlines all sub-futures and exceeds the default
1574    // threshold; a proper fix would require boxing every large sub-future.
1575    #[allow(clippy::large_stack_frames)]
1576    async fn handle_message(
1577        &mut self,
1578        _sender: ActorPath,
1579        msg: RequestManagerMessage,
1580        ctx: &mut ave_actors::ActorContext<Self>,
1581    ) -> Result<(), ActorError> {
1582        match msg {
1583            RequestManagerMessage::RebootUpdate {
1584                governance_id,
1585                request_id,
1586            } => {
1587                if request_id == self.id {
1588                    info!("Init reboot update {}", self.id);
1589                    debug!(
1590                        msg_type = "RebootUpdate",
1591                        request_id = %self.id,
1592                        governance_id = %governance_id,
1593                        "Initializing reboot update"
1594                    );
1595
1596                    if let Err(e) = self.init_update(ctx, &governance_id).await
1597                    {
1598                        error!(
1599                            msg_type = "RebootUpdate",
1600                            request_id = %self.id,
1601                            governance_id = %governance_id,
1602                            error = %e,
1603                            "Failed to initialize reboot update"
1604                        );
1605                        self.match_error(ctx, e).await;
1606                        return Ok(());
1607                    }
1608                }
1609            }
1610            RequestManagerMessage::RebootWait {
1611                governance_id,
1612                request_id,
1613            } => {
1614                if request_id == self.id {
1615                    info!("Init reboot wait {}", self.id);
1616                    debug!(
1617                        msg_type = "RebootWait",
1618                        request_id = %self.id,
1619                        governance_id = %governance_id,
1620                        "Initializing reboot wait"
1621                    );
1622
1623                    if let Err(e) = self.init_wait(ctx, &governance_id).await {
1624                        error!(
1625                            msg_type = "RebootWait",
1626                            request_id = %self.id,
1627                            governance_id = %governance_id,
1628                            error = %e,
1629                            "Failed to initialize reboot wait"
1630                        );
1631                        self.match_error(ctx, e).await;
1632                        return Ok(());
1633                    }
1634                }
1635            }
1636            RequestManagerMessage::Reboot {
1637                governance_id,
1638                request_id,
1639                reboot_type,
1640            } => {
1641                if request_id == self.id {
1642                    if matches!(self.state, RequestManagerState::Reboot) {
1643                        debug!(
1644                            msg_type = "Reboot",
1645                            request_id = %self.id,
1646                            governance_id = %governance_id,
1647                            reboot_type = ?reboot_type,
1648                            "Already in reboot state, ignoring"
1649                        );
1650                    } else {
1651                        debug!(
1652                            msg_type = "Reboot",
1653                            request_id = %self.id,
1654                            governance_id = %governance_id,
1655                            reboot_type = ?reboot_type,
1656                            "Initiating reboot"
1657                        );
1658                        if let Err(e) = self.stops_childs(ctx).await {
1659                            error!(
1660                                msg_type = "Reboot",
1661                                request_id = %self.id,
1662                                governance_id = %governance_id,
1663                                error = %e,
1664                                "Failed to stop childs"
1665                            );
1666                            self.match_error(ctx, e).await;
1667                            return Ok(());
1668                        };
1669                        if let Err(e) = self
1670                            .reboot(ctx, reboot_type, governance_id.clone())
1671                            .await
1672                        {
1673                            error!(
1674                                msg_type = "Reboot",
1675                                request_id = %self.id,
1676                                governance_id = %governance_id,
1677                                error = %e,
1678                                "Failed to initiate reboot"
1679                            );
1680                            self.match_error(ctx, e).await;
1681                            return Ok(());
1682                        }
1683                    }
1684                }
1685            }
1686            RequestManagerMessage::FinishReboot { request_id } => {
1687                if request_id == self.id {
1688                    info!("Init reboot finish {}", self.id);
1689                    debug!(
1690                        msg_type = "FinishReboot",
1691                        request_id = %self.id,
1692                        version = self.version,
1693                        "Reboot completed, resuming request"
1694                    );
1695                    self.on_event(
1696                        RequestManagerEvent::UpdateVersion {
1697                            version: self.version + 1,
1698                        },
1699                        ctx,
1700                    )
1701                    .await;
1702
1703                    if let Err(e) = send_to_tracking(
1704                        ctx,
1705                        RequestTrackingMessage::UpdateVersion {
1706                            request_id: self.id.clone(),
1707                            version: self.version,
1708                        },
1709                    )
1710                    .await
1711                    {
1712                        error!(
1713                            msg_type = "FinishReboot",
1714                            request_id = %self.id,
1715                            version = self.version,
1716                            error = %e,
1717                            "Failed to send version update to tracking"
1718                        );
1719                        return Err(emit_fail(ctx, e).await);
1720                    }
1721
1722                    if let Err(e) = self.check_signature(ctx).await {
1723                        error!(
1724                            msg_type = "FinishReboot",
1725                            request_id = %self.id,
1726                            error = %e,
1727                            "Failed to check signatures after reboot"
1728                        );
1729                        self.match_error(ctx, e).await;
1730                        return Ok(());
1731                    }
1732
1733                    if let Err(e) = self.match_command(ctx).await {
1734                        error!(
1735                            msg_type = "FinishReboot",
1736                            request_id = %self.id,
1737                            error = %e,
1738                            "Failed to execute command after reboot"
1739                        );
1740                        self.match_error(ctx, e).await;
1741                        return Ok(());
1742                    }
1743                }
1744            }
1745            RequestManagerMessage::Abort {
1746                request_id,
1747                who,
1748                reason,
1749                sn,
1750            } => {
1751                if request_id == self.id {
1752                    warn!(
1753                        msg_type = "Abort",
1754                        state = %self.state,
1755                        request_id = %self.id,
1756                        who = %who,
1757                        reason = %reason,
1758                        sn = sn,
1759                        "Request abort received"
1760                    );
1761                    if let Err(e) =
1762                        self.abort_request(ctx, reason, Some(sn), who).await
1763                    {
1764                        error!(
1765                            msg_type = "Abort",
1766                            request_id = %self.id,
1767                            error = %e,
1768                            "Failed to abort request"
1769                        );
1770                        self.match_error(ctx, e).await;
1771                        return Ok(());
1772                    }
1773                }
1774            }
1775            RequestManagerMessage::ManualAbort => {
1776                match &self.state {
1777                    RequestManagerState::Reboot
1778                    | RequestManagerState::Starting
1779                    | RequestManagerState::Evaluation
1780                    | RequestManagerState::Approval { .. }
1781                    | RequestManagerState::Validation { .. } => {
1782                        if let Err(e) = self
1783                            .abort_request(
1784                                ctx,
1785                                "The user manually aborted the request"
1786                                    .to_owned(),
1787                                None,
1788                                (*self.our_key).clone(),
1789                            )
1790                            .await
1791                        {
1792                            error!(
1793                                msg_type = "Abort",
1794                                request_id = %self.id,
1795                                error = %e,
1796                                "Failed to abort request"
1797                            );
1798                            self.match_error(ctx, e).await;
1799                        }
1800                    }
1801                    _ => {
1802                        info!(
1803                            "The request is in a state that cannot be aborted {}, state: {}",
1804                            self.id, self.state
1805                        );
1806                    }
1807                }
1808
1809                return Ok(());
1810            }
1811            RequestManagerMessage::FirstRun {
1812                command,
1813                request,
1814                request_id,
1815            } => {
1816                self.id = request_id.clone();
1817                debug!(
1818                    msg_type = "FirstRun",
1819                    request_id = %request_id,
1820                    command = ?command,
1821                    "First run of request manager"
1822                );
1823                self.on_event(
1824                    RequestManagerEvent::SafeState { command, request },
1825                    ctx,
1826                )
1827                .await;
1828
1829                if let Err(e) = self.match_command(ctx).await {
1830                    error!(
1831                        msg_type = "FirstRun",
1832                        request_id = %self.id,
1833                        error = %e,
1834                        "Failed to execute initial command"
1835                    );
1836                    self.match_error(ctx, e).await;
1837                    return Ok(());
1838                };
1839            }
1840            RequestManagerMessage::Run { request_id } => {
1841                self.id = request_id;
1842
1843                debug!(
1844                    msg_type = "Run",
1845                    request_id = %self.id,
1846                    state = ?self.state,
1847                    version = self.version,
1848                    "Running request manager"
1849                );
1850                match self.state.clone() {
1851                    RequestManagerState::Starting
1852                    | RequestManagerState::Reboot => {
1853                        if let Err(e) = self.match_command(ctx).await {
1854                            error!(
1855                                msg_type = "Run",
1856                                request_id = %self.id,
1857                                state = "Starting/Reboot",
1858                                error = %e,
1859                                "Failed to execute command"
1860                            );
1861                            self.match_error(ctx, e).await;
1862                        return Ok(())
1863                        };
1864                    }
1865                    RequestManagerState::Evaluation => {
1866                        if let Err(e) = self.build_evaluation(ctx).await {
1867                            error!(
1868                                msg_type = "Run",
1869                                request_id = %self.id,
1870                                state = "Evaluation",
1871                                error = %e,
1872                                "Failed to build evaluation"
1873                            );
1874                            self.match_error(ctx, e).await;
1875                        return Ok(())
1876                        }
1877                    }
1878
1879                    RequestManagerState::Approval {
1880                        eval_req,
1881                        eval_res,
1882                    } => {
1883                        if let Err(e) = self
1884                                .build_approval(ctx, eval_req, eval_res.evaluator_res().expect("If the status is approval, it means that the evaluator's response is valid"))
1885                                .await
1886                            {
1887                                error!(
1888                                    msg_type = "Run",
1889                                    request_id = %self.id,
1890                                    state = "Approval",
1891                                    error = %e,
1892                                    "Failed to build approval"
1893                                );
1894                                self.match_error(ctx, e).await;
1895                        return Ok(())
1896                            }
1897                    }
1898                    RequestManagerState::Validation {
1899                        request,
1900                        quorum,
1901                        init_state,
1902                        signers,
1903                    } => {
1904                        if let Err(e) = self
1905                            .run_validation(
1906                                ctx, *request, quorum, signers, init_state,
1907                            )
1908                            .await
1909                        {
1910                            error!(
1911                                msg_type = "Run",
1912                                request_id = %self.id,
1913                                state = "Validation",
1914                                error = %e,
1915                                "Failed to run validation"
1916                            );
1917                            self.match_error(ctx, e).await;
1918                        return Ok(())
1919                        };
1920                    }
1921                    RequestManagerState::UpdateSubject { ledger } => {
1922                        if let Err(e) =
1923                            self.update_subject(ctx, ledger.clone()).await
1924                        {
1925                            error!(
1926                                msg_type = "Run",
1927                                request_id = %self.id,
1928                                state = "UpdateSubject",
1929                                error = %e,
1930                                "Failed to update subject"
1931                            );
1932                            self.match_error(ctx, e).await;
1933                        return Ok(())
1934                        };
1935
1936                        match self.build_distribution(ctx, ledger).await {
1937                            Ok(in_distribution) => {
1938                                if !in_distribution
1939                                    && let Err(e) =
1940                                        self.finish_request(ctx).await
1941                                    {
1942                                        error!(
1943                                            msg_type = "Run",
1944                                            request_id = %self.id,
1945                                            state = "UpdateSubject",
1946                                            error = %e,
1947                                            "Failed to finish request after build distribution"
1948                                        );
1949                                        self.match_error(ctx, e).await;
1950                        return Ok(())
1951                                }
1952                            }
1953                            Err(e) => {
1954                                error!(
1955                                    msg_type = "Run",
1956                                    request_id = %self.id,
1957                                    state = "UpdateSubject",
1958                                    error = %e,
1959                                    "Failed to build distribution"
1960                                );
1961                                self.match_error(ctx, e).await;
1962                        return Ok(())
1963                            }
1964                        };
1965                    }
1966                    RequestManagerState::Distribution { ledger } => {
1967                        match self.build_distribution(ctx, ledger).await {
1968                            Ok(in_distribution) => {
1969                                if !in_distribution
1970                                    && let Err(e) =
1971                                        self.finish_request(ctx).await
1972                                    {
1973                                        error!(
1974                                            msg_type = "Run",
1975                                            request_id = %self.id,
1976                                            state = "Distribution",
1977                                            error = %e,
1978                                            "Failed to finish request after build distribution"
1979                                        );
1980                                        self.match_error(ctx, e).await;
1981                        return Ok(())
1982                                    }
1983                            }
1984                            Err(e) => {
1985                                error!(
1986                                    msg_type = "Run",
1987                                    request_id = %self.id,
1988                                    state = "Distribution",
1989                                    error = %e,
1990                                    "Failed to build distribution"
1991                                );
1992                                self.match_error(ctx, e).await;
1993                        return Ok(())
1994                            }
1995                        };
1996                    }
1997                    RequestManagerState::End => {
1998                        if let Err(e) = self.end_request(ctx).await {
1999                            error!(
2000                                msg_type = "Run",
2001                                request_id = %self.id,
2002                                state = "End",
2003                                error = %e,
2004                                "Failed to end request"
2005                            );
2006                            self.match_error(ctx, e).await;
2007                            return Ok(())
2008                        }
2009                    }
2010                };
2011            }
2012            RequestManagerMessage::EvaluationRes {
2013                eval_req,
2014                eval_res,
2015                request_id,
2016            } => {
2017                if request_id == self.id {
2018                    debug!(
2019                        msg_type = "EvaluationRes",
2020                        request_id = %self.id,
2021                        version = self.version,
2022                        "Evaluation result received"
2023                    );
2024                    if let Err(e) = self.stops_childs(ctx).await {
2025                        error!(
2026                            msg_type = "EvaluationRes",
2027                            request_id = %self.id,
2028                            error = %e,
2029                            "Failed to stop childs"
2030                        );
2031                        self.match_error(ctx, e).await;
2032                        return Ok(());
2033                    };
2034
2035                    if let Some(evaluator_res) = eval_res.evaluator_res()
2036                        && evaluator_res.appr_required
2037                    {
2038                        debug!(
2039                            msg_type = "EvaluationRes",
2040                            request_id = %self.id,
2041                            "Approval required, proceeding to approval phase"
2042                        );
2043                        self.on_event(
2044                            RequestManagerEvent::UpdateState {
2045                                state: Box::new(
2046                                    RequestManagerState::Approval {
2047                                        eval_req: *eval_req.clone(),
2048                                        eval_res: eval_res.clone(),
2049                                    },
2050                                ),
2051                            },
2052                            ctx,
2053                        )
2054                        .await;
2055
2056                        if let Err(e) = self
2057                            .build_approval(ctx, *eval_req, evaluator_res)
2058                            .await
2059                        {
2060                            error!(
2061                                msg_type = "EvaluationRes",
2062                                request_id = %self.id,
2063                                error = %e,
2064                                "Failed to build approval"
2065                            );
2066                            self.match_error(ctx, e).await;
2067                            return Ok(());
2068                        }
2069                    } else {
2070                        debug!(
2071                            msg_type = "EvaluationRes",
2072                            request_id = %self.id,
2073                            "Approval not required, proceeding to validation phase"
2074                        );
2075                        let (request, quorum, signers, init_state) = match self
2076                            .build_validation_req(
2077                                ctx,
2078                                Some((*eval_req, eval_res)),
2079                                None,
2080                            )
2081                            .await
2082                        {
2083                            Ok(data) => data,
2084                            Err(e) => {
2085                                error!(
2086                                    msg_type = "EvaluationRes",
2087                                    request_id = %self.id,
2088                                    error = %e,
2089                                    "Failed to build validation request"
2090                                );
2091                                self.match_error(ctx, e).await;
2092                                return Ok(());
2093                            }
2094                        };
2095
2096                        if let Err(e) = self
2097                            .run_validation(
2098                                ctx, request, quorum, signers, init_state,
2099                            )
2100                            .await
2101                        {
2102                            error!(
2103                                msg_type = "EvaluationRes",
2104                                request_id = %self.id,
2105                                error = %e,
2106                                "Failed to run validation"
2107                            );
2108                            self.match_error(ctx, e).await;
2109                            return Ok(());
2110                        };
2111                    }
2112                }
2113            }
2114            RequestManagerMessage::ApprovalRes {
2115                appro_res,
2116                request_id,
2117            } => {
2118                if request_id == self.id {
2119                    let _ = make_obsolete(ctx, &self.subject_id).await;
2120                    debug!(
2121                        msg_type = "ApprovalRes",
2122                        request_id = %self.id,
2123                        version = self.version,
2124                        "Approval result received"
2125                    );
2126                    if let Err(e) = self.stops_childs(ctx).await {
2127                        error!(
2128                            msg_type = "ApprovalRes",
2129                            request_id = %self.id,
2130                            error = %e,
2131                            "Failed to stop childs"
2132                        );
2133                        self.match_error(ctx, e).await;
2134                        return Ok(());
2135                    };
2136
2137                    let RequestManagerState::Approval { eval_req, eval_res } =
2138                        self.state.clone()
2139                    else {
2140                        error!(
2141                            msg_type = "ApprovalRes",
2142                            request_id = %self.id,
2143                            state = ?self.state,
2144                            "Invalid state for approval response"
2145                        );
2146                        let e = ActorError::FunctionalCritical {
2147                            description: "Invalid request state".to_owned(),
2148                        };
2149                        return Err(emit_fail(ctx, e).await);
2150                    };
2151                    let (request, quorum, signers, init_state) = match self
2152                        .build_validation_req(
2153                            ctx,
2154                            Some((eval_req, eval_res)),
2155                            Some(appro_res),
2156                        )
2157                        .await
2158                    {
2159                        Ok(data) => data,
2160                        Err(e) => {
2161                            error!(
2162                                msg_type = "ApprovalRes",
2163                                request_id = %self.id,
2164                                error = %e,
2165                                "Failed to build validation request"
2166                            );
2167                            self.match_error(ctx, e).await;
2168                            return Ok(());
2169                        }
2170                    };
2171
2172                    if let Err(e) = self
2173                        .run_validation(
2174                            ctx, request, quorum, signers, init_state,
2175                        )
2176                        .await
2177                    {
2178                        error!(
2179                            msg_type = "ApprovalRes",
2180                            request_id = %self.id,
2181                            error = %e,
2182                            "Failed to run validation"
2183                        );
2184                        self.match_error(ctx, e).await;
2185                        return Ok(());
2186                    };
2187                }
2188            }
2189            RequestManagerMessage::ValidationRes {
2190                val_res,
2191                val_req,
2192                request_id,
2193            } => {
2194                if request_id == self.id {
2195                    debug!(
2196                        msg_type = "ValidationRes",
2197                        request_id = %self.id,
2198                        version = self.version,
2199                        "Validation result received"
2200                    );
2201                    if let Err(e) = self.stops_childs(ctx).await {
2202                        error!(
2203                                msg_type = "ValidationRes",
2204                                request_id = %self.id,
2205                                error = %e,
2206                                "Failed to stop childs"
2207                        );
2208                        self.match_error(ctx, e).await;
2209                        return Ok(());
2210                    };
2211
2212                    let signed_ledger =
2213                        match self.build_ledger(ctx, *val_req, val_res).await {
2214                            Ok(signed_ledger) => signed_ledger,
2215                            Err(e) => {
2216                                error!(
2217                                    msg_type = "ValidationRes",
2218                                    request_id = %self.id,
2219                                    error = %e,
2220                                    "Failed to build ledger"
2221                                );
2222                                self.match_error(ctx, e).await;
2223                                return Ok(());
2224                            }
2225                        };
2226
2227                    if let Err(e) =
2228                        self.update_subject(ctx, signed_ledger.clone()).await
2229                    {
2230                        error!(
2231                            msg_type = "ValidationRes",
2232                            request_id = %self.id,
2233                            error = %e,
2234                            "Failed to update subject"
2235                        );
2236                        self.match_error(ctx, e).await;
2237                        return Ok(());
2238                    };
2239
2240                    match self.build_distribution(ctx, signed_ledger).await {
2241                        Ok(in_distribution) => {
2242                            if !in_distribution
2243                                && let Err(e) = self.finish_request(ctx).await
2244                            {
2245                                error!(
2246                                    msg_type = "ValidationRes",
2247                                    request_id = %self.id,
2248                                    error = %e,
2249                                    "Failed to finish request after build distribution"
2250                                );
2251                                self.match_error(ctx, e).await;
2252                                return Ok(());
2253                            }
2254                        }
2255                        Err(e) => {
2256                            error!(
2257                                msg_type = "ValidationRes",
2258                                request_id = %self.id,
2259                                error = %e,
2260                                "Failed to build distribution"
2261                            );
2262                            self.match_error(ctx, e).await;
2263                            return Ok(());
2264                        }
2265                    };
2266                }
2267            }
2268            RequestManagerMessage::FinishRequest { request_id } => {
2269                if request_id == self.id {
2270                    debug!(
2271                        msg_type = "FinishRequest",
2272                        request_id = %self.id,
2273                        version = self.version,
2274                        "Finishing request"
2275                    );
2276
2277                    if let Err(e) = self.stops_childs(ctx).await {
2278                        error!(
2279                            msg_type = "FinishRequest",
2280                            request_id = %self.id,
2281                            error = %e,
2282                            "Failed to stop childs"
2283                        );
2284                        self.match_error(ctx, e).await;
2285                        return Ok(());
2286                    };
2287
2288                    if let Err(e) = self.finish_request(ctx).await {
2289                        error!(
2290                            msg_type = "FinishRequest",
2291                            request_id = %self.id,
2292                            error = %e,
2293                            "Failed to finish request"
2294                        );
2295                        self.match_error(ctx, e).await;
2296                        return Ok(());
2297                    }
2298                }
2299            }
2300        }
2301
2302        Ok(())
2303    }
2304
2305    async fn on_event(
2306        &mut self,
2307        event: RequestManagerEvent,
2308        ctx: &mut ActorContext<Self>,
2309    ) {
2310        let event_type = match &event {
2311            RequestManagerEvent::Finish => "Finish",
2312            RequestManagerEvent::UpdateState { .. } => "UpdateState",
2313            RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2314            RequestManagerEvent::SafeState { .. } => "SafeState",
2315        };
2316
2317        if let Err(e) = self.persist(&event, ctx).await {
2318            error!(
2319                event_type = event_type,
2320                request_id = %self.id,
2321                error = %e,
2322                "Failed to persist event"
2323            );
2324            emit_fail(ctx, e).await;
2325        };
2326    }
2327
2328    async fn on_child_fault(
2329        &mut self,
2330        error: ActorError,
2331        ctx: &mut ActorContext<Self>,
2332    ) -> ChildAction {
2333        error!(
2334            request_id = %self.id,
2335            version = self.version,
2336            state = ?self.state,
2337            error = %error,
2338            "Child fault in request manager"
2339        );
2340        emit_fail(ctx, error).await;
2341        ChildAction::Stop
2342    }
2343}
2344
2345#[async_trait]
2346impl PersistentActor for RequestManager {
2347    type Persistence = LightPersistence;
2348    type InitParams = InitRequestManager;
2349
2350    fn update(&mut self, state: Self) {
2351        self.command = state.command;
2352        self.request = state.request;
2353        self.state = state.state;
2354        self.version = state.version;
2355    }
2356
2357    fn create_initial(params: Self::InitParams) -> Self {
2358        Self {
2359            retry_diff: 0,
2360            retry_timeout: 0,
2361            our_key: params.our_key,
2362            id: DigestIdentifier::default(),
2363            subject_id: params.subject_id,
2364            command: ReqManInitMessage::Evaluate,
2365            request: None,
2366            state: RequestManagerState::Starting,
2367            version: 0,
2368            helpers: Some(params.helpers),
2369        }
2370    }
2371
2372    /// Change node state.
2373    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2374        match event {
2375            RequestManagerEvent::Finish => {
2376                debug!(
2377                    event_type = "Finish",
2378                    request_id = %self.id,
2379                    "Applying finish event"
2380                );
2381                self.state = RequestManagerState::End;
2382                self.request = None;
2383                self.id = DigestIdentifier::default();
2384            }
2385            RequestManagerEvent::UpdateState { state } => {
2386                debug!(
2387                    event_type = "UpdateState",
2388                    request_id = %self.id,
2389                    old_state = ?self.state,
2390                    new_state = ?state,
2391                    "Applying state update"
2392                );
2393                self.state = *state.clone()
2394            }
2395            RequestManagerEvent::UpdateVersion { version } => {
2396                debug!(
2397                    event_type = "UpdateVersion",
2398                    request_id = %self.id,
2399                    old_version = self.version,
2400                    new_version = version,
2401                    "Applying version update"
2402                );
2403                self.state = RequestManagerState::Starting;
2404                self.version = *version
2405            }
2406            RequestManagerEvent::SafeState { command, request } => {
2407                debug!(
2408                    event_type = "SafeState",
2409                    request_id = %self.id,
2410                    command = ?command,
2411                    "Applying safe state"
2412                );
2413                self.version = 0;
2414                self.retry_diff = 0;
2415                self.retry_timeout = 0;
2416                self.state = RequestManagerState::Starting;
2417                self.request = Some(request.clone());
2418                self.command = command.clone();
2419            }
2420        };
2421
2422        Ok(())
2423    }
2424}
2425
2426#[async_trait]
2427impl Storable for RequestManager {}