Skip to main content

ave_core/request/
manager.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9    DigestIdentifier, HashAlgorithm, PublicKey, Signed,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use ave_network::ComunicateInfo;
15use borsh::{BorshDeserialize, BorshSerialize};
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24    Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::{EvalWorkerContext, EvaluateData};
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30    HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::distribution_plan::build_tracker_event_distribution_plan;
36use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
37use crate::model::common::subject::{
38    acquire_subject, create_subject, get_gov, get_gov_sn,
39    get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
40};
41use crate::model::common::{purge_storage, send_to_tracking};
42use crate::model::event::{
43    ApprovalData, EvaluationData, Ledger, LedgerSeal, Protocols, ValidationData,
44};
45use crate::node::SubjectData;
46use crate::request::error::RequestManagerError;
47use crate::request::tracking::RequestTrackingMessage;
48use crate::request::{RequestHandler, RequestHandlerMessage};
49use crate::subject::Metadata;
50use crate::system::ConfigHelper;
51
52use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
53use crate::validation::worker::CurrentRequestRoles;
54use crate::{
55    ActorMessage, NetworkMessage, Validation, ValidationMessage,
56    approval::{Approval, ApprovalMessage},
57    auth::{Auth, AuthMessage, AuthResponse},
58    db::Storable,
59    evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
60    model::common::emit_fail,
61    update::{Update, UpdateMessage, UpdateNew, UpdateType},
62};
63
64use super::{
65    reboot::{Reboot, RebootMessage},
66    types::{
67        DistributionPlanEntry, DistributionPlanMode, ReqManInitMessage,
68        RequestManagerState,
69    },
70};
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct RequestManager {
74    #[serde(skip)]
75    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
76    #[serde(skip)]
77    our_key: Arc<PublicKey>,
78    #[serde(skip)]
79    id: DigestIdentifier,
80    #[serde(skip)]
81    subject_id: DigestIdentifier,
82    #[serde(skip)]
83    governance_id: Option<DigestIdentifier>,
84    #[serde(skip)]
85    retry_timeout: u64,
86    #[serde(skip)]
87    retry_diff: u64,
88    #[serde(skip)]
89    request_started_at: Option<Instant>,
90    #[serde(skip)]
91    current_phase: Option<&'static str>,
92    #[serde(skip)]
93    current_phase_started_at: Option<Instant>,
94    command: ReqManInitMessage,
95    request: Option<Signed<EventRequest>>,
96    state: RequestManagerState,
97    version: u64,
98}
99
100#[derive(Debug, Clone)]
101pub enum RebootType {
102    Normal,
103    Diff,
104    TimeOut,
105}
106
107pub struct InitRequestManager {
108    pub our_key: Arc<PublicKey>,
109    pub subject_id: DigestIdentifier,
110    pub governance_id: Option<DigestIdentifier>,
111    pub helpers: (HashAlgorithm, Arc<NetworkSender>),
112}
113
114impl BorshSerialize for RequestManager {
115    fn serialize<W: std::io::Write>(
116        &self,
117        writer: &mut W,
118    ) -> std::io::Result<()> {
119        BorshSerialize::serialize(&self.command, writer)?;
120        BorshSerialize::serialize(&self.state, writer)?;
121        BorshSerialize::serialize(&self.version, writer)?;
122        BorshSerialize::serialize(&self.request, writer)?;
123
124        Ok(())
125    }
126}
127
128impl BorshDeserialize for RequestManager {
129    fn deserialize_reader<R: std::io::Read>(
130        reader: &mut R,
131    ) -> std::io::Result<Self> {
132        // Deserialize the persisted fields
133        let command = ReqManInitMessage::deserialize_reader(reader)?;
134        let state = RequestManagerState::deserialize_reader(reader)?;
135        let version = u64::deserialize_reader(reader)?;
136        let request =
137            Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
138
139        let our_key = Arc::new(PublicKey::default());
140        let subject_id = DigestIdentifier::default();
141        let id = DigestIdentifier::default();
142
143        Ok(Self {
144            retry_diff: 0,
145            retry_timeout: 0,
146            request_started_at: None,
147            current_phase: None,
148            current_phase_started_at: None,
149            helpers: None,
150            our_key,
151            id,
152            subject_id,
153            governance_id: None,
154            command,
155            request,
156            state,
157            version,
158        })
159    }
160}
161
162impl RequestManager {
163    fn retry_seconds_for_attempt(schedule: &[u64], attempt: u64) -> u64 {
164        let default = schedule.last().copied().unwrap_or(1).max(1);
165        let idx = attempt.saturating_sub(1) as usize;
166        schedule.get(idx).copied().unwrap_or(default).max(1)
167    }
168
169    fn begin_request_metrics(&mut self) {
170        self.request_started_at = Some(Instant::now());
171        self.current_phase = None;
172        self.current_phase_started_at = None;
173
174        if let Some(metrics) = try_core_metrics() {
175            metrics.observe_request_started();
176        }
177    }
178
179    fn ensure_request_metrics_started(&mut self) {
180        if self.request_started_at.is_none() {
181            self.request_started_at = Some(Instant::now());
182        }
183    }
184
185    fn start_phase_metrics(&mut self, phase: &'static str) {
186        self.ensure_request_metrics_started();
187        self.finish_phase_metrics();
188        self.current_phase = Some(phase);
189        self.current_phase_started_at = Some(Instant::now());
190    }
191
192    fn finish_phase_metrics(&mut self) {
193        let Some(phase) = self.current_phase.take() else {
194            self.current_phase_started_at = None;
195            return;
196        };
197        let Some(started_at) = self.current_phase_started_at.take() else {
198            return;
199        };
200
201        if let Some(metrics) = try_core_metrics() {
202            metrics.observe_request_phase(phase, started_at.elapsed());
203        }
204    }
205
206    fn finish_request_metrics(&mut self, result: &'static str) {
207        self.finish_phase_metrics();
208
209        if let Some(started_at) = self.request_started_at.take()
210            && let Some(metrics) = try_core_metrics()
211        {
212            metrics.observe_request_terminal(result, started_at.elapsed());
213        }
214    }
215
216    //////// EVAL
217    ////////////////////////////////////////////////
218    //Revisado
219    fn tracker_evaluation_context(
220        governance_data: &GovernanceData,
221        schema_id: &SchemaType,
222        namespace: Namespace,
223        request_type: &EventRequestType,
224    ) -> Result<EvalWorkerContext, RequestManagerError> {
225        match request_type {
226            EventRequestType::Fact => {
227                let (issuers, issuer_any) = governance_data.get_signers(
228                    RoleTypes::Issuer,
229                    schema_id,
230                    namespace,
231                );
232                let schema_viewpoints = governance_data
233                    .schemas
234                    .get(schema_id)
235                    .ok_or_else(|| {
236                        crate::governance::error::GovernanceError::SchemaDoesNotExist {
237                            schema_id: schema_id.to_string(),
238                        }
239                    })?
240                    .viewpoints
241                    .clone();
242
243                Ok(EvalWorkerContext::TrackerFact {
244                    issuers: issuers.into_iter().collect(),
245                    issuer_any,
246                    schema_viewpoints,
247                })
248            }
249            EventRequestType::Transfer => {
250                let members = governance_data
251                    .members
252                    .values()
253                    .cloned()
254                    .collect::<BTreeSet<PublicKey>>();
255                let (creator_signers, _) = governance_data.get_signers(
256                    RoleTypes::Creator,
257                    schema_id,
258                    namespace.clone(),
259                );
260                let creators = creator_signers
261                    .into_iter()
262                    .map(|creator| {
263                        (creator, BTreeSet::from([namespace.clone()]))
264                    })
265                    .collect::<BTreeMap<PublicKey, BTreeSet<Namespace>>>();
266
267                Ok(EvalWorkerContext::TrackerTransfer { members, creators })
268            }
269            _ => Ok(EvalWorkerContext::Empty),
270        }
271    }
272
273    async fn build_evaluation(
274        &mut self,
275        ctx: &mut ActorContext<Self>,
276    ) -> Result<(), RequestManagerError> {
277        let Some(request) = self.request.clone() else {
278            return Err(RequestManagerError::RequestNotSet);
279        };
280
281        self.on_event(
282            RequestManagerEvent::UpdateState {
283                state: Box::new(RequestManagerState::Evaluation),
284            },
285            ctx,
286        )
287        .await;
288
289        let metadata = self.check_data_eval(ctx, &request).await?;
290
291        let (
292            signed_evaluation_req,
293            quorum,
294            signers,
295            init_state,
296            tracker_context,
297        ) = self.build_request_eval(ctx, &metadata, &request).await?;
298
299        if signers.is_empty() {
300            warn!(
301                request_id = %self.id,
302                schema_id = %metadata.schema_id,
303                "No evaluators available for schema"
304            );
305
306            return Err(RequestManagerError::NoEvaluatorsAvailable {
307                schema_id: metadata.schema_id.to_string(),
308                governance_id: signed_evaluation_req
309                    .content()
310                    .governance_id
311                    .clone(),
312            });
313        }
314
315        self.run_evaluation(
316            ctx,
317            signed_evaluation_req.clone(),
318            quorum,
319            init_state,
320            tracker_context,
321            signers,
322        )
323        .await
324    }
325
326    // revisado
327    const fn needs_subject_manager(&self) -> bool {
328        self.governance_id.is_some()
329    }
330
331    fn requester_id(&self) -> String {
332        self.id.to_string()
333    }
334
335    fn build_distribution_plan(
336        &self,
337        validation_req: &ValidationReq,
338        governance_data: Option<&GovernanceData>,
339    ) -> Result<Vec<DistributionPlanEntry>, RequestManagerError> {
340        let mut plan: HashMap<PublicKey, DistributionPlanMode> = HashMap::new();
341
342        match validation_req {
343            ValidationReq::Create { event_request, .. } => {
344                let EventRequest::Create(create) = event_request.content()
345                else {
346                    return Err(
347                        RequestManagerError::InvalidEventRequestForEvaluation,
348                    );
349                };
350
351                if create.schema_id.is_gov() {
352                    return Ok(Vec::new());
353                }
354
355                let Some(governance_data) = governance_data else {
356                    return Err(RequestManagerError::ActorError(
357                        ActorError::FunctionalCritical {
358                            description:
359                                "Missing governance data for distribution plan"
360                                    .to_owned(),
361                        },
362                    ));
363                };
364                let witnesses =
365                    governance_data.get_witnesses(WitnessesData::Schema {
366                        creator: event_request.signature().signer.clone(),
367                        schema_id: create.schema_id.clone(),
368                        namespace: create.namespace.clone(),
369                    })?;
370
371                for witness in witnesses {
372                    plan.insert(witness, DistributionPlanMode::Clear);
373                }
374            }
375            ValidationReq::Event {
376                actual_protocols,
377                event_request,
378                metadata,
379                ..
380            } => {
381                if metadata.schema_id.is_gov() {
382                    return Ok(Vec::new());
383                }
384
385                let Some(governance_data) = governance_data else {
386                    return Err(RequestManagerError::ActorError(
387                        ActorError::FunctionalCritical {
388                            description:
389                                "Missing governance data for distribution plan"
390                                    .to_owned(),
391                        },
392                    ));
393                };
394                let protocols_success = actual_protocols.is_success();
395
396                return build_tracker_event_distribution_plan(
397                    governance_data,
398                    event_request.content(),
399                    metadata,
400                    protocols_success,
401                )
402                .map_err(|description| {
403                    RequestManagerError::ActorError(
404                        ActorError::FunctionalCritical { description },
405                    )
406                });
407            }
408        }
409
410        Ok(plan
411            .into_iter()
412            .map(|(node, mode)| DistributionPlanEntry { node, mode })
413            .collect())
414    }
415
416    async fn check_data_eval(
417        &self,
418        ctx: &mut ActorContext<Self>,
419        request: &Signed<EventRequest>,
420    ) -> Result<Metadata, RequestManagerError> {
421        let (subject_id, confirm) = match request.content().clone() {
422            EventRequest::Fact(event) => (event.subject_id, false),
423            EventRequest::Transfer(event) => (event.subject_id, false),
424            EventRequest::Confirm(event) => (event.subject_id, true),
425            _ => {
426                return Err(
427                    RequestManagerError::InvalidEventRequestForEvaluation,
428                );
429            }
430        };
431
432        let lease = acquire_subject(
433            ctx,
434            &self.subject_id,
435            self.requester_id(),
436            None,
437            self.needs_subject_manager(),
438        )
439        .await?;
440        let metadata = get_metadata(ctx, &subject_id).await;
441        lease.finish(ctx).await?;
442        let metadata = metadata?;
443
444        if confirm && !metadata.schema_id.is_gov() {
445            return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
446        }
447
448        Ok(metadata)
449    }
450
451    async fn get_governance_data(
452        &self,
453        ctx: &mut ActorContext<Self>,
454    ) -> Result<GovernanceData, RequestManagerError> {
455        let governance_id =
456            self.governance_id.as_ref().unwrap_or(&self.subject_id);
457        Ok(get_gov(ctx, governance_id).await?)
458    }
459
460    async fn build_request_eval(
461        &self,
462        ctx: &mut ActorContext<Self>,
463        metadata: &Metadata,
464        request: &Signed<EventRequest>,
465    ) -> Result<
466        (
467            Signed<EvaluationReq>,
468            Quorum,
469            HashSet<PublicKey>,
470            Option<ValueWrapper>,
471            EvalWorkerContext,
472        ),
473        RequestManagerError,
474    > {
475        let is_gov = metadata.schema_id.is_gov();
476
477        let request_type = EventRequestType::from(request.content());
478        let (evaluate_data, governance_data, init_state, tracker_context) =
479            match (is_gov, request_type.clone()) {
480                (true, EventRequestType::Fact) => {
481                    let state =
482                        GovernanceData::try_from(metadata.properties.clone())?;
483
484                    (
485                        EvaluateData::GovFact {
486                            state: state.clone(),
487                        },
488                        state,
489                        None,
490                        EvalWorkerContext::default(),
491                    )
492                }
493                (true, EventRequestType::Transfer) => {
494                    let state =
495                        GovernanceData::try_from(metadata.properties.clone())?;
496
497                    (
498                        EvaluateData::GovTransfer {
499                            state: state.clone(),
500                        },
501                        state,
502                        None,
503                        EvalWorkerContext::default(),
504                    )
505                }
506                (true, EventRequestType::Confirm) => {
507                    let state =
508                        GovernanceData::try_from(metadata.properties.clone())?;
509
510                    (
511                        EvaluateData::GovConfirm {
512                            state: state.clone(),
513                        },
514                        state,
515                        None,
516                        EvalWorkerContext::default(),
517                    )
518                }
519                (false, EventRequestType::Fact) => {
520                    let governance_data =
521                        get_gov(ctx, &metadata.governance_id).await?;
522
523                    let init_state =
524                        governance_data.get_init_state(&metadata.schema_id)?;
525                    let tracker_context = Self::tracker_evaluation_context(
526                        &governance_data,
527                        &metadata.schema_id,
528                        metadata.namespace.clone(),
529                        &request_type,
530                    )?;
531
532                    (
533                        EvaluateData::TrackerSchemasFact {
534                            state: metadata.properties.clone(),
535                        },
536                        governance_data,
537                        Some(init_state),
538                        tracker_context,
539                    )
540                }
541                (false, EventRequestType::Transfer) => {
542                    let governance_data =
543                        get_gov(ctx, &metadata.governance_id).await?;
544                    let tracker_context = Self::tracker_evaluation_context(
545                        &governance_data,
546                        &metadata.schema_id,
547                        metadata.namespace.clone(),
548                        &request_type,
549                    )?;
550                    (
551                        EvaluateData::TrackerSchemasTransfer {
552                            state: metadata.properties.clone(),
553                        },
554                        governance_data,
555                        None,
556                        tracker_context,
557                    )
558                }
559                _ => {
560                    error!(
561                        request_id = %self.id,
562                        is_gov = is_gov,
563                        request_type = ?request_type,
564                        "Invalid event request type for evaluation state"
565                    );
566                    return Err(
567                        RequestManagerError::InvalidEventRequestForEvaluation,
568                    );
569                }
570            };
571
572        let (signers, quorum) = governance_data.get_quorum_and_signers(
573            ProtocolTypes::Evaluation,
574            &metadata.schema_id,
575            metadata.namespace.clone(),
576        )?;
577
578        let eval_req = EvaluationReq {
579            event_request: request.clone(),
580            data: evaluate_data,
581            sn: metadata.sn + 1,
582            gov_version: governance_data.version,
583            namespace: metadata.namespace.clone(),
584            schema_id: metadata.schema_id.clone(),
585            signer: (*self.our_key).clone(),
586            signer_is_owner: *self.our_key == request.signature().signer,
587            governance_id: metadata.governance_id.clone(),
588        };
589
590        let signature = get_sign(
591            ctx,
592            SignTypesNode::EvaluationReq(Box::new(eval_req.clone())),
593        )
594        .await?;
595
596        let signed_evaluation_req: Signed<EvaluationReq> =
597            Signed::from_parts(eval_req, signature);
598        Ok((
599            signed_evaluation_req,
600            quorum,
601            signers,
602            init_state,
603            tracker_context,
604        ))
605    }
606
607    async fn run_evaluation(
608        &mut self,
609        ctx: &mut ActorContext<Self>,
610        request: Signed<EvaluationReq>,
611        quorum: Quorum,
612        init_state: Option<ValueWrapper>,
613        context: EvalWorkerContext,
614        signers: HashSet<PublicKey>,
615    ) -> Result<(), RequestManagerError> {
616        let Some((hash, network)) = self.helpers.clone() else {
617            return Err(RequestManagerError::HelpersNotInitialized);
618        };
619
620        self.start_phase_metrics("evaluation");
621        info!("Init evaluation {}", self.id);
622        let child = ctx
623            .create_child(
624                "evaluation",
625                Evaluation::new(
626                    self.our_key.clone(),
627                    request,
628                    quorum,
629                    init_state,
630                    context,
631                    hash,
632                    network,
633                ),
634            )
635            .await?;
636
637        child
638            .tell(EvaluationMessage::Create {
639                request_id: self.id.clone(),
640                version: self.version,
641                signers,
642            })
643            .await?;
644
645        send_to_tracking(
646            ctx,
647            RequestTrackingMessage::UpdateState {
648                request_id: self.id.clone(),
649                state: RequestState::Evaluation,
650            },
651        )
652        .await?;
653
654        Ok(())
655    }
656    //////// APPROVE
657    ////////////////////////////////////////////////
658    async fn build_request_appro(
659        &self,
660        ctx: &mut ActorContext<Self>,
661        eval_req: EvaluationReq,
662        evaluator_res: EvaluatorResponse,
663    ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
664        let request = ApprovalReq {
665            subject_id: self.subject_id.clone(),
666            sn: eval_req.sn,
667            gov_version: eval_req.gov_version,
668            patch: evaluator_res.patch,
669            signer: eval_req.signer,
670        };
671
672        let signature =
673            get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
674
675        let signed_approval_req: Signed<ApprovalReq> =
676            Signed::from_parts(request, signature);
677
678        Ok(signed_approval_req)
679    }
680
681    async fn build_approval(
682        &mut self,
683        ctx: &mut ActorContext<Self>,
684        eval_req: EvaluationReq,
685        eval_res: EvaluatorResponse,
686    ) -> Result<(), RequestManagerError> {
687        let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
688
689        let governance_data =
690            get_gov(ctx, &request.content().subject_id).await?;
691
692        let (signers, quorum) = governance_data.get_quorum_and_signers(
693            ProtocolTypes::Approval,
694            &SchemaType::Governance,
695            Namespace::new(),
696        )?;
697
698        if signers.is_empty() {
699            warn!(
700                request_id = %self.id,
701                schema_id = %SchemaType::Governance,
702                "No approvers available for schema"
703            );
704
705            return Err(RequestManagerError::NoApproversAvailable {
706                schema_id: SchemaType::Governance.to_string(),
707                governance_id: self
708                    .governance_id
709                    .clone()
710                    .unwrap_or_else(|| self.subject_id.clone()),
711            });
712        }
713
714        self.run_approval(ctx, request, quorum, signers).await
715    }
716
717    async fn run_approval(
718        &mut self,
719        ctx: &mut ActorContext<Self>,
720        request: Signed<ApprovalReq>,
721        quorum: Quorum,
722        signers: HashSet<PublicKey>,
723    ) -> Result<(), RequestManagerError> {
724        let Some((hash, network)) = self.helpers.clone() else {
725            return Err(RequestManagerError::HelpersNotInitialized);
726        };
727
728        self.start_phase_metrics("approval");
729        info!("Init approval {}", self.id);
730        let child = ctx
731            .create_child(
732                "approval",
733                Approval::new(
734                    self.our_key.clone(),
735                    request,
736                    quorum,
737                    signers,
738                    hash,
739                    network,
740                ),
741            )
742            .await?;
743
744        child
745            .tell(ApprovalMessage::Create {
746                request_id: self.id.clone(),
747                version: self.version,
748            })
749            .await?;
750
751        send_to_tracking(
752            ctx,
753            RequestTrackingMessage::UpdateState {
754                request_id: self.id.clone(),
755                state: RequestState::Approval,
756            },
757        )
758        .await?;
759
760        Ok(())
761    }
762
763    //////// VALI
764    ////////////////////////////////////////////////
765    async fn build_validation_req(
766        &mut self,
767        ctx: &mut ActorContext<Self>,
768        eval: Option<(EvaluationReq, EvaluationData)>,
769        appro_data: Option<ApprovalData>,
770    ) -> Result<
771        (
772            Signed<ValidationReq>,
773            Quorum,
774            HashSet<PublicKey>,
775            Option<ValueWrapper>,
776            CurrentRequestRoles,
777        ),
778        RequestManagerError,
779    > {
780        let (
781            vali_req,
782            quorum,
783            signers,
784            init_state,
785            current_request_roles,
786            schema_id,
787            governance_data,
788        ) = self.build_validation_data(ctx, eval, appro_data).await?;
789
790        if signers.is_empty() {
791            let governance_id = vali_req.get_governance_id().map_err(|error| {
792                error!(
793                    request_id = %self.id,
794                    schema_id = %schema_id,
795                    error = %error,
796                    "Validation request has invalid governance_id"
797                );
798                RequestManagerError::ActorError(
799                    ActorError::FunctionalCritical {
800                        description: format!(
801                            "Validation request has invalid governance_id: {}",
802                            error
803                        ),
804                    },
805                )
806            })?;
807
808            warn!(
809                request_id = %self.id,
810                schema_id = %schema_id,
811                "No validators available for schema"
812            );
813
814            return Err(RequestManagerError::NoValidatorsAvailable {
815                schema_id: schema_id.to_string(),
816                governance_id,
817            });
818        }
819
820        let signature = get_sign(
821            ctx,
822            SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
823        )
824        .await?;
825
826        let distribution_plan =
827            self.build_distribution_plan(&vali_req, governance_data.as_ref())?;
828
829        let signed_validation_req: Signed<ValidationReq> =
830            Signed::from_parts(vali_req, signature);
831
832        self.on_event(
833            RequestManagerEvent::UpdateState {
834                state: Box::new(RequestManagerState::Validation {
835                    request: Box::new(signed_validation_req.clone()),
836                    quorum: quorum.clone(),
837                    init_state: init_state.clone(),
838                    current_request_roles: current_request_roles.clone(),
839                    signers: signers.clone(),
840                    distribution_plan: distribution_plan.clone(),
841                }),
842            },
843            ctx,
844        )
845        .await;
846
847        Ok((
848            signed_validation_req,
849            quorum,
850            signers,
851            init_state,
852            current_request_roles,
853        ))
854    }
855
856    async fn build_validation_data(
857        &self,
858        ctx: &mut ActorContext<Self>,
859        eval: Option<(EvaluationReq, EvaluationData)>,
860        appro_data: Option<ApprovalData>,
861    ) -> Result<
862        (
863            ValidationReq,
864            Quorum,
865            HashSet<PublicKey>,
866            Option<ValueWrapper>,
867            CurrentRequestRoles,
868            SchemaType,
869            Option<GovernanceData>,
870        ),
871        RequestManagerError,
872    > {
873        let Some(request) = self.request.clone() else {
874            return Err(RequestManagerError::RequestNotSet);
875        };
876
877        if let EventRequest::Create(create) = request.content() {
878            if create.schema_id.is_gov() {
879                let governance_data =
880                    GovernanceData::new((*self.our_key).clone());
881                let (signers, quorum) = governance_data
882                    .get_quorum_and_signers(
883                        ProtocolTypes::Validation,
884                        &SchemaType::Governance,
885                        Namespace::new(),
886                    )?;
887
888                Ok((
889                    ValidationReq::Create {
890                        event_request: request.clone(),
891                        gov_version: 0,
892                        subject_id: self.subject_id.clone(),
893                    },
894                    quorum,
895                    signers,
896                    None,
897                    CurrentRequestRoles {
898                        evaluation: RoleDataRegister {
899                            workers: HashSet::new(),
900                            quorum: Quorum::default(),
901                        },
902                        approval: RoleDataRegister {
903                            workers: HashSet::new(),
904                            quorum: Quorum::default(),
905                        },
906                    },
907                    SchemaType::Governance,
908                    None,
909                ))
910            } else {
911                let governance_data =
912                    get_gov(ctx, &create.governance_id).await?;
913
914                let (signers, quorum) = governance_data
915                    .get_quorum_and_signers(
916                        ProtocolTypes::Validation,
917                        &create.schema_id,
918                        create.namespace.clone(),
919                    )?;
920
921                let init_state =
922                    governance_data.get_init_state(&create.schema_id)?;
923
924                Ok((
925                    ValidationReq::Create {
926                        event_request: request.clone(),
927                        gov_version: governance_data.version,
928                        subject_id: self.subject_id.clone(),
929                    },
930                    quorum,
931                    signers,
932                    Some(init_state),
933                    CurrentRequestRoles {
934                        evaluation: RoleDataRegister {
935                            workers: HashSet::new(),
936                            quorum: Quorum::default(),
937                        },
938                        approval: RoleDataRegister {
939                            workers: HashSet::new(),
940                            quorum: Quorum::default(),
941                        },
942                    },
943                    create.schema_id.clone(),
944                    Some(governance_data),
945                ))
946            }
947        } else {
948            let Some((hash, ..)) = self.helpers else {
949                return Err(RequestManagerError::HelpersNotInitialized);
950            };
951
952            let governance_data = self.get_governance_data(ctx).await?;
953
954            let (actual_protocols, gov_version, sn) =
955                if let Some((eval_req, eval_data)) = eval {
956                    if let Some(approval_data) = appro_data.clone() {
957                        (
958                            ActualProtocols::EvalApprove {
959                                eval_data,
960                                approval_data,
961                            },
962                            eval_req.gov_version,
963                            Some(eval_req.sn),
964                        )
965                    } else {
966                        (
967                            ActualProtocols::Eval { eval_data },
968                            eval_req.gov_version,
969                            Some(eval_req.sn),
970                        )
971                    }
972                } else {
973                    (ActualProtocols::None, governance_data.version, None)
974                };
975
976            let lease = acquire_subject(
977                ctx,
978                &self.subject_id,
979                self.requester_id(),
980                None,
981                self.needs_subject_manager(),
982            )
983            .await?;
984            let metadata = get_metadata(ctx, &self.subject_id).await;
985            let last_ledger_event = match metadata {
986                Ok(metadata) => {
987                    let last_ledger_event =
988                        get_last_ledger_event(ctx, &self.subject_id).await;
989                    lease.finish(ctx).await?;
990                    (metadata, last_ledger_event?)
991                }
992                Err(error) => {
993                    lease.finish(ctx).await?;
994                    return Err(error.into());
995                }
996            };
997
998            let (metadata, last_ledger_event) = last_ledger_event;
999
1000            if gov_version != governance_data.version {
1001                return Err(RequestManagerError::GovernanceVersionChanged {
1002                    governance_id: metadata.governance_id,
1003                    expected: gov_version,
1004                    current: governance_data.version,
1005                });
1006            }
1007
1008            let sn = if let Some(sn) = sn {
1009                sn
1010            } else {
1011                metadata.sn + 1
1012            };
1013
1014            let (signers, quorum) = governance_data.get_quorum_and_signers(
1015                ProtocolTypes::Validation,
1016                &metadata.schema_id,
1017                metadata.namespace.clone(),
1018            )?;
1019
1020            let Some(last_ledger_event) = last_ledger_event else {
1021                return Err(RequestManagerError::LastLedgerEventNotFound);
1022            };
1023
1024            let ledger_hash = last_ledger_event.ledger_hash(hash)?;
1025            let schema_id = metadata.schema_id.clone();
1026
1027            let current_request_roles =
1028                if gov_version == governance_data.version {
1029                    let (evaluation_workers, evaluation_quorum) =
1030                        governance_data.get_quorum_and_signers(
1031                            ProtocolTypes::Evaluation,
1032                            &metadata.schema_id,
1033                            metadata.namespace.clone(),
1034                        )?;
1035
1036                    let (approval_workers, approval_quorum) =
1037                        if appro_data.is_some() {
1038                            governance_data.get_quorum_and_signers(
1039                                ProtocolTypes::Approval,
1040                                &SchemaType::Governance,
1041                                Namespace::new(),
1042                            )?
1043                        } else {
1044                            (HashSet::new(), Quorum::default())
1045                        };
1046
1047                    CurrentRequestRoles {
1048                        evaluation: RoleDataRegister {
1049                            workers: evaluation_workers,
1050                            quorum: evaluation_quorum,
1051                        },
1052                        approval: RoleDataRegister {
1053                            workers: approval_workers,
1054                            quorum: approval_quorum,
1055                        },
1056                    }
1057                } else {
1058                    CurrentRequestRoles {
1059                        evaluation: RoleDataRegister {
1060                            workers: HashSet::new(),
1061                            quorum: Quorum::default(),
1062                        },
1063                        approval: RoleDataRegister {
1064                            workers: HashSet::new(),
1065                            quorum: Quorum::default(),
1066                        },
1067                    }
1068                };
1069
1070            Ok((
1071                ValidationReq::Event {
1072                    actual_protocols: Box::new(actual_protocols),
1073                    event_request: request.clone(),
1074                    metadata: Box::new(metadata),
1075                    last_data: Box::new(LastData {
1076                        vali_data: last_ledger_event
1077                            .protocols
1078                            .get_validation_data(),
1079                        gov_version: last_ledger_event.gov_version,
1080                    }),
1081                    gov_version,
1082                    ledger_hash,
1083                    sn,
1084                },
1085                quorum,
1086                signers,
1087                None,
1088                current_request_roles,
1089                schema_id,
1090                Some(governance_data),
1091            ))
1092        }
1093    }
1094
1095    async fn run_validation(
1096        &mut self,
1097        ctx: &mut ActorContext<Self>,
1098        request: Signed<ValidationReq>,
1099        quorum: Quorum,
1100        signers: HashSet<PublicKey>,
1101        init_state: Option<ValueWrapper>,
1102        current_request_roles: CurrentRequestRoles,
1103    ) -> Result<(), RequestManagerError> {
1104        let Some((hash, network)) = self.helpers.clone() else {
1105            return Err(RequestManagerError::HelpersNotInitialized);
1106        };
1107
1108        self.start_phase_metrics("validation");
1109        info!("Init validation {}", self.id);
1110        let child = ctx
1111            .create_child(
1112                "validation",
1113                Validation::new(
1114                    self.our_key.clone(),
1115                    request,
1116                    init_state,
1117                    current_request_roles,
1118                    quorum,
1119                    hash,
1120                    network,
1121                ),
1122            )
1123            .await?;
1124
1125        child
1126            .tell(ValidationMessage::Create {
1127                request_id: self.id.clone(),
1128                version: self.version,
1129                signers,
1130            })
1131            .await?;
1132
1133        send_to_tracking(
1134            ctx,
1135            RequestTrackingMessage::UpdateState {
1136                request_id: self.id.clone(),
1137                state: RequestState::Validation,
1138            },
1139        )
1140        .await?;
1141
1142        Ok(())
1143    }
1144    //////// Distribution
1145    ////////////////////////////////////////////////
1146    async fn build_ledger(
1147        &mut self,
1148        ctx: &mut ActorContext<Self>,
1149        val_req: ValidationReq,
1150        val_res: ValidationData,
1151        distribution_plan: Vec<DistributionPlanEntry>,
1152    ) -> Result<Ledger, RequestManagerError> {
1153        let Some((hash, ..)) = self.helpers else {
1154            return Err(RequestManagerError::HelpersNotInitialized);
1155        };
1156
1157        let (protocols, ledger_seal) = match val_req {
1158            ValidationReq::Create {
1159                event_request,
1160                gov_version,
1161                ..
1162            } => {
1163                let protocols = Protocols::Create {
1164                    event_request,
1165                    validation: val_res,
1166                };
1167
1168                let protocols_hash = protocols.hash_for_ledger(&hash)?;
1169
1170                let ledger_seal = LedgerSeal {
1171                    gov_version,
1172                    sn: 0,
1173                    prev_ledger_event_hash: DigestIdentifier::default(),
1174                    protocols_hash,
1175                };
1176
1177                (protocols, ledger_seal)
1178            }
1179            ValidationReq::Event {
1180                actual_protocols,
1181                event_request,
1182                ledger_hash,
1183                metadata,
1184                gov_version,
1185                sn,
1186                ..
1187            } => {
1188                let protocols = Protocols::build(
1189                    metadata.schema_id.is_gov(),
1190                    event_request,
1191                    *actual_protocols,
1192                    val_res,
1193                )?;
1194
1195                let protocols_hash = protocols.hash_for_ledger(&hash)?;
1196
1197                let ledger_seal = LedgerSeal {
1198                    gov_version,
1199                    sn,
1200                    prev_ledger_event_hash: ledger_hash,
1201                    protocols_hash,
1202                };
1203
1204                (protocols, ledger_seal)
1205            }
1206        };
1207
1208        let signature =
1209            get_sign(ctx, SignTypesNode::LedgerSeal(ledger_seal.clone()))
1210                .await?;
1211
1212        let ledger = Ledger {
1213            gov_version: ledger_seal.gov_version,
1214            sn: ledger_seal.sn,
1215            prev_ledger_event_hash: ledger_seal.prev_ledger_event_hash,
1216            ledger_seal_signature: signature,
1217            protocols,
1218        };
1219
1220        self.on_event(
1221            RequestManagerEvent::UpdateState {
1222                state: Box::new(RequestManagerState::UpdateSubject {
1223                    ledger: ledger.clone(),
1224                    distribution_plan: distribution_plan.clone(),
1225                }),
1226            },
1227            ctx,
1228        )
1229        .await;
1230
1231        Ok(ledger)
1232    }
1233
1234    async fn update_subject(
1235        &mut self,
1236        ctx: &mut ActorContext<Self>,
1237        ledger: Ledger,
1238        distribution_plan: Vec<DistributionPlanEntry>,
1239    ) -> Result<(), RequestManagerError> {
1240        if ledger.get_event_request_type().is_create_event() {
1241            if let Err(e) = create_subject(ctx, ledger.clone()).await {
1242                if let ActorError::Functional { .. } = e {
1243                    return Err(RequestManagerError::CheckLimit);
1244                }
1245                return Err(e.into());
1246            }
1247        } else {
1248            let lease = acquire_subject(
1249                ctx,
1250                &self.subject_id,
1251                self.requester_id(),
1252                None,
1253                self.needs_subject_manager(),
1254            )
1255            .await?;
1256            let update_result =
1257                update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1258                    .await;
1259            lease.finish(ctx).await?;
1260            update_result?;
1261        }
1262
1263        self.on_event(
1264            RequestManagerEvent::UpdateState {
1265                state: Box::new(RequestManagerState::Distribution {
1266                    ledger,
1267                    distribution_plan,
1268                }),
1269            },
1270            ctx,
1271        )
1272        .await;
1273
1274        Ok(())
1275    }
1276
1277    async fn build_distribution(
1278        &mut self,
1279        ctx: &mut ActorContext<Self>,
1280        ledger: Ledger,
1281        mut distribution_plan: Vec<DistributionPlanEntry>,
1282    ) -> Result<bool, RequestManagerError> {
1283        let is_governance = match ledger.get_event_request() {
1284            Some(EventRequest::Create(create)) => create.schema_id.is_gov(),
1285            Some(_) => self.governance_id.is_none(),
1286            None => false,
1287        };
1288
1289        if is_governance {
1290            let governance_id = ledger.get_subject_id();
1291            let governance_data = get_gov(ctx, &governance_id).await?;
1292            distribution_plan = governance_data
1293                .get_witnesses(WitnessesData::Gov)?
1294                .into_iter()
1295                .map(|node| DistributionPlanEntry {
1296                    node,
1297                    mode: DistributionPlanMode::Clear,
1298                })
1299                .collect();
1300        }
1301
1302        if distribution_plan.is_empty() {
1303            return Ok(false);
1304        }
1305
1306        distribution_plan.retain(|entry| entry.node != *self.our_key);
1307
1308        if distribution_plan.is_empty() {
1309            warn!(
1310                request_id = %self.id,
1311                "No witnesses available for distribution"
1312            );
1313            return Ok(false);
1314        }
1315
1316        self.run_distribution(ctx, distribution_plan, ledger)
1317            .await?;
1318
1319        Ok(true)
1320    }
1321
1322    async fn run_distribution(
1323        &mut self,
1324        ctx: &mut ActorContext<Self>,
1325        distribution_plan: Vec<DistributionPlanEntry>,
1326        ledger: Ledger,
1327    ) -> Result<(), RequestManagerError> {
1328        let Some((.., network)) = self.helpers.clone() else {
1329            return Err(RequestManagerError::HelpersNotInitialized);
1330        };
1331
1332        self.start_phase_metrics("distribution");
1333        info!("Init distribution {}", self.id);
1334        let child = ctx
1335            .create_child(
1336                "distribution",
1337                Distribution::new(
1338                    network,
1339                    DistributionType::Request,
1340                    self.id.clone(),
1341                ),
1342            )
1343            .await?;
1344
1345        child
1346            .tell(DistributionMessage::Create {
1347                ledger: Box::new(ledger),
1348                distribution_plan,
1349            })
1350            .await?;
1351
1352        send_to_tracking(
1353            ctx,
1354            RequestTrackingMessage::UpdateState {
1355                request_id: self.id.clone(),
1356                state: RequestState::Distribution,
1357            },
1358        )
1359        .await?;
1360
1361        Ok(())
1362    }
1363
1364    //////// Reboot
1365    ////////////////////////////////////////////////
1366    async fn init_wait(
1367        &self,
1368        ctx: &mut ActorContext<Self>,
1369        governance_id: &DigestIdentifier,
1370    ) -> Result<(), RequestManagerError> {
1371        let Some(config): Option<ConfigHelper> =
1372            ctx.system().get_helper("config").await
1373        else {
1374            return Err(RequestManagerError::ActorError(ActorError::Helper {
1375                name: "config".to_owned(),
1376                reason: "Not found".to_owned(),
1377            }));
1378        };
1379        let actor = ctx
1380            .create_child(
1381                "reboot",
1382                Reboot::new(
1383                    governance_id.clone(),
1384                    self.id.clone(),
1385                    config.sync_reboot.stability_check_interval_secs,
1386                    config.sync_reboot.stability_check_max_retries,
1387                ),
1388            )
1389            .await?;
1390
1391        actor.tell(RebootMessage::Init).await?;
1392
1393        Ok(())
1394    }
1395
1396    async fn init_update(
1397        &self,
1398        ctx: &mut ActorContext<Self>,
1399        governance_id: &DigestIdentifier,
1400    ) -> Result<(), RequestManagerError> {
1401        let Some((.., network)) = self.helpers.clone() else {
1402            return Err(RequestManagerError::HelpersNotInitialized);
1403        };
1404
1405        let gov_sn = get_gov_sn(ctx, governance_id).await?;
1406
1407        let governance_data = get_gov(ctx, governance_id).await?;
1408
1409        let mut witnesses = {
1410            let gov_witnesses =
1411                governance_data.get_witnesses(WitnessesData::Gov)?;
1412
1413            let auth_witnesses =
1414                Self::get_witnesses_auth(ctx, governance_id.clone())
1415                    .await
1416                    .unwrap_or_default();
1417
1418            gov_witnesses
1419                .union(&auth_witnesses)
1420                .cloned()
1421                .collect::<HashSet<PublicKey>>()
1422        };
1423
1424        witnesses.remove(&self.our_key);
1425
1426        if witnesses.is_empty() {
1427            if let Ok(actor) = ctx.reference().await {
1428                actor
1429                    .tell(RequestManagerMessage::FinishReboot {
1430                        request_id: self.id.clone(),
1431                    })
1432                    .await?;
1433            };
1434        } else if witnesses.len() == 1 {
1435            let Some(objetive) = witnesses.iter().next() else {
1436                error!(
1437                    request_id = %self.id,
1438                    governance_id = %governance_id,
1439                    "Witness set became empty while selecting single reboot target"
1440                );
1441                return Err(RequestManagerError::ActorError(
1442                    ActorError::FunctionalCritical {
1443                        description:
1444                            "Witness set became empty while selecting single reboot target"
1445                                .to_owned(),
1446                    },
1447                ));
1448            };
1449            let info = ComunicateInfo {
1450                receiver: objetive.clone(),
1451                request_id: String::default(),
1452                version: 0,
1453                receiver_actor: format!(
1454                    "/user/node/distributor_{}",
1455                    governance_id
1456                ),
1457            };
1458
1459            network
1460                .send_command(ave_network::CommandHelper::SendMessage {
1461                    message: NetworkMessage {
1462                        info,
1463                        message: ActorMessage::DistributionLedgerReq {
1464                            actual_sn: Some(gov_sn),
1465                            target_sn: None,
1466                            subject_id: governance_id.clone(),
1467                        },
1468                    },
1469                })
1470                .await?;
1471
1472            let Ok(actor) = ctx.reference().await else {
1473                return Ok(());
1474            };
1475
1476            actor
1477                .tell(RequestManagerMessage::RebootWait {
1478                    request_id: self.id.clone(),
1479                    governance_id: governance_id.clone(),
1480                })
1481                .await?;
1482        } else {
1483            let Some(config): Option<ConfigHelper> =
1484                ctx.system().get_helper("config").await
1485            else {
1486                return Ok(());
1487            };
1488            let data = UpdateNew {
1489                network,
1490                subject_id: governance_id.clone(),
1491                our_sn: Some(gov_sn),
1492                witnesses,
1493                update_type: UpdateType::Request {
1494                    subject_id: self.subject_id.clone(),
1495                    id: self.id.clone(),
1496                },
1497                subject_kind_hint: Some(
1498                    crate::update::UpdateSubjectKind::Governance,
1499                ),
1500                round_retry_interval_secs: config
1501                    .sync_update
1502                    .round_retry_interval_secs,
1503                max_round_retries: config.sync_update.max_round_retries,
1504                witness_retry_count: config.sync_update.witness_retry_count,
1505                witness_retry_interval_secs: config
1506                    .sync_update
1507                    .witness_retry_interval_secs,
1508            };
1509
1510            let updater = Update::new(data);
1511            let Ok(child) = ctx.create_child("update", updater).await else {
1512                let Ok(actor) = ctx.reference().await else {
1513                    return Ok(());
1514                };
1515
1516                actor
1517                    .tell(RequestManagerMessage::RebootWait {
1518                        request_id: self.id.clone(),
1519                        governance_id: governance_id.clone(),
1520                    })
1521                    .await?;
1522
1523                return Ok(());
1524            };
1525
1526            child.tell(UpdateMessage::Run).await?;
1527        }
1528
1529        Ok(())
1530    }
1531
1532    async fn get_witnesses_auth(
1533        ctx: &ActorContext<Self>,
1534        governance_id: DigestIdentifier,
1535    ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1536        let path = ActorPath::from("/user/node/auth");
1537        let actor = ctx.system().get_actor::<Auth>(&path).await?;
1538
1539        let response = actor
1540            .ask(AuthMessage::GetAuth {
1541                subject_id: governance_id,
1542            })
1543            .await?;
1544
1545        match response {
1546            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1547            _ => Err(RequestManagerError::ActorError(
1548                ActorError::UnexpectedResponse {
1549                    path,
1550                    expected: "AuthResponse::Witnesses".to_owned(),
1551                },
1552            )),
1553        }
1554    }
1555
1556    //////// General
1557    ////////////////////////////////////////////////
1558    async fn send_reboot(
1559        &self,
1560        ctx: &ActorContext<Self>,
1561        governance_id: DigestIdentifier,
1562    ) -> Result<(), ActorError> {
1563        let Ok(actor) = ctx.reference().await else {
1564            return Ok(());
1565        };
1566
1567        actor
1568            .tell(RequestManagerMessage::Reboot {
1569                request_id: self.id.clone(),
1570                governance_id,
1571                reboot_type: RebootType::TimeOut,
1572            })
1573            .await
1574    }
1575
1576    async fn match_error(
1577        &mut self,
1578        ctx: &mut ActorContext<Self>,
1579        error: RequestManagerError,
1580    ) {
1581        match error {
1582            RequestManagerError::NoEvaluatorsAvailable {
1583                governance_id,
1584                ..
1585            }
1586            | RequestManagerError::NoApproversAvailable {
1587                governance_id, ..
1588            }
1589            | RequestManagerError::NoValidatorsAvailable {
1590                governance_id,
1591                ..
1592            }
1593            | RequestManagerError::GovernanceVersionChanged {
1594                governance_id,
1595                ..
1596            } => {
1597                if let Err(e) = self.send_reboot(ctx, governance_id).await {
1598                    emit_fail(ctx, e).await;
1599                }
1600            }
1601            RequestManagerError::CheckLimit
1602            | RequestManagerError::Governance(..)
1603            | RequestManagerError::NotIssuer
1604            | RequestManagerError::NotCreator => {
1605                if let Err(e) = self
1606                    .abort_request(
1607                        ctx,
1608                        error.to_string(),
1609                        None,
1610                        (*self.our_key).clone(),
1611                    )
1612                    .await
1613                {
1614                    emit_fail(
1615                        ctx,
1616                        ActorError::FunctionalCritical {
1617                            description: e.to_string(),
1618                        },
1619                    )
1620                    .await;
1621                }
1622            }
1623            _ => {
1624                emit_fail(
1625                    ctx,
1626                    ActorError::FunctionalCritical {
1627                        description: error.to_string(),
1628                    },
1629                )
1630                .await;
1631            }
1632        }
1633    }
1634
1635    async fn finish_request(
1636        &mut self,
1637        ctx: &mut ActorContext<Self>,
1638    ) -> Result<(), RequestManagerError> {
1639        self.finish_request_metrics("finished");
1640        info!("Ending {}", self.id);
1641        send_to_tracking(
1642            ctx,
1643            RequestTrackingMessage::UpdateState {
1644                request_id: self.id.clone(),
1645                state: RequestState::Finish,
1646            },
1647        )
1648        .await?;
1649
1650        self.on_event(RequestManagerEvent::Finish, ctx).await;
1651
1652        self.end_request(ctx).await?;
1653
1654        Ok(())
1655    }
1656
1657    async fn reboot(
1658        &mut self,
1659        ctx: &mut ActorContext<Self>,
1660        reboot_type: RebootType,
1661        governance_id: DigestIdentifier,
1662    ) -> Result<(), RequestManagerError> {
1663        let Some(config): Option<ConfigHelper> =
1664            ctx.system().get_helper("config").await
1665        else {
1666            return Err(ActorError::Helper {
1667                name: "config".to_owned(),
1668                reason: "Not found".to_owned(),
1669            }
1670            .into());
1671        };
1672        self.start_phase_metrics("reboot");
1673        self.on_event(
1674            RequestManagerEvent::UpdateState {
1675                state: Box::new(RequestManagerState::Reboot),
1676            },
1677            ctx,
1678        )
1679        .await;
1680
1681        let Ok(actor) = ctx.reference().await else {
1682            return Ok(());
1683        };
1684
1685        let request_id = self.id.clone();
1686
1687        match reboot_type {
1688            RebootType::Normal => {
1689                info!("Launching Normal reboot {}", self.id);
1690                send_to_tracking(
1691                    ctx,
1692                    RequestTrackingMessage::UpdateState {
1693                        request_id: self.id.clone(),
1694                        state: RequestState::Reboot,
1695                    },
1696                )
1697                .await?;
1698
1699                actor
1700                    .tell(RequestManagerMessage::RebootUpdate {
1701                        request_id,
1702                        governance_id,
1703                    })
1704                    .await?;
1705            }
1706            RebootType::Diff => {
1707                info!("Launching Diff reboot {}", self.id);
1708                self.retry_diff += 1;
1709
1710                let seconds = Self::retry_seconds_for_attempt(
1711                    &config.sync_reboot.diff_retry_schedule_secs,
1712                    self.retry_diff,
1713                );
1714
1715                info!(
1716                    "Launching Diff reboot {}, try: {}, seconds: {}",
1717                    self.id, self.retry_diff, seconds
1718                );
1719
1720                send_to_tracking(
1721                    ctx,
1722                    RequestTrackingMessage::UpdateState {
1723                        request_id: self.id.clone(),
1724                        state: RequestState::RebootDiff {
1725                            seconds,
1726                            count: self.retry_diff,
1727                        },
1728                    },
1729                )
1730                .await?;
1731
1732                tokio::spawn(async move {
1733                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1734                    let _ = actor
1735                        .tell(RequestManagerMessage::RebootUpdate {
1736                            request_id,
1737                            governance_id,
1738                        })
1739                        .await;
1740                });
1741            }
1742            RebootType::TimeOut => {
1743                self.retry_timeout += 1;
1744
1745                let seconds = Self::retry_seconds_for_attempt(
1746                    &config.sync_reboot.timeout_retry_schedule_secs,
1747                    self.retry_timeout,
1748                );
1749
1750                info!(
1751                    "Launching TimeOut reboot {}, try: {}, seconds: {}",
1752                    self.id, self.retry_timeout, seconds
1753                );
1754                send_to_tracking(
1755                    ctx,
1756                    RequestTrackingMessage::UpdateState {
1757                        request_id: self.id.clone(),
1758                        state: RequestState::RebootTimeOut {
1759                            seconds,
1760                            count: self.retry_timeout,
1761                        },
1762                    },
1763                )
1764                .await?;
1765
1766                tokio::spawn(async move {
1767                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1768                    let _ = actor
1769                        .tell(RequestManagerMessage::RebootUpdate {
1770                            request_id,
1771                            governance_id,
1772                        })
1773                        .await;
1774                });
1775            }
1776        }
1777
1778        Ok(())
1779    }
1780
1781    async fn match_command(
1782        &mut self,
1783        ctx: &mut ActorContext<Self>,
1784    ) -> Result<(), RequestManagerError> {
1785        match self.command {
1786            ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1787            ReqManInitMessage::Validate => {
1788                let (
1789                    request,
1790                    quorum,
1791                    signers,
1792                    init_state,
1793                    current_request_roles,
1794                ) = self.build_validation_req(ctx, None, None).await?;
1795
1796                self.run_validation(
1797                    ctx,
1798                    request,
1799                    quorum,
1800                    signers,
1801                    init_state,
1802                    current_request_roles,
1803                )
1804                .await
1805            }
1806        }
1807    }
1808
1809    async fn check_request_roles_after_reboot(
1810        &self,
1811        ctx: &mut ActorContext<Self>,
1812    ) -> Result<(), RequestManagerError> {
1813        let Some(request) = self.request.clone() else {
1814            return Err(RequestManagerError::RequestNotSet);
1815        };
1816
1817        let gov = self.get_governance_data(ctx).await?;
1818        let subject_data = match request.content() {
1819            EventRequest::Create(..) => None,
1820            _ => get_subject_data(ctx, &self.subject_id).await?,
1821        };
1822
1823        let creator_scope =
1824            match request.content() {
1825                EventRequest::Create(create) if !create.schema_id.is_gov() => {
1826                    Some((create.schema_id.clone(), create.namespace.clone()))
1827                }
1828                _ => subject_data.as_ref().and_then(|subject_data| {
1829                    match subject_data {
1830                        SubjectData::Tracker {
1831                            schema_id,
1832                            namespace,
1833                            ..
1834                        } => Some((
1835                            schema_id.clone(),
1836                            Namespace::from(namespace.clone()),
1837                        )),
1838                        _ => None,
1839                    }
1840                }),
1841            };
1842
1843        if let Some((schema_id, namespace)) = creator_scope
1844            && !gov.has_this_role(HashThisRole::Schema {
1845                who: (*self.our_key).clone(),
1846                role: RoleTypes::Creator,
1847                schema_id,
1848                namespace,
1849            })
1850        {
1851            return Err(RequestManagerError::NotCreator);
1852        }
1853
1854        if let EventRequest::Fact { .. } = request.content() {
1855            let Some(subject_data) = subject_data else {
1856                return Err(RequestManagerError::SubjecData);
1857            };
1858
1859            match subject_data {
1860                SubjectData::Tracker {
1861                    schema_id,
1862                    namespace,
1863                    ..
1864                } => {
1865                    if !gov.has_this_role(HashThisRole::Schema {
1866                        who: request.signature().signer.clone(),
1867                        role: RoleTypes::Issuer,
1868                        schema_id,
1869                        namespace: Namespace::from(namespace),
1870                    }) {
1871                        return Err(RequestManagerError::NotIssuer);
1872                    }
1873                }
1874                SubjectData::Governance { .. } => {
1875                    if !gov.has_this_role(HashThisRole::Gov {
1876                        who: request.signature().signer.clone(),
1877                        role: RoleTypes::Issuer,
1878                    }) {
1879                        return Err(RequestManagerError::NotIssuer);
1880                    }
1881                }
1882            }
1883        }
1884
1885        Ok(())
1886    }
1887
1888    async fn stops_childs(
1889        &self,
1890        ctx: &mut ActorContext<Self>,
1891    ) -> Result<(), RequestManagerError> {
1892        match self.state {
1893            RequestManagerState::Reboot => {
1894                if let Ok(actor) = ctx.get_child::<Update>("update").await {
1895                    actor.ask_stop().await?;
1896                };
1897                if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1898                    actor.ask_stop().await?;
1899                };
1900            }
1901            RequestManagerState::Evaluation => {
1902                if let Ok(actor) =
1903                    ctx.get_child::<Evaluation>("evaluation").await
1904                {
1905                    actor.ask_stop().await?;
1906                };
1907            }
1908            RequestManagerState::Approval { .. } => {
1909                if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1910                    actor.ask_stop().await?;
1911                };
1912                let _ = make_obsolete(ctx, &self.subject_id).await;
1913            }
1914            RequestManagerState::Validation { .. } => {
1915                if let Ok(actor) =
1916                    ctx.get_child::<Validation>("validation").await
1917                {
1918                    actor.ask_stop().await?;
1919                };
1920            }
1921            RequestManagerState::Distribution { .. } => {
1922                if let Ok(actor) =
1923                    ctx.get_child::<Distribution>("distribution").await
1924                {
1925                    actor.ask_stop().await?;
1926                };
1927            }
1928            _ => {}
1929        }
1930
1931        Ok(())
1932    }
1933
1934    async fn abort_request(
1935        &mut self,
1936        ctx: &mut ActorContext<Self>,
1937        error: String,
1938        sn: Option<u64>,
1939        who: PublicKey,
1940    ) -> Result<(), RequestManagerError> {
1941        self.stops_childs(ctx).await?;
1942
1943        self.finish_request_metrics("aborted");
1944        info!("Aborting {}", self.id);
1945        send_to_tracking(
1946            ctx,
1947            RequestTrackingMessage::UpdateState {
1948                request_id: self.id.clone(),
1949                state: RequestState::Abort {
1950                    subject_id: self.subject_id.to_string(),
1951                    error,
1952                    sn,
1953                    who: who.to_string(),
1954                },
1955            },
1956        )
1957        .await?;
1958
1959        self.on_event(RequestManagerEvent::Finish, ctx).await;
1960
1961        self.end_request(ctx).await?;
1962
1963        Ok(())
1964    }
1965
1966    async fn end_request(
1967        &self,
1968        ctx: &ActorContext<Self>,
1969    ) -> Result<(), RequestManagerError> {
1970        let actor = ctx.get_parent::<RequestHandler>().await?;
1971        actor
1972            .tell(RequestHandlerMessage::EndHandling {
1973                subject_id: self.subject_id.clone(),
1974            })
1975            .await?;
1976
1977        Ok(())
1978    }
1979}
1980
1981#[derive(Debug, Clone)]
1982pub enum RequestManagerMessage {
1983    Run {
1984        request_id: DigestIdentifier,
1985    },
1986    FirstRun {
1987        command: ReqManInitMessage,
1988        request: Signed<EventRequest>,
1989        request_id: DigestIdentifier,
1990    },
1991    Abort {
1992        request_id: DigestIdentifier,
1993        who: PublicKey,
1994        reason: String,
1995        sn: u64,
1996    },
1997    ManualAbort,
1998    PurgeStorage,
1999    Reboot {
2000        request_id: DigestIdentifier,
2001        governance_id: DigestIdentifier,
2002        reboot_type: RebootType,
2003    },
2004    RebootUpdate {
2005        request_id: DigestIdentifier,
2006        governance_id: DigestIdentifier,
2007    },
2008    RebootWait {
2009        request_id: DigestIdentifier,
2010        governance_id: DigestIdentifier,
2011    },
2012    FinishReboot {
2013        request_id: DigestIdentifier,
2014    },
2015    EvaluationRes {
2016        request_id: DigestIdentifier,
2017        eval_req: Box<EvaluationReq>,
2018        eval_res: EvaluationData,
2019    },
2020    ApprovalRes {
2021        request_id: DigestIdentifier,
2022        appro_res: ApprovalData,
2023    },
2024    ValidationRes {
2025        request_id: DigestIdentifier,
2026        val_req: Box<ValidationReq>,
2027        val_res: ValidationData,
2028    },
2029    FinishRequest {
2030        request_id: DigestIdentifier,
2031    },
2032}
2033
2034impl Message for RequestManagerMessage {}
2035
2036#[derive(
2037    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
2038)]
2039pub enum RequestManagerEvent {
2040    Finish,
2041    UpdateState {
2042        state: Box<RequestManagerState>,
2043    },
2044    UpdateVersion {
2045        version: u64,
2046    },
2047    SafeState {
2048        command: ReqManInitMessage,
2049        request: Signed<EventRequest>,
2050    },
2051}
2052
2053impl Event for RequestManagerEvent {}
2054
2055#[async_trait]
2056impl Actor for RequestManager {
2057    type Event = RequestManagerEvent;
2058    type Message = RequestManagerMessage;
2059    type Response = ();
2060
2061    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
2062        parent_span.map_or_else(
2063            || info_span!("RequestManager", id),
2064            |parent_span| info_span!(parent: parent_span, "RequestManager", id),
2065        )
2066    }
2067
2068    async fn pre_start(
2069        &mut self,
2070        ctx: &mut ActorContext<Self>,
2071    ) -> Result<(), ActorError> {
2072        if let Err(e) =
2073            self.init_store("request_manager", None, false, ctx).await
2074        {
2075            error!(
2076                error = %e,
2077                subject_id = %self.subject_id,
2078                "Failed to initialize store"
2079            );
2080            return Err(e);
2081        }
2082
2083        if self.governance_id.is_none()
2084            && let Some(request) = &self.request
2085            && let EventRequest::Create(create) = request.content()
2086            && !create.schema_id.is_gov()
2087        {
2088            self.governance_id = Some(create.governance_id.clone());
2089        }
2090
2091        Ok(())
2092    }
2093}
2094
2095#[async_trait]
2096impl Handler<Self> for RequestManager {
2097    // The async state machine inlines all sub-futures and exceeds the default
2098    // threshold; a proper fix would require boxing every large sub-future.
2099    #[allow(clippy::large_stack_frames)]
2100    async fn handle_message(
2101        &mut self,
2102        _sender: ActorPath,
2103        msg: RequestManagerMessage,
2104        ctx: &mut ave_actors::ActorContext<Self>,
2105    ) -> Result<(), ActorError> {
2106        match msg {
2107            RequestManagerMessage::RebootUpdate {
2108                governance_id,
2109                request_id,
2110            } => {
2111                if request_id == self.id {
2112                    info!("Init reboot update {}", self.id);
2113                    debug!(
2114                        msg_type = "RebootUpdate",
2115                        request_id = %self.id,
2116                        governance_id = %governance_id,
2117                        "Initializing reboot update"
2118                    );
2119
2120                    if let Err(e) = self.init_update(ctx, &governance_id).await
2121                    {
2122                        error!(
2123                            msg_type = "RebootUpdate",
2124                            request_id = %self.id,
2125                            governance_id = %governance_id,
2126                            error = %e,
2127                            "Failed to initialize reboot update"
2128                        );
2129                        self.match_error(ctx, e).await;
2130                        return Ok(());
2131                    }
2132                }
2133            }
2134            RequestManagerMessage::RebootWait {
2135                governance_id,
2136                request_id,
2137            } => {
2138                if request_id == self.id {
2139                    info!("Init reboot wait {}", self.id);
2140                    debug!(
2141                        msg_type = "RebootWait",
2142                        request_id = %self.id,
2143                        governance_id = %governance_id,
2144                        "Initializing reboot wait"
2145                    );
2146
2147                    if let Err(e) = self.init_wait(ctx, &governance_id).await {
2148                        error!(
2149                            msg_type = "RebootWait",
2150                            request_id = %self.id,
2151                            governance_id = %governance_id,
2152                            error = %e,
2153                            "Failed to initialize reboot wait"
2154                        );
2155                        self.match_error(ctx, e).await;
2156                        return Ok(());
2157                    }
2158                }
2159            }
2160            RequestManagerMessage::Reboot {
2161                governance_id,
2162                request_id,
2163                reboot_type,
2164            } => {
2165                if request_id == self.id {
2166                    if matches!(self.state, RequestManagerState::Reboot) {
2167                        debug!(
2168                            msg_type = "Reboot",
2169                            request_id = %self.id,
2170                            governance_id = %governance_id,
2171                            reboot_type = ?reboot_type,
2172                            "Already in reboot state, ignoring"
2173                        );
2174                    } else {
2175                        debug!(
2176                            msg_type = "Reboot",
2177                            request_id = %self.id,
2178                            governance_id = %governance_id,
2179                            reboot_type = ?reboot_type,
2180                            "Initiating reboot"
2181                        );
2182                        if let Err(e) = self.stops_childs(ctx).await {
2183                            error!(
2184                                msg_type = "Reboot",
2185                                request_id = %self.id,
2186                                governance_id = %governance_id,
2187                                error = %e,
2188                                "Failed to stop childs"
2189                            );
2190                            self.match_error(ctx, e).await;
2191                            return Ok(());
2192                        };
2193                        if let Err(e) = self
2194                            .reboot(ctx, reboot_type, governance_id.clone())
2195                            .await
2196                        {
2197                            error!(
2198                                msg_type = "Reboot",
2199                                request_id = %self.id,
2200                                governance_id = %governance_id,
2201                                error = %e,
2202                                "Failed to initiate reboot"
2203                            );
2204                            self.match_error(ctx, e).await;
2205                            return Ok(());
2206                        }
2207                    }
2208                }
2209            }
2210            RequestManagerMessage::FinishReboot { request_id } => {
2211                if request_id == self.id {
2212                    info!("Init reboot finish {}", self.id);
2213                    debug!(
2214                        msg_type = "FinishReboot",
2215                        request_id = %self.id,
2216                        version = self.version,
2217                        "Reboot completed, resuming request"
2218                    );
2219                    self.on_event(
2220                        RequestManagerEvent::UpdateVersion {
2221                            version: self.version + 1,
2222                        },
2223                        ctx,
2224                    )
2225                    .await;
2226
2227                    if let Err(e) = send_to_tracking(
2228                        ctx,
2229                        RequestTrackingMessage::UpdateVersion {
2230                            request_id: self.id.clone(),
2231                            version: self.version,
2232                        },
2233                    )
2234                    .await
2235                    {
2236                        error!(
2237                            msg_type = "FinishReboot",
2238                            request_id = %self.id,
2239                            version = self.version,
2240                            error = %e,
2241                            "Failed to send version update to tracking"
2242                        );
2243                        return Err(emit_fail(ctx, e).await);
2244                    }
2245
2246                    if let Err(e) =
2247                        self.check_request_roles_after_reboot(ctx).await
2248                    {
2249                        error!(
2250                            msg_type = "FinishReboot",
2251                            request_id = %self.id,
2252                            error = %e,
2253                            "Failed to check signatures after reboot"
2254                        );
2255                        self.match_error(ctx, e).await;
2256                        return Ok(());
2257                    }
2258
2259                    if let Err(e) = self.match_command(ctx).await {
2260                        error!(
2261                            msg_type = "FinishReboot",
2262                            request_id = %self.id,
2263                            error = %e,
2264                            "Failed to execute command after reboot"
2265                        );
2266                        self.match_error(ctx, e).await;
2267                        return Ok(());
2268                    }
2269                }
2270            }
2271            RequestManagerMessage::Abort {
2272                request_id,
2273                who,
2274                reason,
2275                sn,
2276            } => {
2277                if request_id == self.id {
2278                    warn!(
2279                        msg_type = "Abort",
2280                        state = %self.state,
2281                        request_id = %self.id,
2282                        who = %who,
2283                        reason = %reason,
2284                        sn = sn,
2285                        "Request abort received"
2286                    );
2287                    if let Err(e) =
2288                        self.abort_request(ctx, reason, Some(sn), who).await
2289                    {
2290                        error!(
2291                            msg_type = "Abort",
2292                            request_id = %self.id,
2293                            error = %e,
2294                            "Failed to abort request"
2295                        );
2296                        self.match_error(ctx, e).await;
2297                        return Ok(());
2298                    }
2299                }
2300            }
2301            RequestManagerMessage::ManualAbort => {
2302                match &self.state {
2303                    RequestManagerState::Reboot
2304                    | RequestManagerState::Starting
2305                    | RequestManagerState::Evaluation
2306                    | RequestManagerState::Approval { .. }
2307                    | RequestManagerState::Validation { .. } => {
2308                        if let Err(e) = self
2309                            .abort_request(
2310                                ctx,
2311                                "The user manually aborted the request"
2312                                    .to_owned(),
2313                                None,
2314                                (*self.our_key).clone(),
2315                            )
2316                            .await
2317                        {
2318                            error!(
2319                                msg_type = "Abort",
2320                                request_id = %self.id,
2321                                error = %e,
2322                                "Failed to abort request"
2323                            );
2324                            self.match_error(ctx, e).await;
2325                        }
2326                    }
2327                    _ => {
2328                        info!(
2329                            "The request is in a state that cannot be aborted {}, state: {}",
2330                            self.id, self.state
2331                        );
2332                    }
2333                }
2334
2335                return Ok(());
2336            }
2337            RequestManagerMessage::PurgeStorage => {
2338                purge_storage(ctx).await?;
2339
2340                debug!(
2341                    msg_type = "PurgeStorage",
2342                    subject_id = %self.subject_id,
2343                    "Purged request manager storage"
2344                );
2345
2346                return Ok(());
2347            }
2348            RequestManagerMessage::FirstRun {
2349                command,
2350                request,
2351                request_id,
2352            } => {
2353                self.id = request_id.clone();
2354                self.begin_request_metrics();
2355                debug!(
2356                    msg_type = "FirstRun",
2357                    request_id = %request_id,
2358                    command = ?command,
2359                    "First run of request manager"
2360                );
2361                self.on_event(
2362                    RequestManagerEvent::SafeState { command, request },
2363                    ctx,
2364                )
2365                .await;
2366
2367                if let Err(e) = self.match_command(ctx).await {
2368                    error!(
2369                        msg_type = "FirstRun",
2370                        request_id = %self.id,
2371                        error = %e,
2372                        "Failed to execute initial command"
2373                    );
2374                    self.match_error(ctx, e).await;
2375                    return Ok(());
2376                };
2377            }
2378            RequestManagerMessage::Run { request_id } => {
2379                self.id = request_id;
2380                self.ensure_request_metrics_started();
2381
2382                debug!(
2383                    msg_type = "Run",
2384                    request_id = %self.id,
2385                    state = ?self.state,
2386                    version = self.version,
2387                    "Running request manager"
2388                );
2389                match self.state.clone() {
2390                    RequestManagerState::Starting
2391                    | RequestManagerState::Reboot => {
2392                        if let Err(e) = self.match_command(ctx).await {
2393                            error!(
2394                                msg_type = "Run",
2395                                request_id = %self.id,
2396                                state = "Starting/Reboot",
2397                                error = %e,
2398                                "Failed to execute command"
2399                            );
2400                            self.match_error(ctx, e).await;
2401                            return Ok(());
2402                        };
2403                    }
2404                    RequestManagerState::Evaluation => {
2405                        if let Err(e) = self.build_evaluation(ctx).await {
2406                            error!(
2407                                msg_type = "Run",
2408                                request_id = %self.id,
2409                                state = "Evaluation",
2410                                error = %e,
2411                                "Failed to build evaluation"
2412                            );
2413                            self.match_error(ctx, e).await;
2414                            return Ok(());
2415                        }
2416                    }
2417
2418                    RequestManagerState::Approval { eval_req, eval_res } => {
2419                        let Some(evaluator_res) =
2420                            eval_res.evaluator_response_ok()
2421                        else {
2422                            error!(
2423                                msg_type = "Run",
2424                                request_id = %self.id,
2425                                state = "Approval",
2426                                "Approval state is missing a successful evaluator response"
2427                            );
2428                            self.match_error(
2429                                ctx,
2430                                RequestManagerError::InvalidRequestState {
2431                                    expected:
2432                                        "approval state with successful evaluator response",
2433                                    got:
2434                                        "approval state without successful evaluator response",
2435                                },
2436                            )
2437                            .await;
2438                            return Ok(());
2439                        };
2440
2441                        if let Err(e) = self
2442                            .build_approval(ctx, eval_req, evaluator_res)
2443                            .await
2444                        {
2445                            error!(
2446                                msg_type = "Run",
2447                                request_id = %self.id,
2448                                state = "Approval",
2449                                error = %e,
2450                                "Failed to build approval"
2451                            );
2452                            self.match_error(ctx, e).await;
2453                            return Ok(());
2454                        }
2455                    }
2456                    RequestManagerState::Validation {
2457                        request,
2458                        quorum,
2459                        init_state,
2460                        current_request_roles,
2461                        signers,
2462                        distribution_plan: _,
2463                    } => {
2464                        if let Err(e) = self
2465                            .run_validation(
2466                                ctx,
2467                                *request,
2468                                quorum,
2469                                signers,
2470                                init_state,
2471                                current_request_roles,
2472                            )
2473                            .await
2474                        {
2475                            error!(
2476                                msg_type = "Run",
2477                                request_id = %self.id,
2478                                state = "Validation",
2479                                error = %e,
2480                                "Failed to run validation"
2481                            );
2482                            self.match_error(ctx, e).await;
2483                            return Ok(());
2484                        };
2485                    }
2486                    RequestManagerState::UpdateSubject {
2487                        ledger,
2488                        distribution_plan,
2489                    } => {
2490                        if let Err(e) = self
2491                            .update_subject(
2492                                ctx,
2493                                ledger.clone(),
2494                                distribution_plan.clone(),
2495                            )
2496                            .await
2497                        {
2498                            error!(
2499                                msg_type = "Run",
2500                                request_id = %self.id,
2501                                state = "UpdateSubject",
2502                                error = %e,
2503                                "Failed to update subject"
2504                            );
2505                            self.match_error(ctx, e).await;
2506                            return Ok(());
2507                        };
2508
2509                        match self
2510                            .build_distribution(ctx, ledger, distribution_plan)
2511                            .await
2512                        {
2513                            Ok(in_distribution) => {
2514                                if !in_distribution
2515                                    && let Err(e) =
2516                                        self.finish_request(ctx).await
2517                                {
2518                                    error!(
2519                                        msg_type = "Run",
2520                                        request_id = %self.id,
2521                                        state = "UpdateSubject",
2522                                        error = %e,
2523                                        "Failed to finish request after build distribution"
2524                                    );
2525                                    self.match_error(ctx, e).await;
2526                                    return Ok(());
2527                                }
2528                            }
2529                            Err(e) => {
2530                                error!(
2531                                    msg_type = "Run",
2532                                    request_id = %self.id,
2533                                    state = "UpdateSubject",
2534                                    error = %e,
2535                                    "Failed to build distribution"
2536                                );
2537                                self.match_error(ctx, e).await;
2538                                return Ok(());
2539                            }
2540                        };
2541                    }
2542                    RequestManagerState::Distribution {
2543                        ledger,
2544                        distribution_plan,
2545                    } => {
2546                        match self
2547                            .build_distribution(ctx, ledger, distribution_plan)
2548                            .await
2549                        {
2550                            Ok(in_distribution) => {
2551                                if !in_distribution
2552                                    && let Err(e) =
2553                                        self.finish_request(ctx).await
2554                                {
2555                                    error!(
2556                                        msg_type = "Run",
2557                                        request_id = %self.id,
2558                                        state = "Distribution",
2559                                        error = %e,
2560                                        "Failed to finish request after build distribution"
2561                                    );
2562                                    self.match_error(ctx, e).await;
2563                                    return Ok(());
2564                                }
2565                            }
2566                            Err(e) => {
2567                                error!(
2568                                    msg_type = "Run",
2569                                    request_id = %self.id,
2570                                    state = "Distribution",
2571                                    error = %e,
2572                                    "Failed to build distribution"
2573                                );
2574                                self.match_error(ctx, e).await;
2575                                return Ok(());
2576                            }
2577                        };
2578                    }
2579                    RequestManagerState::End => {
2580                        if let Err(e) = self.end_request(ctx).await {
2581                            error!(
2582                                msg_type = "Run",
2583                                request_id = %self.id,
2584                                state = "End",
2585                                error = %e,
2586                                "Failed to end request"
2587                            );
2588                            self.match_error(ctx, e).await;
2589                            return Ok(());
2590                        }
2591                    }
2592                };
2593            }
2594            RequestManagerMessage::EvaluationRes {
2595                eval_req,
2596                eval_res,
2597                request_id,
2598            } => {
2599                if request_id == self.id {
2600                    debug!(
2601                        msg_type = "EvaluationRes",
2602                        request_id = %self.id,
2603                        version = self.version,
2604                        "Evaluation result received"
2605                    );
2606                    if let Err(e) = self.stops_childs(ctx).await {
2607                        error!(
2608                            msg_type = "EvaluationRes",
2609                            request_id = %self.id,
2610                            error = %e,
2611                            "Failed to stop childs"
2612                        );
2613                        self.match_error(ctx, e).await;
2614                        return Ok(());
2615                    };
2616
2617                    if let Some(evaluator_res) =
2618                        eval_res.evaluator_response_ok()
2619                        && evaluator_res.appr_required
2620                    {
2621                        debug!(
2622                            msg_type = "EvaluationRes",
2623                            request_id = %self.id,
2624                            "Approval required, proceeding to approval phase"
2625                        );
2626                        self.on_event(
2627                            RequestManagerEvent::UpdateState {
2628                                state: Box::new(
2629                                    RequestManagerState::Approval {
2630                                        eval_req: *eval_req.clone(),
2631                                        eval_res: eval_res.clone(),
2632                                    },
2633                                ),
2634                            },
2635                            ctx,
2636                        )
2637                        .await;
2638
2639                        if let Err(e) = self
2640                            .build_approval(ctx, *eval_req, evaluator_res)
2641                            .await
2642                        {
2643                            error!(
2644                                msg_type = "EvaluationRes",
2645                                request_id = %self.id,
2646                                error = %e,
2647                                "Failed to build approval"
2648                            );
2649                            self.match_error(ctx, e).await;
2650                            return Ok(());
2651                        }
2652                    } else {
2653                        debug!(
2654                            msg_type = "EvaluationRes",
2655                            request_id = %self.id,
2656                            "Approval not required, proceeding to validation phase"
2657                        );
2658                        let (
2659                            request,
2660                            quorum,
2661                            signers,
2662                            init_state,
2663                            current_request_roles,
2664                        ) = match self
2665                            .build_validation_req(
2666                                ctx,
2667                                Some((*eval_req, eval_res)),
2668                                None,
2669                            )
2670                            .await
2671                        {
2672                            Ok(data) => data,
2673                            Err(e) => {
2674                                error!(
2675                                    msg_type = "EvaluationRes",
2676                                    request_id = %self.id,
2677                                    error = %e,
2678                                    "Failed to build validation request"
2679                                );
2680                                self.match_error(ctx, e).await;
2681                                return Ok(());
2682                            }
2683                        };
2684
2685                        if let Err(e) = self
2686                            .run_validation(
2687                                ctx,
2688                                request,
2689                                quorum,
2690                                signers,
2691                                init_state,
2692                                current_request_roles,
2693                            )
2694                            .await
2695                        {
2696                            error!(
2697                                msg_type = "EvaluationRes",
2698                                request_id = %self.id,
2699                                error = %e,
2700                                "Failed to run validation"
2701                            );
2702                            self.match_error(ctx, e).await;
2703                            return Ok(());
2704                        };
2705                    }
2706                }
2707            }
2708            RequestManagerMessage::ApprovalRes {
2709                appro_res,
2710                request_id,
2711            } => {
2712                if request_id == self.id {
2713                    let _ = make_obsolete(ctx, &self.subject_id).await;
2714                    debug!(
2715                        msg_type = "ApprovalRes",
2716                        request_id = %self.id,
2717                        version = self.version,
2718                        "Approval result received"
2719                    );
2720                    if let Err(e) = self.stops_childs(ctx).await {
2721                        error!(
2722                            msg_type = "ApprovalRes",
2723                            request_id = %self.id,
2724                            error = %e,
2725                            "Failed to stop childs"
2726                        );
2727                        self.match_error(ctx, e).await;
2728                        return Ok(());
2729                    };
2730
2731                    let RequestManagerState::Approval { eval_req, eval_res } =
2732                        self.state.clone()
2733                    else {
2734                        error!(
2735                            msg_type = "ApprovalRes",
2736                            request_id = %self.id,
2737                            state = ?self.state,
2738                            "Invalid state for approval response"
2739                        );
2740                        let e = ActorError::FunctionalCritical {
2741                            description: "Invalid request state".to_owned(),
2742                        };
2743                        return Err(emit_fail(ctx, e).await);
2744                    };
2745                    let (
2746                        request,
2747                        quorum,
2748                        signers,
2749                        init_state,
2750                        current_request_roles,
2751                    ) = match self
2752                        .build_validation_req(
2753                            ctx,
2754                            Some((eval_req, eval_res)),
2755                            Some(appro_res),
2756                        )
2757                        .await
2758                    {
2759                        Ok(data) => data,
2760                        Err(e) => {
2761                            error!(
2762                                msg_type = "ApprovalRes",
2763                                request_id = %self.id,
2764                                error = %e,
2765                                "Failed to build validation request"
2766                            );
2767                            self.match_error(ctx, e).await;
2768                            return Ok(());
2769                        }
2770                    };
2771
2772                    if let Err(e) = self
2773                        .run_validation(
2774                            ctx,
2775                            request,
2776                            quorum,
2777                            signers,
2778                            init_state,
2779                            current_request_roles,
2780                        )
2781                        .await
2782                    {
2783                        error!(
2784                            msg_type = "ApprovalRes",
2785                            request_id = %self.id,
2786                            error = %e,
2787                            "Failed to run validation"
2788                        );
2789                        self.match_error(ctx, e).await;
2790                        return Ok(());
2791                    };
2792                }
2793            }
2794            RequestManagerMessage::ValidationRes {
2795                val_res,
2796                val_req,
2797                request_id,
2798            } => {
2799                if request_id == self.id {
2800                    debug!(
2801                        msg_type = "ValidationRes",
2802                        request_id = %self.id,
2803                        version = self.version,
2804                        "Validation result received"
2805                    );
2806                    if let Err(e) = self.stops_childs(ctx).await {
2807                        error!(
2808                                msg_type = "ValidationRes",
2809                                request_id = %self.id,
2810                                error = %e,
2811                                "Failed to stop childs"
2812                        );
2813                        self.match_error(ctx, e).await;
2814                        return Ok(());
2815                    };
2816
2817                    let distribution_plan = match &self.state {
2818                        RequestManagerState::Validation {
2819                            distribution_plan,
2820                            ..
2821                        } => distribution_plan.clone(),
2822                        _ => {
2823                            error!(
2824                                msg_type = "ValidationRes",
2825                                request_id = %self.id,
2826                                state = ?self.state,
2827                                "Invalid state for validation response"
2828                            );
2829                            self.match_error(
2830                                ctx,
2831                                RequestManagerError::InvalidRequestState {
2832                                    expected: "Validation",
2833                                    got: "Other",
2834                                },
2835                            )
2836                            .await;
2837                            return Ok(());
2838                        }
2839                    };
2840
2841                    let signed_ledger = match self
2842                        .build_ledger(
2843                            ctx,
2844                            *val_req,
2845                            val_res,
2846                            distribution_plan.clone(),
2847                        )
2848                        .await
2849                    {
2850                        Ok(signed_ledger) => signed_ledger,
2851                        Err(e) => {
2852                            error!(
2853                                msg_type = "ValidationRes",
2854                                request_id = %self.id,
2855                                error = %e,
2856                                "Failed to build ledger"
2857                            );
2858                            self.match_error(ctx, e).await;
2859                            return Ok(());
2860                        }
2861                    };
2862
2863                    if let Err(e) = self
2864                        .update_subject(
2865                            ctx,
2866                            signed_ledger.clone(),
2867                            distribution_plan.clone(),
2868                        )
2869                        .await
2870                    {
2871                        error!(
2872                            msg_type = "ValidationRes",
2873                            request_id = %self.id,
2874                            error = %e,
2875                            "Failed to update subject"
2876                        );
2877                        self.match_error(ctx, e).await;
2878                        return Ok(());
2879                    };
2880
2881                    match self
2882                        .build_distribution(
2883                            ctx,
2884                            signed_ledger,
2885                            distribution_plan,
2886                        )
2887                        .await
2888                    {
2889                        Ok(in_distribution) => {
2890                            if !in_distribution
2891                                && let Err(e) = self.finish_request(ctx).await
2892                            {
2893                                error!(
2894                                    msg_type = "ValidationRes",
2895                                    request_id = %self.id,
2896                                    error = %e,
2897                                    "Failed to finish request after build distribution"
2898                                );
2899                                self.match_error(ctx, e).await;
2900                                return Ok(());
2901                            }
2902                        }
2903                        Err(e) => {
2904                            error!(
2905                                msg_type = "ValidationRes",
2906                                request_id = %self.id,
2907                                error = %e,
2908                                "Failed to build distribution"
2909                            );
2910                            self.match_error(ctx, e).await;
2911                            return Ok(());
2912                        }
2913                    };
2914                }
2915            }
2916            RequestManagerMessage::FinishRequest { request_id } => {
2917                if request_id == self.id {
2918                    debug!(
2919                        msg_type = "FinishRequest",
2920                        request_id = %self.id,
2921                        version = self.version,
2922                        "Finishing request"
2923                    );
2924
2925                    if let Err(e) = self.stops_childs(ctx).await {
2926                        error!(
2927                            msg_type = "FinishRequest",
2928                            request_id = %self.id,
2929                            error = %e,
2930                            "Failed to stop childs"
2931                        );
2932                        self.match_error(ctx, e).await;
2933                        return Ok(());
2934                    };
2935
2936                    if let Err(e) = self.finish_request(ctx).await {
2937                        error!(
2938                            msg_type = "FinishRequest",
2939                            request_id = %self.id,
2940                            error = %e,
2941                            "Failed to finish request"
2942                        );
2943                        self.match_error(ctx, e).await;
2944                        return Ok(());
2945                    }
2946                }
2947            }
2948        }
2949
2950        Ok(())
2951    }
2952
2953    async fn on_event(
2954        &mut self,
2955        event: RequestManagerEvent,
2956        ctx: &mut ActorContext<Self>,
2957    ) {
2958        let event_type = match &event {
2959            RequestManagerEvent::Finish => "Finish",
2960            RequestManagerEvent::UpdateState { .. } => "UpdateState",
2961            RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2962            RequestManagerEvent::SafeState { .. } => "SafeState",
2963        };
2964
2965        if let Err(e) = self.persist(&event, ctx).await {
2966            error!(
2967                event_type = event_type,
2968                request_id = %self.id,
2969                error = %e,
2970                "Failed to persist event"
2971            );
2972            emit_fail(ctx, e).await;
2973        };
2974    }
2975
2976    async fn on_child_fault(
2977        &mut self,
2978        error: ActorError,
2979        ctx: &mut ActorContext<Self>,
2980    ) -> ChildAction {
2981        error!(
2982            request_id = %self.id,
2983            version = self.version,
2984            state = ?self.state,
2985            error = %error,
2986            "Child fault in request manager"
2987        );
2988        emit_fail(ctx, error).await;
2989        ChildAction::Stop
2990    }
2991}
2992
2993#[async_trait]
2994impl PersistentActor for RequestManager {
2995    type Persistence = LightPersistence;
2996    type InitParams = InitRequestManager;
2997
2998    fn update(&mut self, state: Self) {
2999        self.command = state.command;
3000        self.request = state.request;
3001        self.state = state.state;
3002        self.version = state.version;
3003    }
3004
3005    fn create_initial(params: Self::InitParams) -> Self {
3006        Self {
3007            retry_diff: 0,
3008            retry_timeout: 0,
3009            request_started_at: None,
3010            current_phase: None,
3011            current_phase_started_at: None,
3012            our_key: params.our_key,
3013            id: DigestIdentifier::default(),
3014            subject_id: params.subject_id,
3015            governance_id: params.governance_id,
3016            command: ReqManInitMessage::Evaluate,
3017            request: None,
3018            state: RequestManagerState::Starting,
3019            version: 0,
3020            helpers: Some(params.helpers),
3021        }
3022    }
3023
3024    /// Change node state.
3025    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3026        match event {
3027            RequestManagerEvent::Finish => {
3028                debug!(
3029                    event_type = "Finish",
3030                    request_id = %self.id,
3031                    "Applying finish event"
3032                );
3033                self.state = RequestManagerState::End;
3034                self.request = None;
3035                self.id = DigestIdentifier::default();
3036            }
3037            RequestManagerEvent::UpdateState { state } => {
3038                debug!(
3039                    event_type = "UpdateState",
3040                    request_id = %self.id,
3041                    old_state = ?self.state,
3042                    new_state = ?state,
3043                    "Applying state update"
3044                );
3045                self.state = *state.clone()
3046            }
3047            RequestManagerEvent::UpdateVersion { version } => {
3048                debug!(
3049                    event_type = "UpdateVersion",
3050                    request_id = %self.id,
3051                    old_version = self.version,
3052                    new_version = version,
3053                    "Applying version update"
3054                );
3055                self.state = RequestManagerState::Starting;
3056                self.version = *version
3057            }
3058            RequestManagerEvent::SafeState { command, request } => {
3059                debug!(
3060                    event_type = "SafeState",
3061                    request_id = %self.id,
3062                    command = ?command,
3063                    "Applying safe state"
3064                );
3065                self.version = 0;
3066                self.retry_diff = 0;
3067                self.retry_timeout = 0;
3068                self.state = RequestManagerState::Starting;
3069                self.request = Some(request.clone());
3070                self.command = command.clone();
3071            }
3072        };
3073
3074        Ok(())
3075    }
3076}
3077
3078#[async_trait]
3079impl Storable for RequestManager {}