1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9 DigestIdentifier, HashAlgorithm, PublicKey, Signed,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use ave_network::ComunicateInfo;
15use borsh::{BorshDeserialize, BorshSerialize};
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24 Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::{EvalWorkerContext, EvaluateData};
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30 HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::distribution_plan::build_tracker_event_distribution_plan;
36use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
37use crate::model::common::subject::{
38 acquire_subject, create_subject, get_gov, get_gov_sn,
39 get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
40};
41use crate::model::common::{purge_storage, send_to_tracking};
42use crate::model::event::{
43 ApprovalData, EvaluationData, Ledger, LedgerSeal, Protocols, ValidationData,
44};
45use crate::node::SubjectData;
46use crate::request::error::RequestManagerError;
47use crate::request::tracking::RequestTrackingMessage;
48use crate::request::{RequestHandler, RequestHandlerMessage};
49use crate::subject::Metadata;
50use crate::system::ConfigHelper;
51
52use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
53use crate::validation::worker::CurrentRequestRoles;
54use crate::{
55 ActorMessage, NetworkMessage, Validation, ValidationMessage,
56 approval::{Approval, ApprovalMessage},
57 auth::{Auth, AuthMessage, AuthResponse},
58 db::Storable,
59 evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
60 model::common::emit_fail,
61 update::{Update, UpdateMessage, UpdateNew, UpdateType},
62};
63
64use super::{
65 reboot::{Reboot, RebootMessage},
66 types::{
67 DistributionPlanEntry, DistributionPlanMode, ReqManInitMessage,
68 RequestManagerState,
69 },
70};
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct RequestManager {
74 #[serde(skip)]
75 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
76 #[serde(skip)]
77 our_key: Arc<PublicKey>,
78 #[serde(skip)]
79 id: DigestIdentifier,
80 #[serde(skip)]
81 subject_id: DigestIdentifier,
82 #[serde(skip)]
83 governance_id: Option<DigestIdentifier>,
84 #[serde(skip)]
85 retry_timeout: u64,
86 #[serde(skip)]
87 retry_diff: u64,
88 #[serde(skip)]
89 request_started_at: Option<Instant>,
90 #[serde(skip)]
91 current_phase: Option<&'static str>,
92 #[serde(skip)]
93 current_phase_started_at: Option<Instant>,
94 command: ReqManInitMessage,
95 request: Option<Signed<EventRequest>>,
96 state: RequestManagerState,
97 version: u64,
98}
99
100#[derive(Debug, Clone)]
101pub enum RebootType {
102 Normal,
103 Diff,
104 TimeOut,
105}
106
107pub struct InitRequestManager {
108 pub our_key: Arc<PublicKey>,
109 pub subject_id: DigestIdentifier,
110 pub governance_id: Option<DigestIdentifier>,
111 pub helpers: (HashAlgorithm, Arc<NetworkSender>),
112}
113
114impl BorshSerialize for RequestManager {
115 fn serialize<W: std::io::Write>(
116 &self,
117 writer: &mut W,
118 ) -> std::io::Result<()> {
119 BorshSerialize::serialize(&self.command, writer)?;
120 BorshSerialize::serialize(&self.state, writer)?;
121 BorshSerialize::serialize(&self.version, writer)?;
122 BorshSerialize::serialize(&self.request, writer)?;
123
124 Ok(())
125 }
126}
127
128impl BorshDeserialize for RequestManager {
129 fn deserialize_reader<R: std::io::Read>(
130 reader: &mut R,
131 ) -> std::io::Result<Self> {
132 let command = ReqManInitMessage::deserialize_reader(reader)?;
134 let state = RequestManagerState::deserialize_reader(reader)?;
135 let version = u64::deserialize_reader(reader)?;
136 let request =
137 Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
138
139 let our_key = Arc::new(PublicKey::default());
140 let subject_id = DigestIdentifier::default();
141 let id = DigestIdentifier::default();
142
143 Ok(Self {
144 retry_diff: 0,
145 retry_timeout: 0,
146 request_started_at: None,
147 current_phase: None,
148 current_phase_started_at: None,
149 helpers: None,
150 our_key,
151 id,
152 subject_id,
153 governance_id: None,
154 command,
155 request,
156 state,
157 version,
158 })
159 }
160}
161
162impl RequestManager {
163 fn retry_seconds_for_attempt(schedule: &[u64], attempt: u64) -> u64 {
164 let default = schedule.last().copied().unwrap_or(1).max(1);
165 let idx = attempt.saturating_sub(1) as usize;
166 schedule.get(idx).copied().unwrap_or(default).max(1)
167 }
168
169 fn begin_request_metrics(&mut self) {
170 self.request_started_at = Some(Instant::now());
171 self.current_phase = None;
172 self.current_phase_started_at = None;
173
174 if let Some(metrics) = try_core_metrics() {
175 metrics.observe_request_started();
176 }
177 }
178
179 fn ensure_request_metrics_started(&mut self) {
180 if self.request_started_at.is_none() {
181 self.request_started_at = Some(Instant::now());
182 }
183 }
184
185 fn start_phase_metrics(&mut self, phase: &'static str) {
186 self.ensure_request_metrics_started();
187 self.finish_phase_metrics();
188 self.current_phase = Some(phase);
189 self.current_phase_started_at = Some(Instant::now());
190 }
191
192 fn finish_phase_metrics(&mut self) {
193 let Some(phase) = self.current_phase.take() else {
194 self.current_phase_started_at = None;
195 return;
196 };
197 let Some(started_at) = self.current_phase_started_at.take() else {
198 return;
199 };
200
201 if let Some(metrics) = try_core_metrics() {
202 metrics.observe_request_phase(phase, started_at.elapsed());
203 }
204 }
205
206 fn finish_request_metrics(&mut self, result: &'static str) {
207 self.finish_phase_metrics();
208
209 if let Some(started_at) = self.request_started_at.take()
210 && let Some(metrics) = try_core_metrics()
211 {
212 metrics.observe_request_terminal(result, started_at.elapsed());
213 }
214 }
215
216 fn tracker_evaluation_context(
220 governance_data: &GovernanceData,
221 schema_id: &SchemaType,
222 namespace: Namespace,
223 request_type: &EventRequestType,
224 ) -> Result<EvalWorkerContext, RequestManagerError> {
225 match request_type {
226 EventRequestType::Fact => {
227 let (issuers, issuer_any) = governance_data.get_signers(
228 RoleTypes::Issuer,
229 schema_id,
230 namespace,
231 );
232 let schema_viewpoints = governance_data
233 .schemas
234 .get(schema_id)
235 .ok_or_else(|| {
236 crate::governance::error::GovernanceError::SchemaDoesNotExist {
237 schema_id: schema_id.to_string(),
238 }
239 })?
240 .viewpoints
241 .clone();
242
243 Ok(EvalWorkerContext::TrackerFact {
244 issuers: issuers.into_iter().collect(),
245 issuer_any,
246 schema_viewpoints,
247 })
248 }
249 EventRequestType::Transfer => {
250 let members = governance_data
251 .members
252 .values()
253 .cloned()
254 .collect::<BTreeSet<PublicKey>>();
255 let (creator_signers, _) = governance_data.get_signers(
256 RoleTypes::Creator,
257 schema_id,
258 namespace.clone(),
259 );
260 let creators = creator_signers
261 .into_iter()
262 .map(|creator| {
263 (creator, BTreeSet::from([namespace.clone()]))
264 })
265 .collect::<BTreeMap<PublicKey, BTreeSet<Namespace>>>();
266
267 Ok(EvalWorkerContext::TrackerTransfer { members, creators })
268 }
269 _ => Ok(EvalWorkerContext::Empty),
270 }
271 }
272
273 async fn build_evaluation(
274 &mut self,
275 ctx: &mut ActorContext<Self>,
276 ) -> Result<(), RequestManagerError> {
277 let Some(request) = self.request.clone() else {
278 return Err(RequestManagerError::RequestNotSet);
279 };
280
281 self.on_event(
282 RequestManagerEvent::UpdateState {
283 state: Box::new(RequestManagerState::Evaluation),
284 },
285 ctx,
286 )
287 .await;
288
289 let metadata = self.check_data_eval(ctx, &request).await?;
290
291 let (
292 signed_evaluation_req,
293 quorum,
294 signers,
295 init_state,
296 tracker_context,
297 ) = self.build_request_eval(ctx, &metadata, &request).await?;
298
299 if signers.is_empty() {
300 warn!(
301 request_id = %self.id,
302 schema_id = %metadata.schema_id,
303 "No evaluators available for schema"
304 );
305
306 return Err(RequestManagerError::NoEvaluatorsAvailable {
307 schema_id: metadata.schema_id.to_string(),
308 governance_id: signed_evaluation_req
309 .content()
310 .governance_id
311 .clone(),
312 });
313 }
314
315 self.run_evaluation(
316 ctx,
317 signed_evaluation_req.clone(),
318 quorum,
319 init_state,
320 tracker_context,
321 signers,
322 )
323 .await
324 }
325
326 const fn needs_subject_manager(&self) -> bool {
328 self.governance_id.is_some()
329 }
330
331 fn requester_id(&self) -> String {
332 self.id.to_string()
333 }
334
335 fn build_distribution_plan(
336 &self,
337 validation_req: &ValidationReq,
338 governance_data: Option<&GovernanceData>,
339 ) -> Result<Vec<DistributionPlanEntry>, RequestManagerError> {
340 let mut plan: HashMap<PublicKey, DistributionPlanMode> = HashMap::new();
341
342 match validation_req {
343 ValidationReq::Create { event_request, .. } => {
344 let EventRequest::Create(create) = event_request.content()
345 else {
346 return Err(
347 RequestManagerError::InvalidEventRequestForEvaluation,
348 );
349 };
350
351 if create.schema_id.is_gov() {
352 return Ok(Vec::new());
353 }
354
355 let Some(governance_data) = governance_data else {
356 return Err(RequestManagerError::ActorError(
357 ActorError::FunctionalCritical {
358 description:
359 "Missing governance data for distribution plan"
360 .to_owned(),
361 },
362 ));
363 };
364 let witnesses =
365 governance_data.get_witnesses(WitnessesData::Schema {
366 creator: event_request.signature().signer.clone(),
367 schema_id: create.schema_id.clone(),
368 namespace: create.namespace.clone(),
369 })?;
370
371 for witness in witnesses {
372 plan.insert(witness, DistributionPlanMode::Clear);
373 }
374 }
375 ValidationReq::Event {
376 actual_protocols,
377 event_request,
378 metadata,
379 ..
380 } => {
381 if metadata.schema_id.is_gov() {
382 return Ok(Vec::new());
383 }
384
385 let Some(governance_data) = governance_data else {
386 return Err(RequestManagerError::ActorError(
387 ActorError::FunctionalCritical {
388 description:
389 "Missing governance data for distribution plan"
390 .to_owned(),
391 },
392 ));
393 };
394 let protocols_success = actual_protocols.is_success();
395
396 return build_tracker_event_distribution_plan(
397 governance_data,
398 event_request.content(),
399 metadata,
400 protocols_success,
401 )
402 .map_err(|description| {
403 RequestManagerError::ActorError(
404 ActorError::FunctionalCritical { description },
405 )
406 });
407 }
408 }
409
410 Ok(plan
411 .into_iter()
412 .map(|(node, mode)| DistributionPlanEntry { node, mode })
413 .collect())
414 }
415
416 async fn check_data_eval(
417 &self,
418 ctx: &mut ActorContext<Self>,
419 request: &Signed<EventRequest>,
420 ) -> Result<Metadata, RequestManagerError> {
421 let (subject_id, confirm) = match request.content().clone() {
422 EventRequest::Fact(event) => (event.subject_id, false),
423 EventRequest::Transfer(event) => (event.subject_id, false),
424 EventRequest::Confirm(event) => (event.subject_id, true),
425 _ => {
426 return Err(
427 RequestManagerError::InvalidEventRequestForEvaluation,
428 );
429 }
430 };
431
432 let lease = acquire_subject(
433 ctx,
434 &self.subject_id,
435 self.requester_id(),
436 None,
437 self.needs_subject_manager(),
438 )
439 .await?;
440 let metadata = get_metadata(ctx, &subject_id).await;
441 lease.finish(ctx).await?;
442 let metadata = metadata?;
443
444 if confirm && !metadata.schema_id.is_gov() {
445 return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
446 }
447
448 Ok(metadata)
449 }
450
451 async fn get_governance_data(
452 &self,
453 ctx: &mut ActorContext<Self>,
454 ) -> Result<GovernanceData, RequestManagerError> {
455 let governance_id =
456 self.governance_id.as_ref().unwrap_or(&self.subject_id);
457 Ok(get_gov(ctx, governance_id).await?)
458 }
459
460 async fn build_request_eval(
461 &self,
462 ctx: &mut ActorContext<Self>,
463 metadata: &Metadata,
464 request: &Signed<EventRequest>,
465 ) -> Result<
466 (
467 Signed<EvaluationReq>,
468 Quorum,
469 HashSet<PublicKey>,
470 Option<ValueWrapper>,
471 EvalWorkerContext,
472 ),
473 RequestManagerError,
474 > {
475 let is_gov = metadata.schema_id.is_gov();
476
477 let request_type = EventRequestType::from(request.content());
478 let (evaluate_data, governance_data, init_state, tracker_context) =
479 match (is_gov, request_type.clone()) {
480 (true, EventRequestType::Fact) => {
481 let state =
482 GovernanceData::try_from(metadata.properties.clone())?;
483
484 (
485 EvaluateData::GovFact {
486 state: state.clone(),
487 },
488 state,
489 None,
490 EvalWorkerContext::default(),
491 )
492 }
493 (true, EventRequestType::Transfer) => {
494 let state =
495 GovernanceData::try_from(metadata.properties.clone())?;
496
497 (
498 EvaluateData::GovTransfer {
499 state: state.clone(),
500 },
501 state,
502 None,
503 EvalWorkerContext::default(),
504 )
505 }
506 (true, EventRequestType::Confirm) => {
507 let state =
508 GovernanceData::try_from(metadata.properties.clone())?;
509
510 (
511 EvaluateData::GovConfirm {
512 state: state.clone(),
513 },
514 state,
515 None,
516 EvalWorkerContext::default(),
517 )
518 }
519 (false, EventRequestType::Fact) => {
520 let governance_data =
521 get_gov(ctx, &metadata.governance_id).await?;
522
523 let init_state =
524 governance_data.get_init_state(&metadata.schema_id)?;
525 let tracker_context = Self::tracker_evaluation_context(
526 &governance_data,
527 &metadata.schema_id,
528 metadata.namespace.clone(),
529 &request_type,
530 )?;
531
532 (
533 EvaluateData::TrackerSchemasFact {
534 state: metadata.properties.clone(),
535 },
536 governance_data,
537 Some(init_state),
538 tracker_context,
539 )
540 }
541 (false, EventRequestType::Transfer) => {
542 let governance_data =
543 get_gov(ctx, &metadata.governance_id).await?;
544 let tracker_context = Self::tracker_evaluation_context(
545 &governance_data,
546 &metadata.schema_id,
547 metadata.namespace.clone(),
548 &request_type,
549 )?;
550 (
551 EvaluateData::TrackerSchemasTransfer {
552 state: metadata.properties.clone(),
553 },
554 governance_data,
555 None,
556 tracker_context,
557 )
558 }
559 _ => {
560 error!(
561 request_id = %self.id,
562 is_gov = is_gov,
563 request_type = ?request_type,
564 "Invalid event request type for evaluation state"
565 );
566 return Err(
567 RequestManagerError::InvalidEventRequestForEvaluation,
568 );
569 }
570 };
571
572 let (signers, quorum) = governance_data.get_quorum_and_signers(
573 ProtocolTypes::Evaluation,
574 &metadata.schema_id,
575 metadata.namespace.clone(),
576 )?;
577
578 let eval_req = EvaluationReq {
579 event_request: request.clone(),
580 data: evaluate_data,
581 sn: metadata.sn + 1,
582 gov_version: governance_data.version,
583 namespace: metadata.namespace.clone(),
584 schema_id: metadata.schema_id.clone(),
585 signer: (*self.our_key).clone(),
586 signer_is_owner: *self.our_key == request.signature().signer,
587 governance_id: metadata.governance_id.clone(),
588 };
589
590 let signature = get_sign(
591 ctx,
592 SignTypesNode::EvaluationReq(Box::new(eval_req.clone())),
593 )
594 .await?;
595
596 let signed_evaluation_req: Signed<EvaluationReq> =
597 Signed::from_parts(eval_req, signature);
598 Ok((
599 signed_evaluation_req,
600 quorum,
601 signers,
602 init_state,
603 tracker_context,
604 ))
605 }
606
607 async fn run_evaluation(
608 &mut self,
609 ctx: &mut ActorContext<Self>,
610 request: Signed<EvaluationReq>,
611 quorum: Quorum,
612 init_state: Option<ValueWrapper>,
613 context: EvalWorkerContext,
614 signers: HashSet<PublicKey>,
615 ) -> Result<(), RequestManagerError> {
616 let Some((hash, network)) = self.helpers.clone() else {
617 return Err(RequestManagerError::HelpersNotInitialized);
618 };
619
620 self.start_phase_metrics("evaluation");
621 info!("Init evaluation {}", self.id);
622 let child = ctx
623 .create_child(
624 "evaluation",
625 Evaluation::new(
626 self.our_key.clone(),
627 request,
628 quorum,
629 init_state,
630 context,
631 hash,
632 network,
633 ),
634 )
635 .await?;
636
637 child
638 .tell(EvaluationMessage::Create {
639 request_id: self.id.clone(),
640 version: self.version,
641 signers,
642 })
643 .await?;
644
645 send_to_tracking(
646 ctx,
647 RequestTrackingMessage::UpdateState {
648 request_id: self.id.clone(),
649 state: RequestState::Evaluation,
650 },
651 )
652 .await?;
653
654 Ok(())
655 }
656 async fn build_request_appro(
659 &self,
660 ctx: &mut ActorContext<Self>,
661 eval_req: EvaluationReq,
662 evaluator_res: EvaluatorResponse,
663 ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
664 let request = ApprovalReq {
665 subject_id: self.subject_id.clone(),
666 sn: eval_req.sn,
667 gov_version: eval_req.gov_version,
668 patch: evaluator_res.patch,
669 signer: eval_req.signer,
670 };
671
672 let signature =
673 get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
674
675 let signed_approval_req: Signed<ApprovalReq> =
676 Signed::from_parts(request, signature);
677
678 Ok(signed_approval_req)
679 }
680
681 async fn build_approval(
682 &mut self,
683 ctx: &mut ActorContext<Self>,
684 eval_req: EvaluationReq,
685 eval_res: EvaluatorResponse,
686 ) -> Result<(), RequestManagerError> {
687 let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
688
689 let governance_data =
690 get_gov(ctx, &request.content().subject_id).await?;
691
692 let (signers, quorum) = governance_data.get_quorum_and_signers(
693 ProtocolTypes::Approval,
694 &SchemaType::Governance,
695 Namespace::new(),
696 )?;
697
698 if signers.is_empty() {
699 warn!(
700 request_id = %self.id,
701 schema_id = %SchemaType::Governance,
702 "No approvers available for schema"
703 );
704
705 return Err(RequestManagerError::NoApproversAvailable {
706 schema_id: SchemaType::Governance.to_string(),
707 governance_id: self
708 .governance_id
709 .clone()
710 .unwrap_or_else(|| self.subject_id.clone()),
711 });
712 }
713
714 self.run_approval(ctx, request, quorum, signers).await
715 }
716
717 async fn run_approval(
718 &mut self,
719 ctx: &mut ActorContext<Self>,
720 request: Signed<ApprovalReq>,
721 quorum: Quorum,
722 signers: HashSet<PublicKey>,
723 ) -> Result<(), RequestManagerError> {
724 let Some((hash, network)) = self.helpers.clone() else {
725 return Err(RequestManagerError::HelpersNotInitialized);
726 };
727
728 self.start_phase_metrics("approval");
729 info!("Init approval {}", self.id);
730 let child = ctx
731 .create_child(
732 "approval",
733 Approval::new(
734 self.our_key.clone(),
735 request,
736 quorum,
737 signers,
738 hash,
739 network,
740 ),
741 )
742 .await?;
743
744 child
745 .tell(ApprovalMessage::Create {
746 request_id: self.id.clone(),
747 version: self.version,
748 })
749 .await?;
750
751 send_to_tracking(
752 ctx,
753 RequestTrackingMessage::UpdateState {
754 request_id: self.id.clone(),
755 state: RequestState::Approval,
756 },
757 )
758 .await?;
759
760 Ok(())
761 }
762
763 async fn build_validation_req(
766 &mut self,
767 ctx: &mut ActorContext<Self>,
768 eval: Option<(EvaluationReq, EvaluationData)>,
769 appro_data: Option<ApprovalData>,
770 ) -> Result<
771 (
772 Signed<ValidationReq>,
773 Quorum,
774 HashSet<PublicKey>,
775 Option<ValueWrapper>,
776 CurrentRequestRoles,
777 ),
778 RequestManagerError,
779 > {
780 let (
781 vali_req,
782 quorum,
783 signers,
784 init_state,
785 current_request_roles,
786 schema_id,
787 governance_data,
788 ) = self.build_validation_data(ctx, eval, appro_data).await?;
789
790 if signers.is_empty() {
791 let governance_id = vali_req.get_governance_id().map_err(|error| {
792 error!(
793 request_id = %self.id,
794 schema_id = %schema_id,
795 error = %error,
796 "Validation request has invalid governance_id"
797 );
798 RequestManagerError::ActorError(
799 ActorError::FunctionalCritical {
800 description: format!(
801 "Validation request has invalid governance_id: {}",
802 error
803 ),
804 },
805 )
806 })?;
807
808 warn!(
809 request_id = %self.id,
810 schema_id = %schema_id,
811 "No validators available for schema"
812 );
813
814 return Err(RequestManagerError::NoValidatorsAvailable {
815 schema_id: schema_id.to_string(),
816 governance_id,
817 });
818 }
819
820 let signature = get_sign(
821 ctx,
822 SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
823 )
824 .await?;
825
826 let distribution_plan =
827 self.build_distribution_plan(&vali_req, governance_data.as_ref())?;
828
829 let signed_validation_req: Signed<ValidationReq> =
830 Signed::from_parts(vali_req, signature);
831
832 self.on_event(
833 RequestManagerEvent::UpdateState {
834 state: Box::new(RequestManagerState::Validation {
835 request: Box::new(signed_validation_req.clone()),
836 quorum: quorum.clone(),
837 init_state: init_state.clone(),
838 current_request_roles: current_request_roles.clone(),
839 signers: signers.clone(),
840 distribution_plan: distribution_plan.clone(),
841 }),
842 },
843 ctx,
844 )
845 .await;
846
847 Ok((
848 signed_validation_req,
849 quorum,
850 signers,
851 init_state,
852 current_request_roles,
853 ))
854 }
855
856 async fn build_validation_data(
857 &self,
858 ctx: &mut ActorContext<Self>,
859 eval: Option<(EvaluationReq, EvaluationData)>,
860 appro_data: Option<ApprovalData>,
861 ) -> Result<
862 (
863 ValidationReq,
864 Quorum,
865 HashSet<PublicKey>,
866 Option<ValueWrapper>,
867 CurrentRequestRoles,
868 SchemaType,
869 Option<GovernanceData>,
870 ),
871 RequestManagerError,
872 > {
873 let Some(request) = self.request.clone() else {
874 return Err(RequestManagerError::RequestNotSet);
875 };
876
877 if let EventRequest::Create(create) = request.content() {
878 if create.schema_id.is_gov() {
879 let governance_data =
880 GovernanceData::new((*self.our_key).clone());
881 let (signers, quorum) = governance_data
882 .get_quorum_and_signers(
883 ProtocolTypes::Validation,
884 &SchemaType::Governance,
885 Namespace::new(),
886 )?;
887
888 Ok((
889 ValidationReq::Create {
890 event_request: request.clone(),
891 gov_version: 0,
892 subject_id: self.subject_id.clone(),
893 },
894 quorum,
895 signers,
896 None,
897 CurrentRequestRoles {
898 evaluation: RoleDataRegister {
899 workers: HashSet::new(),
900 quorum: Quorum::default(),
901 },
902 approval: RoleDataRegister {
903 workers: HashSet::new(),
904 quorum: Quorum::default(),
905 },
906 },
907 SchemaType::Governance,
908 None,
909 ))
910 } else {
911 let governance_data =
912 get_gov(ctx, &create.governance_id).await?;
913
914 let (signers, quorum) = governance_data
915 .get_quorum_and_signers(
916 ProtocolTypes::Validation,
917 &create.schema_id,
918 create.namespace.clone(),
919 )?;
920
921 let init_state =
922 governance_data.get_init_state(&create.schema_id)?;
923
924 Ok((
925 ValidationReq::Create {
926 event_request: request.clone(),
927 gov_version: governance_data.version,
928 subject_id: self.subject_id.clone(),
929 },
930 quorum,
931 signers,
932 Some(init_state),
933 CurrentRequestRoles {
934 evaluation: RoleDataRegister {
935 workers: HashSet::new(),
936 quorum: Quorum::default(),
937 },
938 approval: RoleDataRegister {
939 workers: HashSet::new(),
940 quorum: Quorum::default(),
941 },
942 },
943 create.schema_id.clone(),
944 Some(governance_data),
945 ))
946 }
947 } else {
948 let Some((hash, ..)) = self.helpers else {
949 return Err(RequestManagerError::HelpersNotInitialized);
950 };
951
952 let governance_data = self.get_governance_data(ctx).await?;
953
954 let (actual_protocols, gov_version, sn) =
955 if let Some((eval_req, eval_data)) = eval {
956 if let Some(approval_data) = appro_data.clone() {
957 (
958 ActualProtocols::EvalApprove {
959 eval_data,
960 approval_data,
961 },
962 eval_req.gov_version,
963 Some(eval_req.sn),
964 )
965 } else {
966 (
967 ActualProtocols::Eval { eval_data },
968 eval_req.gov_version,
969 Some(eval_req.sn),
970 )
971 }
972 } else {
973 (ActualProtocols::None, governance_data.version, None)
974 };
975
976 let lease = acquire_subject(
977 ctx,
978 &self.subject_id,
979 self.requester_id(),
980 None,
981 self.needs_subject_manager(),
982 )
983 .await?;
984 let metadata = get_metadata(ctx, &self.subject_id).await;
985 let last_ledger_event = match metadata {
986 Ok(metadata) => {
987 let last_ledger_event =
988 get_last_ledger_event(ctx, &self.subject_id).await;
989 lease.finish(ctx).await?;
990 (metadata, last_ledger_event?)
991 }
992 Err(error) => {
993 lease.finish(ctx).await?;
994 return Err(error.into());
995 }
996 };
997
998 let (metadata, last_ledger_event) = last_ledger_event;
999
1000 if gov_version != governance_data.version {
1001 return Err(RequestManagerError::GovernanceVersionChanged {
1002 governance_id: metadata.governance_id,
1003 expected: gov_version,
1004 current: governance_data.version,
1005 });
1006 }
1007
1008 let sn = if let Some(sn) = sn {
1009 sn
1010 } else {
1011 metadata.sn + 1
1012 };
1013
1014 let (signers, quorum) = governance_data.get_quorum_and_signers(
1015 ProtocolTypes::Validation,
1016 &metadata.schema_id,
1017 metadata.namespace.clone(),
1018 )?;
1019
1020 let Some(last_ledger_event) = last_ledger_event else {
1021 return Err(RequestManagerError::LastLedgerEventNotFound);
1022 };
1023
1024 let ledger_hash = last_ledger_event.ledger_hash(hash)?;
1025 let schema_id = metadata.schema_id.clone();
1026
1027 let current_request_roles =
1028 if gov_version == governance_data.version {
1029 let (evaluation_workers, evaluation_quorum) =
1030 governance_data.get_quorum_and_signers(
1031 ProtocolTypes::Evaluation,
1032 &metadata.schema_id,
1033 metadata.namespace.clone(),
1034 )?;
1035
1036 let (approval_workers, approval_quorum) =
1037 if appro_data.is_some() {
1038 governance_data.get_quorum_and_signers(
1039 ProtocolTypes::Approval,
1040 &SchemaType::Governance,
1041 Namespace::new(),
1042 )?
1043 } else {
1044 (HashSet::new(), Quorum::default())
1045 };
1046
1047 CurrentRequestRoles {
1048 evaluation: RoleDataRegister {
1049 workers: evaluation_workers,
1050 quorum: evaluation_quorum,
1051 },
1052 approval: RoleDataRegister {
1053 workers: approval_workers,
1054 quorum: approval_quorum,
1055 },
1056 }
1057 } else {
1058 CurrentRequestRoles {
1059 evaluation: RoleDataRegister {
1060 workers: HashSet::new(),
1061 quorum: Quorum::default(),
1062 },
1063 approval: RoleDataRegister {
1064 workers: HashSet::new(),
1065 quorum: Quorum::default(),
1066 },
1067 }
1068 };
1069
1070 Ok((
1071 ValidationReq::Event {
1072 actual_protocols: Box::new(actual_protocols),
1073 event_request: request.clone(),
1074 metadata: Box::new(metadata),
1075 last_data: Box::new(LastData {
1076 vali_data: last_ledger_event
1077 .protocols
1078 .get_validation_data(),
1079 gov_version: last_ledger_event.gov_version,
1080 }),
1081 gov_version,
1082 ledger_hash,
1083 sn,
1084 },
1085 quorum,
1086 signers,
1087 None,
1088 current_request_roles,
1089 schema_id,
1090 Some(governance_data),
1091 ))
1092 }
1093 }
1094
1095 async fn run_validation(
1096 &mut self,
1097 ctx: &mut ActorContext<Self>,
1098 request: Signed<ValidationReq>,
1099 quorum: Quorum,
1100 signers: HashSet<PublicKey>,
1101 init_state: Option<ValueWrapper>,
1102 current_request_roles: CurrentRequestRoles,
1103 ) -> Result<(), RequestManagerError> {
1104 let Some((hash, network)) = self.helpers.clone() else {
1105 return Err(RequestManagerError::HelpersNotInitialized);
1106 };
1107
1108 self.start_phase_metrics("validation");
1109 info!("Init validation {}", self.id);
1110 let child = ctx
1111 .create_child(
1112 "validation",
1113 Validation::new(
1114 self.our_key.clone(),
1115 request,
1116 init_state,
1117 current_request_roles,
1118 quorum,
1119 hash,
1120 network,
1121 ),
1122 )
1123 .await?;
1124
1125 child
1126 .tell(ValidationMessage::Create {
1127 request_id: self.id.clone(),
1128 version: self.version,
1129 signers,
1130 })
1131 .await?;
1132
1133 send_to_tracking(
1134 ctx,
1135 RequestTrackingMessage::UpdateState {
1136 request_id: self.id.clone(),
1137 state: RequestState::Validation,
1138 },
1139 )
1140 .await?;
1141
1142 Ok(())
1143 }
1144 async fn build_ledger(
1147 &mut self,
1148 ctx: &mut ActorContext<Self>,
1149 val_req: ValidationReq,
1150 val_res: ValidationData,
1151 distribution_plan: Vec<DistributionPlanEntry>,
1152 ) -> Result<Ledger, RequestManagerError> {
1153 let Some((hash, ..)) = self.helpers else {
1154 return Err(RequestManagerError::HelpersNotInitialized);
1155 };
1156
1157 let (protocols, ledger_seal) = match val_req {
1158 ValidationReq::Create {
1159 event_request,
1160 gov_version,
1161 ..
1162 } => {
1163 let protocols = Protocols::Create {
1164 event_request,
1165 validation: val_res,
1166 };
1167
1168 let protocols_hash = protocols.hash_for_ledger(&hash)?;
1169
1170 let ledger_seal = LedgerSeal {
1171 gov_version,
1172 sn: 0,
1173 prev_ledger_event_hash: DigestIdentifier::default(),
1174 protocols_hash,
1175 };
1176
1177 (protocols, ledger_seal)
1178 }
1179 ValidationReq::Event {
1180 actual_protocols,
1181 event_request,
1182 ledger_hash,
1183 metadata,
1184 gov_version,
1185 sn,
1186 ..
1187 } => {
1188 let protocols = Protocols::build(
1189 metadata.schema_id.is_gov(),
1190 event_request,
1191 *actual_protocols,
1192 val_res,
1193 )?;
1194
1195 let protocols_hash = protocols.hash_for_ledger(&hash)?;
1196
1197 let ledger_seal = LedgerSeal {
1198 gov_version,
1199 sn,
1200 prev_ledger_event_hash: ledger_hash,
1201 protocols_hash,
1202 };
1203
1204 (protocols, ledger_seal)
1205 }
1206 };
1207
1208 let signature =
1209 get_sign(ctx, SignTypesNode::LedgerSeal(ledger_seal.clone()))
1210 .await?;
1211
1212 let ledger = Ledger {
1213 gov_version: ledger_seal.gov_version,
1214 sn: ledger_seal.sn,
1215 prev_ledger_event_hash: ledger_seal.prev_ledger_event_hash,
1216 ledger_seal_signature: signature,
1217 protocols,
1218 };
1219
1220 self.on_event(
1221 RequestManagerEvent::UpdateState {
1222 state: Box::new(RequestManagerState::UpdateSubject {
1223 ledger: ledger.clone(),
1224 distribution_plan: distribution_plan.clone(),
1225 }),
1226 },
1227 ctx,
1228 )
1229 .await;
1230
1231 Ok(ledger)
1232 }
1233
1234 async fn update_subject(
1235 &mut self,
1236 ctx: &mut ActorContext<Self>,
1237 ledger: Ledger,
1238 distribution_plan: Vec<DistributionPlanEntry>,
1239 ) -> Result<(), RequestManagerError> {
1240 if ledger.get_event_request_type().is_create_event() {
1241 if let Err(e) = create_subject(ctx, ledger.clone()).await {
1242 if let ActorError::Functional { .. } = e {
1243 return Err(RequestManagerError::CheckLimit);
1244 }
1245 return Err(e.into());
1246 }
1247 } else {
1248 let lease = acquire_subject(
1249 ctx,
1250 &self.subject_id,
1251 self.requester_id(),
1252 None,
1253 self.needs_subject_manager(),
1254 )
1255 .await?;
1256 let update_result =
1257 update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1258 .await;
1259 lease.finish(ctx).await?;
1260 update_result?;
1261 }
1262
1263 self.on_event(
1264 RequestManagerEvent::UpdateState {
1265 state: Box::new(RequestManagerState::Distribution {
1266 ledger,
1267 distribution_plan,
1268 }),
1269 },
1270 ctx,
1271 )
1272 .await;
1273
1274 Ok(())
1275 }
1276
1277 async fn build_distribution(
1278 &mut self,
1279 ctx: &mut ActorContext<Self>,
1280 ledger: Ledger,
1281 mut distribution_plan: Vec<DistributionPlanEntry>,
1282 ) -> Result<bool, RequestManagerError> {
1283 let is_governance = match ledger.get_event_request() {
1284 Some(EventRequest::Create(create)) => create.schema_id.is_gov(),
1285 Some(_) => self.governance_id.is_none(),
1286 None => false,
1287 };
1288
1289 if is_governance {
1290 let governance_id = ledger.get_subject_id();
1291 let governance_data = get_gov(ctx, &governance_id).await?;
1292 distribution_plan = governance_data
1293 .get_witnesses(WitnessesData::Gov)?
1294 .into_iter()
1295 .map(|node| DistributionPlanEntry {
1296 node,
1297 mode: DistributionPlanMode::Clear,
1298 })
1299 .collect();
1300 }
1301
1302 if distribution_plan.is_empty() {
1303 return Ok(false);
1304 }
1305
1306 distribution_plan.retain(|entry| entry.node != *self.our_key);
1307
1308 if distribution_plan.is_empty() {
1309 warn!(
1310 request_id = %self.id,
1311 "No witnesses available for distribution"
1312 );
1313 return Ok(false);
1314 }
1315
1316 self.run_distribution(ctx, distribution_plan, ledger)
1317 .await?;
1318
1319 Ok(true)
1320 }
1321
1322 async fn run_distribution(
1323 &mut self,
1324 ctx: &mut ActorContext<Self>,
1325 distribution_plan: Vec<DistributionPlanEntry>,
1326 ledger: Ledger,
1327 ) -> Result<(), RequestManagerError> {
1328 let Some((.., network)) = self.helpers.clone() else {
1329 return Err(RequestManagerError::HelpersNotInitialized);
1330 };
1331
1332 self.start_phase_metrics("distribution");
1333 info!("Init distribution {}", self.id);
1334 let child = ctx
1335 .create_child(
1336 "distribution",
1337 Distribution::new(
1338 network,
1339 DistributionType::Request,
1340 self.id.clone(),
1341 ),
1342 )
1343 .await?;
1344
1345 child
1346 .tell(DistributionMessage::Create {
1347 ledger: Box::new(ledger),
1348 distribution_plan,
1349 })
1350 .await?;
1351
1352 send_to_tracking(
1353 ctx,
1354 RequestTrackingMessage::UpdateState {
1355 request_id: self.id.clone(),
1356 state: RequestState::Distribution,
1357 },
1358 )
1359 .await?;
1360
1361 Ok(())
1362 }
1363
1364 async fn init_wait(
1367 &self,
1368 ctx: &mut ActorContext<Self>,
1369 governance_id: &DigestIdentifier,
1370 ) -> Result<(), RequestManagerError> {
1371 let Some(config): Option<ConfigHelper> =
1372 ctx.system().get_helper("config").await
1373 else {
1374 return Err(RequestManagerError::ActorError(ActorError::Helper {
1375 name: "config".to_owned(),
1376 reason: "Not found".to_owned(),
1377 }));
1378 };
1379 let actor = ctx
1380 .create_child(
1381 "reboot",
1382 Reboot::new(
1383 governance_id.clone(),
1384 self.id.clone(),
1385 config.sync_reboot.stability_check_interval_secs,
1386 config.sync_reboot.stability_check_max_retries,
1387 ),
1388 )
1389 .await?;
1390
1391 actor.tell(RebootMessage::Init).await?;
1392
1393 Ok(())
1394 }
1395
1396 async fn init_update(
1397 &self,
1398 ctx: &mut ActorContext<Self>,
1399 governance_id: &DigestIdentifier,
1400 ) -> Result<(), RequestManagerError> {
1401 let Some((.., network)) = self.helpers.clone() else {
1402 return Err(RequestManagerError::HelpersNotInitialized);
1403 };
1404
1405 let gov_sn = get_gov_sn(ctx, governance_id).await?;
1406
1407 let governance_data = get_gov(ctx, governance_id).await?;
1408
1409 let mut witnesses = {
1410 let gov_witnesses =
1411 governance_data.get_witnesses(WitnessesData::Gov)?;
1412
1413 let auth_witnesses =
1414 Self::get_witnesses_auth(ctx, governance_id.clone())
1415 .await
1416 .unwrap_or_default();
1417
1418 gov_witnesses
1419 .union(&auth_witnesses)
1420 .cloned()
1421 .collect::<HashSet<PublicKey>>()
1422 };
1423
1424 witnesses.remove(&self.our_key);
1425
1426 if witnesses.is_empty() {
1427 if let Ok(actor) = ctx.reference().await {
1428 actor
1429 .tell(RequestManagerMessage::FinishReboot {
1430 request_id: self.id.clone(),
1431 })
1432 .await?;
1433 };
1434 } else if witnesses.len() == 1 {
1435 let Some(objetive) = witnesses.iter().next() else {
1436 error!(
1437 request_id = %self.id,
1438 governance_id = %governance_id,
1439 "Witness set became empty while selecting single reboot target"
1440 );
1441 return Err(RequestManagerError::ActorError(
1442 ActorError::FunctionalCritical {
1443 description:
1444 "Witness set became empty while selecting single reboot target"
1445 .to_owned(),
1446 },
1447 ));
1448 };
1449 let info = ComunicateInfo {
1450 receiver: objetive.clone(),
1451 request_id: String::default(),
1452 version: 0,
1453 receiver_actor: format!(
1454 "/user/node/distributor_{}",
1455 governance_id
1456 ),
1457 };
1458
1459 network
1460 .send_command(ave_network::CommandHelper::SendMessage {
1461 message: NetworkMessage {
1462 info,
1463 message: ActorMessage::DistributionLedgerReq {
1464 actual_sn: Some(gov_sn),
1465 target_sn: None,
1466 subject_id: governance_id.clone(),
1467 },
1468 },
1469 })
1470 .await?;
1471
1472 let Ok(actor) = ctx.reference().await else {
1473 return Ok(());
1474 };
1475
1476 actor
1477 .tell(RequestManagerMessage::RebootWait {
1478 request_id: self.id.clone(),
1479 governance_id: governance_id.clone(),
1480 })
1481 .await?;
1482 } else {
1483 let Some(config): Option<ConfigHelper> =
1484 ctx.system().get_helper("config").await
1485 else {
1486 return Ok(());
1487 };
1488 let data = UpdateNew {
1489 network,
1490 subject_id: governance_id.clone(),
1491 our_sn: Some(gov_sn),
1492 witnesses,
1493 update_type: UpdateType::Request {
1494 subject_id: self.subject_id.clone(),
1495 id: self.id.clone(),
1496 },
1497 subject_kind_hint: Some(
1498 crate::update::UpdateSubjectKind::Governance,
1499 ),
1500 round_retry_interval_secs: config
1501 .sync_update
1502 .round_retry_interval_secs,
1503 max_round_retries: config.sync_update.max_round_retries,
1504 witness_retry_count: config.sync_update.witness_retry_count,
1505 witness_retry_interval_secs: config
1506 .sync_update
1507 .witness_retry_interval_secs,
1508 };
1509
1510 let updater = Update::new(data);
1511 let Ok(child) = ctx.create_child("update", updater).await else {
1512 let Ok(actor) = ctx.reference().await else {
1513 return Ok(());
1514 };
1515
1516 actor
1517 .tell(RequestManagerMessage::RebootWait {
1518 request_id: self.id.clone(),
1519 governance_id: governance_id.clone(),
1520 })
1521 .await?;
1522
1523 return Ok(());
1524 };
1525
1526 child.tell(UpdateMessage::Run).await?;
1527 }
1528
1529 Ok(())
1530 }
1531
1532 async fn get_witnesses_auth(
1533 ctx: &ActorContext<Self>,
1534 governance_id: DigestIdentifier,
1535 ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1536 let path = ActorPath::from("/user/node/auth");
1537 let actor = ctx.system().get_actor::<Auth>(&path).await?;
1538
1539 let response = actor
1540 .ask(AuthMessage::GetAuth {
1541 subject_id: governance_id,
1542 })
1543 .await?;
1544
1545 match response {
1546 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1547 _ => Err(RequestManagerError::ActorError(
1548 ActorError::UnexpectedResponse {
1549 path,
1550 expected: "AuthResponse::Witnesses".to_owned(),
1551 },
1552 )),
1553 }
1554 }
1555
1556 async fn send_reboot(
1559 &self,
1560 ctx: &ActorContext<Self>,
1561 governance_id: DigestIdentifier,
1562 ) -> Result<(), ActorError> {
1563 let Ok(actor) = ctx.reference().await else {
1564 return Ok(());
1565 };
1566
1567 actor
1568 .tell(RequestManagerMessage::Reboot {
1569 request_id: self.id.clone(),
1570 governance_id,
1571 reboot_type: RebootType::TimeOut,
1572 })
1573 .await
1574 }
1575
1576 async fn match_error(
1577 &mut self,
1578 ctx: &mut ActorContext<Self>,
1579 error: RequestManagerError,
1580 ) {
1581 match error {
1582 RequestManagerError::NoEvaluatorsAvailable {
1583 governance_id,
1584 ..
1585 }
1586 | RequestManagerError::NoApproversAvailable {
1587 governance_id, ..
1588 }
1589 | RequestManagerError::NoValidatorsAvailable {
1590 governance_id,
1591 ..
1592 }
1593 | RequestManagerError::GovernanceVersionChanged {
1594 governance_id,
1595 ..
1596 } => {
1597 if let Err(e) = self.send_reboot(ctx, governance_id).await {
1598 emit_fail(ctx, e).await;
1599 }
1600 }
1601 RequestManagerError::CheckLimit
1602 | RequestManagerError::Governance(..)
1603 | RequestManagerError::NotIssuer
1604 | RequestManagerError::NotCreator => {
1605 if let Err(e) = self
1606 .abort_request(
1607 ctx,
1608 error.to_string(),
1609 None,
1610 (*self.our_key).clone(),
1611 )
1612 .await
1613 {
1614 emit_fail(
1615 ctx,
1616 ActorError::FunctionalCritical {
1617 description: e.to_string(),
1618 },
1619 )
1620 .await;
1621 }
1622 }
1623 _ => {
1624 emit_fail(
1625 ctx,
1626 ActorError::FunctionalCritical {
1627 description: error.to_string(),
1628 },
1629 )
1630 .await;
1631 }
1632 }
1633 }
1634
1635 async fn finish_request(
1636 &mut self,
1637 ctx: &mut ActorContext<Self>,
1638 ) -> Result<(), RequestManagerError> {
1639 self.finish_request_metrics("finished");
1640 info!("Ending {}", self.id);
1641 send_to_tracking(
1642 ctx,
1643 RequestTrackingMessage::UpdateState {
1644 request_id: self.id.clone(),
1645 state: RequestState::Finish,
1646 },
1647 )
1648 .await?;
1649
1650 self.on_event(RequestManagerEvent::Finish, ctx).await;
1651
1652 self.end_request(ctx).await?;
1653
1654 Ok(())
1655 }
1656
1657 async fn reboot(
1658 &mut self,
1659 ctx: &mut ActorContext<Self>,
1660 reboot_type: RebootType,
1661 governance_id: DigestIdentifier,
1662 ) -> Result<(), RequestManagerError> {
1663 let Some(config): Option<ConfigHelper> =
1664 ctx.system().get_helper("config").await
1665 else {
1666 return Err(ActorError::Helper {
1667 name: "config".to_owned(),
1668 reason: "Not found".to_owned(),
1669 }
1670 .into());
1671 };
1672 self.start_phase_metrics("reboot");
1673 self.on_event(
1674 RequestManagerEvent::UpdateState {
1675 state: Box::new(RequestManagerState::Reboot),
1676 },
1677 ctx,
1678 )
1679 .await;
1680
1681 let Ok(actor) = ctx.reference().await else {
1682 return Ok(());
1683 };
1684
1685 let request_id = self.id.clone();
1686
1687 match reboot_type {
1688 RebootType::Normal => {
1689 info!("Launching Normal reboot {}", self.id);
1690 send_to_tracking(
1691 ctx,
1692 RequestTrackingMessage::UpdateState {
1693 request_id: self.id.clone(),
1694 state: RequestState::Reboot,
1695 },
1696 )
1697 .await?;
1698
1699 actor
1700 .tell(RequestManagerMessage::RebootUpdate {
1701 request_id,
1702 governance_id,
1703 })
1704 .await?;
1705 }
1706 RebootType::Diff => {
1707 info!("Launching Diff reboot {}", self.id);
1708 self.retry_diff += 1;
1709
1710 let seconds = Self::retry_seconds_for_attempt(
1711 &config.sync_reboot.diff_retry_schedule_secs,
1712 self.retry_diff,
1713 );
1714
1715 info!(
1716 "Launching Diff reboot {}, try: {}, seconds: {}",
1717 self.id, self.retry_diff, seconds
1718 );
1719
1720 send_to_tracking(
1721 ctx,
1722 RequestTrackingMessage::UpdateState {
1723 request_id: self.id.clone(),
1724 state: RequestState::RebootDiff {
1725 seconds,
1726 count: self.retry_diff,
1727 },
1728 },
1729 )
1730 .await?;
1731
1732 tokio::spawn(async move {
1733 tokio::time::sleep(Duration::from_secs(seconds)).await;
1734 let _ = actor
1735 .tell(RequestManagerMessage::RebootUpdate {
1736 request_id,
1737 governance_id,
1738 })
1739 .await;
1740 });
1741 }
1742 RebootType::TimeOut => {
1743 self.retry_timeout += 1;
1744
1745 let seconds = Self::retry_seconds_for_attempt(
1746 &config.sync_reboot.timeout_retry_schedule_secs,
1747 self.retry_timeout,
1748 );
1749
1750 info!(
1751 "Launching TimeOut reboot {}, try: {}, seconds: {}",
1752 self.id, self.retry_timeout, seconds
1753 );
1754 send_to_tracking(
1755 ctx,
1756 RequestTrackingMessage::UpdateState {
1757 request_id: self.id.clone(),
1758 state: RequestState::RebootTimeOut {
1759 seconds,
1760 count: self.retry_timeout,
1761 },
1762 },
1763 )
1764 .await?;
1765
1766 tokio::spawn(async move {
1767 tokio::time::sleep(Duration::from_secs(seconds)).await;
1768 let _ = actor
1769 .tell(RequestManagerMessage::RebootUpdate {
1770 request_id,
1771 governance_id,
1772 })
1773 .await;
1774 });
1775 }
1776 }
1777
1778 Ok(())
1779 }
1780
1781 async fn match_command(
1782 &mut self,
1783 ctx: &mut ActorContext<Self>,
1784 ) -> Result<(), RequestManagerError> {
1785 match self.command {
1786 ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1787 ReqManInitMessage::Validate => {
1788 let (
1789 request,
1790 quorum,
1791 signers,
1792 init_state,
1793 current_request_roles,
1794 ) = self.build_validation_req(ctx, None, None).await?;
1795
1796 self.run_validation(
1797 ctx,
1798 request,
1799 quorum,
1800 signers,
1801 init_state,
1802 current_request_roles,
1803 )
1804 .await
1805 }
1806 }
1807 }
1808
1809 async fn check_request_roles_after_reboot(
1810 &self,
1811 ctx: &mut ActorContext<Self>,
1812 ) -> Result<(), RequestManagerError> {
1813 let Some(request) = self.request.clone() else {
1814 return Err(RequestManagerError::RequestNotSet);
1815 };
1816
1817 let gov = self.get_governance_data(ctx).await?;
1818 let subject_data = match request.content() {
1819 EventRequest::Create(..) => None,
1820 _ => get_subject_data(ctx, &self.subject_id).await?,
1821 };
1822
1823 let creator_scope =
1824 match request.content() {
1825 EventRequest::Create(create) if !create.schema_id.is_gov() => {
1826 Some((create.schema_id.clone(), create.namespace.clone()))
1827 }
1828 _ => subject_data.as_ref().and_then(|subject_data| {
1829 match subject_data {
1830 SubjectData::Tracker {
1831 schema_id,
1832 namespace,
1833 ..
1834 } => Some((
1835 schema_id.clone(),
1836 Namespace::from(namespace.clone()),
1837 )),
1838 _ => None,
1839 }
1840 }),
1841 };
1842
1843 if let Some((schema_id, namespace)) = creator_scope
1844 && !gov.has_this_role(HashThisRole::Schema {
1845 who: (*self.our_key).clone(),
1846 role: RoleTypes::Creator,
1847 schema_id,
1848 namespace,
1849 })
1850 {
1851 return Err(RequestManagerError::NotCreator);
1852 }
1853
1854 if let EventRequest::Fact { .. } = request.content() {
1855 let Some(subject_data) = subject_data else {
1856 return Err(RequestManagerError::SubjecData);
1857 };
1858
1859 match subject_data {
1860 SubjectData::Tracker {
1861 schema_id,
1862 namespace,
1863 ..
1864 } => {
1865 if !gov.has_this_role(HashThisRole::Schema {
1866 who: request.signature().signer.clone(),
1867 role: RoleTypes::Issuer,
1868 schema_id,
1869 namespace: Namespace::from(namespace),
1870 }) {
1871 return Err(RequestManagerError::NotIssuer);
1872 }
1873 }
1874 SubjectData::Governance { .. } => {
1875 if !gov.has_this_role(HashThisRole::Gov {
1876 who: request.signature().signer.clone(),
1877 role: RoleTypes::Issuer,
1878 }) {
1879 return Err(RequestManagerError::NotIssuer);
1880 }
1881 }
1882 }
1883 }
1884
1885 Ok(())
1886 }
1887
1888 async fn stops_childs(
1889 &self,
1890 ctx: &mut ActorContext<Self>,
1891 ) -> Result<(), RequestManagerError> {
1892 match self.state {
1893 RequestManagerState::Reboot => {
1894 if let Ok(actor) = ctx.get_child::<Update>("update").await {
1895 actor.ask_stop().await?;
1896 };
1897 if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1898 actor.ask_stop().await?;
1899 };
1900 }
1901 RequestManagerState::Evaluation => {
1902 if let Ok(actor) =
1903 ctx.get_child::<Evaluation>("evaluation").await
1904 {
1905 actor.ask_stop().await?;
1906 };
1907 }
1908 RequestManagerState::Approval { .. } => {
1909 if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1910 actor.ask_stop().await?;
1911 };
1912 let _ = make_obsolete(ctx, &self.subject_id).await;
1913 }
1914 RequestManagerState::Validation { .. } => {
1915 if let Ok(actor) =
1916 ctx.get_child::<Validation>("validation").await
1917 {
1918 actor.ask_stop().await?;
1919 };
1920 }
1921 RequestManagerState::Distribution { .. } => {
1922 if let Ok(actor) =
1923 ctx.get_child::<Distribution>("distribution").await
1924 {
1925 actor.ask_stop().await?;
1926 };
1927 }
1928 _ => {}
1929 }
1930
1931 Ok(())
1932 }
1933
1934 async fn abort_request(
1935 &mut self,
1936 ctx: &mut ActorContext<Self>,
1937 error: String,
1938 sn: Option<u64>,
1939 who: PublicKey,
1940 ) -> Result<(), RequestManagerError> {
1941 self.stops_childs(ctx).await?;
1942
1943 self.finish_request_metrics("aborted");
1944 info!("Aborting {}", self.id);
1945 send_to_tracking(
1946 ctx,
1947 RequestTrackingMessage::UpdateState {
1948 request_id: self.id.clone(),
1949 state: RequestState::Abort {
1950 subject_id: self.subject_id.to_string(),
1951 error,
1952 sn,
1953 who: who.to_string(),
1954 },
1955 },
1956 )
1957 .await?;
1958
1959 self.on_event(RequestManagerEvent::Finish, ctx).await;
1960
1961 self.end_request(ctx).await?;
1962
1963 Ok(())
1964 }
1965
1966 async fn end_request(
1967 &self,
1968 ctx: &ActorContext<Self>,
1969 ) -> Result<(), RequestManagerError> {
1970 let actor = ctx.get_parent::<RequestHandler>().await?;
1971 actor
1972 .tell(RequestHandlerMessage::EndHandling {
1973 subject_id: self.subject_id.clone(),
1974 })
1975 .await?;
1976
1977 Ok(())
1978 }
1979}
1980
1981#[derive(Debug, Clone)]
1982pub enum RequestManagerMessage {
1983 Run {
1984 request_id: DigestIdentifier,
1985 },
1986 FirstRun {
1987 command: ReqManInitMessage,
1988 request: Signed<EventRequest>,
1989 request_id: DigestIdentifier,
1990 },
1991 Abort {
1992 request_id: DigestIdentifier,
1993 who: PublicKey,
1994 reason: String,
1995 sn: u64,
1996 },
1997 ManualAbort,
1998 PurgeStorage,
1999 Reboot {
2000 request_id: DigestIdentifier,
2001 governance_id: DigestIdentifier,
2002 reboot_type: RebootType,
2003 },
2004 RebootUpdate {
2005 request_id: DigestIdentifier,
2006 governance_id: DigestIdentifier,
2007 },
2008 RebootWait {
2009 request_id: DigestIdentifier,
2010 governance_id: DigestIdentifier,
2011 },
2012 FinishReboot {
2013 request_id: DigestIdentifier,
2014 },
2015 EvaluationRes {
2016 request_id: DigestIdentifier,
2017 eval_req: Box<EvaluationReq>,
2018 eval_res: EvaluationData,
2019 },
2020 ApprovalRes {
2021 request_id: DigestIdentifier,
2022 appro_res: ApprovalData,
2023 },
2024 ValidationRes {
2025 request_id: DigestIdentifier,
2026 val_req: Box<ValidationReq>,
2027 val_res: ValidationData,
2028 },
2029 FinishRequest {
2030 request_id: DigestIdentifier,
2031 },
2032}
2033
2034impl Message for RequestManagerMessage {}
2035
2036#[derive(
2037 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
2038)]
2039pub enum RequestManagerEvent {
2040 Finish,
2041 UpdateState {
2042 state: Box<RequestManagerState>,
2043 },
2044 UpdateVersion {
2045 version: u64,
2046 },
2047 SafeState {
2048 command: ReqManInitMessage,
2049 request: Signed<EventRequest>,
2050 },
2051}
2052
2053impl Event for RequestManagerEvent {}
2054
2055#[async_trait]
2056impl Actor for RequestManager {
2057 type Event = RequestManagerEvent;
2058 type Message = RequestManagerMessage;
2059 type Response = ();
2060
2061 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
2062 parent_span.map_or_else(
2063 || info_span!("RequestManager", id),
2064 |parent_span| info_span!(parent: parent_span, "RequestManager", id),
2065 )
2066 }
2067
2068 async fn pre_start(
2069 &mut self,
2070 ctx: &mut ActorContext<Self>,
2071 ) -> Result<(), ActorError> {
2072 if let Err(e) =
2073 self.init_store("request_manager", None, false, ctx).await
2074 {
2075 error!(
2076 error = %e,
2077 subject_id = %self.subject_id,
2078 "Failed to initialize store"
2079 );
2080 return Err(e);
2081 }
2082
2083 if self.governance_id.is_none()
2084 && let Some(request) = &self.request
2085 && let EventRequest::Create(create) = request.content()
2086 && !create.schema_id.is_gov()
2087 {
2088 self.governance_id = Some(create.governance_id.clone());
2089 }
2090
2091 Ok(())
2092 }
2093}
2094
2095#[async_trait]
2096impl Handler<Self> for RequestManager {
2097 #[allow(clippy::large_stack_frames)]
2100 async fn handle_message(
2101 &mut self,
2102 _sender: ActorPath,
2103 msg: RequestManagerMessage,
2104 ctx: &mut ave_actors::ActorContext<Self>,
2105 ) -> Result<(), ActorError> {
2106 match msg {
2107 RequestManagerMessage::RebootUpdate {
2108 governance_id,
2109 request_id,
2110 } => {
2111 if request_id == self.id {
2112 info!("Init reboot update {}", self.id);
2113 debug!(
2114 msg_type = "RebootUpdate",
2115 request_id = %self.id,
2116 governance_id = %governance_id,
2117 "Initializing reboot update"
2118 );
2119
2120 if let Err(e) = self.init_update(ctx, &governance_id).await
2121 {
2122 error!(
2123 msg_type = "RebootUpdate",
2124 request_id = %self.id,
2125 governance_id = %governance_id,
2126 error = %e,
2127 "Failed to initialize reboot update"
2128 );
2129 self.match_error(ctx, e).await;
2130 return Ok(());
2131 }
2132 }
2133 }
2134 RequestManagerMessage::RebootWait {
2135 governance_id,
2136 request_id,
2137 } => {
2138 if request_id == self.id {
2139 info!("Init reboot wait {}", self.id);
2140 debug!(
2141 msg_type = "RebootWait",
2142 request_id = %self.id,
2143 governance_id = %governance_id,
2144 "Initializing reboot wait"
2145 );
2146
2147 if let Err(e) = self.init_wait(ctx, &governance_id).await {
2148 error!(
2149 msg_type = "RebootWait",
2150 request_id = %self.id,
2151 governance_id = %governance_id,
2152 error = %e,
2153 "Failed to initialize reboot wait"
2154 );
2155 self.match_error(ctx, e).await;
2156 return Ok(());
2157 }
2158 }
2159 }
2160 RequestManagerMessage::Reboot {
2161 governance_id,
2162 request_id,
2163 reboot_type,
2164 } => {
2165 if request_id == self.id {
2166 if matches!(self.state, RequestManagerState::Reboot) {
2167 debug!(
2168 msg_type = "Reboot",
2169 request_id = %self.id,
2170 governance_id = %governance_id,
2171 reboot_type = ?reboot_type,
2172 "Already in reboot state, ignoring"
2173 );
2174 } else {
2175 debug!(
2176 msg_type = "Reboot",
2177 request_id = %self.id,
2178 governance_id = %governance_id,
2179 reboot_type = ?reboot_type,
2180 "Initiating reboot"
2181 );
2182 if let Err(e) = self.stops_childs(ctx).await {
2183 error!(
2184 msg_type = "Reboot",
2185 request_id = %self.id,
2186 governance_id = %governance_id,
2187 error = %e,
2188 "Failed to stop childs"
2189 );
2190 self.match_error(ctx, e).await;
2191 return Ok(());
2192 };
2193 if let Err(e) = self
2194 .reboot(ctx, reboot_type, governance_id.clone())
2195 .await
2196 {
2197 error!(
2198 msg_type = "Reboot",
2199 request_id = %self.id,
2200 governance_id = %governance_id,
2201 error = %e,
2202 "Failed to initiate reboot"
2203 );
2204 self.match_error(ctx, e).await;
2205 return Ok(());
2206 }
2207 }
2208 }
2209 }
2210 RequestManagerMessage::FinishReboot { request_id } => {
2211 if request_id == self.id {
2212 info!("Init reboot finish {}", self.id);
2213 debug!(
2214 msg_type = "FinishReboot",
2215 request_id = %self.id,
2216 version = self.version,
2217 "Reboot completed, resuming request"
2218 );
2219 self.on_event(
2220 RequestManagerEvent::UpdateVersion {
2221 version: self.version + 1,
2222 },
2223 ctx,
2224 )
2225 .await;
2226
2227 if let Err(e) = send_to_tracking(
2228 ctx,
2229 RequestTrackingMessage::UpdateVersion {
2230 request_id: self.id.clone(),
2231 version: self.version,
2232 },
2233 )
2234 .await
2235 {
2236 error!(
2237 msg_type = "FinishReboot",
2238 request_id = %self.id,
2239 version = self.version,
2240 error = %e,
2241 "Failed to send version update to tracking"
2242 );
2243 return Err(emit_fail(ctx, e).await);
2244 }
2245
2246 if let Err(e) =
2247 self.check_request_roles_after_reboot(ctx).await
2248 {
2249 error!(
2250 msg_type = "FinishReboot",
2251 request_id = %self.id,
2252 error = %e,
2253 "Failed to check signatures after reboot"
2254 );
2255 self.match_error(ctx, e).await;
2256 return Ok(());
2257 }
2258
2259 if let Err(e) = self.match_command(ctx).await {
2260 error!(
2261 msg_type = "FinishReboot",
2262 request_id = %self.id,
2263 error = %e,
2264 "Failed to execute command after reboot"
2265 );
2266 self.match_error(ctx, e).await;
2267 return Ok(());
2268 }
2269 }
2270 }
2271 RequestManagerMessage::Abort {
2272 request_id,
2273 who,
2274 reason,
2275 sn,
2276 } => {
2277 if request_id == self.id {
2278 warn!(
2279 msg_type = "Abort",
2280 state = %self.state,
2281 request_id = %self.id,
2282 who = %who,
2283 reason = %reason,
2284 sn = sn,
2285 "Request abort received"
2286 );
2287 if let Err(e) =
2288 self.abort_request(ctx, reason, Some(sn), who).await
2289 {
2290 error!(
2291 msg_type = "Abort",
2292 request_id = %self.id,
2293 error = %e,
2294 "Failed to abort request"
2295 );
2296 self.match_error(ctx, e).await;
2297 return Ok(());
2298 }
2299 }
2300 }
2301 RequestManagerMessage::ManualAbort => {
2302 match &self.state {
2303 RequestManagerState::Reboot
2304 | RequestManagerState::Starting
2305 | RequestManagerState::Evaluation
2306 | RequestManagerState::Approval { .. }
2307 | RequestManagerState::Validation { .. } => {
2308 if let Err(e) = self
2309 .abort_request(
2310 ctx,
2311 "The user manually aborted the request"
2312 .to_owned(),
2313 None,
2314 (*self.our_key).clone(),
2315 )
2316 .await
2317 {
2318 error!(
2319 msg_type = "Abort",
2320 request_id = %self.id,
2321 error = %e,
2322 "Failed to abort request"
2323 );
2324 self.match_error(ctx, e).await;
2325 }
2326 }
2327 _ => {
2328 info!(
2329 "The request is in a state that cannot be aborted {}, state: {}",
2330 self.id, self.state
2331 );
2332 }
2333 }
2334
2335 return Ok(());
2336 }
2337 RequestManagerMessage::PurgeStorage => {
2338 purge_storage(ctx).await?;
2339
2340 debug!(
2341 msg_type = "PurgeStorage",
2342 subject_id = %self.subject_id,
2343 "Purged request manager storage"
2344 );
2345
2346 return Ok(());
2347 }
2348 RequestManagerMessage::FirstRun {
2349 command,
2350 request,
2351 request_id,
2352 } => {
2353 self.id = request_id.clone();
2354 self.begin_request_metrics();
2355 debug!(
2356 msg_type = "FirstRun",
2357 request_id = %request_id,
2358 command = ?command,
2359 "First run of request manager"
2360 );
2361 self.on_event(
2362 RequestManagerEvent::SafeState { command, request },
2363 ctx,
2364 )
2365 .await;
2366
2367 if let Err(e) = self.match_command(ctx).await {
2368 error!(
2369 msg_type = "FirstRun",
2370 request_id = %self.id,
2371 error = %e,
2372 "Failed to execute initial command"
2373 );
2374 self.match_error(ctx, e).await;
2375 return Ok(());
2376 };
2377 }
2378 RequestManagerMessage::Run { request_id } => {
2379 self.id = request_id;
2380 self.ensure_request_metrics_started();
2381
2382 debug!(
2383 msg_type = "Run",
2384 request_id = %self.id,
2385 state = ?self.state,
2386 version = self.version,
2387 "Running request manager"
2388 );
2389 match self.state.clone() {
2390 RequestManagerState::Starting
2391 | RequestManagerState::Reboot => {
2392 if let Err(e) = self.match_command(ctx).await {
2393 error!(
2394 msg_type = "Run",
2395 request_id = %self.id,
2396 state = "Starting/Reboot",
2397 error = %e,
2398 "Failed to execute command"
2399 );
2400 self.match_error(ctx, e).await;
2401 return Ok(());
2402 };
2403 }
2404 RequestManagerState::Evaluation => {
2405 if let Err(e) = self.build_evaluation(ctx).await {
2406 error!(
2407 msg_type = "Run",
2408 request_id = %self.id,
2409 state = "Evaluation",
2410 error = %e,
2411 "Failed to build evaluation"
2412 );
2413 self.match_error(ctx, e).await;
2414 return Ok(());
2415 }
2416 }
2417
2418 RequestManagerState::Approval { eval_req, eval_res } => {
2419 let Some(evaluator_res) =
2420 eval_res.evaluator_response_ok()
2421 else {
2422 error!(
2423 msg_type = "Run",
2424 request_id = %self.id,
2425 state = "Approval",
2426 "Approval state is missing a successful evaluator response"
2427 );
2428 self.match_error(
2429 ctx,
2430 RequestManagerError::InvalidRequestState {
2431 expected:
2432 "approval state with successful evaluator response",
2433 got:
2434 "approval state without successful evaluator response",
2435 },
2436 )
2437 .await;
2438 return Ok(());
2439 };
2440
2441 if let Err(e) = self
2442 .build_approval(ctx, eval_req, evaluator_res)
2443 .await
2444 {
2445 error!(
2446 msg_type = "Run",
2447 request_id = %self.id,
2448 state = "Approval",
2449 error = %e,
2450 "Failed to build approval"
2451 );
2452 self.match_error(ctx, e).await;
2453 return Ok(());
2454 }
2455 }
2456 RequestManagerState::Validation {
2457 request,
2458 quorum,
2459 init_state,
2460 current_request_roles,
2461 signers,
2462 distribution_plan: _,
2463 } => {
2464 if let Err(e) = self
2465 .run_validation(
2466 ctx,
2467 *request,
2468 quorum,
2469 signers,
2470 init_state,
2471 current_request_roles,
2472 )
2473 .await
2474 {
2475 error!(
2476 msg_type = "Run",
2477 request_id = %self.id,
2478 state = "Validation",
2479 error = %e,
2480 "Failed to run validation"
2481 );
2482 self.match_error(ctx, e).await;
2483 return Ok(());
2484 };
2485 }
2486 RequestManagerState::UpdateSubject {
2487 ledger,
2488 distribution_plan,
2489 } => {
2490 if let Err(e) = self
2491 .update_subject(
2492 ctx,
2493 ledger.clone(),
2494 distribution_plan.clone(),
2495 )
2496 .await
2497 {
2498 error!(
2499 msg_type = "Run",
2500 request_id = %self.id,
2501 state = "UpdateSubject",
2502 error = %e,
2503 "Failed to update subject"
2504 );
2505 self.match_error(ctx, e).await;
2506 return Ok(());
2507 };
2508
2509 match self
2510 .build_distribution(ctx, ledger, distribution_plan)
2511 .await
2512 {
2513 Ok(in_distribution) => {
2514 if !in_distribution
2515 && let Err(e) =
2516 self.finish_request(ctx).await
2517 {
2518 error!(
2519 msg_type = "Run",
2520 request_id = %self.id,
2521 state = "UpdateSubject",
2522 error = %e,
2523 "Failed to finish request after build distribution"
2524 );
2525 self.match_error(ctx, e).await;
2526 return Ok(());
2527 }
2528 }
2529 Err(e) => {
2530 error!(
2531 msg_type = "Run",
2532 request_id = %self.id,
2533 state = "UpdateSubject",
2534 error = %e,
2535 "Failed to build distribution"
2536 );
2537 self.match_error(ctx, e).await;
2538 return Ok(());
2539 }
2540 };
2541 }
2542 RequestManagerState::Distribution {
2543 ledger,
2544 distribution_plan,
2545 } => {
2546 match self
2547 .build_distribution(ctx, ledger, distribution_plan)
2548 .await
2549 {
2550 Ok(in_distribution) => {
2551 if !in_distribution
2552 && let Err(e) =
2553 self.finish_request(ctx).await
2554 {
2555 error!(
2556 msg_type = "Run",
2557 request_id = %self.id,
2558 state = "Distribution",
2559 error = %e,
2560 "Failed to finish request after build distribution"
2561 );
2562 self.match_error(ctx, e).await;
2563 return Ok(());
2564 }
2565 }
2566 Err(e) => {
2567 error!(
2568 msg_type = "Run",
2569 request_id = %self.id,
2570 state = "Distribution",
2571 error = %e,
2572 "Failed to build distribution"
2573 );
2574 self.match_error(ctx, e).await;
2575 return Ok(());
2576 }
2577 };
2578 }
2579 RequestManagerState::End => {
2580 if let Err(e) = self.end_request(ctx).await {
2581 error!(
2582 msg_type = "Run",
2583 request_id = %self.id,
2584 state = "End",
2585 error = %e,
2586 "Failed to end request"
2587 );
2588 self.match_error(ctx, e).await;
2589 return Ok(());
2590 }
2591 }
2592 };
2593 }
2594 RequestManagerMessage::EvaluationRes {
2595 eval_req,
2596 eval_res,
2597 request_id,
2598 } => {
2599 if request_id == self.id {
2600 debug!(
2601 msg_type = "EvaluationRes",
2602 request_id = %self.id,
2603 version = self.version,
2604 "Evaluation result received"
2605 );
2606 if let Err(e) = self.stops_childs(ctx).await {
2607 error!(
2608 msg_type = "EvaluationRes",
2609 request_id = %self.id,
2610 error = %e,
2611 "Failed to stop childs"
2612 );
2613 self.match_error(ctx, e).await;
2614 return Ok(());
2615 };
2616
2617 if let Some(evaluator_res) =
2618 eval_res.evaluator_response_ok()
2619 && evaluator_res.appr_required
2620 {
2621 debug!(
2622 msg_type = "EvaluationRes",
2623 request_id = %self.id,
2624 "Approval required, proceeding to approval phase"
2625 );
2626 self.on_event(
2627 RequestManagerEvent::UpdateState {
2628 state: Box::new(
2629 RequestManagerState::Approval {
2630 eval_req: *eval_req.clone(),
2631 eval_res: eval_res.clone(),
2632 },
2633 ),
2634 },
2635 ctx,
2636 )
2637 .await;
2638
2639 if let Err(e) = self
2640 .build_approval(ctx, *eval_req, evaluator_res)
2641 .await
2642 {
2643 error!(
2644 msg_type = "EvaluationRes",
2645 request_id = %self.id,
2646 error = %e,
2647 "Failed to build approval"
2648 );
2649 self.match_error(ctx, e).await;
2650 return Ok(());
2651 }
2652 } else {
2653 debug!(
2654 msg_type = "EvaluationRes",
2655 request_id = %self.id,
2656 "Approval not required, proceeding to validation phase"
2657 );
2658 let (
2659 request,
2660 quorum,
2661 signers,
2662 init_state,
2663 current_request_roles,
2664 ) = match self
2665 .build_validation_req(
2666 ctx,
2667 Some((*eval_req, eval_res)),
2668 None,
2669 )
2670 .await
2671 {
2672 Ok(data) => data,
2673 Err(e) => {
2674 error!(
2675 msg_type = "EvaluationRes",
2676 request_id = %self.id,
2677 error = %e,
2678 "Failed to build validation request"
2679 );
2680 self.match_error(ctx, e).await;
2681 return Ok(());
2682 }
2683 };
2684
2685 if let Err(e) = self
2686 .run_validation(
2687 ctx,
2688 request,
2689 quorum,
2690 signers,
2691 init_state,
2692 current_request_roles,
2693 )
2694 .await
2695 {
2696 error!(
2697 msg_type = "EvaluationRes",
2698 request_id = %self.id,
2699 error = %e,
2700 "Failed to run validation"
2701 );
2702 self.match_error(ctx, e).await;
2703 return Ok(());
2704 };
2705 }
2706 }
2707 }
2708 RequestManagerMessage::ApprovalRes {
2709 appro_res,
2710 request_id,
2711 } => {
2712 if request_id == self.id {
2713 let _ = make_obsolete(ctx, &self.subject_id).await;
2714 debug!(
2715 msg_type = "ApprovalRes",
2716 request_id = %self.id,
2717 version = self.version,
2718 "Approval result received"
2719 );
2720 if let Err(e) = self.stops_childs(ctx).await {
2721 error!(
2722 msg_type = "ApprovalRes",
2723 request_id = %self.id,
2724 error = %e,
2725 "Failed to stop childs"
2726 );
2727 self.match_error(ctx, e).await;
2728 return Ok(());
2729 };
2730
2731 let RequestManagerState::Approval { eval_req, eval_res } =
2732 self.state.clone()
2733 else {
2734 error!(
2735 msg_type = "ApprovalRes",
2736 request_id = %self.id,
2737 state = ?self.state,
2738 "Invalid state for approval response"
2739 );
2740 let e = ActorError::FunctionalCritical {
2741 description: "Invalid request state".to_owned(),
2742 };
2743 return Err(emit_fail(ctx, e).await);
2744 };
2745 let (
2746 request,
2747 quorum,
2748 signers,
2749 init_state,
2750 current_request_roles,
2751 ) = match self
2752 .build_validation_req(
2753 ctx,
2754 Some((eval_req, eval_res)),
2755 Some(appro_res),
2756 )
2757 .await
2758 {
2759 Ok(data) => data,
2760 Err(e) => {
2761 error!(
2762 msg_type = "ApprovalRes",
2763 request_id = %self.id,
2764 error = %e,
2765 "Failed to build validation request"
2766 );
2767 self.match_error(ctx, e).await;
2768 return Ok(());
2769 }
2770 };
2771
2772 if let Err(e) = self
2773 .run_validation(
2774 ctx,
2775 request,
2776 quorum,
2777 signers,
2778 init_state,
2779 current_request_roles,
2780 )
2781 .await
2782 {
2783 error!(
2784 msg_type = "ApprovalRes",
2785 request_id = %self.id,
2786 error = %e,
2787 "Failed to run validation"
2788 );
2789 self.match_error(ctx, e).await;
2790 return Ok(());
2791 };
2792 }
2793 }
2794 RequestManagerMessage::ValidationRes {
2795 val_res,
2796 val_req,
2797 request_id,
2798 } => {
2799 if request_id == self.id {
2800 debug!(
2801 msg_type = "ValidationRes",
2802 request_id = %self.id,
2803 version = self.version,
2804 "Validation result received"
2805 );
2806 if let Err(e) = self.stops_childs(ctx).await {
2807 error!(
2808 msg_type = "ValidationRes",
2809 request_id = %self.id,
2810 error = %e,
2811 "Failed to stop childs"
2812 );
2813 self.match_error(ctx, e).await;
2814 return Ok(());
2815 };
2816
2817 let distribution_plan = match &self.state {
2818 RequestManagerState::Validation {
2819 distribution_plan,
2820 ..
2821 } => distribution_plan.clone(),
2822 _ => {
2823 error!(
2824 msg_type = "ValidationRes",
2825 request_id = %self.id,
2826 state = ?self.state,
2827 "Invalid state for validation response"
2828 );
2829 self.match_error(
2830 ctx,
2831 RequestManagerError::InvalidRequestState {
2832 expected: "Validation",
2833 got: "Other",
2834 },
2835 )
2836 .await;
2837 return Ok(());
2838 }
2839 };
2840
2841 let signed_ledger = match self
2842 .build_ledger(
2843 ctx,
2844 *val_req,
2845 val_res,
2846 distribution_plan.clone(),
2847 )
2848 .await
2849 {
2850 Ok(signed_ledger) => signed_ledger,
2851 Err(e) => {
2852 error!(
2853 msg_type = "ValidationRes",
2854 request_id = %self.id,
2855 error = %e,
2856 "Failed to build ledger"
2857 );
2858 self.match_error(ctx, e).await;
2859 return Ok(());
2860 }
2861 };
2862
2863 if let Err(e) = self
2864 .update_subject(
2865 ctx,
2866 signed_ledger.clone(),
2867 distribution_plan.clone(),
2868 )
2869 .await
2870 {
2871 error!(
2872 msg_type = "ValidationRes",
2873 request_id = %self.id,
2874 error = %e,
2875 "Failed to update subject"
2876 );
2877 self.match_error(ctx, e).await;
2878 return Ok(());
2879 };
2880
2881 match self
2882 .build_distribution(
2883 ctx,
2884 signed_ledger,
2885 distribution_plan,
2886 )
2887 .await
2888 {
2889 Ok(in_distribution) => {
2890 if !in_distribution
2891 && let Err(e) = self.finish_request(ctx).await
2892 {
2893 error!(
2894 msg_type = "ValidationRes",
2895 request_id = %self.id,
2896 error = %e,
2897 "Failed to finish request after build distribution"
2898 );
2899 self.match_error(ctx, e).await;
2900 return Ok(());
2901 }
2902 }
2903 Err(e) => {
2904 error!(
2905 msg_type = "ValidationRes",
2906 request_id = %self.id,
2907 error = %e,
2908 "Failed to build distribution"
2909 );
2910 self.match_error(ctx, e).await;
2911 return Ok(());
2912 }
2913 };
2914 }
2915 }
2916 RequestManagerMessage::FinishRequest { request_id } => {
2917 if request_id == self.id {
2918 debug!(
2919 msg_type = "FinishRequest",
2920 request_id = %self.id,
2921 version = self.version,
2922 "Finishing request"
2923 );
2924
2925 if let Err(e) = self.stops_childs(ctx).await {
2926 error!(
2927 msg_type = "FinishRequest",
2928 request_id = %self.id,
2929 error = %e,
2930 "Failed to stop childs"
2931 );
2932 self.match_error(ctx, e).await;
2933 return Ok(());
2934 };
2935
2936 if let Err(e) = self.finish_request(ctx).await {
2937 error!(
2938 msg_type = "FinishRequest",
2939 request_id = %self.id,
2940 error = %e,
2941 "Failed to finish request"
2942 );
2943 self.match_error(ctx, e).await;
2944 return Ok(());
2945 }
2946 }
2947 }
2948 }
2949
2950 Ok(())
2951 }
2952
2953 async fn on_event(
2954 &mut self,
2955 event: RequestManagerEvent,
2956 ctx: &mut ActorContext<Self>,
2957 ) {
2958 let event_type = match &event {
2959 RequestManagerEvent::Finish => "Finish",
2960 RequestManagerEvent::UpdateState { .. } => "UpdateState",
2961 RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2962 RequestManagerEvent::SafeState { .. } => "SafeState",
2963 };
2964
2965 if let Err(e) = self.persist(&event, ctx).await {
2966 error!(
2967 event_type = event_type,
2968 request_id = %self.id,
2969 error = %e,
2970 "Failed to persist event"
2971 );
2972 emit_fail(ctx, e).await;
2973 };
2974 }
2975
2976 async fn on_child_fault(
2977 &mut self,
2978 error: ActorError,
2979 ctx: &mut ActorContext<Self>,
2980 ) -> ChildAction {
2981 error!(
2982 request_id = %self.id,
2983 version = self.version,
2984 state = ?self.state,
2985 error = %error,
2986 "Child fault in request manager"
2987 );
2988 emit_fail(ctx, error).await;
2989 ChildAction::Stop
2990 }
2991}
2992
2993#[async_trait]
2994impl PersistentActor for RequestManager {
2995 type Persistence = LightPersistence;
2996 type InitParams = InitRequestManager;
2997
2998 fn update(&mut self, state: Self) {
2999 self.command = state.command;
3000 self.request = state.request;
3001 self.state = state.state;
3002 self.version = state.version;
3003 }
3004
3005 fn create_initial(params: Self::InitParams) -> Self {
3006 Self {
3007 retry_diff: 0,
3008 retry_timeout: 0,
3009 request_started_at: None,
3010 current_phase: None,
3011 current_phase_started_at: None,
3012 our_key: params.our_key,
3013 id: DigestIdentifier::default(),
3014 subject_id: params.subject_id,
3015 governance_id: params.governance_id,
3016 command: ReqManInitMessage::Evaluate,
3017 request: None,
3018 state: RequestManagerState::Starting,
3019 version: 0,
3020 helpers: Some(params.helpers),
3021 }
3022 }
3023
3024 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3026 match event {
3027 RequestManagerEvent::Finish => {
3028 debug!(
3029 event_type = "Finish",
3030 request_id = %self.id,
3031 "Applying finish event"
3032 );
3033 self.state = RequestManagerState::End;
3034 self.request = None;
3035 self.id = DigestIdentifier::default();
3036 }
3037 RequestManagerEvent::UpdateState { state } => {
3038 debug!(
3039 event_type = "UpdateState",
3040 request_id = %self.id,
3041 old_state = ?self.state,
3042 new_state = ?state,
3043 "Applying state update"
3044 );
3045 self.state = *state.clone()
3046 }
3047 RequestManagerEvent::UpdateVersion { version } => {
3048 debug!(
3049 event_type = "UpdateVersion",
3050 request_id = %self.id,
3051 old_version = self.version,
3052 new_version = version,
3053 "Applying version update"
3054 );
3055 self.state = RequestManagerState::Starting;
3056 self.version = *version
3057 }
3058 RequestManagerEvent::SafeState { command, request } => {
3059 debug!(
3060 event_type = "SafeState",
3061 request_id = %self.id,
3062 command = ?command,
3063 "Applying safe state"
3064 );
3065 self.version = 0;
3066 self.retry_diff = 0;
3067 self.retry_timeout = 0;
3068 self.state = RequestManagerState::Starting;
3069 self.request = Some(request.clone());
3070 self.command = command.clone();
3071 }
3072 };
3073
3074 Ok(())
3075 }
3076}
3077
3078#[async_trait]
3079impl Storable for RequestManager {}