Skip to main content

ave_core/request/
manager.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9    DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24    Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::EvaluateData;
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30    HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
36use crate::model::common::subject::{
37    acquire_subject, create_subject, get_gov, get_gov_sn,
38    get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
39};
40use crate::model::common::{purge_storage, send_to_tracking};
41use crate::model::event::{
42    ApprovalData, EvaluationData, Ledger, Protocols, ValidationData,
43};
44use crate::node::SubjectData;
45use crate::request::error::RequestManagerError;
46use crate::request::tracking::RequestTrackingMessage;
47use crate::request::{RequestHandler, RequestHandlerMessage};
48use crate::subject::{Metadata, SignedLedger};
49
50use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
51use crate::validation::worker::CurrentRequestRoles;
52use crate::{
53    ActorMessage, NetworkMessage, Validation, ValidationMessage,
54    approval::{Approval, ApprovalMessage},
55    auth::{Auth, AuthMessage, AuthResponse},
56    db::Storable,
57    evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
58    model::common::emit_fail,
59    update::{Update, UpdateMessage, UpdateNew, UpdateType},
60};
61
62use super::{
63    reboot::{Reboot, RebootMessage},
64    types::{ReqManInitMessage, RequestManagerState},
65};
66
67#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct RequestManager {
69    #[serde(skip)]
70    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
71    #[serde(skip)]
72    our_key: Arc<PublicKey>,
73    #[serde(skip)]
74    id: DigestIdentifier,
75    #[serde(skip)]
76    subject_id: DigestIdentifier,
77    #[serde(skip)]
78    governance_id: Option<DigestIdentifier>,
79    #[serde(skip)]
80    retry_timeout: u64,
81    #[serde(skip)]
82    retry_diff: u64,
83    #[serde(skip)]
84    request_started_at: Option<Instant>,
85    #[serde(skip)]
86    current_phase: Option<&'static str>,
87    #[serde(skip)]
88    current_phase_started_at: Option<Instant>,
89    command: ReqManInitMessage,
90    request: Option<Signed<EventRequest>>,
91    state: RequestManagerState,
92    version: u64,
93}
94
95#[derive(Debug, Clone)]
96pub enum RebootType {
97    Normal,
98    Diff,
99    TimeOut,
100}
101
102pub struct InitRequestManager {
103    pub our_key: Arc<PublicKey>,
104    pub subject_id: DigestIdentifier,
105    pub governance_id: Option<DigestIdentifier>,
106    pub helpers: (HashAlgorithm, Arc<NetworkSender>),
107}
108
109impl BorshSerialize for RequestManager {
110    fn serialize<W: std::io::Write>(
111        &self,
112        writer: &mut W,
113    ) -> std::io::Result<()> {
114        BorshSerialize::serialize(&self.command, writer)?;
115        BorshSerialize::serialize(&self.state, writer)?;
116        BorshSerialize::serialize(&self.version, writer)?;
117        BorshSerialize::serialize(&self.request, writer)?;
118
119        Ok(())
120    }
121}
122
123impl BorshDeserialize for RequestManager {
124    fn deserialize_reader<R: std::io::Read>(
125        reader: &mut R,
126    ) -> std::io::Result<Self> {
127        // Deserialize the persisted fields
128        let command = ReqManInitMessage::deserialize_reader(reader)?;
129        let state = RequestManagerState::deserialize_reader(reader)?;
130        let version = u64::deserialize_reader(reader)?;
131        let request =
132            Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
133
134        let our_key = Arc::new(PublicKey::default());
135        let subject_id = DigestIdentifier::default();
136        let id = DigestIdentifier::default();
137
138        Ok(Self {
139            retry_diff: 0,
140            retry_timeout: 0,
141            request_started_at: None,
142            current_phase: None,
143            current_phase_started_at: None,
144            helpers: None,
145            our_key,
146            id,
147            subject_id,
148            governance_id: None,
149            command,
150            request,
151            state,
152            version,
153        })
154    }
155}
156
157impl RequestManager {
158    fn begin_request_metrics(&mut self) {
159        self.request_started_at = Some(Instant::now());
160        self.current_phase = None;
161        self.current_phase_started_at = None;
162
163        if let Some(metrics) = try_core_metrics() {
164            metrics.observe_request_started();
165        }
166    }
167
168    fn ensure_request_metrics_started(&mut self) {
169        if self.request_started_at.is_none() {
170            self.request_started_at = Some(Instant::now());
171        }
172    }
173
174    fn start_phase_metrics(&mut self, phase: &'static str) {
175        self.ensure_request_metrics_started();
176        self.finish_phase_metrics();
177        self.current_phase = Some(phase);
178        self.current_phase_started_at = Some(Instant::now());
179    }
180
181    fn finish_phase_metrics(&mut self) {
182        let Some(phase) = self.current_phase.take() else {
183            self.current_phase_started_at = None;
184            return;
185        };
186        let Some(started_at) = self.current_phase_started_at.take() else {
187            return;
188        };
189
190        if let Some(metrics) = try_core_metrics() {
191            metrics.observe_request_phase(phase, started_at.elapsed());
192        }
193    }
194
195    fn finish_request_metrics(&mut self, result: &'static str) {
196        self.finish_phase_metrics();
197
198        if let Some(started_at) = self.request_started_at.take()
199            && let Some(metrics) = try_core_metrics()
200        {
201            metrics.observe_request_terminal(result, started_at.elapsed());
202        }
203    }
204
205    //////// EVAL
206    ////////////////////////////////////////////////
207    //Revisado
208    async fn build_evaluation(
209        &mut self,
210        ctx: &mut ActorContext<Self>,
211    ) -> Result<(), RequestManagerError> {
212        let Some(request) = self.request.clone() else {
213            return Err(RequestManagerError::RequestNotSet);
214        };
215
216        self.on_event(
217            RequestManagerEvent::UpdateState {
218                state: Box::new(RequestManagerState::Evaluation),
219            },
220            ctx,
221        )
222        .await;
223
224        let metadata = self.check_data_eval(ctx, &request).await?;
225
226        let (signed_evaluation_req, quorum, signers, init_state) =
227            self.build_request_eval(ctx, &metadata, &request).await?;
228
229        if signers.is_empty() {
230            warn!(
231                request_id = %self.id,
232                schema_id = %metadata.schema_id,
233                "No evaluators available for schema"
234            );
235
236            return Err(RequestManagerError::NoEvaluatorsAvailable {
237                schema_id: metadata.schema_id.to_string(),
238                governance_id: signed_evaluation_req
239                    .content()
240                    .governance_id
241                    .clone(),
242            });
243        }
244
245        self.run_evaluation(
246            ctx,
247            signed_evaluation_req.clone(),
248            quorum,
249            init_state,
250            signers,
251        )
252        .await
253    }
254
255    // revisado
256    const fn needs_subject_manager(&self) -> bool {
257        self.governance_id.is_some()
258    }
259
260    fn requester_id(&self) -> String {
261        self.id.to_string()
262    }
263
264    async fn check_data_eval(
265        &self,
266        ctx: &mut ActorContext<Self>,
267        request: &Signed<EventRequest>,
268    ) -> Result<Metadata, RequestManagerError> {
269        let (subject_id, confirm) = match request.content().clone() {
270            EventRequest::Fact(event) => (event.subject_id, false),
271            EventRequest::Transfer(event) => (event.subject_id, false),
272            EventRequest::Confirm(event) => (event.subject_id, true),
273            _ => {
274                return Err(
275                    RequestManagerError::InvalidEventRequestForEvaluation,
276                );
277            }
278        };
279
280        let lease = acquire_subject(
281            ctx,
282            &self.subject_id,
283            self.requester_id(),
284            None,
285            self.needs_subject_manager(),
286        )
287        .await?;
288        let metadata = get_metadata(ctx, &subject_id).await;
289        lease.finish(ctx).await?;
290        let metadata = metadata?;
291
292        if confirm && !metadata.schema_id.is_gov() {
293            return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
294        }
295
296        Ok(metadata)
297    }
298
299    async fn get_governance_data(
300        &self,
301        ctx: &mut ActorContext<Self>,
302    ) -> Result<GovernanceData, RequestManagerError> {
303        let governance_id =
304            self.governance_id.as_ref().unwrap_or(&self.subject_id);
305        Ok(get_gov(ctx, governance_id).await?)
306    }
307
308    async fn build_request_eval(
309        &self,
310        ctx: &mut ActorContext<Self>,
311        metadata: &Metadata,
312        request: &Signed<EventRequest>,
313    ) -> Result<
314        (
315            Signed<EvaluationReq>,
316            Quorum,
317            HashSet<PublicKey>,
318            Option<ValueWrapper>,
319        ),
320        RequestManagerError,
321    > {
322        let is_gov = metadata.schema_id.is_gov();
323
324        let request_type = EventRequestType::from(request.content());
325        let (evaluate_data, governance_data, init_state) = match (
326            is_gov,
327            request_type,
328        ) {
329            (true, EventRequestType::Fact) => {
330                let state =
331                    GovernanceData::try_from(metadata.properties.clone())?;
332
333                (
334                    EvaluateData::GovFact {
335                        state: state.clone(),
336                    },
337                    state,
338                    None,
339                )
340            }
341            (true, EventRequestType::Transfer) => {
342                let state =
343                    GovernanceData::try_from(metadata.properties.clone())?;
344
345                (
346                    EvaluateData::GovTransfer {
347                        state: state.clone(),
348                    },
349                    state,
350                    None,
351                )
352            }
353            (true, EventRequestType::Confirm) => {
354                let state =
355                    GovernanceData::try_from(metadata.properties.clone())?;
356
357                (
358                    EvaluateData::GovConfirm {
359                        state: state.clone(),
360                    },
361                    state,
362                    None,
363                )
364            }
365            (false, EventRequestType::Fact) => {
366                let governance_data =
367                    get_gov(ctx, &metadata.governance_id).await?;
368
369                let init_state =
370                    governance_data.get_init_state(&metadata.schema_id)?;
371
372                (
373                    EvaluateData::TrackerSchemasFact {
374                        contract: format!(
375                            "{}_{}",
376                            metadata.governance_id, metadata.schema_id
377                        ),
378                        state: metadata.properties.clone(),
379                    },
380                    governance_data,
381                    Some(init_state),
382                )
383            }
384            (false, EventRequestType::Transfer) => {
385                let governance_data =
386                    get_gov(ctx, &metadata.governance_id).await?;
387                (
388                    EvaluateData::TrackerSchemasTransfer {
389                        governance_data: governance_data.clone(),
390                        namespace: metadata.namespace.clone(),
391                        schema_id: metadata.schema_id.clone(),
392                        state: metadata.properties.clone(),
393                    },
394                    governance_data,
395                    None,
396                )
397            }
398            _ => unreachable!(
399                "It was previously verified that the matched cases are the only possible ones"
400            ),
401        };
402
403        let (signers, quorum) = governance_data.get_quorum_and_signers(
404            ProtocolTypes::Evaluation,
405            &metadata.schema_id,
406            metadata.namespace.clone(),
407        )?;
408
409        let eval_req = EvaluationReq {
410            event_request: request.clone(),
411            data: evaluate_data,
412            sn: metadata.sn + 1,
413            gov_version: governance_data.version,
414            namespace: metadata.namespace.clone(),
415            schema_id: metadata.schema_id.clone(),
416            signer: (*self.our_key).clone(),
417            signer_is_owner: *self.our_key == request.signature().signer,
418            governance_id: metadata.governance_id.clone(),
419        };
420
421        let signature =
422            get_sign(ctx, SignTypesNode::EvaluationReq(eval_req.clone()))
423                .await?;
424
425        let signed_evaluation_req: Signed<EvaluationReq> =
426            Signed::from_parts(eval_req, signature);
427        Ok((signed_evaluation_req, quorum, signers, init_state))
428    }
429
430    async fn run_evaluation(
431        &mut self,
432        ctx: &mut ActorContext<Self>,
433        request: Signed<EvaluationReq>,
434        quorum: Quorum,
435        init_state: Option<ValueWrapper>,
436        signers: HashSet<PublicKey>,
437    ) -> Result<(), RequestManagerError> {
438        let Some((hash, network)) = self.helpers.clone() else {
439            return Err(RequestManagerError::HelpersNotInitialized);
440        };
441
442        self.start_phase_metrics("evaluation");
443        info!("Init evaluation {}", self.id);
444        let child = ctx
445            .create_child(
446                "evaluation",
447                Evaluation::new(
448                    self.our_key.clone(),
449                    request,
450                    quorum,
451                    init_state,
452                    hash,
453                    network,
454                ),
455            )
456            .await?;
457
458        child
459            .tell(EvaluationMessage::Create {
460                request_id: self.id.clone(),
461                version: self.version,
462                signers,
463            })
464            .await?;
465
466        send_to_tracking(
467            ctx,
468            RequestTrackingMessage::UpdateState {
469                request_id: self.id.clone(),
470                state: RequestState::Evaluation,
471            },
472        )
473        .await?;
474
475        Ok(())
476    }
477    //////// APPROVE
478    ////////////////////////////////////////////////
479    async fn build_request_appro(
480        &self,
481        ctx: &mut ActorContext<Self>,
482        eval_req: EvaluationReq,
483        evaluator_res: EvaluatorResponse,
484    ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
485        let request = ApprovalReq {
486            subject_id: self.subject_id.clone(),
487            sn: eval_req.sn,
488            gov_version: eval_req.gov_version,
489            patch: evaluator_res.patch,
490            signer: eval_req.signer,
491        };
492
493        let signature =
494            get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
495
496        let signed_approval_req: Signed<ApprovalReq> =
497            Signed::from_parts(request, signature);
498
499        Ok(signed_approval_req)
500    }
501
502    async fn build_approval(
503        &mut self,
504        ctx: &mut ActorContext<Self>,
505        eval_req: EvaluationReq,
506        eval_res: EvaluatorResponse,
507    ) -> Result<(), RequestManagerError> {
508        let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
509
510        let governance_data =
511            get_gov(ctx, &request.content().subject_id).await?;
512
513        let (signers, quorum) = governance_data.get_quorum_and_signers(
514            ProtocolTypes::Approval,
515            &SchemaType::Governance,
516            Namespace::new(),
517        )?;
518
519        if signers.is_empty() {
520            warn!(
521                request_id = %self.id,
522                schema_id = %SchemaType::Governance,
523                "No approvers available for schema"
524            );
525
526            return Err(RequestManagerError::NoApproversAvailable {
527                schema_id: SchemaType::Governance.to_string(),
528                governance_id: self
529                    .governance_id
530                    .clone()
531                    .unwrap_or_else(|| self.subject_id.clone()),
532            });
533        }
534
535        self.run_approval(ctx, request, quorum, signers).await
536    }
537
538    async fn run_approval(
539        &mut self,
540        ctx: &mut ActorContext<Self>,
541        request: Signed<ApprovalReq>,
542        quorum: Quorum,
543        signers: HashSet<PublicKey>,
544    ) -> Result<(), RequestManagerError> {
545        let Some((hash, network)) = self.helpers.clone() else {
546            return Err(RequestManagerError::HelpersNotInitialized);
547        };
548
549        self.start_phase_metrics("approval");
550        info!("Init approval {}", self.id);
551        let child = ctx
552            .create_child(
553                "approval",
554                Approval::new(
555                    self.our_key.clone(),
556                    request,
557                    quorum,
558                    signers,
559                    hash,
560                    network,
561                ),
562            )
563            .await?;
564
565        child
566            .tell(ApprovalMessage::Create {
567                request_id: self.id.clone(),
568                version: self.version,
569            })
570            .await?;
571
572        send_to_tracking(
573            ctx,
574            RequestTrackingMessage::UpdateState {
575                request_id: self.id.clone(),
576                state: RequestState::Approval,
577            },
578        )
579        .await?;
580
581        Ok(())
582    }
583
584    //////// VALI
585    ////////////////////////////////////////////////
586    async fn build_validation_req(
587        &mut self,
588        ctx: &mut ActorContext<Self>,
589        eval: Option<(EvaluationReq, EvaluationData)>,
590        appro_data: Option<ApprovalData>,
591    ) -> Result<
592        (
593            Signed<ValidationReq>,
594            Quorum,
595            HashSet<PublicKey>,
596            Option<ValueWrapper>,
597            CurrentRequestRoles,
598        ),
599        RequestManagerError,
600    > {
601        let (
602            vali_req,
603            quorum,
604            signers,
605            init_state,
606            current_request_roles,
607            schema_id,
608        ) = self.build_validation_data(ctx, eval, appro_data).await?;
609
610        if signers.is_empty() {
611            warn!(
612                request_id = %self.id,
613                schema_id = %schema_id,
614                "No validators available for schema"
615            );
616
617            return Err(RequestManagerError::NoValidatorsAvailable {
618                schema_id: schema_id.to_string(),
619                governance_id: vali_req.get_governance_id().expect("The build process verified that the event request is valid")
620            });
621        }
622
623        let signature = get_sign(
624            ctx,
625            SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
626        )
627        .await?;
628
629        let signed_validation_req: Signed<ValidationReq> =
630            Signed::from_parts(vali_req, signature);
631
632        self.on_event(
633            RequestManagerEvent::UpdateState {
634                state: Box::new(RequestManagerState::Validation {
635                    request: Box::new(signed_validation_req.clone()),
636                    quorum: quorum.clone(),
637                    init_state: init_state.clone(),
638                    current_request_roles: current_request_roles.clone(),
639                    signers: signers.clone(),
640                }),
641            },
642            ctx,
643        )
644        .await;
645
646        Ok((
647            signed_validation_req,
648            quorum,
649            signers,
650            init_state,
651            current_request_roles,
652        ))
653    }
654
655    async fn build_validation_data(
656        &self,
657        ctx: &mut ActorContext<Self>,
658        eval: Option<(EvaluationReq, EvaluationData)>,
659        appro_data: Option<ApprovalData>,
660    ) -> Result<
661        (
662            ValidationReq,
663            Quorum,
664            HashSet<PublicKey>,
665            Option<ValueWrapper>,
666            CurrentRequestRoles,
667            SchemaType,
668        ),
669        RequestManagerError,
670    > {
671        let Some(request) = self.request.clone() else {
672            return Err(RequestManagerError::RequestNotSet);
673        };
674
675        if let EventRequest::Create(create) = request.content() {
676            if create.schema_id.is_gov() {
677                let governance_data =
678                    GovernanceData::new((*self.our_key).clone());
679                let (signers, quorum) = governance_data
680                    .get_quorum_and_signers(
681                        ProtocolTypes::Validation,
682                        &SchemaType::Governance,
683                        Namespace::new(),
684                    )?;
685
686                Ok((
687                    ValidationReq::Create {
688                        event_request: request.clone(),
689                        gov_version: 0,
690                        subject_id: self.subject_id.clone(),
691                    },
692                    quorum,
693                    signers,
694                    None,
695                    CurrentRequestRoles {
696                        evaluation: RoleDataRegister {
697                            workers: HashSet::new(),
698                            quorum: Quorum::default(),
699                        },
700                        approval: RoleDataRegister {
701                            workers: HashSet::new(),
702                            quorum: Quorum::default(),
703                        },
704                    },
705                    SchemaType::Governance,
706                ))
707            } else {
708                let governance_data =
709                    get_gov(ctx, &create.governance_id).await?;
710
711                let (signers, quorum) = governance_data
712                    .get_quorum_and_signers(
713                        ProtocolTypes::Validation,
714                        &create.schema_id,
715                        create.namespace.clone(),
716                    )?;
717
718                let init_state =
719                    governance_data.get_init_state(&create.schema_id)?;
720
721                Ok((
722                    ValidationReq::Create {
723                        event_request: request.clone(),
724                        gov_version: governance_data.version,
725                        subject_id: self.subject_id.clone(),
726                    },
727                    quorum,
728                    signers,
729                    Some(init_state),
730                    CurrentRequestRoles {
731                        evaluation: RoleDataRegister {
732                            workers: HashSet::new(),
733                            quorum: Quorum::default(),
734                        },
735                        approval: RoleDataRegister {
736                            workers: HashSet::new(),
737                            quorum: Quorum::default(),
738                        },
739                    },
740                    create.schema_id.clone(),
741                ))
742            }
743        } else {
744            let Some((hash, ..)) = self.helpers else {
745                return Err(RequestManagerError::HelpersNotInitialized);
746            };
747
748            let governance_data = self.get_governance_data(ctx).await?;
749
750            let (actual_protocols, gov_version, sn) =
751                if let Some((eval_req, eval_data)) = eval {
752                    if let Some(approval_data) = appro_data.clone() {
753                        (
754                            ActualProtocols::EvalApprove {
755                                eval_data,
756                                approval_data,
757                            },
758                            eval_req.gov_version,
759                            Some(eval_req.sn),
760                        )
761                    } else {
762                        (
763                            ActualProtocols::Eval { eval_data },
764                            eval_req.gov_version,
765                            Some(eval_req.sn),
766                        )
767                    }
768                } else {
769                    (ActualProtocols::None, governance_data.version, None)
770                };
771
772            let lease = acquire_subject(
773                ctx,
774                &self.subject_id,
775                self.requester_id(),
776                None,
777                self.needs_subject_manager(),
778            )
779            .await?;
780            let metadata = get_metadata(ctx, &self.subject_id).await;
781            let last_ledger_event = match metadata {
782                Ok(metadata) => {
783                    let last_ledger_event =
784                        get_last_ledger_event(ctx, &self.subject_id).await;
785                    lease.finish(ctx).await?;
786                    (metadata, last_ledger_event?)
787                }
788                Err(error) => {
789                    lease.finish(ctx).await?;
790                    return Err(error.into());
791                }
792            };
793
794            let (metadata, last_ledger_event) = last_ledger_event;
795            let sn = if let Some(sn) = sn {
796                sn
797            } else {
798                metadata.sn + 1
799            };
800
801            let (signers, quorum) = governance_data.get_quorum_and_signers(
802                ProtocolTypes::Validation,
803                &metadata.schema_id,
804                metadata.namespace.clone(),
805            )?;
806
807            let Some(last_ledger_event) = last_ledger_event else {
808                return Err(RequestManagerError::LastLedgerEventNotFound);
809            };
810
811            let ledger_hash = hash_borsh(&*hash.hasher(), &last_ledger_event.0)
812                .map_err(|e| RequestManagerError::LedgerHashFailed {
813                    details: e.to_string(),
814                })?;
815
816            let schema_id = metadata.schema_id.clone();
817
818            let current_request_roles =
819                if gov_version == governance_data.version {
820                    let (evaluation_workers, evaluation_quorum) =
821                        governance_data.get_quorum_and_signers(
822                            ProtocolTypes::Evaluation,
823                            &metadata.schema_id,
824                            metadata.namespace.clone(),
825                        )?;
826
827                    let (approval_workers, approval_quorum) =
828                        if appro_data.is_some() {
829                            governance_data.get_quorum_and_signers(
830                                ProtocolTypes::Approval,
831                                &SchemaType::Governance,
832                                Namespace::new(),
833                            )?
834                        } else {
835                            (HashSet::new(), Quorum::default())
836                        };
837
838                    CurrentRequestRoles {
839                        evaluation: RoleDataRegister {
840                            workers: evaluation_workers,
841                            quorum: evaluation_quorum,
842                        },
843                        approval: RoleDataRegister {
844                            workers: approval_workers,
845                            quorum: approval_quorum,
846                        },
847                    }
848                } else {
849                    CurrentRequestRoles {
850                        evaluation: RoleDataRegister {
851                            workers: HashSet::new(),
852                            quorum: Quorum::default(),
853                        },
854                        approval: RoleDataRegister {
855                            workers: HashSet::new(),
856                            quorum: Quorum::default(),
857                        },
858                    }
859                };
860
861            Ok((
862                ValidationReq::Event {
863                    actual_protocols: Box::new(actual_protocols),
864                    event_request: request.clone(),
865                    metadata: Box::new(metadata),
866                    last_data: Box::new(LastData {
867                        vali_data: last_ledger_event
868                            .content()
869                            .protocols
870                            .get_validation_data(),
871                        gov_version: last_ledger_event.content().gov_version,
872                    }),
873                    gov_version,
874                    ledger_hash,
875                    sn,
876                },
877                quorum,
878                signers,
879                None,
880                current_request_roles,
881                schema_id,
882            ))
883        }
884    }
885
886    async fn run_validation(
887        &mut self,
888        ctx: &mut ActorContext<Self>,
889        request: Signed<ValidationReq>,
890        quorum: Quorum,
891        signers: HashSet<PublicKey>,
892        init_state: Option<ValueWrapper>,
893        current_request_roles: CurrentRequestRoles,
894    ) -> Result<(), RequestManagerError> {
895        let Some((hash, network)) = self.helpers.clone() else {
896            return Err(RequestManagerError::HelpersNotInitialized);
897        };
898
899        self.start_phase_metrics("validation");
900        info!("Init validation {}", self.id);
901        let child = ctx
902            .create_child(
903                "validation",
904                Validation::new(
905                    self.our_key.clone(),
906                    request,
907                    init_state,
908                    current_request_roles,
909                    quorum,
910                    hash,
911                    network,
912                ),
913            )
914            .await?;
915
916        child
917            .tell(ValidationMessage::Create {
918                request_id: self.id.clone(),
919                version: self.version,
920                signers,
921            })
922            .await?;
923
924        send_to_tracking(
925            ctx,
926            RequestTrackingMessage::UpdateState {
927                request_id: self.id.clone(),
928                state: RequestState::Validation,
929            },
930        )
931        .await?;
932
933        Ok(())
934    }
935    //////// Distribution
936    ////////////////////////////////////////////////
937    async fn build_ledger(
938        &mut self,
939        ctx: &mut ActorContext<Self>,
940        val_req: ValidationReq,
941        val_res: ValidationData,
942    ) -> Result<SignedLedger, RequestManagerError> {
943        let ledger = match val_req {
944            ValidationReq::Create {
945                event_request,
946                gov_version,
947                ..
948            } => Ledger {
949                event_request,
950                gov_version,
951                sn: 0,
952                prev_ledger_event_hash: DigestIdentifier::default(),
953                protocols: Protocols::Create {
954                    validation: val_res,
955                },
956            },
957            ValidationReq::Event {
958                actual_protocols,
959                event_request,
960                metadata,
961                gov_version,
962                sn,
963                ledger_hash,
964                ..
965            } => Ledger {
966                gov_version,
967                sn,
968                prev_ledger_event_hash: ledger_hash,
969                protocols: Protocols::build(
970                    metadata.schema_id.is_gov(),
971                    EventRequestType::from(event_request.content()),
972                    *actual_protocols,
973                    val_res,
974                )?,
975                event_request,
976            },
977        };
978
979        let signature =
980            get_sign(ctx, SignTypesNode::Ledger(ledger.clone())).await?;
981
982        let ledger = SignedLedger(Signed::from_parts(ledger, signature));
983
984        self.on_event(
985            RequestManagerEvent::UpdateState {
986                state: Box::new(RequestManagerState::UpdateSubject {
987                    ledger: ledger.clone(),
988                }),
989            },
990            ctx,
991        )
992        .await;
993
994        Ok(ledger)
995    }
996
997    async fn update_subject(
998        &mut self,
999        ctx: &mut ActorContext<Self>,
1000        ledger: SignedLedger,
1001    ) -> Result<(), RequestManagerError> {
1002        if ledger.content().event_request.content().is_create_event() {
1003            if let Err(e) = create_subject(ctx, ledger.clone()).await {
1004                if let ActorError::Functional { .. } = e {
1005                    return Err(RequestManagerError::CheckLimit);
1006                }
1007                return Err(e.into());
1008            }
1009        } else {
1010            let lease = acquire_subject(
1011                ctx,
1012                &self.subject_id,
1013                self.requester_id(),
1014                None,
1015                self.needs_subject_manager(),
1016            )
1017            .await?;
1018            let update_result =
1019                update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1020                    .await;
1021            lease.finish(ctx).await?;
1022            update_result?;
1023        }
1024
1025        self.on_event(
1026            RequestManagerEvent::UpdateState {
1027                state: Box::new(RequestManagerState::Distribution { ledger }),
1028            },
1029            ctx,
1030        )
1031        .await;
1032
1033        Ok(())
1034    }
1035
1036    async fn build_distribution(
1037        &mut self,
1038        ctx: &mut ActorContext<Self>,
1039        ledger: SignedLedger,
1040    ) -> Result<bool, RequestManagerError> {
1041        let witnesses = self
1042            .build_distribution_data(ctx, ledger.signature().signer.clone())
1043            .await?;
1044
1045        let Some(mut witnesses) = witnesses else {
1046            return Ok(false);
1047        };
1048
1049        witnesses.remove(&self.our_key);
1050
1051        if witnesses.is_empty() {
1052            warn!(
1053                request_id = %self.id,
1054                "No witnesses available for distribution"
1055            );
1056            return Ok(false);
1057        }
1058
1059        self.run_distribution(ctx, witnesses, ledger).await?;
1060
1061        Ok(true)
1062    }
1063
1064    async fn build_distribution_data(
1065        &self,
1066        ctx: &mut ActorContext<Self>,
1067        creator: PublicKey,
1068    ) -> Result<Option<HashSet<PublicKey>>, RequestManagerError> {
1069        let Some(request) = self.request.clone() else {
1070            return Err(RequestManagerError::RequestNotSet);
1071        };
1072
1073        let witnesses = if let EventRequest::Create(create) = request.content()
1074        {
1075            if create.schema_id == SchemaType::Governance {
1076                None
1077            } else {
1078                let governance_data = self.get_governance_data(ctx).await?;
1079
1080                let witnesses =
1081                    governance_data.get_witnesses(WitnessesData::Schema {
1082                        creator,
1083                        schema_id: create.schema_id.clone(),
1084                        namespace: create.namespace.clone(),
1085                    })?;
1086
1087                Some(witnesses)
1088            }
1089        } else {
1090            let data = get_subject_data(ctx, &self.subject_id).await?;
1091
1092            let Some(data) = data else {
1093                return Err(RequestManagerError::SubjectDataNotFound {
1094                    subject_id: self.subject_id.to_string(),
1095                });
1096            };
1097
1098            let governance_data = self.get_governance_data(ctx).await?;
1099
1100            let witnesses = match data {
1101                SubjectData::Governance { .. } => {
1102                    governance_data.get_witnesses(WitnessesData::Gov)?
1103                }
1104                SubjectData::Tracker {
1105                    schema_id,
1106                    namespace,
1107                    ..
1108                } => governance_data.get_witnesses(WitnessesData::Schema {
1109                    creator,
1110                    schema_id,
1111                    namespace: Namespace::from(namespace),
1112                })?,
1113            };
1114
1115            Some(witnesses)
1116        };
1117
1118        Ok(witnesses)
1119    }
1120
1121    async fn run_distribution(
1122        &mut self,
1123        ctx: &mut ActorContext<Self>,
1124        witnesses: HashSet<PublicKey>,
1125        ledger: SignedLedger,
1126    ) -> Result<(), RequestManagerError> {
1127        let Some((.., network)) = self.helpers.clone() else {
1128            return Err(RequestManagerError::HelpersNotInitialized);
1129        };
1130
1131        self.start_phase_metrics("distribution");
1132        info!("Init distribution {}", self.id);
1133        let child = ctx
1134            .create_child(
1135                "distribution",
1136                Distribution::new(
1137                    network,
1138                    DistributionType::Request,
1139                    self.id.clone(),
1140                ),
1141            )
1142            .await?;
1143
1144        child
1145            .tell(DistributionMessage::Create {
1146                ledger: Box::new(ledger),
1147                witnesses,
1148            })
1149            .await?;
1150
1151        send_to_tracking(
1152            ctx,
1153            RequestTrackingMessage::UpdateState {
1154                request_id: self.id.clone(),
1155                state: RequestState::Distribution,
1156            },
1157        )
1158        .await?;
1159
1160        Ok(())
1161    }
1162
1163    //////// Reboot
1164    ////////////////////////////////////////////////
1165    async fn init_wait(
1166        &self,
1167        ctx: &mut ActorContext<Self>,
1168        governance_id: &DigestIdentifier,
1169    ) -> Result<(), RequestManagerError> {
1170        let actor = ctx
1171            .create_child(
1172                "reboot",
1173                Reboot::new(governance_id.clone(), self.id.clone()),
1174            )
1175            .await?;
1176
1177        actor.tell(RebootMessage::Init).await?;
1178
1179        Ok(())
1180    }
1181
1182    async fn init_update(
1183        &self,
1184        ctx: &mut ActorContext<Self>,
1185        governance_id: &DigestIdentifier,
1186    ) -> Result<(), RequestManagerError> {
1187        let Some((.., network)) = self.helpers.clone() else {
1188            return Err(RequestManagerError::HelpersNotInitialized);
1189        };
1190
1191        let gov_sn = get_gov_sn(ctx, governance_id).await?;
1192
1193        let governance_data = get_gov(ctx, governance_id).await?;
1194
1195        let mut witnesses = {
1196            let gov_witnesses =
1197                governance_data.get_witnesses(WitnessesData::Gov)?;
1198
1199            let auth_witnesses =
1200                Self::get_witnesses_auth(ctx, governance_id.clone())
1201                    .await
1202                    .unwrap_or_default();
1203
1204            gov_witnesses
1205                .union(&auth_witnesses)
1206                .cloned()
1207                .collect::<HashSet<PublicKey>>()
1208        };
1209
1210        witnesses.remove(&self.our_key);
1211
1212        if witnesses.is_empty() {
1213            if let Ok(actor) = ctx.reference().await {
1214                actor
1215                    .tell(RequestManagerMessage::FinishReboot {
1216                        request_id: self.id.clone(),
1217                    })
1218                    .await?;
1219            };
1220        } else if witnesses.len() == 1 {
1221            let objetive = witnesses.iter().next().expect("len is 1");
1222            let info = ComunicateInfo {
1223                receiver: objetive.clone(),
1224                request_id: String::default(),
1225                version: 0,
1226                receiver_actor: format!(
1227                    "/user/node/distributor_{}",
1228                    governance_id
1229                ),
1230            };
1231
1232            network
1233                .send_command(network::CommandHelper::SendMessage {
1234                    message: NetworkMessage {
1235                        info,
1236                        message: ActorMessage::DistributionLedgerReq {
1237                            actual_sn: Some(gov_sn),
1238                            subject_id: governance_id.clone(),
1239                        },
1240                    },
1241                })
1242                .await?;
1243
1244            let Ok(actor) = ctx.reference().await else {
1245                return Ok(());
1246            };
1247
1248            actor
1249                .tell(RequestManagerMessage::RebootWait {
1250                    request_id: self.id.clone(),
1251                    governance_id: governance_id.clone(),
1252                })
1253                .await?;
1254        } else {
1255            let data = UpdateNew {
1256                network,
1257                subject_id: governance_id.clone(),
1258                our_sn: Some(gov_sn),
1259                witnesses,
1260                update_type: UpdateType::Request {
1261                    subject_id: self.subject_id.clone(),
1262                    id: self.id.clone(),
1263                },
1264            };
1265
1266            let updater = Update::new(data);
1267            let Ok(child) = ctx.create_child("update", updater).await else {
1268                let Ok(actor) = ctx.reference().await else {
1269                    return Ok(());
1270                };
1271
1272                actor
1273                    .tell(RequestManagerMessage::RebootWait {
1274                        request_id: self.id.clone(),
1275                        governance_id: governance_id.clone(),
1276                    })
1277                    .await?;
1278
1279                return Ok(());
1280            };
1281
1282            child.tell(UpdateMessage::Run).await?;
1283        }
1284
1285        Ok(())
1286    }
1287
1288    async fn get_witnesses_auth(
1289        ctx: &ActorContext<Self>,
1290        governance_id: DigestIdentifier,
1291    ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1292        let path = ActorPath::from("/user/node/auth");
1293        let actor = ctx.system().get_actor::<Auth>(&path).await?;
1294
1295        let response = actor
1296            .ask(AuthMessage::GetAuth {
1297                subject_id: governance_id,
1298            })
1299            .await?;
1300
1301        match response {
1302            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1303            _ => Err(RequestManagerError::ActorError(
1304                ActorError::UnexpectedResponse {
1305                    path,
1306                    expected: "AuthResponse::Witnesses".to_owned(),
1307                },
1308            )),
1309        }
1310    }
1311
1312    //////// General
1313    ////////////////////////////////////////////////
1314    async fn send_reboot(
1315        &self,
1316        ctx: &ActorContext<Self>,
1317        governance_id: DigestIdentifier,
1318    ) -> Result<(), ActorError> {
1319        let Ok(actor) = ctx.reference().await else {
1320            return Ok(());
1321        };
1322
1323        actor
1324            .tell(RequestManagerMessage::Reboot {
1325                request_id: self.id.clone(),
1326                governance_id,
1327                reboot_type: RebootType::TimeOut,
1328            })
1329            .await
1330    }
1331
1332    async fn match_error(
1333        &mut self,
1334        ctx: &mut ActorContext<Self>,
1335        error: RequestManagerError,
1336    ) {
1337        match error {
1338            RequestManagerError::NoEvaluatorsAvailable {
1339                governance_id,
1340                ..
1341            }
1342            | RequestManagerError::NoApproversAvailable {
1343                governance_id, ..
1344            }
1345            | RequestManagerError::NoValidatorsAvailable {
1346                governance_id,
1347                ..
1348            } => {
1349                if let Err(e) = self.send_reboot(ctx, governance_id).await {
1350                    emit_fail(ctx, e).await;
1351                }
1352            }
1353            RequestManagerError::CheckLimit
1354            | RequestManagerError::Governance(..) => {
1355                if let Err(e) = self
1356                    .abort_request(
1357                        ctx,
1358                        error.to_string(),
1359                        None,
1360                        (*self.our_key).clone(),
1361                    )
1362                    .await
1363                {
1364                    emit_fail(
1365                        ctx,
1366                        ActorError::FunctionalCritical {
1367                            description: e.to_string(),
1368                        },
1369                    )
1370                    .await;
1371                }
1372            }
1373            _ => {
1374                emit_fail(
1375                    ctx,
1376                    ActorError::FunctionalCritical {
1377                        description: error.to_string(),
1378                    },
1379                )
1380                .await;
1381            }
1382        }
1383    }
1384
1385    async fn finish_request(
1386        &mut self,
1387        ctx: &mut ActorContext<Self>,
1388    ) -> Result<(), RequestManagerError> {
1389        self.finish_request_metrics("finished");
1390        info!("Ending {}", self.id);
1391        send_to_tracking(
1392            ctx,
1393            RequestTrackingMessage::UpdateState {
1394                request_id: self.id.clone(),
1395                state: RequestState::Finish,
1396            },
1397        )
1398        .await?;
1399
1400        self.on_event(RequestManagerEvent::Finish, ctx).await;
1401
1402        self.end_request(ctx).await?;
1403
1404        Ok(())
1405    }
1406
1407    async fn reboot(
1408        &mut self,
1409        ctx: &mut ActorContext<Self>,
1410        reboot_type: RebootType,
1411        governance_id: DigestIdentifier,
1412    ) -> Result<(), RequestManagerError> {
1413        self.start_phase_metrics("reboot");
1414        self.on_event(
1415            RequestManagerEvent::UpdateState {
1416                state: Box::new(RequestManagerState::Reboot),
1417            },
1418            ctx,
1419        )
1420        .await;
1421
1422        let Ok(actor) = ctx.reference().await else {
1423            return Ok(());
1424        };
1425
1426        let request_id = self.id.clone();
1427
1428        match reboot_type {
1429            RebootType::Normal => {
1430                info!("Launching Normal reboot {}", self.id);
1431                send_to_tracking(
1432                    ctx,
1433                    RequestTrackingMessage::UpdateState {
1434                        request_id: self.id.clone(),
1435                        state: RequestState::Reboot,
1436                    },
1437                )
1438                .await?;
1439
1440                actor
1441                    .tell(RequestManagerMessage::RebootUpdate {
1442                        request_id,
1443                        governance_id,
1444                    })
1445                    .await?;
1446            }
1447            RebootType::Diff => {
1448                info!("Launching Diff reboot {}", self.id);
1449                self.retry_diff += 1;
1450
1451                let seconds = match self.retry_diff {
1452                    1 => 10,
1453                    2 => 20,
1454                    3 => 30,
1455                    _ => 60,
1456                };
1457
1458                info!(
1459                    "Launching Diff reboot {}, try: {}, seconds: {}",
1460                    self.id, self.retry_diff, seconds
1461                );
1462
1463                send_to_tracking(
1464                    ctx,
1465                    RequestTrackingMessage::UpdateState {
1466                        request_id: self.id.clone(),
1467                        state: RequestState::RebootDiff {
1468                            seconds,
1469                            count: self.retry_diff,
1470                        },
1471                    },
1472                )
1473                .await?;
1474
1475                tokio::spawn(async move {
1476                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1477                    let _ = actor
1478                        .tell(RequestManagerMessage::RebootUpdate {
1479                            request_id,
1480                            governance_id,
1481                        })
1482                        .await;
1483                });
1484            }
1485            RebootType::TimeOut => {
1486                self.retry_timeout += 1;
1487
1488                #[cfg(not(any(test, feature = "test")))]
1489                let seconds = match self.retry_timeout {
1490                    1 => 30,
1491                    2 => 60,
1492                    3 => 120,
1493                    _ => 300,
1494                };
1495
1496                #[cfg(any(test, feature = "test"))]
1497                let seconds = match self.retry_timeout {
1498                    1 => 5,
1499                    2 => 5,
1500                    3 => 5,
1501                    _ => 5,
1502                };
1503
1504                info!(
1505                    "Launching TimeOut reboot {}, try: {}, seconds: {}",
1506                    self.id, self.retry_timeout, seconds
1507                );
1508                send_to_tracking(
1509                    ctx,
1510                    RequestTrackingMessage::UpdateState {
1511                        request_id: self.id.clone(),
1512                        state: RequestState::RebootTimeOut {
1513                            seconds,
1514                            count: self.retry_timeout,
1515                        },
1516                    },
1517                )
1518                .await?;
1519
1520                tokio::spawn(async move {
1521                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1522                    let _ = actor
1523                        .tell(RequestManagerMessage::RebootUpdate {
1524                            request_id,
1525                            governance_id,
1526                        })
1527                        .await;
1528                });
1529            }
1530        }
1531
1532        Ok(())
1533    }
1534
1535    async fn match_command(
1536        &mut self,
1537        ctx: &mut ActorContext<Self>,
1538    ) -> Result<(), RequestManagerError> {
1539        match self.command {
1540            ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1541            ReqManInitMessage::Validate => {
1542                let (
1543                    request,
1544                    quorum,
1545                    signers,
1546                    init_state,
1547                    current_request_roles,
1548                ) = self.build_validation_req(ctx, None, None).await?;
1549
1550                self.run_validation(
1551                    ctx,
1552                    request,
1553                    quorum,
1554                    signers,
1555                    init_state,
1556                    current_request_roles,
1557                )
1558                .await
1559            }
1560        }
1561    }
1562
1563    async fn check_signature(
1564        &self,
1565        ctx: &mut ActorContext<Self>,
1566    ) -> Result<(), RequestManagerError> {
1567        let Some(request) = self.request.clone() else {
1568            return Err(RequestManagerError::RequestNotSet);
1569        };
1570
1571        if let EventRequest::Fact { .. } = request.content() {
1572            let subject_data = get_subject_data(ctx, &self.subject_id).await?;
1573            let Some(subject_data) = subject_data else {
1574                return Err(RequestManagerError::SubjecData);
1575            };
1576
1577            let gov = self.get_governance_data(ctx).await?;
1578            match subject_data {
1579                SubjectData::Tracker {
1580                    schema_id,
1581                    namespace,
1582                    ..
1583                } => {
1584                    if !gov.has_this_role(HashThisRole::Schema {
1585                        who: request.signature().signer.clone(),
1586                        role: RoleTypes::Issuer,
1587                        schema_id,
1588                        namespace: Namespace::from(namespace),
1589                    }) {
1590                        return Err(RequestManagerError::NotIssuer);
1591                    }
1592                }
1593                SubjectData::Governance { .. } => {
1594                    if !gov.has_this_role(HashThisRole::Gov {
1595                        who: request.signature().signer.clone(),
1596                        role: RoleTypes::Issuer,
1597                    }) {
1598                        return Err(RequestManagerError::NotIssuer);
1599                    }
1600                }
1601            }
1602        }
1603
1604        Ok(())
1605    }
1606
1607    async fn stops_childs(
1608        &self,
1609        ctx: &mut ActorContext<Self>,
1610    ) -> Result<(), RequestManagerError> {
1611        match self.state {
1612            RequestManagerState::Reboot => {
1613                if let Ok(actor) = ctx.get_child::<Update>("update").await {
1614                    actor.ask_stop().await?;
1615                };
1616                if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1617                    actor.ask_stop().await?;
1618                };
1619            }
1620            RequestManagerState::Evaluation => {
1621                if let Ok(actor) =
1622                    ctx.get_child::<Evaluation>("evaluation").await
1623                {
1624                    actor.ask_stop().await?;
1625                };
1626            }
1627            RequestManagerState::Approval { .. } => {
1628                if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1629                    actor.ask_stop().await?;
1630                };
1631                let _ = make_obsolete(ctx, &self.subject_id).await;
1632            }
1633            RequestManagerState::Validation { .. } => {
1634                if let Ok(actor) =
1635                    ctx.get_child::<Validation>("validation").await
1636                {
1637                    actor.ask_stop().await?;
1638                };
1639            }
1640            RequestManagerState::Distribution { .. } => {
1641                if let Ok(actor) =
1642                    ctx.get_child::<Distribution>("distribution").await
1643                {
1644                    actor.ask_stop().await?;
1645                };
1646            }
1647            _ => {}
1648        }
1649
1650        Ok(())
1651    }
1652
1653    async fn abort_request(
1654        &mut self,
1655        ctx: &mut ActorContext<Self>,
1656        error: String,
1657        sn: Option<u64>,
1658        who: PublicKey,
1659    ) -> Result<(), RequestManagerError> {
1660        self.stops_childs(ctx).await?;
1661
1662        self.finish_request_metrics("aborted");
1663        info!("Aborting {}", self.id);
1664        send_to_tracking(
1665            ctx,
1666            RequestTrackingMessage::UpdateState {
1667                request_id: self.id.clone(),
1668                state: RequestState::Abort {
1669                    subject_id: self.subject_id.to_string(),
1670                    error,
1671                    sn,
1672                    who: who.to_string(),
1673                },
1674            },
1675        )
1676        .await?;
1677
1678        self.on_event(RequestManagerEvent::Finish, ctx).await;
1679
1680        self.end_request(ctx).await?;
1681
1682        Ok(())
1683    }
1684
1685    async fn end_request(
1686        &self,
1687        ctx: &ActorContext<Self>,
1688    ) -> Result<(), RequestManagerError> {
1689        let actor = ctx.get_parent::<RequestHandler>().await?;
1690        actor
1691            .tell(RequestHandlerMessage::EndHandling {
1692                subject_id: self.subject_id.clone(),
1693            })
1694            .await?;
1695
1696        Ok(())
1697    }
1698}
1699
1700#[derive(Debug, Clone)]
1701pub enum RequestManagerMessage {
1702    Run {
1703        request_id: DigestIdentifier,
1704    },
1705    FirstRun {
1706        command: ReqManInitMessage,
1707        request: Signed<EventRequest>,
1708        request_id: DigestIdentifier,
1709    },
1710    Abort {
1711        request_id: DigestIdentifier,
1712        who: PublicKey,
1713        reason: String,
1714        sn: u64,
1715    },
1716    ManualAbort,
1717    PurgeStorage,
1718    Reboot {
1719        request_id: DigestIdentifier,
1720        governance_id: DigestIdentifier,
1721        reboot_type: RebootType,
1722    },
1723    RebootUpdate {
1724        request_id: DigestIdentifier,
1725        governance_id: DigestIdentifier,
1726    },
1727    RebootWait {
1728        request_id: DigestIdentifier,
1729        governance_id: DigestIdentifier,
1730    },
1731    FinishReboot {
1732        request_id: DigestIdentifier,
1733    },
1734    EvaluationRes {
1735        request_id: DigestIdentifier,
1736        eval_req: Box<EvaluationReq>,
1737        eval_res: EvaluationData,
1738    },
1739    ApprovalRes {
1740        request_id: DigestIdentifier,
1741        appro_res: ApprovalData,
1742    },
1743    ValidationRes {
1744        request_id: DigestIdentifier,
1745        val_req: Box<ValidationReq>,
1746        val_res: ValidationData,
1747    },
1748    FinishRequest {
1749        request_id: DigestIdentifier,
1750    },
1751}
1752
1753impl Message for RequestManagerMessage {}
1754
1755#[derive(
1756    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
1757)]
1758pub enum RequestManagerEvent {
1759    Finish,
1760    UpdateState {
1761        state: Box<RequestManagerState>,
1762    },
1763    UpdateVersion {
1764        version: u64,
1765    },
1766    SafeState {
1767        command: ReqManInitMessage,
1768        request: Signed<EventRequest>,
1769    },
1770}
1771
1772impl Event for RequestManagerEvent {}
1773
1774#[async_trait]
1775impl Actor for RequestManager {
1776    type Event = RequestManagerEvent;
1777    type Message = RequestManagerMessage;
1778    type Response = ();
1779
1780    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1781        parent_span.map_or_else(
1782            || info_span!("RequestManager", id),
1783            |parent_span| info_span!(parent: parent_span, "RequestManager", id),
1784        )
1785    }
1786
1787    async fn pre_start(
1788        &mut self,
1789        ctx: &mut ActorContext<Self>,
1790    ) -> Result<(), ActorError> {
1791        if let Err(e) =
1792            self.init_store("request_manager", None, false, ctx).await
1793        {
1794            error!(
1795                error = %e,
1796                subject_id = %self.subject_id,
1797                "Failed to initialize store"
1798            );
1799            return Err(e);
1800        }
1801
1802        if self.governance_id.is_none()
1803            && let Some(request) = &self.request
1804            && let EventRequest::Create(create) = request.content()
1805            && !create.schema_id.is_gov()
1806        {
1807            self.governance_id = Some(create.governance_id.clone());
1808        }
1809
1810        Ok(())
1811    }
1812}
1813
1814#[async_trait]
1815impl Handler<Self> for RequestManager {
1816    // The async state machine inlines all sub-futures and exceeds the default
1817    // threshold; a proper fix would require boxing every large sub-future.
1818    #[allow(clippy::large_stack_frames)]
1819    async fn handle_message(
1820        &mut self,
1821        _sender: ActorPath,
1822        msg: RequestManagerMessage,
1823        ctx: &mut ave_actors::ActorContext<Self>,
1824    ) -> Result<(), ActorError> {
1825        match msg {
1826            RequestManagerMessage::RebootUpdate {
1827                governance_id,
1828                request_id,
1829            } => {
1830                if request_id == self.id {
1831                    info!("Init reboot update {}", self.id);
1832                    debug!(
1833                        msg_type = "RebootUpdate",
1834                        request_id = %self.id,
1835                        governance_id = %governance_id,
1836                        "Initializing reboot update"
1837                    );
1838
1839                    if let Err(e) = self.init_update(ctx, &governance_id).await
1840                    {
1841                        error!(
1842                            msg_type = "RebootUpdate",
1843                            request_id = %self.id,
1844                            governance_id = %governance_id,
1845                            error = %e,
1846                            "Failed to initialize reboot update"
1847                        );
1848                        self.match_error(ctx, e).await;
1849                        return Ok(());
1850                    }
1851                }
1852            }
1853            RequestManagerMessage::RebootWait {
1854                governance_id,
1855                request_id,
1856            } => {
1857                if request_id == self.id {
1858                    info!("Init reboot wait {}", self.id);
1859                    debug!(
1860                        msg_type = "RebootWait",
1861                        request_id = %self.id,
1862                        governance_id = %governance_id,
1863                        "Initializing reboot wait"
1864                    );
1865
1866                    if let Err(e) = self.init_wait(ctx, &governance_id).await {
1867                        error!(
1868                            msg_type = "RebootWait",
1869                            request_id = %self.id,
1870                            governance_id = %governance_id,
1871                            error = %e,
1872                            "Failed to initialize reboot wait"
1873                        );
1874                        self.match_error(ctx, e).await;
1875                        return Ok(());
1876                    }
1877                }
1878            }
1879            RequestManagerMessage::Reboot {
1880                governance_id,
1881                request_id,
1882                reboot_type,
1883            } => {
1884                if request_id == self.id {
1885                    if matches!(self.state, RequestManagerState::Reboot) {
1886                        debug!(
1887                            msg_type = "Reboot",
1888                            request_id = %self.id,
1889                            governance_id = %governance_id,
1890                            reboot_type = ?reboot_type,
1891                            "Already in reboot state, ignoring"
1892                        );
1893                    } else {
1894                        debug!(
1895                            msg_type = "Reboot",
1896                            request_id = %self.id,
1897                            governance_id = %governance_id,
1898                            reboot_type = ?reboot_type,
1899                            "Initiating reboot"
1900                        );
1901                        if let Err(e) = self.stops_childs(ctx).await {
1902                            error!(
1903                                msg_type = "Reboot",
1904                                request_id = %self.id,
1905                                governance_id = %governance_id,
1906                                error = %e,
1907                                "Failed to stop childs"
1908                            );
1909                            self.match_error(ctx, e).await;
1910                            return Ok(());
1911                        };
1912                        if let Err(e) = self
1913                            .reboot(ctx, reboot_type, governance_id.clone())
1914                            .await
1915                        {
1916                            error!(
1917                                msg_type = "Reboot",
1918                                request_id = %self.id,
1919                                governance_id = %governance_id,
1920                                error = %e,
1921                                "Failed to initiate reboot"
1922                            );
1923                            self.match_error(ctx, e).await;
1924                            return Ok(());
1925                        }
1926                    }
1927                }
1928            }
1929            RequestManagerMessage::FinishReboot { request_id } => {
1930                if request_id == self.id {
1931                    info!("Init reboot finish {}", self.id);
1932                    debug!(
1933                        msg_type = "FinishReboot",
1934                        request_id = %self.id,
1935                        version = self.version,
1936                        "Reboot completed, resuming request"
1937                    );
1938                    self.on_event(
1939                        RequestManagerEvent::UpdateVersion {
1940                            version: self.version + 1,
1941                        },
1942                        ctx,
1943                    )
1944                    .await;
1945
1946                    if let Err(e) = send_to_tracking(
1947                        ctx,
1948                        RequestTrackingMessage::UpdateVersion {
1949                            request_id: self.id.clone(),
1950                            version: self.version,
1951                        },
1952                    )
1953                    .await
1954                    {
1955                        error!(
1956                            msg_type = "FinishReboot",
1957                            request_id = %self.id,
1958                            version = self.version,
1959                            error = %e,
1960                            "Failed to send version update to tracking"
1961                        );
1962                        return Err(emit_fail(ctx, e).await);
1963                    }
1964
1965                    if let Err(e) = self.check_signature(ctx).await {
1966                        error!(
1967                            msg_type = "FinishReboot",
1968                            request_id = %self.id,
1969                            error = %e,
1970                            "Failed to check signatures after reboot"
1971                        );
1972                        self.match_error(ctx, e).await;
1973                        return Ok(());
1974                    }
1975
1976                    if let Err(e) = self.match_command(ctx).await {
1977                        error!(
1978                            msg_type = "FinishReboot",
1979                            request_id = %self.id,
1980                            error = %e,
1981                            "Failed to execute command after reboot"
1982                        );
1983                        self.match_error(ctx, e).await;
1984                        return Ok(());
1985                    }
1986                }
1987            }
1988            RequestManagerMessage::Abort {
1989                request_id,
1990                who,
1991                reason,
1992                sn,
1993            } => {
1994                if request_id == self.id {
1995                    warn!(
1996                        msg_type = "Abort",
1997                        state = %self.state,
1998                        request_id = %self.id,
1999                        who = %who,
2000                        reason = %reason,
2001                        sn = sn,
2002                        "Request abort received"
2003                    );
2004                    if let Err(e) =
2005                        self.abort_request(ctx, reason, Some(sn), who).await
2006                    {
2007                        error!(
2008                            msg_type = "Abort",
2009                            request_id = %self.id,
2010                            error = %e,
2011                            "Failed to abort request"
2012                        );
2013                        self.match_error(ctx, e).await;
2014                        return Ok(());
2015                    }
2016                }
2017            }
2018            RequestManagerMessage::ManualAbort => {
2019                match &self.state {
2020                    RequestManagerState::Reboot
2021                    | RequestManagerState::Starting
2022                    | RequestManagerState::Evaluation
2023                    | RequestManagerState::Approval { .. }
2024                    | RequestManagerState::Validation { .. } => {
2025                        if let Err(e) = self
2026                            .abort_request(
2027                                ctx,
2028                                "The user manually aborted the request"
2029                                    .to_owned(),
2030                                None,
2031                                (*self.our_key).clone(),
2032                            )
2033                            .await
2034                        {
2035                            error!(
2036                                msg_type = "Abort",
2037                                request_id = %self.id,
2038                                error = %e,
2039                                "Failed to abort request"
2040                            );
2041                            self.match_error(ctx, e).await;
2042                        }
2043                    }
2044                    _ => {
2045                        info!(
2046                            "The request is in a state that cannot be aborted {}, state: {}",
2047                            self.id, self.state
2048                        );
2049                    }
2050                }
2051
2052                return Ok(());
2053            }
2054            RequestManagerMessage::PurgeStorage => {
2055                purge_storage(ctx).await?;
2056
2057                debug!(
2058                    msg_type = "PurgeStorage",
2059                    subject_id = %self.subject_id,
2060                    "Purged request manager storage"
2061                );
2062
2063                return Ok(());
2064            }
2065            RequestManagerMessage::FirstRun {
2066                command,
2067                request,
2068                request_id,
2069            } => {
2070                self.id = request_id.clone();
2071                self.begin_request_metrics();
2072                debug!(
2073                    msg_type = "FirstRun",
2074                    request_id = %request_id,
2075                    command = ?command,
2076                    "First run of request manager"
2077                );
2078                self.on_event(
2079                    RequestManagerEvent::SafeState { command, request },
2080                    ctx,
2081                )
2082                .await;
2083
2084                if let Err(e) = self.match_command(ctx).await {
2085                    error!(
2086                        msg_type = "FirstRun",
2087                        request_id = %self.id,
2088                        error = %e,
2089                        "Failed to execute initial command"
2090                    );
2091                    self.match_error(ctx, e).await;
2092                    return Ok(());
2093                };
2094            }
2095            RequestManagerMessage::Run { request_id } => {
2096                self.id = request_id;
2097                self.ensure_request_metrics_started();
2098
2099                debug!(
2100                    msg_type = "Run",
2101                    request_id = %self.id,
2102                    state = ?self.state,
2103                    version = self.version,
2104                    "Running request manager"
2105                );
2106                match self.state.clone() {
2107                    RequestManagerState::Starting
2108                    | RequestManagerState::Reboot => {
2109                        if let Err(e) = self.match_command(ctx).await {
2110                            error!(
2111                                msg_type = "Run",
2112                                request_id = %self.id,
2113                                state = "Starting/Reboot",
2114                                error = %e,
2115                                "Failed to execute command"
2116                            );
2117                            self.match_error(ctx, e).await;
2118                        return Ok(())
2119                        };
2120                    }
2121                    RequestManagerState::Evaluation => {
2122                        if let Err(e) = self.build_evaluation(ctx).await {
2123                            error!(
2124                                msg_type = "Run",
2125                                request_id = %self.id,
2126                                state = "Evaluation",
2127                                error = %e,
2128                                "Failed to build evaluation"
2129                            );
2130                            self.match_error(ctx, e).await;
2131                        return Ok(())
2132                        }
2133                    }
2134
2135                    RequestManagerState::Approval {
2136                        eval_req,
2137                        eval_res,
2138                    } => {
2139                        if let Err(e) = self
2140                                .build_approval(ctx, eval_req, eval_res.evaluator_res().expect("If the status is approval, it means that the evaluator's response is valid"))
2141                                .await
2142                            {
2143                                error!(
2144                                    msg_type = "Run",
2145                                    request_id = %self.id,
2146                                    state = "Approval",
2147                                    error = %e,
2148                                    "Failed to build approval"
2149                                );
2150                                self.match_error(ctx, e).await;
2151                        return Ok(())
2152                            }
2153                    }
2154                    RequestManagerState::Validation {
2155                        request,
2156                        quorum,
2157                        init_state,
2158                        current_request_roles,
2159                        signers,
2160                    } => {
2161                        if let Err(e) = self
2162                            .run_validation(
2163                                ctx,
2164                                *request,
2165                                quorum,
2166                                signers,
2167                                init_state,
2168                                current_request_roles,
2169                            )
2170                            .await
2171                        {
2172                            error!(
2173                                msg_type = "Run",
2174                                request_id = %self.id,
2175                                state = "Validation",
2176                                error = %e,
2177                                "Failed to run validation"
2178                            );
2179                            self.match_error(ctx, e).await;
2180                        return Ok(())
2181                        };
2182                    }
2183                    RequestManagerState::UpdateSubject { ledger } => {
2184                        if let Err(e) =
2185                            self.update_subject(ctx, ledger.clone()).await
2186                        {
2187                            error!(
2188                                msg_type = "Run",
2189                                request_id = %self.id,
2190                                state = "UpdateSubject",
2191                                error = %e,
2192                                "Failed to update subject"
2193                            );
2194                            self.match_error(ctx, e).await;
2195                        return Ok(())
2196                        };
2197
2198                        match self.build_distribution(ctx, ledger).await {
2199                            Ok(in_distribution) => {
2200                                if !in_distribution
2201                                    && let Err(e) =
2202                                        self.finish_request(ctx).await
2203                                    {
2204                                        error!(
2205                                            msg_type = "Run",
2206                                            request_id = %self.id,
2207                                            state = "UpdateSubject",
2208                                            error = %e,
2209                                            "Failed to finish request after build distribution"
2210                                        );
2211                                        self.match_error(ctx, e).await;
2212                        return Ok(())
2213                                }
2214                            }
2215                            Err(e) => {
2216                                error!(
2217                                    msg_type = "Run",
2218                                    request_id = %self.id,
2219                                    state = "UpdateSubject",
2220                                    error = %e,
2221                                    "Failed to build distribution"
2222                                );
2223                                self.match_error(ctx, e).await;
2224                        return Ok(())
2225                            }
2226                        };
2227                    }
2228                    RequestManagerState::Distribution { ledger } => {
2229                        match self.build_distribution(ctx, ledger).await {
2230                            Ok(in_distribution) => {
2231                                if !in_distribution
2232                                    && let Err(e) =
2233                                        self.finish_request(ctx).await
2234                                    {
2235                                        error!(
2236                                            msg_type = "Run",
2237                                            request_id = %self.id,
2238                                            state = "Distribution",
2239                                            error = %e,
2240                                            "Failed to finish request after build distribution"
2241                                        );
2242                                        self.match_error(ctx, e).await;
2243                        return Ok(())
2244                                    }
2245                            }
2246                            Err(e) => {
2247                                error!(
2248                                    msg_type = "Run",
2249                                    request_id = %self.id,
2250                                    state = "Distribution",
2251                                    error = %e,
2252                                    "Failed to build distribution"
2253                                );
2254                                self.match_error(ctx, e).await;
2255                        return Ok(())
2256                            }
2257                        };
2258                    }
2259                    RequestManagerState::End => {
2260                        if let Err(e) = self.end_request(ctx).await {
2261                            error!(
2262                                msg_type = "Run",
2263                                request_id = %self.id,
2264                                state = "End",
2265                                error = %e,
2266                                "Failed to end request"
2267                            );
2268                            self.match_error(ctx, e).await;
2269                            return Ok(())
2270                        }
2271                    }
2272                };
2273            }
2274            RequestManagerMessage::EvaluationRes {
2275                eval_req,
2276                eval_res,
2277                request_id,
2278            } => {
2279                if request_id == self.id {
2280                    debug!(
2281                        msg_type = "EvaluationRes",
2282                        request_id = %self.id,
2283                        version = self.version,
2284                        "Evaluation result received"
2285                    );
2286                    if let Err(e) = self.stops_childs(ctx).await {
2287                        error!(
2288                            msg_type = "EvaluationRes",
2289                            request_id = %self.id,
2290                            error = %e,
2291                            "Failed to stop childs"
2292                        );
2293                        self.match_error(ctx, e).await;
2294                        return Ok(());
2295                    };
2296
2297                    if let Some(evaluator_res) = eval_res.evaluator_res()
2298                        && evaluator_res.appr_required
2299                    {
2300                        debug!(
2301                            msg_type = "EvaluationRes",
2302                            request_id = %self.id,
2303                            "Approval required, proceeding to approval phase"
2304                        );
2305                        self.on_event(
2306                            RequestManagerEvent::UpdateState {
2307                                state: Box::new(
2308                                    RequestManagerState::Approval {
2309                                        eval_req: *eval_req.clone(),
2310                                        eval_res: eval_res.clone(),
2311                                    },
2312                                ),
2313                            },
2314                            ctx,
2315                        )
2316                        .await;
2317
2318                        if let Err(e) = self
2319                            .build_approval(ctx, *eval_req, evaluator_res)
2320                            .await
2321                        {
2322                            error!(
2323                                msg_type = "EvaluationRes",
2324                                request_id = %self.id,
2325                                error = %e,
2326                                "Failed to build approval"
2327                            );
2328                            self.match_error(ctx, e).await;
2329                            return Ok(());
2330                        }
2331                    } else {
2332                        debug!(
2333                            msg_type = "EvaluationRes",
2334                            request_id = %self.id,
2335                            "Approval not required, proceeding to validation phase"
2336                        );
2337                        let (
2338                            request,
2339                            quorum,
2340                            signers,
2341                            init_state,
2342                            current_request_roles,
2343                        ) = match self
2344                            .build_validation_req(
2345                                ctx,
2346                                Some((*eval_req, eval_res)),
2347                                None,
2348                            )
2349                            .await
2350                        {
2351                            Ok(data) => data,
2352                            Err(e) => {
2353                                error!(
2354                                    msg_type = "EvaluationRes",
2355                                    request_id = %self.id,
2356                                    error = %e,
2357                                    "Failed to build validation request"
2358                                );
2359                                self.match_error(ctx, e).await;
2360                                return Ok(());
2361                            }
2362                        };
2363
2364                        if let Err(e) = self
2365                            .run_validation(
2366                                ctx,
2367                                request,
2368                                quorum,
2369                                signers,
2370                                init_state,
2371                                current_request_roles,
2372                            )
2373                            .await
2374                        {
2375                            error!(
2376                                msg_type = "EvaluationRes",
2377                                request_id = %self.id,
2378                                error = %e,
2379                                "Failed to run validation"
2380                            );
2381                            self.match_error(ctx, e).await;
2382                            return Ok(());
2383                        };
2384                    }
2385                }
2386            }
2387            RequestManagerMessage::ApprovalRes {
2388                appro_res,
2389                request_id,
2390            } => {
2391                if request_id == self.id {
2392                    let _ = make_obsolete(ctx, &self.subject_id).await;
2393                    debug!(
2394                        msg_type = "ApprovalRes",
2395                        request_id = %self.id,
2396                        version = self.version,
2397                        "Approval result received"
2398                    );
2399                    if let Err(e) = self.stops_childs(ctx).await {
2400                        error!(
2401                            msg_type = "ApprovalRes",
2402                            request_id = %self.id,
2403                            error = %e,
2404                            "Failed to stop childs"
2405                        );
2406                        self.match_error(ctx, e).await;
2407                        return Ok(());
2408                    };
2409
2410                    let RequestManagerState::Approval { eval_req, eval_res } =
2411                        self.state.clone()
2412                    else {
2413                        error!(
2414                            msg_type = "ApprovalRes",
2415                            request_id = %self.id,
2416                            state = ?self.state,
2417                            "Invalid state for approval response"
2418                        );
2419                        let e = ActorError::FunctionalCritical {
2420                            description: "Invalid request state".to_owned(),
2421                        };
2422                        return Err(emit_fail(ctx, e).await);
2423                    };
2424                    let (
2425                        request,
2426                        quorum,
2427                        signers,
2428                        init_state,
2429                        current_request_roles,
2430                    ) = match self
2431                        .build_validation_req(
2432                            ctx,
2433                            Some((eval_req, eval_res)),
2434                            Some(appro_res),
2435                        )
2436                        .await
2437                    {
2438                        Ok(data) => data,
2439                        Err(e) => {
2440                            error!(
2441                                msg_type = "ApprovalRes",
2442                                request_id = %self.id,
2443                                error = %e,
2444                                "Failed to build validation request"
2445                            );
2446                            self.match_error(ctx, e).await;
2447                            return Ok(());
2448                        }
2449                    };
2450
2451                    if let Err(e) = self
2452                        .run_validation(
2453                            ctx,
2454                            request,
2455                            quorum,
2456                            signers,
2457                            init_state,
2458                            current_request_roles,
2459                        )
2460                        .await
2461                    {
2462                        error!(
2463                            msg_type = "ApprovalRes",
2464                            request_id = %self.id,
2465                            error = %e,
2466                            "Failed to run validation"
2467                        );
2468                        self.match_error(ctx, e).await;
2469                        return Ok(());
2470                    };
2471                }
2472            }
2473            RequestManagerMessage::ValidationRes {
2474                val_res,
2475                val_req,
2476                request_id,
2477            } => {
2478                if request_id == self.id {
2479                    debug!(
2480                        msg_type = "ValidationRes",
2481                        request_id = %self.id,
2482                        version = self.version,
2483                        "Validation result received"
2484                    );
2485                    if let Err(e) = self.stops_childs(ctx).await {
2486                        error!(
2487                                msg_type = "ValidationRes",
2488                                request_id = %self.id,
2489                                error = %e,
2490                                "Failed to stop childs"
2491                        );
2492                        self.match_error(ctx, e).await;
2493                        return Ok(());
2494                    };
2495
2496                    let signed_ledger =
2497                        match self.build_ledger(ctx, *val_req, val_res).await {
2498                            Ok(signed_ledger) => signed_ledger,
2499                            Err(e) => {
2500                                error!(
2501                                    msg_type = "ValidationRes",
2502                                    request_id = %self.id,
2503                                    error = %e,
2504                                    "Failed to build ledger"
2505                                );
2506                                self.match_error(ctx, e).await;
2507                                return Ok(());
2508                            }
2509                        };
2510
2511                    if let Err(e) =
2512                        self.update_subject(ctx, signed_ledger.clone()).await
2513                    {
2514                        error!(
2515                            msg_type = "ValidationRes",
2516                            request_id = %self.id,
2517                            error = %e,
2518                            "Failed to update subject"
2519                        );
2520                        self.match_error(ctx, e).await;
2521                        return Ok(());
2522                    };
2523
2524                    match self.build_distribution(ctx, signed_ledger).await {
2525                        Ok(in_distribution) => {
2526                            if !in_distribution
2527                                && let Err(e) = self.finish_request(ctx).await
2528                            {
2529                                error!(
2530                                    msg_type = "ValidationRes",
2531                                    request_id = %self.id,
2532                                    error = %e,
2533                                    "Failed to finish request after build distribution"
2534                                );
2535                                self.match_error(ctx, e).await;
2536                                return Ok(());
2537                            }
2538                        }
2539                        Err(e) => {
2540                            error!(
2541                                msg_type = "ValidationRes",
2542                                request_id = %self.id,
2543                                error = %e,
2544                                "Failed to build distribution"
2545                            );
2546                            self.match_error(ctx, e).await;
2547                            return Ok(());
2548                        }
2549                    };
2550                }
2551            }
2552            RequestManagerMessage::FinishRequest { request_id } => {
2553                if request_id == self.id {
2554                    debug!(
2555                        msg_type = "FinishRequest",
2556                        request_id = %self.id,
2557                        version = self.version,
2558                        "Finishing request"
2559                    );
2560
2561                    if let Err(e) = self.stops_childs(ctx).await {
2562                        error!(
2563                            msg_type = "FinishRequest",
2564                            request_id = %self.id,
2565                            error = %e,
2566                            "Failed to stop childs"
2567                        );
2568                        self.match_error(ctx, e).await;
2569                        return Ok(());
2570                    };
2571
2572                    if let Err(e) = self.finish_request(ctx).await {
2573                        error!(
2574                            msg_type = "FinishRequest",
2575                            request_id = %self.id,
2576                            error = %e,
2577                            "Failed to finish request"
2578                        );
2579                        self.match_error(ctx, e).await;
2580                        return Ok(());
2581                    }
2582                }
2583            }
2584        }
2585
2586        Ok(())
2587    }
2588
2589    async fn on_event(
2590        &mut self,
2591        event: RequestManagerEvent,
2592        ctx: &mut ActorContext<Self>,
2593    ) {
2594        let event_type = match &event {
2595            RequestManagerEvent::Finish => "Finish",
2596            RequestManagerEvent::UpdateState { .. } => "UpdateState",
2597            RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2598            RequestManagerEvent::SafeState { .. } => "SafeState",
2599        };
2600
2601        if let Err(e) = self.persist(&event, ctx).await {
2602            error!(
2603                event_type = event_type,
2604                request_id = %self.id,
2605                error = %e,
2606                "Failed to persist event"
2607            );
2608            emit_fail(ctx, e).await;
2609        };
2610    }
2611
2612    async fn on_child_fault(
2613        &mut self,
2614        error: ActorError,
2615        ctx: &mut ActorContext<Self>,
2616    ) -> ChildAction {
2617        error!(
2618            request_id = %self.id,
2619            version = self.version,
2620            state = ?self.state,
2621            error = %error,
2622            "Child fault in request manager"
2623        );
2624        emit_fail(ctx, error).await;
2625        ChildAction::Stop
2626    }
2627}
2628
2629#[async_trait]
2630impl PersistentActor for RequestManager {
2631    type Persistence = LightPersistence;
2632    type InitParams = InitRequestManager;
2633
2634    fn update(&mut self, state: Self) {
2635        self.command = state.command;
2636        self.request = state.request;
2637        self.state = state.state;
2638        self.version = state.version;
2639    }
2640
2641    fn create_initial(params: Self::InitParams) -> Self {
2642        Self {
2643            retry_diff: 0,
2644            retry_timeout: 0,
2645            request_started_at: None,
2646            current_phase: None,
2647            current_phase_started_at: None,
2648            our_key: params.our_key,
2649            id: DigestIdentifier::default(),
2650            subject_id: params.subject_id,
2651            governance_id: params.governance_id,
2652            command: ReqManInitMessage::Evaluate,
2653            request: None,
2654            state: RequestManagerState::Starting,
2655            version: 0,
2656            helpers: Some(params.helpers),
2657        }
2658    }
2659
2660    /// Change node state.
2661    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2662        match event {
2663            RequestManagerEvent::Finish => {
2664                debug!(
2665                    event_type = "Finish",
2666                    request_id = %self.id,
2667                    "Applying finish event"
2668                );
2669                self.state = RequestManagerState::End;
2670                self.request = None;
2671                self.id = DigestIdentifier::default();
2672            }
2673            RequestManagerEvent::UpdateState { state } => {
2674                debug!(
2675                    event_type = "UpdateState",
2676                    request_id = %self.id,
2677                    old_state = ?self.state,
2678                    new_state = ?state,
2679                    "Applying state update"
2680                );
2681                self.state = *state.clone()
2682            }
2683            RequestManagerEvent::UpdateVersion { version } => {
2684                debug!(
2685                    event_type = "UpdateVersion",
2686                    request_id = %self.id,
2687                    old_version = self.version,
2688                    new_version = version,
2689                    "Applying version update"
2690                );
2691                self.state = RequestManagerState::Starting;
2692                self.version = *version
2693            }
2694            RequestManagerEvent::SafeState { command, request } => {
2695                debug!(
2696                    event_type = "SafeState",
2697                    request_id = %self.id,
2698                    command = ?command,
2699                    "Applying safe state"
2700                );
2701                self.version = 0;
2702                self.retry_diff = 0;
2703                self.retry_timeout = 0;
2704                self.state = RequestManagerState::Starting;
2705                self.request = Some(request.clone());
2706                self.command = command.clone();
2707            }
2708        };
2709
2710        Ok(())
2711    }
2712}
2713
2714#[async_trait]
2715impl Storable for RequestManager {}