Skip to main content

ave_core/request/
manager.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9    DigestIdentifier, HashAlgorithm, PublicKey, Signed,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use ave_network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24    Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::{EvalWorkerContext, EvaluateData};
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30    HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::distribution_plan::build_tracker_event_distribution_plan;
36use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
37use crate::model::common::subject::{
38    acquire_subject, create_subject, get_gov, get_gov_sn,
39    get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
40};
41use crate::model::common::{purge_storage, send_to_tracking};
42use crate::model::event::{
43    ApprovalData, EvaluationData, Ledger, LedgerSeal, Protocols, ValidationData,
44};
45use crate::node::SubjectData;
46use crate::request::error::RequestManagerError;
47use crate::request::tracking::RequestTrackingMessage;
48use crate::request::{RequestHandler, RequestHandlerMessage};
49use crate::subject::Metadata;
50use crate::system::ConfigHelper;
51
52use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
53use crate::validation::worker::CurrentRequestRoles;
54use crate::{
55    ActorMessage, NetworkMessage, Validation, ValidationMessage,
56    approval::{Approval, ApprovalMessage},
57    auth::{Auth, AuthMessage, AuthResponse},
58    db::Storable,
59    evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
60    model::common::emit_fail,
61    update::{Update, UpdateMessage, UpdateNew, UpdateType},
62};
63
64use super::{
65    reboot::{Reboot, RebootMessage},
66    types::{
67        DistributionPlanEntry, DistributionPlanMode, ReqManInitMessage,
68        RequestManagerState,
69    },
70};
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct RequestManager {
74    #[serde(skip)]
75    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
76    #[serde(skip)]
77    our_key: Arc<PublicKey>,
78    #[serde(skip)]
79    id: DigestIdentifier,
80    #[serde(skip)]
81    subject_id: DigestIdentifier,
82    #[serde(skip)]
83    governance_id: Option<DigestIdentifier>,
84    #[serde(skip)]
85    retry_timeout: u64,
86    #[serde(skip)]
87    retry_diff: u64,
88    #[serde(skip)]
89    request_started_at: Option<Instant>,
90    #[serde(skip)]
91    current_phase: Option<&'static str>,
92    #[serde(skip)]
93    current_phase_started_at: Option<Instant>,
94    command: ReqManInitMessage,
95    request: Option<Signed<EventRequest>>,
96    state: RequestManagerState,
97    version: u64,
98}
99
100#[derive(Debug, Clone)]
101pub enum RebootType {
102    Normal,
103    Diff,
104    TimeOut,
105}
106
107pub struct InitRequestManager {
108    pub our_key: Arc<PublicKey>,
109    pub subject_id: DigestIdentifier,
110    pub governance_id: Option<DigestIdentifier>,
111    pub helpers: (HashAlgorithm, Arc<NetworkSender>),
112}
113
114impl BorshSerialize for RequestManager {
115    fn serialize<W: std::io::Write>(
116        &self,
117        writer: &mut W,
118    ) -> std::io::Result<()> {
119        BorshSerialize::serialize(&self.command, writer)?;
120        BorshSerialize::serialize(&self.state, writer)?;
121        BorshSerialize::serialize(&self.version, writer)?;
122        BorshSerialize::serialize(&self.request, writer)?;
123
124        Ok(())
125    }
126}
127
128impl BorshDeserialize for RequestManager {
129    fn deserialize_reader<R: std::io::Read>(
130        reader: &mut R,
131    ) -> std::io::Result<Self> {
132        // Deserialize the persisted fields
133        let command = ReqManInitMessage::deserialize_reader(reader)?;
134        let state = RequestManagerState::deserialize_reader(reader)?;
135        let version = u64::deserialize_reader(reader)?;
136        let request =
137            Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
138
139        let our_key = Arc::new(PublicKey::default());
140        let subject_id = DigestIdentifier::default();
141        let id = DigestIdentifier::default();
142
143        Ok(Self {
144            retry_diff: 0,
145            retry_timeout: 0,
146            request_started_at: None,
147            current_phase: None,
148            current_phase_started_at: None,
149            helpers: None,
150            our_key,
151            id,
152            subject_id,
153            governance_id: None,
154            command,
155            request,
156            state,
157            version,
158        })
159    }
160}
161
162impl RequestManager {
163    fn retry_seconds_for_attempt(schedule: &[u64], attempt: u64) -> u64 {
164        let default = schedule.last().copied().unwrap_or(1).max(1);
165        let idx = attempt.saturating_sub(1) as usize;
166        schedule.get(idx).copied().unwrap_or(default).max(1)
167    }
168
169    fn begin_request_metrics(&mut self) {
170        self.request_started_at = Some(Instant::now());
171        self.current_phase = None;
172        self.current_phase_started_at = None;
173
174        if let Some(metrics) = try_core_metrics() {
175            metrics.observe_request_started();
176        }
177    }
178
179    fn ensure_request_metrics_started(&mut self) {
180        if self.request_started_at.is_none() {
181            self.request_started_at = Some(Instant::now());
182        }
183    }
184
185    fn start_phase_metrics(&mut self, phase: &'static str) {
186        self.ensure_request_metrics_started();
187        self.finish_phase_metrics();
188        self.current_phase = Some(phase);
189        self.current_phase_started_at = Some(Instant::now());
190    }
191
192    fn finish_phase_metrics(&mut self) {
193        let Some(phase) = self.current_phase.take() else {
194            self.current_phase_started_at = None;
195            return;
196        };
197        let Some(started_at) = self.current_phase_started_at.take() else {
198            return;
199        };
200
201        if let Some(metrics) = try_core_metrics() {
202            metrics.observe_request_phase(phase, started_at.elapsed());
203        }
204    }
205
206    fn finish_request_metrics(&mut self, result: &'static str) {
207        self.finish_phase_metrics();
208
209        if let Some(started_at) = self.request_started_at.take()
210            && let Some(metrics) = try_core_metrics()
211        {
212            metrics.observe_request_terminal(result, started_at.elapsed());
213        }
214    }
215
216    //////// EVAL
217    ////////////////////////////////////////////////
218    //Revisado
219    fn tracker_evaluation_context(
220        governance_data: &GovernanceData,
221        schema_id: &SchemaType,
222        namespace: Namespace,
223        request_type: &EventRequestType,
224    ) -> Result<EvalWorkerContext, RequestManagerError> {
225        match request_type {
226            EventRequestType::Fact => {
227                let (issuers, issuer_any) = governance_data.get_signers(
228                    RoleTypes::Issuer,
229                    schema_id,
230                    namespace,
231                );
232                let schema_viewpoints = governance_data
233                    .schemas
234                    .get(schema_id)
235                    .ok_or_else(|| {
236                        crate::governance::error::GovernanceError::SchemaDoesNotExist {
237                            schema_id: schema_id.to_string(),
238                        }
239                    })?
240                    .viewpoints
241                    .clone();
242
243                Ok(EvalWorkerContext::TrackerFact {
244                    issuers: issuers.into_iter().collect(),
245                    issuer_any,
246                    schema_viewpoints,
247                })
248            }
249            EventRequestType::Transfer => {
250                let members = governance_data
251                    .members
252                    .values()
253                    .cloned()
254                    .collect::<BTreeSet<PublicKey>>();
255                let (creator_signers, _) = governance_data.get_signers(
256                    RoleTypes::Creator,
257                    schema_id,
258                    namespace.clone(),
259                );
260                let creators = creator_signers
261                    .into_iter()
262                    .map(|creator| (creator, BTreeSet::from([namespace.clone()])))
263                    .collect::<BTreeMap<PublicKey, BTreeSet<Namespace>>>();
264
265                Ok(EvalWorkerContext::TrackerTransfer { members, creators })
266            }
267            _ => Ok(EvalWorkerContext::Empty),
268        }
269    }
270
271    async fn build_evaluation(
272        &mut self,
273        ctx: &mut ActorContext<Self>,
274    ) -> Result<(), RequestManagerError> {
275        let Some(request) = self.request.clone() else {
276            return Err(RequestManagerError::RequestNotSet);
277        };
278
279        self.on_event(
280            RequestManagerEvent::UpdateState {
281                state: Box::new(RequestManagerState::Evaluation),
282            },
283            ctx,
284        )
285        .await;
286
287        let metadata = self.check_data_eval(ctx, &request).await?;
288
289        let (
290            signed_evaluation_req,
291            quorum,
292            signers,
293            init_state,
294            tracker_context,
295        ) =
296            self.build_request_eval(ctx, &metadata, &request).await?;
297
298        if signers.is_empty() {
299            warn!(
300                request_id = %self.id,
301                schema_id = %metadata.schema_id,
302                "No evaluators available for schema"
303            );
304
305            return Err(RequestManagerError::NoEvaluatorsAvailable {
306                schema_id: metadata.schema_id.to_string(),
307                governance_id: signed_evaluation_req
308                    .content()
309                    .governance_id
310                    .clone(),
311            });
312        }
313
314        self.run_evaluation(
315            ctx,
316            signed_evaluation_req.clone(),
317            quorum,
318            init_state,
319            tracker_context,
320            signers,
321        )
322        .await
323    }
324
325    // revisado
326    const fn needs_subject_manager(&self) -> bool {
327        self.governance_id.is_some()
328    }
329
330    fn requester_id(&self) -> String {
331        self.id.to_string()
332    }
333
334    fn build_distribution_plan(
335        &self,
336        validation_req: &ValidationReq,
337        governance_data: Option<&GovernanceData>,
338    ) -> Result<Vec<DistributionPlanEntry>, RequestManagerError> {
339        let mut plan: HashMap<PublicKey, DistributionPlanMode> = HashMap::new();
340
341        match validation_req {
342            ValidationReq::Create { event_request, .. } => {
343                let EventRequest::Create(create) = event_request.content()
344                else {
345                    return Err(
346                        RequestManagerError::InvalidEventRequestForEvaluation,
347                    );
348                };
349
350                if create.schema_id.is_gov() {
351                    return Ok(Vec::new());
352                }
353
354                let Some(governance_data) = governance_data else {
355                    return Err(RequestManagerError::ActorError(
356                        ActorError::FunctionalCritical {
357                            description:
358                                "Missing governance data for distribution plan"
359                                    .to_owned(),
360                        },
361                    ));
362                };
363                let witnesses =
364                    governance_data.get_witnesses(WitnessesData::Schema {
365                        creator: event_request.signature().signer.clone(),
366                        schema_id: create.schema_id.clone(),
367                        namespace: create.namespace.clone(),
368                    })?;
369
370                for witness in witnesses {
371                    plan.insert(witness, DistributionPlanMode::Clear);
372                }
373            }
374            ValidationReq::Event {
375                actual_protocols,
376                event_request,
377                metadata,
378                ..
379            } => {
380                if metadata.schema_id.is_gov() {
381                    return Ok(Vec::new());
382                }
383
384                let Some(governance_data) = governance_data else {
385                    return Err(RequestManagerError::ActorError(
386                        ActorError::FunctionalCritical {
387                            description:
388                                "Missing governance data for distribution plan"
389                                    .to_owned(),
390                        },
391                    ));
392                };
393                let protocols_success = actual_protocols.is_success();
394
395                return build_tracker_event_distribution_plan(
396                    governance_data,
397                    event_request.content(),
398                    metadata,
399                    protocols_success,
400                )
401                .map_err(|description| {
402                    RequestManagerError::ActorError(
403                        ActorError::FunctionalCritical { description },
404                    )
405                });
406            }
407        }
408
409        Ok(plan
410            .into_iter()
411            .map(|(node, mode)| DistributionPlanEntry { node, mode })
412            .collect())
413    }
414
415    async fn check_data_eval(
416        &self,
417        ctx: &mut ActorContext<Self>,
418        request: &Signed<EventRequest>,
419    ) -> Result<Metadata, RequestManagerError> {
420        let (subject_id, confirm) = match request.content().clone() {
421            EventRequest::Fact(event) => (event.subject_id, false),
422            EventRequest::Transfer(event) => (event.subject_id, false),
423            EventRequest::Confirm(event) => (event.subject_id, true),
424            _ => {
425                return Err(
426                    RequestManagerError::InvalidEventRequestForEvaluation,
427                );
428            }
429        };
430
431        let lease = acquire_subject(
432            ctx,
433            &self.subject_id,
434            self.requester_id(),
435            None,
436            self.needs_subject_manager(),
437        )
438        .await?;
439        let metadata = get_metadata(ctx, &subject_id).await;
440        lease.finish(ctx).await?;
441        let metadata = metadata?;
442
443        if confirm && !metadata.schema_id.is_gov() {
444            return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
445        }
446
447        Ok(metadata)
448    }
449
450    async fn get_governance_data(
451        &self,
452        ctx: &mut ActorContext<Self>,
453    ) -> Result<GovernanceData, RequestManagerError> {
454        let governance_id =
455            self.governance_id.as_ref().unwrap_or(&self.subject_id);
456        Ok(get_gov(ctx, governance_id).await?)
457    }
458
459    async fn build_request_eval(
460        &self,
461        ctx: &mut ActorContext<Self>,
462        metadata: &Metadata,
463        request: &Signed<EventRequest>,
464    ) -> Result<
465        (
466            Signed<EvaluationReq>,
467            Quorum,
468            HashSet<PublicKey>,
469            Option<ValueWrapper>,
470            EvalWorkerContext,
471        ),
472        RequestManagerError,
473    > {
474        let is_gov = metadata.schema_id.is_gov();
475
476        let request_type = EventRequestType::from(request.content());
477        let (evaluate_data, governance_data, init_state, tracker_context) = match (
478            is_gov,
479            request_type.clone(),
480        ) {
481            (true, EventRequestType::Fact) => {
482                let state =
483                    GovernanceData::try_from(metadata.properties.clone())?;
484
485                (
486                    EvaluateData::GovFact {
487                        state: state.clone(),
488                    },
489                    state,
490                    None,
491                    EvalWorkerContext::default(),
492                )
493            }
494            (true, EventRequestType::Transfer) => {
495                let state =
496                    GovernanceData::try_from(metadata.properties.clone())?;
497
498                (
499                    EvaluateData::GovTransfer {
500                        state: state.clone(),
501                    },
502                    state,
503                    None,
504                    EvalWorkerContext::default(),
505                )
506            }
507            (true, EventRequestType::Confirm) => {
508                let state =
509                    GovernanceData::try_from(metadata.properties.clone())?;
510
511                (
512                    EvaluateData::GovConfirm {
513                        state: state.clone(),
514                    },
515                    state,
516                    None,
517                    EvalWorkerContext::default(),
518                )
519            }
520            (false, EventRequestType::Fact) => {
521                let governance_data =
522                    get_gov(ctx, &metadata.governance_id).await?;
523
524                let init_state =
525                    governance_data.get_init_state(&metadata.schema_id)?;
526                let tracker_context = Self::tracker_evaluation_context(
527                    &governance_data,
528                    &metadata.schema_id,
529                    metadata.namespace.clone(),
530                    &request_type,
531                )?;
532
533                (
534                    EvaluateData::TrackerSchemasFact {
535                        state: metadata.properties.clone(),
536                    },
537                    governance_data,
538                    Some(init_state),
539                    tracker_context,
540                )
541            }
542            (false, EventRequestType::Transfer) => {
543                let governance_data =
544                    get_gov(ctx, &metadata.governance_id).await?;
545                let tracker_context = Self::tracker_evaluation_context(
546                    &governance_data,
547                    &metadata.schema_id,
548                    metadata.namespace.clone(),
549                    &request_type,
550                )?;
551                (
552                    EvaluateData::TrackerSchemasTransfer {
553                        state: metadata.properties.clone(),
554                    },
555                    governance_data,
556                    None,
557                    tracker_context,
558                )
559            }
560            _ => {
561                error!(
562                    request_id = %self.id,
563                    is_gov = is_gov,
564                    request_type = ?request_type,
565                    "Invalid event request type for evaluation state"
566                );
567                return Err(
568                    RequestManagerError::InvalidEventRequestForEvaluation,
569                );
570            }
571        };
572
573        let (signers, quorum) = governance_data.get_quorum_and_signers(
574            ProtocolTypes::Evaluation,
575            &metadata.schema_id,
576            metadata.namespace.clone(),
577        )?;
578
579        let eval_req = EvaluationReq {
580            event_request: request.clone(),
581            data: evaluate_data,
582            sn: metadata.sn + 1,
583            gov_version: governance_data.version,
584            namespace: metadata.namespace.clone(),
585            schema_id: metadata.schema_id.clone(),
586            signer: (*self.our_key).clone(),
587            signer_is_owner: *self.our_key == request.signature().signer,
588            governance_id: metadata.governance_id.clone(),
589        };
590
591        let signature = get_sign(
592            ctx,
593            SignTypesNode::EvaluationReq(Box::new(eval_req.clone())),
594        )
595        .await?;
596
597        let signed_evaluation_req: Signed<EvaluationReq> =
598            Signed::from_parts(eval_req, signature);
599        Ok((
600            signed_evaluation_req,
601            quorum,
602            signers,
603            init_state,
604            tracker_context,
605        ))
606    }
607
608    async fn run_evaluation(
609        &mut self,
610        ctx: &mut ActorContext<Self>,
611        request: Signed<EvaluationReq>,
612        quorum: Quorum,
613        init_state: Option<ValueWrapper>,
614        context: EvalWorkerContext,
615        signers: HashSet<PublicKey>,
616    ) -> Result<(), RequestManagerError> {
617        let Some((hash, network)) = self.helpers.clone() else {
618            return Err(RequestManagerError::HelpersNotInitialized);
619        };
620
621        self.start_phase_metrics("evaluation");
622        info!("Init evaluation {}", self.id);
623        let child = ctx
624            .create_child(
625                "evaluation",
626                Evaluation::new(
627                    self.our_key.clone(),
628                    request,
629                    quorum,
630                    init_state,
631                    context,
632                    hash,
633                    network,
634                ),
635            )
636            .await?;
637
638        child
639            .tell(EvaluationMessage::Create {
640                request_id: self.id.clone(),
641                version: self.version,
642                signers,
643            })
644            .await?;
645
646        send_to_tracking(
647            ctx,
648            RequestTrackingMessage::UpdateState {
649                request_id: self.id.clone(),
650                state: RequestState::Evaluation,
651            },
652        )
653        .await?;
654
655        Ok(())
656    }
657    //////// APPROVE
658    ////////////////////////////////////////////////
659    async fn build_request_appro(
660        &self,
661        ctx: &mut ActorContext<Self>,
662        eval_req: EvaluationReq,
663        evaluator_res: EvaluatorResponse,
664    ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
665        let request = ApprovalReq {
666            subject_id: self.subject_id.clone(),
667            sn: eval_req.sn,
668            gov_version: eval_req.gov_version,
669            patch: evaluator_res.patch,
670            signer: eval_req.signer,
671        };
672
673        let signature =
674            get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
675
676        let signed_approval_req: Signed<ApprovalReq> =
677            Signed::from_parts(request, signature);
678
679        Ok(signed_approval_req)
680    }
681
682    async fn build_approval(
683        &mut self,
684        ctx: &mut ActorContext<Self>,
685        eval_req: EvaluationReq,
686        eval_res: EvaluatorResponse,
687    ) -> Result<(), RequestManagerError> {
688        let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
689
690        let governance_data =
691            get_gov(ctx, &request.content().subject_id).await?;
692
693        let (signers, quorum) = governance_data.get_quorum_and_signers(
694            ProtocolTypes::Approval,
695            &SchemaType::Governance,
696            Namespace::new(),
697        )?;
698
699        if signers.is_empty() {
700            warn!(
701                request_id = %self.id,
702                schema_id = %SchemaType::Governance,
703                "No approvers available for schema"
704            );
705
706            return Err(RequestManagerError::NoApproversAvailable {
707                schema_id: SchemaType::Governance.to_string(),
708                governance_id: self
709                    .governance_id
710                    .clone()
711                    .unwrap_or_else(|| self.subject_id.clone()),
712            });
713        }
714
715        self.run_approval(ctx, request, quorum, signers).await
716    }
717
718    async fn run_approval(
719        &mut self,
720        ctx: &mut ActorContext<Self>,
721        request: Signed<ApprovalReq>,
722        quorum: Quorum,
723        signers: HashSet<PublicKey>,
724    ) -> Result<(), RequestManagerError> {
725        let Some((hash, network)) = self.helpers.clone() else {
726            return Err(RequestManagerError::HelpersNotInitialized);
727        };
728
729        self.start_phase_metrics("approval");
730        info!("Init approval {}", self.id);
731        let child = ctx
732            .create_child(
733                "approval",
734                Approval::new(
735                    self.our_key.clone(),
736                    request,
737                    quorum,
738                    signers,
739                    hash,
740                    network,
741                ),
742            )
743            .await?;
744
745        child
746            .tell(ApprovalMessage::Create {
747                request_id: self.id.clone(),
748                version: self.version,
749            })
750            .await?;
751
752        send_to_tracking(
753            ctx,
754            RequestTrackingMessage::UpdateState {
755                request_id: self.id.clone(),
756                state: RequestState::Approval,
757            },
758        )
759        .await?;
760
761        Ok(())
762    }
763
764    //////// VALI
765    ////////////////////////////////////////////////
766    async fn build_validation_req(
767        &mut self,
768        ctx: &mut ActorContext<Self>,
769        eval: Option<(EvaluationReq, EvaluationData)>,
770        appro_data: Option<ApprovalData>,
771    ) -> Result<
772        (
773            Signed<ValidationReq>,
774            Quorum,
775            HashSet<PublicKey>,
776            Option<ValueWrapper>,
777            CurrentRequestRoles,
778        ),
779        RequestManagerError,
780    > {
781        let (
782            vali_req,
783            quorum,
784            signers,
785            init_state,
786            current_request_roles,
787            schema_id,
788            governance_data,
789        ) = self.build_validation_data(ctx, eval, appro_data).await?;
790
791        if signers.is_empty() {
792            let governance_id = vali_req.get_governance_id().map_err(|error| {
793                error!(
794                    request_id = %self.id,
795                    schema_id = %schema_id,
796                    error = %error,
797                    "Validation request has invalid governance_id"
798                );
799                RequestManagerError::ActorError(
800                    ActorError::FunctionalCritical {
801                        description: format!(
802                            "Validation request has invalid governance_id: {}",
803                            error
804                        ),
805                    },
806                )
807            })?;
808
809            warn!(
810                request_id = %self.id,
811                schema_id = %schema_id,
812                "No validators available for schema"
813            );
814
815            return Err(RequestManagerError::NoValidatorsAvailable {
816                schema_id: schema_id.to_string(),
817                governance_id,
818            });
819        }
820
821        let signature = get_sign(
822            ctx,
823            SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
824        )
825        .await?;
826
827        let distribution_plan =
828            self.build_distribution_plan(&vali_req, governance_data.as_ref())?;
829
830        let signed_validation_req: Signed<ValidationReq> =
831            Signed::from_parts(vali_req, signature);
832
833        self.on_event(
834            RequestManagerEvent::UpdateState {
835                state: Box::new(RequestManagerState::Validation {
836                    request: Box::new(signed_validation_req.clone()),
837                    quorum: quorum.clone(),
838                    init_state: init_state.clone(),
839                    current_request_roles: current_request_roles.clone(),
840                    signers: signers.clone(),
841                    distribution_plan: distribution_plan.clone(),
842                }),
843            },
844            ctx,
845        )
846        .await;
847
848        Ok((
849            signed_validation_req,
850            quorum,
851            signers,
852            init_state,
853            current_request_roles,
854        ))
855    }
856
857    async fn build_validation_data(
858        &self,
859        ctx: &mut ActorContext<Self>,
860        eval: Option<(EvaluationReq, EvaluationData)>,
861        appro_data: Option<ApprovalData>,
862    ) -> Result<
863        (
864            ValidationReq,
865            Quorum,
866            HashSet<PublicKey>,
867            Option<ValueWrapper>,
868            CurrentRequestRoles,
869            SchemaType,
870            Option<GovernanceData>,
871        ),
872        RequestManagerError,
873    > {
874        let Some(request) = self.request.clone() else {
875            return Err(RequestManagerError::RequestNotSet);
876        };
877
878        if let EventRequest::Create(create) = request.content() {
879            if create.schema_id.is_gov() {
880                let governance_data =
881                    GovernanceData::new((*self.our_key).clone());
882                let (signers, quorum) = governance_data
883                    .get_quorum_and_signers(
884                        ProtocolTypes::Validation,
885                        &SchemaType::Governance,
886                        Namespace::new(),
887                    )?;
888
889                Ok((
890                    ValidationReq::Create {
891                        event_request: request.clone(),
892                        gov_version: 0,
893                        subject_id: self.subject_id.clone(),
894                    },
895                    quorum,
896                    signers,
897                    None,
898                    CurrentRequestRoles {
899                        evaluation: RoleDataRegister {
900                            workers: HashSet::new(),
901                            quorum: Quorum::default(),
902                        },
903                        approval: RoleDataRegister {
904                            workers: HashSet::new(),
905                            quorum: Quorum::default(),
906                        },
907                    },
908                    SchemaType::Governance,
909                    None,
910                ))
911            } else {
912                let governance_data =
913                    get_gov(ctx, &create.governance_id).await?;
914
915                let (signers, quorum) = governance_data
916                    .get_quorum_and_signers(
917                        ProtocolTypes::Validation,
918                        &create.schema_id,
919                        create.namespace.clone(),
920                    )?;
921
922                let init_state =
923                    governance_data.get_init_state(&create.schema_id)?;
924
925                Ok((
926                    ValidationReq::Create {
927                        event_request: request.clone(),
928                        gov_version: governance_data.version,
929                        subject_id: self.subject_id.clone(),
930                    },
931                    quorum,
932                    signers,
933                    Some(init_state),
934                    CurrentRequestRoles {
935                        evaluation: RoleDataRegister {
936                            workers: HashSet::new(),
937                            quorum: Quorum::default(),
938                        },
939                        approval: RoleDataRegister {
940                            workers: HashSet::new(),
941                            quorum: Quorum::default(),
942                        },
943                    },
944                    create.schema_id.clone(),
945                    Some(governance_data),
946                ))
947            }
948        } else {
949            let Some((hash, ..)) = self.helpers else {
950                return Err(RequestManagerError::HelpersNotInitialized);
951            };
952
953            let governance_data = self.get_governance_data(ctx).await?;
954
955            let (actual_protocols, gov_version, sn) =
956                if let Some((eval_req, eval_data)) = eval {
957                    if let Some(approval_data) = appro_data.clone() {
958                        (
959                            ActualProtocols::EvalApprove {
960                                eval_data,
961                                approval_data,
962                            },
963                            eval_req.gov_version,
964                            Some(eval_req.sn),
965                        )
966                    } else {
967                        (
968                            ActualProtocols::Eval { eval_data },
969                            eval_req.gov_version,
970                            Some(eval_req.sn),
971                        )
972                    }
973                } else {
974                    (ActualProtocols::None, governance_data.version, None)
975                };
976
977            let lease = acquire_subject(
978                ctx,
979                &self.subject_id,
980                self.requester_id(),
981                None,
982                self.needs_subject_manager(),
983            )
984            .await?;
985            let metadata = get_metadata(ctx, &self.subject_id).await;
986            let last_ledger_event = match metadata {
987                Ok(metadata) => {
988                    let last_ledger_event =
989                        get_last_ledger_event(ctx, &self.subject_id).await;
990                    lease.finish(ctx).await?;
991                    (metadata, last_ledger_event?)
992                }
993                Err(error) => {
994                    lease.finish(ctx).await?;
995                    return Err(error.into());
996                }
997            };
998
999            let (metadata, last_ledger_event) = last_ledger_event;
1000
1001            if gov_version != governance_data.version {
1002                return Err(RequestManagerError::GovernanceVersionChanged {
1003                    governance_id: metadata.governance_id,
1004                    expected: gov_version,
1005                    current: governance_data.version,
1006                });
1007            }
1008
1009            let sn = if let Some(sn) = sn {
1010                sn
1011            } else {
1012                metadata.sn + 1
1013            };
1014
1015            let (signers, quorum) = governance_data.get_quorum_and_signers(
1016                ProtocolTypes::Validation,
1017                &metadata.schema_id,
1018                metadata.namespace.clone(),
1019            )?;
1020
1021            let Some(last_ledger_event) = last_ledger_event else {
1022                return Err(RequestManagerError::LastLedgerEventNotFound);
1023            };
1024
1025            let ledger_hash = last_ledger_event.ledger_hash(hash)?;
1026            let schema_id = metadata.schema_id.clone();
1027
1028            let current_request_roles =
1029                if gov_version == governance_data.version {
1030                    let (evaluation_workers, evaluation_quorum) =
1031                        governance_data.get_quorum_and_signers(
1032                            ProtocolTypes::Evaluation,
1033                            &metadata.schema_id,
1034                            metadata.namespace.clone(),
1035                        )?;
1036
1037                    let (approval_workers, approval_quorum) =
1038                        if appro_data.is_some() {
1039                            governance_data.get_quorum_and_signers(
1040                                ProtocolTypes::Approval,
1041                                &SchemaType::Governance,
1042                                Namespace::new(),
1043                            )?
1044                        } else {
1045                            (HashSet::new(), Quorum::default())
1046                        };
1047
1048                    CurrentRequestRoles {
1049                        evaluation: RoleDataRegister {
1050                            workers: evaluation_workers,
1051                            quorum: evaluation_quorum,
1052                        },
1053                        approval: RoleDataRegister {
1054                            workers: approval_workers,
1055                            quorum: approval_quorum,
1056                        },
1057                    }
1058                } else {
1059                    CurrentRequestRoles {
1060                        evaluation: RoleDataRegister {
1061                            workers: HashSet::new(),
1062                            quorum: Quorum::default(),
1063                        },
1064                        approval: RoleDataRegister {
1065                            workers: HashSet::new(),
1066                            quorum: Quorum::default(),
1067                        },
1068                    }
1069                };
1070
1071            Ok((
1072                ValidationReq::Event {
1073                    actual_protocols: Box::new(actual_protocols),
1074                    event_request: request.clone(),
1075                    metadata: Box::new(metadata),
1076                    last_data: Box::new(LastData {
1077                        vali_data: last_ledger_event
1078                            .protocols
1079                            .get_validation_data(),
1080                        gov_version: last_ledger_event.gov_version,
1081                    }),
1082                    gov_version,
1083                    ledger_hash,
1084                    sn,
1085                },
1086                quorum,
1087                signers,
1088                None,
1089                current_request_roles,
1090                schema_id,
1091                Some(governance_data),
1092            ))
1093        }
1094    }
1095
1096    async fn run_validation(
1097        &mut self,
1098        ctx: &mut ActorContext<Self>,
1099        request: Signed<ValidationReq>,
1100        quorum: Quorum,
1101        signers: HashSet<PublicKey>,
1102        init_state: Option<ValueWrapper>,
1103        current_request_roles: CurrentRequestRoles,
1104    ) -> Result<(), RequestManagerError> {
1105        let Some((hash, network)) = self.helpers.clone() else {
1106            return Err(RequestManagerError::HelpersNotInitialized);
1107        };
1108
1109        self.start_phase_metrics("validation");
1110        info!("Init validation {}", self.id);
1111        let child = ctx
1112            .create_child(
1113                "validation",
1114                Validation::new(
1115                    self.our_key.clone(),
1116                    request,
1117                    init_state,
1118                    current_request_roles,
1119                    quorum,
1120                    hash,
1121                    network,
1122                ),
1123            )
1124            .await?;
1125
1126        child
1127            .tell(ValidationMessage::Create {
1128                request_id: self.id.clone(),
1129                version: self.version,
1130                signers,
1131            })
1132            .await?;
1133
1134        send_to_tracking(
1135            ctx,
1136            RequestTrackingMessage::UpdateState {
1137                request_id: self.id.clone(),
1138                state: RequestState::Validation,
1139            },
1140        )
1141        .await?;
1142
1143        Ok(())
1144    }
1145    //////// Distribution
1146    ////////////////////////////////////////////////
1147    async fn build_ledger(
1148        &mut self,
1149        ctx: &mut ActorContext<Self>,
1150        val_req: ValidationReq,
1151        val_res: ValidationData,
1152        distribution_plan: Vec<DistributionPlanEntry>,
1153    ) -> Result<Ledger, RequestManagerError> {
1154        let Some((hash, ..)) = self.helpers else {
1155            return Err(RequestManagerError::HelpersNotInitialized);
1156        };
1157
1158        let (protocols, ledger_seal) = match val_req {
1159            ValidationReq::Create {
1160                event_request,
1161                gov_version,
1162                ..
1163            } => {
1164                let protocols = Protocols::Create {
1165                    event_request,
1166                    validation: val_res,
1167                };
1168
1169                let protocols_hash = protocols.hash_for_ledger(&hash)?;
1170
1171                let ledger_seal = LedgerSeal {
1172                    gov_version,
1173                    sn: 0,
1174                    prev_ledger_event_hash: DigestIdentifier::default(),
1175                    protocols_hash,
1176                };
1177
1178                (protocols, ledger_seal)
1179            }
1180            ValidationReq::Event {
1181                actual_protocols,
1182                event_request,
1183                ledger_hash,
1184                metadata,
1185                gov_version,
1186                sn,
1187                ..
1188            } => {
1189                let protocols = Protocols::build(
1190                    metadata.schema_id.is_gov(),
1191                    event_request,
1192                    *actual_protocols,
1193                    val_res,
1194                )?;
1195
1196                let protocols_hash = protocols.hash_for_ledger(&hash)?;
1197
1198                let ledger_seal = LedgerSeal {
1199                    gov_version,
1200                    sn,
1201                    prev_ledger_event_hash: ledger_hash,
1202                    protocols_hash,
1203                };
1204
1205                (protocols, ledger_seal)
1206            }
1207        };
1208
1209        let signature =
1210            get_sign(ctx, SignTypesNode::LedgerSeal(ledger_seal.clone()))
1211                .await?;
1212
1213        let ledger = Ledger {
1214            gov_version: ledger_seal.gov_version,
1215            sn: ledger_seal.sn,
1216            prev_ledger_event_hash: ledger_seal.prev_ledger_event_hash,
1217            ledger_seal_signature: signature,
1218            protocols,
1219        };
1220
1221        self.on_event(
1222            RequestManagerEvent::UpdateState {
1223                state: Box::new(RequestManagerState::UpdateSubject {
1224                    ledger: ledger.clone(),
1225                    distribution_plan: distribution_plan.clone(),
1226                }),
1227            },
1228            ctx,
1229        )
1230        .await;
1231
1232        Ok(ledger)
1233    }
1234
1235    async fn update_subject(
1236        &mut self,
1237        ctx: &mut ActorContext<Self>,
1238        ledger: Ledger,
1239        distribution_plan: Vec<DistributionPlanEntry>,
1240    ) -> Result<(), RequestManagerError> {
1241        if ledger.get_event_request_type().is_create_event() {
1242            if let Err(e) = create_subject(ctx, ledger.clone()).await {
1243                if let ActorError::Functional { .. } = e {
1244                    return Err(RequestManagerError::CheckLimit);
1245                }
1246                return Err(e.into());
1247            }
1248        } else {
1249            let lease = acquire_subject(
1250                ctx,
1251                &self.subject_id,
1252                self.requester_id(),
1253                None,
1254                self.needs_subject_manager(),
1255            )
1256            .await?;
1257            let update_result =
1258                update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1259                    .await;
1260            lease.finish(ctx).await?;
1261            update_result?;
1262        }
1263
1264        self.on_event(
1265            RequestManagerEvent::UpdateState {
1266                state: Box::new(RequestManagerState::Distribution {
1267                    ledger,
1268                    distribution_plan,
1269                }),
1270            },
1271            ctx,
1272        )
1273        .await;
1274
1275        Ok(())
1276    }
1277
1278    async fn build_distribution(
1279        &mut self,
1280        ctx: &mut ActorContext<Self>,
1281        ledger: Ledger,
1282        mut distribution_plan: Vec<DistributionPlanEntry>,
1283    ) -> Result<bool, RequestManagerError> {
1284        let is_governance = match ledger.get_event_request() {
1285            Some(EventRequest::Create(create)) => create.schema_id.is_gov(),
1286            Some(_) => self.governance_id.is_none(),
1287            None => false,
1288        };
1289
1290        if is_governance {
1291            let governance_id = ledger.get_subject_id();
1292            let governance_data = get_gov(ctx, &governance_id).await?;
1293            distribution_plan = governance_data
1294                .get_witnesses(WitnessesData::Gov)?
1295                .into_iter()
1296                .map(|node| DistributionPlanEntry {
1297                    node,
1298                    mode: DistributionPlanMode::Clear,
1299                })
1300                .collect();
1301        }
1302
1303        if distribution_plan.is_empty() {
1304            return Ok(false);
1305        }
1306
1307        distribution_plan.retain(|entry| entry.node != *self.our_key);
1308
1309        if distribution_plan.is_empty() {
1310            warn!(
1311                request_id = %self.id,
1312                "No witnesses available for distribution"
1313            );
1314            return Ok(false);
1315        }
1316
1317        self.run_distribution(ctx, distribution_plan, ledger)
1318            .await?;
1319
1320        Ok(true)
1321    }
1322
1323    async fn run_distribution(
1324        &mut self,
1325        ctx: &mut ActorContext<Self>,
1326        distribution_plan: Vec<DistributionPlanEntry>,
1327        ledger: Ledger,
1328    ) -> Result<(), RequestManagerError> {
1329        let Some((.., network)) = self.helpers.clone() else {
1330            return Err(RequestManagerError::HelpersNotInitialized);
1331        };
1332
1333        self.start_phase_metrics("distribution");
1334        info!("Init distribution {}", self.id);
1335        let child = ctx
1336            .create_child(
1337                "distribution",
1338                Distribution::new(
1339                    network,
1340                    DistributionType::Request,
1341                    self.id.clone(),
1342                ),
1343            )
1344            .await?;
1345
1346        child
1347            .tell(DistributionMessage::Create {
1348                ledger: Box::new(ledger),
1349                distribution_plan,
1350            })
1351            .await?;
1352
1353        send_to_tracking(
1354            ctx,
1355            RequestTrackingMessage::UpdateState {
1356                request_id: self.id.clone(),
1357                state: RequestState::Distribution,
1358            },
1359        )
1360        .await?;
1361
1362        Ok(())
1363    }
1364
1365    //////// Reboot
1366    ////////////////////////////////////////////////
1367    async fn init_wait(
1368        &self,
1369        ctx: &mut ActorContext<Self>,
1370        governance_id: &DigestIdentifier,
1371    ) -> Result<(), RequestManagerError> {
1372        let Some(config): Option<ConfigHelper> =
1373            ctx.system().get_helper("config").await
1374        else {
1375            return Err(RequestManagerError::ActorError(ActorError::Helper {
1376                name: "config".to_owned(),
1377                reason: "Not found".to_owned(),
1378            }));
1379        };
1380        let actor = ctx
1381            .create_child(
1382                "reboot",
1383                Reboot::new(
1384                    governance_id.clone(),
1385                    self.id.clone(),
1386                    config.sync_reboot.stability_check_interval_secs,
1387                    config.sync_reboot.stability_check_max_retries,
1388                ),
1389            )
1390            .await?;
1391
1392        actor.tell(RebootMessage::Init).await?;
1393
1394        Ok(())
1395    }
1396
1397    async fn init_update(
1398        &self,
1399        ctx: &mut ActorContext<Self>,
1400        governance_id: &DigestIdentifier,
1401    ) -> Result<(), RequestManagerError> {
1402        let Some((.., network)) = self.helpers.clone() else {
1403            return Err(RequestManagerError::HelpersNotInitialized);
1404        };
1405
1406        let gov_sn = get_gov_sn(ctx, governance_id).await?;
1407
1408        let governance_data = get_gov(ctx, governance_id).await?;
1409
1410        let mut witnesses = {
1411            let gov_witnesses =
1412                governance_data.get_witnesses(WitnessesData::Gov)?;
1413
1414            let auth_witnesses =
1415                Self::get_witnesses_auth(ctx, governance_id.clone())
1416                    .await
1417                    .unwrap_or_default();
1418
1419            gov_witnesses
1420                .union(&auth_witnesses)
1421                .cloned()
1422                .collect::<HashSet<PublicKey>>()
1423        };
1424
1425        witnesses.remove(&self.our_key);
1426
1427        if witnesses.is_empty() {
1428            if let Ok(actor) = ctx.reference().await {
1429                actor
1430                    .tell(RequestManagerMessage::FinishReboot {
1431                        request_id: self.id.clone(),
1432                    })
1433                    .await?;
1434            };
1435        } else if witnesses.len() == 1 {
1436            let Some(objetive) = witnesses.iter().next() else {
1437                error!(
1438                    request_id = %self.id,
1439                    governance_id = %governance_id,
1440                    "Witness set became empty while selecting single reboot target"
1441                );
1442                return Err(RequestManagerError::ActorError(
1443                    ActorError::FunctionalCritical {
1444                        description:
1445                            "Witness set became empty while selecting single reboot target"
1446                                .to_owned(),
1447                    },
1448                ));
1449            };
1450            let info = ComunicateInfo {
1451                receiver: objetive.clone(),
1452                request_id: String::default(),
1453                version: 0,
1454                receiver_actor: format!(
1455                    "/user/node/distributor_{}",
1456                    governance_id
1457                ),
1458            };
1459
1460            network
1461                .send_command(ave_network::CommandHelper::SendMessage {
1462                    message: NetworkMessage {
1463                        info,
1464                        message: ActorMessage::DistributionLedgerReq {
1465                            actual_sn: Some(gov_sn),
1466                            target_sn: None,
1467                            subject_id: governance_id.clone(),
1468                        },
1469                    },
1470                })
1471                .await?;
1472
1473            let Ok(actor) = ctx.reference().await else {
1474                return Ok(());
1475            };
1476
1477            actor
1478                .tell(RequestManagerMessage::RebootWait {
1479                    request_id: self.id.clone(),
1480                    governance_id: governance_id.clone(),
1481                })
1482                .await?;
1483        } else {
1484            let Some(config): Option<ConfigHelper> =
1485                ctx.system().get_helper("config").await
1486            else {
1487                return Ok(());
1488            };
1489            let data = UpdateNew {
1490                network,
1491                subject_id: governance_id.clone(),
1492                our_sn: Some(gov_sn),
1493                witnesses,
1494                update_type: UpdateType::Request {
1495                    subject_id: self.subject_id.clone(),
1496                    id: self.id.clone(),
1497                },
1498                subject_kind_hint: Some(
1499                    crate::update::UpdateSubjectKind::Governance,
1500                ),
1501                round_retry_interval_secs: config
1502                    .sync_update
1503                    .round_retry_interval_secs,
1504                max_round_retries: config.sync_update.max_round_retries,
1505                witness_retry_count: config.sync_update.witness_retry_count,
1506                witness_retry_interval_secs: config
1507                    .sync_update
1508                    .witness_retry_interval_secs,
1509            };
1510
1511            let updater = Update::new(data);
1512            let Ok(child) = ctx.create_child("update", updater).await else {
1513                let Ok(actor) = ctx.reference().await else {
1514                    return Ok(());
1515                };
1516
1517                actor
1518                    .tell(RequestManagerMessage::RebootWait {
1519                        request_id: self.id.clone(),
1520                        governance_id: governance_id.clone(),
1521                    })
1522                    .await?;
1523
1524                return Ok(());
1525            };
1526
1527            child.tell(UpdateMessage::Run).await?;
1528        }
1529
1530        Ok(())
1531    }
1532
1533    async fn get_witnesses_auth(
1534        ctx: &ActorContext<Self>,
1535        governance_id: DigestIdentifier,
1536    ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1537        let path = ActorPath::from("/user/node/auth");
1538        let actor = ctx.system().get_actor::<Auth>(&path).await?;
1539
1540        let response = actor
1541            .ask(AuthMessage::GetAuth {
1542                subject_id: governance_id,
1543            })
1544            .await?;
1545
1546        match response {
1547            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1548            _ => Err(RequestManagerError::ActorError(
1549                ActorError::UnexpectedResponse {
1550                    path,
1551                    expected: "AuthResponse::Witnesses".to_owned(),
1552                },
1553            )),
1554        }
1555    }
1556
1557    //////// General
1558    ////////////////////////////////////////////////
1559    async fn send_reboot(
1560        &self,
1561        ctx: &ActorContext<Self>,
1562        governance_id: DigestIdentifier,
1563    ) -> Result<(), ActorError> {
1564        let Ok(actor) = ctx.reference().await else {
1565            return Ok(());
1566        };
1567
1568        actor
1569            .tell(RequestManagerMessage::Reboot {
1570                request_id: self.id.clone(),
1571                governance_id,
1572                reboot_type: RebootType::TimeOut,
1573            })
1574            .await
1575    }
1576
1577    async fn match_error(
1578        &mut self,
1579        ctx: &mut ActorContext<Self>,
1580        error: RequestManagerError,
1581    ) {
1582        match error {
1583            RequestManagerError::NoEvaluatorsAvailable {
1584                governance_id,
1585                ..
1586            }
1587            | RequestManagerError::NoApproversAvailable {
1588                governance_id, ..
1589            }
1590            | RequestManagerError::NoValidatorsAvailable {
1591                governance_id,
1592                ..
1593            }
1594            | RequestManagerError::GovernanceVersionChanged {
1595                governance_id,
1596                ..
1597            } => {
1598                if let Err(e) = self.send_reboot(ctx, governance_id).await {
1599                    emit_fail(ctx, e).await;
1600                }
1601            }
1602            RequestManagerError::CheckLimit
1603            | RequestManagerError::Governance(..)
1604            | RequestManagerError::NotIssuer
1605            | RequestManagerError::NotCreator => {
1606                if let Err(e) = self
1607                    .abort_request(
1608                        ctx,
1609                        error.to_string(),
1610                        None,
1611                        (*self.our_key).clone(),
1612                    )
1613                    .await
1614                {
1615                    emit_fail(
1616                        ctx,
1617                        ActorError::FunctionalCritical {
1618                            description: e.to_string(),
1619                        },
1620                    )
1621                    .await;
1622                }
1623            }
1624            _ => {
1625                emit_fail(
1626                    ctx,
1627                    ActorError::FunctionalCritical {
1628                        description: error.to_string(),
1629                    },
1630                )
1631                .await;
1632            }
1633        }
1634    }
1635
1636    async fn finish_request(
1637        &mut self,
1638        ctx: &mut ActorContext<Self>,
1639    ) -> Result<(), RequestManagerError> {
1640        self.finish_request_metrics("finished");
1641        info!("Ending {}", self.id);
1642        send_to_tracking(
1643            ctx,
1644            RequestTrackingMessage::UpdateState {
1645                request_id: self.id.clone(),
1646                state: RequestState::Finish,
1647            },
1648        )
1649        .await?;
1650
1651        self.on_event(RequestManagerEvent::Finish, ctx).await;
1652
1653        self.end_request(ctx).await?;
1654
1655        Ok(())
1656    }
1657
1658    async fn reboot(
1659        &mut self,
1660        ctx: &mut ActorContext<Self>,
1661        reboot_type: RebootType,
1662        governance_id: DigestIdentifier,
1663    ) -> Result<(), RequestManagerError> {
1664        let Some(config): Option<ConfigHelper> =
1665            ctx.system().get_helper("config").await
1666        else {
1667            return Err(ActorError::Helper {
1668                name: "config".to_owned(),
1669                reason: "Not found".to_owned(),
1670            }
1671            .into());
1672        };
1673        self.start_phase_metrics("reboot");
1674        self.on_event(
1675            RequestManagerEvent::UpdateState {
1676                state: Box::new(RequestManagerState::Reboot),
1677            },
1678            ctx,
1679        )
1680        .await;
1681
1682        let Ok(actor) = ctx.reference().await else {
1683            return Ok(());
1684        };
1685
1686        let request_id = self.id.clone();
1687
1688        match reboot_type {
1689            RebootType::Normal => {
1690                info!("Launching Normal reboot {}", self.id);
1691                send_to_tracking(
1692                    ctx,
1693                    RequestTrackingMessage::UpdateState {
1694                        request_id: self.id.clone(),
1695                        state: RequestState::Reboot,
1696                    },
1697                )
1698                .await?;
1699
1700                actor
1701                    .tell(RequestManagerMessage::RebootUpdate {
1702                        request_id,
1703                        governance_id,
1704                    })
1705                    .await?;
1706            }
1707            RebootType::Diff => {
1708                info!("Launching Diff reboot {}", self.id);
1709                self.retry_diff += 1;
1710
1711                let seconds = Self::retry_seconds_for_attempt(
1712                    &config.sync_reboot.diff_retry_schedule_secs,
1713                    self.retry_diff,
1714                );
1715
1716                info!(
1717                    "Launching Diff reboot {}, try: {}, seconds: {}",
1718                    self.id, self.retry_diff, seconds
1719                );
1720
1721                send_to_tracking(
1722                    ctx,
1723                    RequestTrackingMessage::UpdateState {
1724                        request_id: self.id.clone(),
1725                        state: RequestState::RebootDiff {
1726                            seconds,
1727                            count: self.retry_diff,
1728                        },
1729                    },
1730                )
1731                .await?;
1732
1733                tokio::spawn(async move {
1734                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1735                    let _ = actor
1736                        .tell(RequestManagerMessage::RebootUpdate {
1737                            request_id,
1738                            governance_id,
1739                        })
1740                        .await;
1741                });
1742            }
1743            RebootType::TimeOut => {
1744                self.retry_timeout += 1;
1745
1746                let seconds = Self::retry_seconds_for_attempt(
1747                    &config.sync_reboot.timeout_retry_schedule_secs,
1748                    self.retry_timeout,
1749                );
1750
1751                info!(
1752                    "Launching TimeOut reboot {}, try: {}, seconds: {}",
1753                    self.id, self.retry_timeout, seconds
1754                );
1755                send_to_tracking(
1756                    ctx,
1757                    RequestTrackingMessage::UpdateState {
1758                        request_id: self.id.clone(),
1759                        state: RequestState::RebootTimeOut {
1760                            seconds,
1761                            count: self.retry_timeout,
1762                        },
1763                    },
1764                )
1765                .await?;
1766
1767                tokio::spawn(async move {
1768                    tokio::time::sleep(Duration::from_secs(seconds)).await;
1769                    let _ = actor
1770                        .tell(RequestManagerMessage::RebootUpdate {
1771                            request_id,
1772                            governance_id,
1773                        })
1774                        .await;
1775                });
1776            }
1777        }
1778
1779        Ok(())
1780    }
1781
1782    async fn match_command(
1783        &mut self,
1784        ctx: &mut ActorContext<Self>,
1785    ) -> Result<(), RequestManagerError> {
1786        match self.command {
1787            ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1788            ReqManInitMessage::Validate => {
1789                let (
1790                    request,
1791                    quorum,
1792                    signers,
1793                    init_state,
1794                    current_request_roles,
1795                ) = self.build_validation_req(ctx, None, None).await?;
1796
1797                self.run_validation(
1798                    ctx,
1799                    request,
1800                    quorum,
1801                    signers,
1802                    init_state,
1803                    current_request_roles,
1804                )
1805                .await
1806            }
1807        }
1808    }
1809
1810    async fn check_request_roles_after_reboot(
1811        &self,
1812        ctx: &mut ActorContext<Self>,
1813    ) -> Result<(), RequestManagerError> {
1814        let Some(request) = self.request.clone() else {
1815            return Err(RequestManagerError::RequestNotSet);
1816        };
1817
1818        let gov = self.get_governance_data(ctx).await?;
1819        let subject_data = match request.content() {
1820            EventRequest::Create(..) => None,
1821            _ => get_subject_data(ctx, &self.subject_id).await?,
1822        };
1823
1824        let creator_scope = match request.content() {
1825            EventRequest::Create(create) if !create.schema_id.is_gov() => {
1826                Some((create.schema_id.clone(), create.namespace.clone()))
1827            }
1828            _ => subject_data.as_ref().and_then(|subject_data| {
1829                match subject_data {
1830                    SubjectData::Tracker {
1831                        schema_id,
1832                        namespace,
1833                        ..
1834                    } => Some((
1835                        schema_id.clone(),
1836                        Namespace::from(namespace.clone()),
1837                    )),
1838                    _ => None,
1839                }
1840            }),
1841        };
1842
1843        if let Some((schema_id, namespace)) = creator_scope
1844            && !gov.has_this_role(HashThisRole::Schema {
1845                who: (*self.our_key).clone(),
1846                role: RoleTypes::Creator,
1847                schema_id,
1848                namespace,
1849            })
1850        {
1851            return Err(RequestManagerError::NotCreator);
1852        }
1853
1854        if let EventRequest::Fact { .. } = request.content() {
1855            let Some(subject_data) = subject_data else {
1856                return Err(RequestManagerError::SubjecData);
1857            };
1858
1859            match subject_data {
1860                SubjectData::Tracker {
1861                    schema_id,
1862                    namespace,
1863                    ..
1864                } => {
1865                    if !gov.has_this_role(HashThisRole::Schema {
1866                        who: request.signature().signer.clone(),
1867                        role: RoleTypes::Issuer,
1868                        schema_id,
1869                        namespace: Namespace::from(namespace),
1870                    }) {
1871                        return Err(RequestManagerError::NotIssuer);
1872                    }
1873                }
1874                SubjectData::Governance { .. } => {
1875                    if !gov.has_this_role(HashThisRole::Gov {
1876                        who: request.signature().signer.clone(),
1877                        role: RoleTypes::Issuer,
1878                    }) {
1879                        return Err(RequestManagerError::NotIssuer);
1880                    }
1881                }
1882            }
1883        }
1884
1885        Ok(())
1886    }
1887
1888    async fn stops_childs(
1889        &self,
1890        ctx: &mut ActorContext<Self>,
1891    ) -> Result<(), RequestManagerError> {
1892        match self.state {
1893            RequestManagerState::Reboot => {
1894                if let Ok(actor) = ctx.get_child::<Update>("update").await {
1895                    actor.ask_stop().await?;
1896                };
1897                if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1898                    actor.ask_stop().await?;
1899                };
1900            }
1901            RequestManagerState::Evaluation => {
1902                if let Ok(actor) =
1903                    ctx.get_child::<Evaluation>("evaluation").await
1904                {
1905                    actor.ask_stop().await?;
1906                };
1907            }
1908            RequestManagerState::Approval { .. } => {
1909                if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1910                    actor.ask_stop().await?;
1911                };
1912                let _ = make_obsolete(ctx, &self.subject_id).await;
1913            }
1914            RequestManagerState::Validation { .. } => {
1915                if let Ok(actor) =
1916                    ctx.get_child::<Validation>("validation").await
1917                {
1918                    actor.ask_stop().await?;
1919                };
1920            }
1921            RequestManagerState::Distribution { .. } => {
1922                if let Ok(actor) =
1923                    ctx.get_child::<Distribution>("distribution").await
1924                {
1925                    actor.ask_stop().await?;
1926                };
1927            }
1928            _ => {}
1929        }
1930
1931        Ok(())
1932    }
1933
1934    async fn abort_request(
1935        &mut self,
1936        ctx: &mut ActorContext<Self>,
1937        error: String,
1938        sn: Option<u64>,
1939        who: PublicKey,
1940    ) -> Result<(), RequestManagerError> {
1941        self.stops_childs(ctx).await?;
1942
1943        self.finish_request_metrics("aborted");
1944        info!("Aborting {}", self.id);
1945        send_to_tracking(
1946            ctx,
1947            RequestTrackingMessage::UpdateState {
1948                request_id: self.id.clone(),
1949                state: RequestState::Abort {
1950                    subject_id: self.subject_id.to_string(),
1951                    error,
1952                    sn,
1953                    who: who.to_string(),
1954                },
1955            },
1956        )
1957        .await?;
1958
1959        self.on_event(RequestManagerEvent::Finish, ctx).await;
1960
1961        self.end_request(ctx).await?;
1962
1963        Ok(())
1964    }
1965
1966    async fn end_request(
1967        &self,
1968        ctx: &ActorContext<Self>,
1969    ) -> Result<(), RequestManagerError> {
1970        let actor = ctx.get_parent::<RequestHandler>().await?;
1971        actor
1972            .tell(RequestHandlerMessage::EndHandling {
1973                subject_id: self.subject_id.clone(),
1974            })
1975            .await?;
1976
1977        Ok(())
1978    }
1979}
1980
1981#[derive(Debug, Clone)]
1982pub enum RequestManagerMessage {
1983    Run {
1984        request_id: DigestIdentifier,
1985    },
1986    FirstRun {
1987        command: ReqManInitMessage,
1988        request: Signed<EventRequest>,
1989        request_id: DigestIdentifier,
1990    },
1991    Abort {
1992        request_id: DigestIdentifier,
1993        who: PublicKey,
1994        reason: String,
1995        sn: u64,
1996    },
1997    ManualAbort,
1998    PurgeStorage,
1999    Reboot {
2000        request_id: DigestIdentifier,
2001        governance_id: DigestIdentifier,
2002        reboot_type: RebootType,
2003    },
2004    RebootUpdate {
2005        request_id: DigestIdentifier,
2006        governance_id: DigestIdentifier,
2007    },
2008    RebootWait {
2009        request_id: DigestIdentifier,
2010        governance_id: DigestIdentifier,
2011    },
2012    FinishReboot {
2013        request_id: DigestIdentifier,
2014    },
2015    EvaluationRes {
2016        request_id: DigestIdentifier,
2017        eval_req: Box<EvaluationReq>,
2018        eval_res: EvaluationData,
2019    },
2020    ApprovalRes {
2021        request_id: DigestIdentifier,
2022        appro_res: ApprovalData,
2023    },
2024    ValidationRes {
2025        request_id: DigestIdentifier,
2026        val_req: Box<ValidationReq>,
2027        val_res: ValidationData,
2028    },
2029    FinishRequest {
2030        request_id: DigestIdentifier,
2031    },
2032}
2033
2034impl Message for RequestManagerMessage {}
2035
2036#[derive(
2037    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
2038)]
2039pub enum RequestManagerEvent {
2040    Finish,
2041    UpdateState {
2042        state: Box<RequestManagerState>,
2043    },
2044    UpdateVersion {
2045        version: u64,
2046    },
2047    SafeState {
2048        command: ReqManInitMessage,
2049        request: Signed<EventRequest>,
2050    },
2051}
2052
2053impl Event for RequestManagerEvent {}
2054
2055#[async_trait]
2056impl Actor for RequestManager {
2057    type Event = RequestManagerEvent;
2058    type Message = RequestManagerMessage;
2059    type Response = ();
2060
2061    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
2062        parent_span.map_or_else(
2063            || info_span!("RequestManager", id),
2064            |parent_span| info_span!(parent: parent_span, "RequestManager", id),
2065        )
2066    }
2067
2068    async fn pre_start(
2069        &mut self,
2070        ctx: &mut ActorContext<Self>,
2071    ) -> Result<(), ActorError> {
2072        if let Err(e) =
2073            self.init_store("request_manager", None, false, ctx).await
2074        {
2075            error!(
2076                error = %e,
2077                subject_id = %self.subject_id,
2078                "Failed to initialize store"
2079            );
2080            return Err(e);
2081        }
2082
2083        if self.governance_id.is_none()
2084            && let Some(request) = &self.request
2085            && let EventRequest::Create(create) = request.content()
2086            && !create.schema_id.is_gov()
2087        {
2088            self.governance_id = Some(create.governance_id.clone());
2089        }
2090
2091        Ok(())
2092    }
2093}
2094
2095#[async_trait]
2096impl Handler<Self> for RequestManager {
2097    // The async state machine inlines all sub-futures and exceeds the default
2098    // threshold; a proper fix would require boxing every large sub-future.
2099    #[allow(clippy::large_stack_frames)]
2100    async fn handle_message(
2101        &mut self,
2102        _sender: ActorPath,
2103        msg: RequestManagerMessage,
2104        ctx: &mut ave_actors::ActorContext<Self>,
2105    ) -> Result<(), ActorError> {
2106        match msg {
2107            RequestManagerMessage::RebootUpdate {
2108                governance_id,
2109                request_id,
2110            } => {
2111                if request_id == self.id {
2112                    info!("Init reboot update {}", self.id);
2113                    debug!(
2114                        msg_type = "RebootUpdate",
2115                        request_id = %self.id,
2116                        governance_id = %governance_id,
2117                        "Initializing reboot update"
2118                    );
2119
2120                    if let Err(e) = self.init_update(ctx, &governance_id).await
2121                    {
2122                        error!(
2123                            msg_type = "RebootUpdate",
2124                            request_id = %self.id,
2125                            governance_id = %governance_id,
2126                            error = %e,
2127                            "Failed to initialize reboot update"
2128                        );
2129                        self.match_error(ctx, e).await;
2130                        return Ok(());
2131                    }
2132                }
2133            }
2134            RequestManagerMessage::RebootWait {
2135                governance_id,
2136                request_id,
2137            } => {
2138                if request_id == self.id {
2139                    info!("Init reboot wait {}", self.id);
2140                    debug!(
2141                        msg_type = "RebootWait",
2142                        request_id = %self.id,
2143                        governance_id = %governance_id,
2144                        "Initializing reboot wait"
2145                    );
2146
2147                    if let Err(e) = self.init_wait(ctx, &governance_id).await {
2148                        error!(
2149                            msg_type = "RebootWait",
2150                            request_id = %self.id,
2151                            governance_id = %governance_id,
2152                            error = %e,
2153                            "Failed to initialize reboot wait"
2154                        );
2155                        self.match_error(ctx, e).await;
2156                        return Ok(());
2157                    }
2158                }
2159            }
2160            RequestManagerMessage::Reboot {
2161                governance_id,
2162                request_id,
2163                reboot_type,
2164            } => {
2165                if request_id == self.id {
2166                    if matches!(self.state, RequestManagerState::Reboot) {
2167                        debug!(
2168                            msg_type = "Reboot",
2169                            request_id = %self.id,
2170                            governance_id = %governance_id,
2171                            reboot_type = ?reboot_type,
2172                            "Already in reboot state, ignoring"
2173                        );
2174                    } else {
2175                        debug!(
2176                            msg_type = "Reboot",
2177                            request_id = %self.id,
2178                            governance_id = %governance_id,
2179                            reboot_type = ?reboot_type,
2180                            "Initiating reboot"
2181                        );
2182                        if let Err(e) = self.stops_childs(ctx).await {
2183                            error!(
2184                                msg_type = "Reboot",
2185                                request_id = %self.id,
2186                                governance_id = %governance_id,
2187                                error = %e,
2188                                "Failed to stop childs"
2189                            );
2190                            self.match_error(ctx, e).await;
2191                            return Ok(());
2192                        };
2193                        if let Err(e) = self
2194                            .reboot(ctx, reboot_type, governance_id.clone())
2195                            .await
2196                        {
2197                            error!(
2198                                msg_type = "Reboot",
2199                                request_id = %self.id,
2200                                governance_id = %governance_id,
2201                                error = %e,
2202                                "Failed to initiate reboot"
2203                            );
2204                            self.match_error(ctx, e).await;
2205                            return Ok(());
2206                        }
2207                    }
2208                }
2209            }
2210            RequestManagerMessage::FinishReboot { request_id } => {
2211                if request_id == self.id {
2212                    info!("Init reboot finish {}", self.id);
2213                    debug!(
2214                        msg_type = "FinishReboot",
2215                        request_id = %self.id,
2216                        version = self.version,
2217                        "Reboot completed, resuming request"
2218                    );
2219                    self.on_event(
2220                        RequestManagerEvent::UpdateVersion {
2221                            version: self.version + 1,
2222                        },
2223                        ctx,
2224                    )
2225                    .await;
2226
2227                    if let Err(e) = send_to_tracking(
2228                        ctx,
2229                        RequestTrackingMessage::UpdateVersion {
2230                            request_id: self.id.clone(),
2231                            version: self.version,
2232                        },
2233                    )
2234                    .await
2235                    {
2236                        error!(
2237                            msg_type = "FinishReboot",
2238                            request_id = %self.id,
2239                            version = self.version,
2240                            error = %e,
2241                            "Failed to send version update to tracking"
2242                        );
2243                        return Err(emit_fail(ctx, e).await);
2244                    }
2245
2246                    if let Err(e) =
2247                        self.check_request_roles_after_reboot(ctx).await
2248                    {
2249                        error!(
2250                            msg_type = "FinishReboot",
2251                            request_id = %self.id,
2252                            error = %e,
2253                            "Failed to check signatures after reboot"
2254                        );
2255                        self.match_error(ctx, e).await;
2256                        return Ok(());
2257                    }
2258
2259                    if let Err(e) = self.match_command(ctx).await {
2260                        error!(
2261                            msg_type = "FinishReboot",
2262                            request_id = %self.id,
2263                            error = %e,
2264                            "Failed to execute command after reboot"
2265                        );
2266                        self.match_error(ctx, e).await;
2267                        return Ok(());
2268                    }
2269                }
2270            }
2271            RequestManagerMessage::Abort {
2272                request_id,
2273                who,
2274                reason,
2275                sn,
2276            } => {
2277                if request_id == self.id {
2278                    warn!(
2279                        msg_type = "Abort",
2280                        state = %self.state,
2281                        request_id = %self.id,
2282                        who = %who,
2283                        reason = %reason,
2284                        sn = sn,
2285                        "Request abort received"
2286                    );
2287                    if let Err(e) =
2288                        self.abort_request(ctx, reason, Some(sn), who).await
2289                    {
2290                        error!(
2291                            msg_type = "Abort",
2292                            request_id = %self.id,
2293                            error = %e,
2294                            "Failed to abort request"
2295                        );
2296                        self.match_error(ctx, e).await;
2297                        return Ok(());
2298                    }
2299                }
2300            }
2301            RequestManagerMessage::ManualAbort => {
2302                match &self.state {
2303                    RequestManagerState::Reboot
2304                    | RequestManagerState::Starting
2305                    | RequestManagerState::Evaluation
2306                    | RequestManagerState::Approval { .. }
2307                    | RequestManagerState::Validation { .. } => {
2308                        if let Err(e) = self
2309                            .abort_request(
2310                                ctx,
2311                                "The user manually aborted the request"
2312                                    .to_owned(),
2313                                None,
2314                                (*self.our_key).clone(),
2315                            )
2316                            .await
2317                        {
2318                            error!(
2319                                msg_type = "Abort",
2320                                request_id = %self.id,
2321                                error = %e,
2322                                "Failed to abort request"
2323                            );
2324                            self.match_error(ctx, e).await;
2325                        }
2326                    }
2327                    _ => {
2328                        info!(
2329                            "The request is in a state that cannot be aborted {}, state: {}",
2330                            self.id, self.state
2331                        );
2332                    }
2333                }
2334
2335                return Ok(());
2336            }
2337            RequestManagerMessage::PurgeStorage => {
2338                purge_storage(ctx).await?;
2339
2340                debug!(
2341                    msg_type = "PurgeStorage",
2342                    subject_id = %self.subject_id,
2343                    "Purged request manager storage"
2344                );
2345
2346                return Ok(());
2347            }
2348            RequestManagerMessage::FirstRun {
2349                command,
2350                request,
2351                request_id,
2352            } => {
2353                self.id = request_id.clone();
2354                self.begin_request_metrics();
2355                debug!(
2356                    msg_type = "FirstRun",
2357                    request_id = %request_id,
2358                    command = ?command,
2359                    "First run of request manager"
2360                );
2361                self.on_event(
2362                    RequestManagerEvent::SafeState { command, request },
2363                    ctx,
2364                )
2365                .await;
2366
2367                if let Err(e) = self.match_command(ctx).await {
2368                    error!(
2369                        msg_type = "FirstRun",
2370                        request_id = %self.id,
2371                        error = %e,
2372                        "Failed to execute initial command"
2373                    );
2374                    self.match_error(ctx, e).await;
2375                    return Ok(());
2376                };
2377            }
2378            RequestManagerMessage::Run { request_id } => {
2379                self.id = request_id;
2380                self.ensure_request_metrics_started();
2381
2382                debug!(
2383                    msg_type = "Run",
2384                    request_id = %self.id,
2385                    state = ?self.state,
2386                    version = self.version,
2387                    "Running request manager"
2388                );
2389                match self.state.clone() {
2390                    RequestManagerState::Starting
2391                    | RequestManagerState::Reboot => {
2392                        if let Err(e) = self.match_command(ctx).await {
2393                            error!(
2394                                msg_type = "Run",
2395                                request_id = %self.id,
2396                                state = "Starting/Reboot",
2397                                error = %e,
2398                                "Failed to execute command"
2399                            );
2400                            self.match_error(ctx, e).await;
2401                            return Ok(());
2402                        };
2403                    }
2404                    RequestManagerState::Evaluation => {
2405                        if let Err(e) = self.build_evaluation(ctx).await {
2406                            error!(
2407                                msg_type = "Run",
2408                                request_id = %self.id,
2409                                state = "Evaluation",
2410                                error = %e,
2411                                "Failed to build evaluation"
2412                            );
2413                            self.match_error(ctx, e).await;
2414                            return Ok(());
2415                        }
2416                    }
2417
2418                    RequestManagerState::Approval { eval_req, eval_res } => {
2419                        let Some(evaluator_res) =
2420                            eval_res.evaluator_response_ok()
2421                        else {
2422                            error!(
2423                                msg_type = "Run",
2424                                request_id = %self.id,
2425                                state = "Approval",
2426                                "Approval state is missing a successful evaluator response"
2427                            );
2428                            self.match_error(
2429                                ctx,
2430                                RequestManagerError::InvalidRequestState {
2431                                    expected:
2432                                        "approval state with successful evaluator response",
2433                                    got:
2434                                        "approval state without successful evaluator response",
2435                                },
2436                            )
2437                            .await;
2438                            return Ok(());
2439                        };
2440
2441                        if let Err(e) = self
2442                            .build_approval(ctx, eval_req, evaluator_res)
2443                            .await
2444                        {
2445                            error!(
2446                                msg_type = "Run",
2447                                request_id = %self.id,
2448                                state = "Approval",
2449                                error = %e,
2450                                "Failed to build approval"
2451                            );
2452                            self.match_error(ctx, e).await;
2453                            return Ok(());
2454                        }
2455                    }
2456                    RequestManagerState::Validation {
2457                        request,
2458                        quorum,
2459                        init_state,
2460                        current_request_roles,
2461                        signers,
2462                        distribution_plan: _,
2463                    } => {
2464                        if let Err(e) = self
2465                            .run_validation(
2466                                ctx,
2467                                *request,
2468                                quorum,
2469                                signers,
2470                                init_state,
2471                                current_request_roles,
2472                            )
2473                            .await
2474                        {
2475                            error!(
2476                                msg_type = "Run",
2477                                request_id = %self.id,
2478                                state = "Validation",
2479                                error = %e,
2480                                "Failed to run validation"
2481                            );
2482                            self.match_error(ctx, e).await;
2483                            return Ok(());
2484                        };
2485                    }
2486                    RequestManagerState::UpdateSubject {
2487                        ledger,
2488                        distribution_plan,
2489                    } => {
2490                        if let Err(e) = self
2491                            .update_subject(
2492                                ctx,
2493                                ledger.clone(),
2494                                distribution_plan.clone(),
2495                            )
2496                            .await
2497                        {
2498                            error!(
2499                                msg_type = "Run",
2500                                request_id = %self.id,
2501                                state = "UpdateSubject",
2502                                error = %e,
2503                                "Failed to update subject"
2504                            );
2505                            self.match_error(ctx, e).await;
2506                            return Ok(());
2507                        };
2508
2509                        match self
2510                            .build_distribution(ctx, ledger, distribution_plan)
2511                            .await
2512                        {
2513                            Ok(in_distribution) => {
2514                                if !in_distribution
2515                                    && let Err(e) =
2516                                        self.finish_request(ctx).await
2517                                {
2518                                    error!(
2519                                        msg_type = "Run",
2520                                        request_id = %self.id,
2521                                        state = "UpdateSubject",
2522                                        error = %e,
2523                                        "Failed to finish request after build distribution"
2524                                    );
2525                                    self.match_error(ctx, e).await;
2526                                    return Ok(());
2527                                }
2528                            }
2529                            Err(e) => {
2530                                error!(
2531                                    msg_type = "Run",
2532                                    request_id = %self.id,
2533                                    state = "UpdateSubject",
2534                                    error = %e,
2535                                    "Failed to build distribution"
2536                                );
2537                                self.match_error(ctx, e).await;
2538                                return Ok(());
2539                            }
2540                        };
2541                    }
2542                    RequestManagerState::Distribution {
2543                        ledger,
2544                        distribution_plan,
2545                    } => {
2546                        match self
2547                            .build_distribution(ctx, ledger, distribution_plan)
2548                            .await
2549                        {
2550                            Ok(in_distribution) => {
2551                                if !in_distribution
2552                                    && let Err(e) =
2553                                        self.finish_request(ctx).await
2554                                {
2555                                    error!(
2556                                        msg_type = "Run",
2557                                        request_id = %self.id,
2558                                        state = "Distribution",
2559                                        error = %e,
2560                                        "Failed to finish request after build distribution"
2561                                    );
2562                                    self.match_error(ctx, e).await;
2563                                    return Ok(());
2564                                }
2565                            }
2566                            Err(e) => {
2567                                error!(
2568                                    msg_type = "Run",
2569                                    request_id = %self.id,
2570                                    state = "Distribution",
2571                                    error = %e,
2572                                    "Failed to build distribution"
2573                                );
2574                                self.match_error(ctx, e).await;
2575                                return Ok(());
2576                            }
2577                        };
2578                    }
2579                    RequestManagerState::End => {
2580                        if let Err(e) = self.end_request(ctx).await {
2581                            error!(
2582                                msg_type = "Run",
2583                                request_id = %self.id,
2584                                state = "End",
2585                                error = %e,
2586                                "Failed to end request"
2587                            );
2588                            self.match_error(ctx, e).await;
2589                            return Ok(());
2590                        }
2591                    }
2592                };
2593            }
2594            RequestManagerMessage::EvaluationRes {
2595                eval_req,
2596                eval_res,
2597                request_id,
2598            } => {
2599                if request_id == self.id {
2600                    debug!(
2601                        msg_type = "EvaluationRes",
2602                        request_id = %self.id,
2603                        version = self.version,
2604                        "Evaluation result received"
2605                    );
2606                    if let Err(e) = self.stops_childs(ctx).await {
2607                        error!(
2608                            msg_type = "EvaluationRes",
2609                            request_id = %self.id,
2610                            error = %e,
2611                            "Failed to stop childs"
2612                        );
2613                        self.match_error(ctx, e).await;
2614                        return Ok(());
2615                    };
2616
2617                    if let Some(evaluator_res) =
2618                        eval_res.evaluator_response_ok()
2619                        && evaluator_res.appr_required
2620                    {
2621                        debug!(
2622                            msg_type = "EvaluationRes",
2623                            request_id = %self.id,
2624                            "Approval required, proceeding to approval phase"
2625                        );
2626                        self.on_event(
2627                            RequestManagerEvent::UpdateState {
2628                                state: Box::new(
2629                                    RequestManagerState::Approval {
2630                                        eval_req: *eval_req.clone(),
2631                                        eval_res: eval_res.clone(),
2632                                    },
2633                                ),
2634                            },
2635                            ctx,
2636                        )
2637                        .await;
2638
2639                        if let Err(e) = self
2640                            .build_approval(ctx, *eval_req, evaluator_res)
2641                            .await
2642                        {
2643                            error!(
2644                                msg_type = "EvaluationRes",
2645                                request_id = %self.id,
2646                                error = %e,
2647                                "Failed to build approval"
2648                            );
2649                            self.match_error(ctx, e).await;
2650                            return Ok(());
2651                        }
2652                    } else {
2653                        debug!(
2654                            msg_type = "EvaluationRes",
2655                            request_id = %self.id,
2656                            "Approval not required, proceeding to validation phase"
2657                        );
2658                        let (
2659                            request,
2660                            quorum,
2661                            signers,
2662                            init_state,
2663                            current_request_roles,
2664                        ) = match self
2665                            .build_validation_req(
2666                                ctx,
2667                                Some((*eval_req, eval_res)),
2668                                None,
2669                            )
2670                            .await
2671                        {
2672                            Ok(data) => data,
2673                            Err(e) => {
2674                                error!(
2675                                    msg_type = "EvaluationRes",
2676                                    request_id = %self.id,
2677                                    error = %e,
2678                                    "Failed to build validation request"
2679                                );
2680                                self.match_error(ctx, e).await;
2681                                return Ok(());
2682                            }
2683                        };
2684
2685                        if let Err(e) = self
2686                            .run_validation(
2687                                ctx,
2688                                request,
2689                                quorum,
2690                                signers,
2691                                init_state,
2692                                current_request_roles,
2693                            )
2694                            .await
2695                        {
2696                            error!(
2697                                msg_type = "EvaluationRes",
2698                                request_id = %self.id,
2699                                error = %e,
2700                                "Failed to run validation"
2701                            );
2702                            self.match_error(ctx, e).await;
2703                            return Ok(());
2704                        };
2705                    }
2706                }
2707            }
2708            RequestManagerMessage::ApprovalRes {
2709                appro_res,
2710                request_id,
2711            } => {
2712                if request_id == self.id {
2713                    let _ = make_obsolete(ctx, &self.subject_id).await;
2714                    debug!(
2715                        msg_type = "ApprovalRes",
2716                        request_id = %self.id,
2717                        version = self.version,
2718                        "Approval result received"
2719                    );
2720                    if let Err(e) = self.stops_childs(ctx).await {
2721                        error!(
2722                            msg_type = "ApprovalRes",
2723                            request_id = %self.id,
2724                            error = %e,
2725                            "Failed to stop childs"
2726                        );
2727                        self.match_error(ctx, e).await;
2728                        return Ok(());
2729                    };
2730
2731                    let RequestManagerState::Approval { eval_req, eval_res } =
2732                        self.state.clone()
2733                    else {
2734                        error!(
2735                            msg_type = "ApprovalRes",
2736                            request_id = %self.id,
2737                            state = ?self.state,
2738                            "Invalid state for approval response"
2739                        );
2740                        let e = ActorError::FunctionalCritical {
2741                            description: "Invalid request state".to_owned(),
2742                        };
2743                        return Err(emit_fail(ctx, e).await);
2744                    };
2745                    let (
2746                        request,
2747                        quorum,
2748                        signers,
2749                        init_state,
2750                        current_request_roles,
2751                    ) = match self
2752                        .build_validation_req(
2753                            ctx,
2754                            Some((eval_req, eval_res)),
2755                            Some(appro_res),
2756                        )
2757                        .await
2758                    {
2759                        Ok(data) => data,
2760                        Err(e) => {
2761                            error!(
2762                                msg_type = "ApprovalRes",
2763                                request_id = %self.id,
2764                                error = %e,
2765                                "Failed to build validation request"
2766                            );
2767                            self.match_error(ctx, e).await;
2768                            return Ok(());
2769                        }
2770                    };
2771
2772                    if let Err(e) = self
2773                        .run_validation(
2774                            ctx,
2775                            request,
2776                            quorum,
2777                            signers,
2778                            init_state,
2779                            current_request_roles,
2780                        )
2781                        .await
2782                    {
2783                        error!(
2784                            msg_type = "ApprovalRes",
2785                            request_id = %self.id,
2786                            error = %e,
2787                            "Failed to run validation"
2788                        );
2789                        self.match_error(ctx, e).await;
2790                        return Ok(());
2791                    };
2792                }
2793            }
2794            RequestManagerMessage::ValidationRes {
2795                val_res,
2796                val_req,
2797                request_id,
2798            } => {
2799                if request_id == self.id {
2800                    debug!(
2801                        msg_type = "ValidationRes",
2802                        request_id = %self.id,
2803                        version = self.version,
2804                        "Validation result received"
2805                    );
2806                    if let Err(e) = self.stops_childs(ctx).await {
2807                        error!(
2808                                msg_type = "ValidationRes",
2809                                request_id = %self.id,
2810                                error = %e,
2811                                "Failed to stop childs"
2812                        );
2813                        self.match_error(ctx, e).await;
2814                        return Ok(());
2815                    };
2816
2817                    let distribution_plan = match &self.state {
2818                        RequestManagerState::Validation {
2819                            distribution_plan,
2820                            ..
2821                        } => distribution_plan.clone(),
2822                        _ => {
2823                            error!(
2824                                msg_type = "ValidationRes",
2825                                request_id = %self.id,
2826                                state = ?self.state,
2827                                "Invalid state for validation response"
2828                            );
2829                            self.match_error(
2830                                ctx,
2831                                RequestManagerError::InvalidRequestState {
2832                                    expected: "Validation",
2833                                    got: "Other",
2834                                },
2835                            )
2836                            .await;
2837                            return Ok(());
2838                        }
2839                    };
2840
2841                    let signed_ledger = match self
2842                        .build_ledger(
2843                            ctx,
2844                            *val_req,
2845                            val_res,
2846                            distribution_plan.clone(),
2847                        )
2848                        .await
2849                    {
2850                        Ok(signed_ledger) => signed_ledger,
2851                        Err(e) => {
2852                            error!(
2853                                msg_type = "ValidationRes",
2854                                request_id = %self.id,
2855                                error = %e,
2856                                "Failed to build ledger"
2857                            );
2858                            self.match_error(ctx, e).await;
2859                            return Ok(());
2860                        }
2861                    };
2862
2863                    if let Err(e) = self
2864                        .update_subject(
2865                            ctx,
2866                            signed_ledger.clone(),
2867                            distribution_plan.clone(),
2868                        )
2869                        .await
2870                    {
2871                        error!(
2872                            msg_type = "ValidationRes",
2873                            request_id = %self.id,
2874                            error = %e,
2875                            "Failed to update subject"
2876                        );
2877                        self.match_error(ctx, e).await;
2878                        return Ok(());
2879                    };
2880
2881                    match self
2882                        .build_distribution(
2883                            ctx,
2884                            signed_ledger,
2885                            distribution_plan,
2886                        )
2887                        .await
2888                    {
2889                        Ok(in_distribution) => {
2890                            if !in_distribution
2891                                && let Err(e) = self.finish_request(ctx).await
2892                            {
2893                                error!(
2894                                    msg_type = "ValidationRes",
2895                                    request_id = %self.id,
2896                                    error = %e,
2897                                    "Failed to finish request after build distribution"
2898                                );
2899                                self.match_error(ctx, e).await;
2900                                return Ok(());
2901                            }
2902                        }
2903                        Err(e) => {
2904                            error!(
2905                                msg_type = "ValidationRes",
2906                                request_id = %self.id,
2907                                error = %e,
2908                                "Failed to build distribution"
2909                            );
2910                            self.match_error(ctx, e).await;
2911                            return Ok(());
2912                        }
2913                    };
2914                }
2915            }
2916            RequestManagerMessage::FinishRequest { request_id } => {
2917                if request_id == self.id {
2918                    debug!(
2919                        msg_type = "FinishRequest",
2920                        request_id = %self.id,
2921                        version = self.version,
2922                        "Finishing request"
2923                    );
2924
2925                    if let Err(e) = self.stops_childs(ctx).await {
2926                        error!(
2927                            msg_type = "FinishRequest",
2928                            request_id = %self.id,
2929                            error = %e,
2930                            "Failed to stop childs"
2931                        );
2932                        self.match_error(ctx, e).await;
2933                        return Ok(());
2934                    };
2935
2936                    if let Err(e) = self.finish_request(ctx).await {
2937                        error!(
2938                            msg_type = "FinishRequest",
2939                            request_id = %self.id,
2940                            error = %e,
2941                            "Failed to finish request"
2942                        );
2943                        self.match_error(ctx, e).await;
2944                        return Ok(());
2945                    }
2946                }
2947            }
2948        }
2949
2950        Ok(())
2951    }
2952
2953    async fn on_event(
2954        &mut self,
2955        event: RequestManagerEvent,
2956        ctx: &mut ActorContext<Self>,
2957    ) {
2958        let event_type = match &event {
2959            RequestManagerEvent::Finish => "Finish",
2960            RequestManagerEvent::UpdateState { .. } => "UpdateState",
2961            RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2962            RequestManagerEvent::SafeState { .. } => "SafeState",
2963        };
2964
2965        if let Err(e) = self.persist(&event, ctx).await {
2966            error!(
2967                event_type = event_type,
2968                request_id = %self.id,
2969                error = %e,
2970                "Failed to persist event"
2971            );
2972            emit_fail(ctx, e).await;
2973        };
2974    }
2975
2976    async fn on_child_fault(
2977        &mut self,
2978        error: ActorError,
2979        ctx: &mut ActorContext<Self>,
2980    ) -> ChildAction {
2981        error!(
2982            request_id = %self.id,
2983            version = self.version,
2984            state = ?self.state,
2985            error = %error,
2986            "Child fault in request manager"
2987        );
2988        emit_fail(ctx, error).await;
2989        ChildAction::Stop
2990    }
2991}
2992
2993#[async_trait]
2994impl PersistentActor for RequestManager {
2995    type Persistence = LightPersistence;
2996    type InitParams = InitRequestManager;
2997
2998    fn update(&mut self, state: Self) {
2999        self.command = state.command;
3000        self.request = state.request;
3001        self.state = state.state;
3002        self.version = state.version;
3003    }
3004
3005    fn create_initial(params: Self::InitParams) -> Self {
3006        Self {
3007            retry_diff: 0,
3008            retry_timeout: 0,
3009            request_started_at: None,
3010            current_phase: None,
3011            current_phase_started_at: None,
3012            our_key: params.our_key,
3013            id: DigestIdentifier::default(),
3014            subject_id: params.subject_id,
3015            governance_id: params.governance_id,
3016            command: ReqManInitMessage::Evaluate,
3017            request: None,
3018            state: RequestManagerState::Starting,
3019            version: 0,
3020            helpers: Some(params.helpers),
3021        }
3022    }
3023
3024    /// Change node state.
3025    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3026        match event {
3027            RequestManagerEvent::Finish => {
3028                debug!(
3029                    event_type = "Finish",
3030                    request_id = %self.id,
3031                    "Applying finish event"
3032                );
3033                self.state = RequestManagerState::End;
3034                self.request = None;
3035                self.id = DigestIdentifier::default();
3036            }
3037            RequestManagerEvent::UpdateState { state } => {
3038                debug!(
3039                    event_type = "UpdateState",
3040                    request_id = %self.id,
3041                    old_state = ?self.state,
3042                    new_state = ?state,
3043                    "Applying state update"
3044                );
3045                self.state = *state.clone()
3046            }
3047            RequestManagerEvent::UpdateVersion { version } => {
3048                debug!(
3049                    event_type = "UpdateVersion",
3050                    request_id = %self.id,
3051                    old_version = self.version,
3052                    new_version = version,
3053                    "Applying version update"
3054                );
3055                self.state = RequestManagerState::Starting;
3056                self.version = *version
3057            }
3058            RequestManagerEvent::SafeState { command, request } => {
3059                debug!(
3060                    event_type = "SafeState",
3061                    request_id = %self.id,
3062                    command = ?command,
3063                    "Applying safe state"
3064                );
3065                self.version = 0;
3066                self.retry_diff = 0;
3067                self.retry_timeout = 0;
3068                self.state = RequestManagerState::Starting;
3069                self.request = Some(request.clone());
3070                self.command = command.clone();
3071            }
3072        };
3073
3074        Ok(())
3075    }
3076}
3077
3078#[async_trait]
3079impl Storable for RequestManager {}