1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9 DigestIdentifier, HashAlgorithm, PublicKey, Signed,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use ave_network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24 Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::{EvalWorkerContext, EvaluateData};
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30 HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::distribution_plan::build_tracker_event_distribution_plan;
36use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
37use crate::model::common::subject::{
38 acquire_subject, create_subject, get_gov, get_gov_sn,
39 get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
40};
41use crate::model::common::{purge_storage, send_to_tracking};
42use crate::model::event::{
43 ApprovalData, EvaluationData, Ledger, LedgerSeal, Protocols, ValidationData,
44};
45use crate::node::SubjectData;
46use crate::request::error::RequestManagerError;
47use crate::request::tracking::RequestTrackingMessage;
48use crate::request::{RequestHandler, RequestHandlerMessage};
49use crate::subject::Metadata;
50use crate::system::ConfigHelper;
51
52use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
53use crate::validation::worker::CurrentRequestRoles;
54use crate::{
55 ActorMessage, NetworkMessage, Validation, ValidationMessage,
56 approval::{Approval, ApprovalMessage},
57 auth::{Auth, AuthMessage, AuthResponse},
58 db::Storable,
59 evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
60 model::common::emit_fail,
61 update::{Update, UpdateMessage, UpdateNew, UpdateType},
62};
63
64use super::{
65 reboot::{Reboot, RebootMessage},
66 types::{
67 DistributionPlanEntry, DistributionPlanMode, ReqManInitMessage,
68 RequestManagerState,
69 },
70};
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
73pub struct RequestManager {
74 #[serde(skip)]
75 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
76 #[serde(skip)]
77 our_key: Arc<PublicKey>,
78 #[serde(skip)]
79 id: DigestIdentifier,
80 #[serde(skip)]
81 subject_id: DigestIdentifier,
82 #[serde(skip)]
83 governance_id: Option<DigestIdentifier>,
84 #[serde(skip)]
85 retry_timeout: u64,
86 #[serde(skip)]
87 retry_diff: u64,
88 #[serde(skip)]
89 request_started_at: Option<Instant>,
90 #[serde(skip)]
91 current_phase: Option<&'static str>,
92 #[serde(skip)]
93 current_phase_started_at: Option<Instant>,
94 command: ReqManInitMessage,
95 request: Option<Signed<EventRequest>>,
96 state: RequestManagerState,
97 version: u64,
98}
99
100#[derive(Debug, Clone)]
101pub enum RebootType {
102 Normal,
103 Diff,
104 TimeOut,
105}
106
107pub struct InitRequestManager {
108 pub our_key: Arc<PublicKey>,
109 pub subject_id: DigestIdentifier,
110 pub governance_id: Option<DigestIdentifier>,
111 pub helpers: (HashAlgorithm, Arc<NetworkSender>),
112}
113
114impl BorshSerialize for RequestManager {
115 fn serialize<W: std::io::Write>(
116 &self,
117 writer: &mut W,
118 ) -> std::io::Result<()> {
119 BorshSerialize::serialize(&self.command, writer)?;
120 BorshSerialize::serialize(&self.state, writer)?;
121 BorshSerialize::serialize(&self.version, writer)?;
122 BorshSerialize::serialize(&self.request, writer)?;
123
124 Ok(())
125 }
126}
127
128impl BorshDeserialize for RequestManager {
129 fn deserialize_reader<R: std::io::Read>(
130 reader: &mut R,
131 ) -> std::io::Result<Self> {
132 let command = ReqManInitMessage::deserialize_reader(reader)?;
134 let state = RequestManagerState::deserialize_reader(reader)?;
135 let version = u64::deserialize_reader(reader)?;
136 let request =
137 Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
138
139 let our_key = Arc::new(PublicKey::default());
140 let subject_id = DigestIdentifier::default();
141 let id = DigestIdentifier::default();
142
143 Ok(Self {
144 retry_diff: 0,
145 retry_timeout: 0,
146 request_started_at: None,
147 current_phase: None,
148 current_phase_started_at: None,
149 helpers: None,
150 our_key,
151 id,
152 subject_id,
153 governance_id: None,
154 command,
155 request,
156 state,
157 version,
158 })
159 }
160}
161
162impl RequestManager {
163 fn retry_seconds_for_attempt(schedule: &[u64], attempt: u64) -> u64 {
164 let default = schedule.last().copied().unwrap_or(1).max(1);
165 let idx = attempt.saturating_sub(1) as usize;
166 schedule.get(idx).copied().unwrap_or(default).max(1)
167 }
168
169 fn begin_request_metrics(&mut self) {
170 self.request_started_at = Some(Instant::now());
171 self.current_phase = None;
172 self.current_phase_started_at = None;
173
174 if let Some(metrics) = try_core_metrics() {
175 metrics.observe_request_started();
176 }
177 }
178
179 fn ensure_request_metrics_started(&mut self) {
180 if self.request_started_at.is_none() {
181 self.request_started_at = Some(Instant::now());
182 }
183 }
184
185 fn start_phase_metrics(&mut self, phase: &'static str) {
186 self.ensure_request_metrics_started();
187 self.finish_phase_metrics();
188 self.current_phase = Some(phase);
189 self.current_phase_started_at = Some(Instant::now());
190 }
191
192 fn finish_phase_metrics(&mut self) {
193 let Some(phase) = self.current_phase.take() else {
194 self.current_phase_started_at = None;
195 return;
196 };
197 let Some(started_at) = self.current_phase_started_at.take() else {
198 return;
199 };
200
201 if let Some(metrics) = try_core_metrics() {
202 metrics.observe_request_phase(phase, started_at.elapsed());
203 }
204 }
205
206 fn finish_request_metrics(&mut self, result: &'static str) {
207 self.finish_phase_metrics();
208
209 if let Some(started_at) = self.request_started_at.take()
210 && let Some(metrics) = try_core_metrics()
211 {
212 metrics.observe_request_terminal(result, started_at.elapsed());
213 }
214 }
215
216 fn tracker_evaluation_context(
220 governance_data: &GovernanceData,
221 schema_id: &SchemaType,
222 namespace: Namespace,
223 request_type: &EventRequestType,
224 ) -> Result<EvalWorkerContext, RequestManagerError> {
225 match request_type {
226 EventRequestType::Fact => {
227 let (issuers, issuer_any) = governance_data.get_signers(
228 RoleTypes::Issuer,
229 schema_id,
230 namespace,
231 );
232 let schema_viewpoints = governance_data
233 .schemas
234 .get(schema_id)
235 .ok_or_else(|| {
236 crate::governance::error::GovernanceError::SchemaDoesNotExist {
237 schema_id: schema_id.to_string(),
238 }
239 })?
240 .viewpoints
241 .clone();
242
243 Ok(EvalWorkerContext::TrackerFact {
244 issuers: issuers.into_iter().collect(),
245 issuer_any,
246 schema_viewpoints,
247 })
248 }
249 EventRequestType::Transfer => {
250 let members = governance_data
251 .members
252 .values()
253 .cloned()
254 .collect::<BTreeSet<PublicKey>>();
255 let (creator_signers, _) = governance_data.get_signers(
256 RoleTypes::Creator,
257 schema_id,
258 namespace.clone(),
259 );
260 let creators = creator_signers
261 .into_iter()
262 .map(|creator| (creator, BTreeSet::from([namespace.clone()])))
263 .collect::<BTreeMap<PublicKey, BTreeSet<Namespace>>>();
264
265 Ok(EvalWorkerContext::TrackerTransfer { members, creators })
266 }
267 _ => Ok(EvalWorkerContext::Empty),
268 }
269 }
270
271 async fn build_evaluation(
272 &mut self,
273 ctx: &mut ActorContext<Self>,
274 ) -> Result<(), RequestManagerError> {
275 let Some(request) = self.request.clone() else {
276 return Err(RequestManagerError::RequestNotSet);
277 };
278
279 self.on_event(
280 RequestManagerEvent::UpdateState {
281 state: Box::new(RequestManagerState::Evaluation),
282 },
283 ctx,
284 )
285 .await;
286
287 let metadata = self.check_data_eval(ctx, &request).await?;
288
289 let (
290 signed_evaluation_req,
291 quorum,
292 signers,
293 init_state,
294 tracker_context,
295 ) =
296 self.build_request_eval(ctx, &metadata, &request).await?;
297
298 if signers.is_empty() {
299 warn!(
300 request_id = %self.id,
301 schema_id = %metadata.schema_id,
302 "No evaluators available for schema"
303 );
304
305 return Err(RequestManagerError::NoEvaluatorsAvailable {
306 schema_id: metadata.schema_id.to_string(),
307 governance_id: signed_evaluation_req
308 .content()
309 .governance_id
310 .clone(),
311 });
312 }
313
314 self.run_evaluation(
315 ctx,
316 signed_evaluation_req.clone(),
317 quorum,
318 init_state,
319 tracker_context,
320 signers,
321 )
322 .await
323 }
324
325 const fn needs_subject_manager(&self) -> bool {
327 self.governance_id.is_some()
328 }
329
330 fn requester_id(&self) -> String {
331 self.id.to_string()
332 }
333
334 fn build_distribution_plan(
335 &self,
336 validation_req: &ValidationReq,
337 governance_data: Option<&GovernanceData>,
338 ) -> Result<Vec<DistributionPlanEntry>, RequestManagerError> {
339 let mut plan: HashMap<PublicKey, DistributionPlanMode> = HashMap::new();
340
341 match validation_req {
342 ValidationReq::Create { event_request, .. } => {
343 let EventRequest::Create(create) = event_request.content()
344 else {
345 return Err(
346 RequestManagerError::InvalidEventRequestForEvaluation,
347 );
348 };
349
350 if create.schema_id.is_gov() {
351 return Ok(Vec::new());
352 }
353
354 let Some(governance_data) = governance_data else {
355 return Err(RequestManagerError::ActorError(
356 ActorError::FunctionalCritical {
357 description:
358 "Missing governance data for distribution plan"
359 .to_owned(),
360 },
361 ));
362 };
363 let witnesses =
364 governance_data.get_witnesses(WitnessesData::Schema {
365 creator: event_request.signature().signer.clone(),
366 schema_id: create.schema_id.clone(),
367 namespace: create.namespace.clone(),
368 })?;
369
370 for witness in witnesses {
371 plan.insert(witness, DistributionPlanMode::Clear);
372 }
373 }
374 ValidationReq::Event {
375 actual_protocols,
376 event_request,
377 metadata,
378 ..
379 } => {
380 if metadata.schema_id.is_gov() {
381 return Ok(Vec::new());
382 }
383
384 let Some(governance_data) = governance_data else {
385 return Err(RequestManagerError::ActorError(
386 ActorError::FunctionalCritical {
387 description:
388 "Missing governance data for distribution plan"
389 .to_owned(),
390 },
391 ));
392 };
393 let protocols_success = actual_protocols.is_success();
394
395 return build_tracker_event_distribution_plan(
396 governance_data,
397 event_request.content(),
398 metadata,
399 protocols_success,
400 )
401 .map_err(|description| {
402 RequestManagerError::ActorError(
403 ActorError::FunctionalCritical { description },
404 )
405 });
406 }
407 }
408
409 Ok(plan
410 .into_iter()
411 .map(|(node, mode)| DistributionPlanEntry { node, mode })
412 .collect())
413 }
414
415 async fn check_data_eval(
416 &self,
417 ctx: &mut ActorContext<Self>,
418 request: &Signed<EventRequest>,
419 ) -> Result<Metadata, RequestManagerError> {
420 let (subject_id, confirm) = match request.content().clone() {
421 EventRequest::Fact(event) => (event.subject_id, false),
422 EventRequest::Transfer(event) => (event.subject_id, false),
423 EventRequest::Confirm(event) => (event.subject_id, true),
424 _ => {
425 return Err(
426 RequestManagerError::InvalidEventRequestForEvaluation,
427 );
428 }
429 };
430
431 let lease = acquire_subject(
432 ctx,
433 &self.subject_id,
434 self.requester_id(),
435 None,
436 self.needs_subject_manager(),
437 )
438 .await?;
439 let metadata = get_metadata(ctx, &subject_id).await;
440 lease.finish(ctx).await?;
441 let metadata = metadata?;
442
443 if confirm && !metadata.schema_id.is_gov() {
444 return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
445 }
446
447 Ok(metadata)
448 }
449
450 async fn get_governance_data(
451 &self,
452 ctx: &mut ActorContext<Self>,
453 ) -> Result<GovernanceData, RequestManagerError> {
454 let governance_id =
455 self.governance_id.as_ref().unwrap_or(&self.subject_id);
456 Ok(get_gov(ctx, governance_id).await?)
457 }
458
459 async fn build_request_eval(
460 &self,
461 ctx: &mut ActorContext<Self>,
462 metadata: &Metadata,
463 request: &Signed<EventRequest>,
464 ) -> Result<
465 (
466 Signed<EvaluationReq>,
467 Quorum,
468 HashSet<PublicKey>,
469 Option<ValueWrapper>,
470 EvalWorkerContext,
471 ),
472 RequestManagerError,
473 > {
474 let is_gov = metadata.schema_id.is_gov();
475
476 let request_type = EventRequestType::from(request.content());
477 let (evaluate_data, governance_data, init_state, tracker_context) = match (
478 is_gov,
479 request_type.clone(),
480 ) {
481 (true, EventRequestType::Fact) => {
482 let state =
483 GovernanceData::try_from(metadata.properties.clone())?;
484
485 (
486 EvaluateData::GovFact {
487 state: state.clone(),
488 },
489 state,
490 None,
491 EvalWorkerContext::default(),
492 )
493 }
494 (true, EventRequestType::Transfer) => {
495 let state =
496 GovernanceData::try_from(metadata.properties.clone())?;
497
498 (
499 EvaluateData::GovTransfer {
500 state: state.clone(),
501 },
502 state,
503 None,
504 EvalWorkerContext::default(),
505 )
506 }
507 (true, EventRequestType::Confirm) => {
508 let state =
509 GovernanceData::try_from(metadata.properties.clone())?;
510
511 (
512 EvaluateData::GovConfirm {
513 state: state.clone(),
514 },
515 state,
516 None,
517 EvalWorkerContext::default(),
518 )
519 }
520 (false, EventRequestType::Fact) => {
521 let governance_data =
522 get_gov(ctx, &metadata.governance_id).await?;
523
524 let init_state =
525 governance_data.get_init_state(&metadata.schema_id)?;
526 let tracker_context = Self::tracker_evaluation_context(
527 &governance_data,
528 &metadata.schema_id,
529 metadata.namespace.clone(),
530 &request_type,
531 )?;
532
533 (
534 EvaluateData::TrackerSchemasFact {
535 state: metadata.properties.clone(),
536 },
537 governance_data,
538 Some(init_state),
539 tracker_context,
540 )
541 }
542 (false, EventRequestType::Transfer) => {
543 let governance_data =
544 get_gov(ctx, &metadata.governance_id).await?;
545 let tracker_context = Self::tracker_evaluation_context(
546 &governance_data,
547 &metadata.schema_id,
548 metadata.namespace.clone(),
549 &request_type,
550 )?;
551 (
552 EvaluateData::TrackerSchemasTransfer {
553 state: metadata.properties.clone(),
554 },
555 governance_data,
556 None,
557 tracker_context,
558 )
559 }
560 _ => {
561 error!(
562 request_id = %self.id,
563 is_gov = is_gov,
564 request_type = ?request_type,
565 "Invalid event request type for evaluation state"
566 );
567 return Err(
568 RequestManagerError::InvalidEventRequestForEvaluation,
569 );
570 }
571 };
572
573 let (signers, quorum) = governance_data.get_quorum_and_signers(
574 ProtocolTypes::Evaluation,
575 &metadata.schema_id,
576 metadata.namespace.clone(),
577 )?;
578
579 let eval_req = EvaluationReq {
580 event_request: request.clone(),
581 data: evaluate_data,
582 sn: metadata.sn + 1,
583 gov_version: governance_data.version,
584 namespace: metadata.namespace.clone(),
585 schema_id: metadata.schema_id.clone(),
586 signer: (*self.our_key).clone(),
587 signer_is_owner: *self.our_key == request.signature().signer,
588 governance_id: metadata.governance_id.clone(),
589 };
590
591 let signature = get_sign(
592 ctx,
593 SignTypesNode::EvaluationReq(Box::new(eval_req.clone())),
594 )
595 .await?;
596
597 let signed_evaluation_req: Signed<EvaluationReq> =
598 Signed::from_parts(eval_req, signature);
599 Ok((
600 signed_evaluation_req,
601 quorum,
602 signers,
603 init_state,
604 tracker_context,
605 ))
606 }
607
608 async fn run_evaluation(
609 &mut self,
610 ctx: &mut ActorContext<Self>,
611 request: Signed<EvaluationReq>,
612 quorum: Quorum,
613 init_state: Option<ValueWrapper>,
614 context: EvalWorkerContext,
615 signers: HashSet<PublicKey>,
616 ) -> Result<(), RequestManagerError> {
617 let Some((hash, network)) = self.helpers.clone() else {
618 return Err(RequestManagerError::HelpersNotInitialized);
619 };
620
621 self.start_phase_metrics("evaluation");
622 info!("Init evaluation {}", self.id);
623 let child = ctx
624 .create_child(
625 "evaluation",
626 Evaluation::new(
627 self.our_key.clone(),
628 request,
629 quorum,
630 init_state,
631 context,
632 hash,
633 network,
634 ),
635 )
636 .await?;
637
638 child
639 .tell(EvaluationMessage::Create {
640 request_id: self.id.clone(),
641 version: self.version,
642 signers,
643 })
644 .await?;
645
646 send_to_tracking(
647 ctx,
648 RequestTrackingMessage::UpdateState {
649 request_id: self.id.clone(),
650 state: RequestState::Evaluation,
651 },
652 )
653 .await?;
654
655 Ok(())
656 }
657 async fn build_request_appro(
660 &self,
661 ctx: &mut ActorContext<Self>,
662 eval_req: EvaluationReq,
663 evaluator_res: EvaluatorResponse,
664 ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
665 let request = ApprovalReq {
666 subject_id: self.subject_id.clone(),
667 sn: eval_req.sn,
668 gov_version: eval_req.gov_version,
669 patch: evaluator_res.patch,
670 signer: eval_req.signer,
671 };
672
673 let signature =
674 get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
675
676 let signed_approval_req: Signed<ApprovalReq> =
677 Signed::from_parts(request, signature);
678
679 Ok(signed_approval_req)
680 }
681
682 async fn build_approval(
683 &mut self,
684 ctx: &mut ActorContext<Self>,
685 eval_req: EvaluationReq,
686 eval_res: EvaluatorResponse,
687 ) -> Result<(), RequestManagerError> {
688 let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
689
690 let governance_data =
691 get_gov(ctx, &request.content().subject_id).await?;
692
693 let (signers, quorum) = governance_data.get_quorum_and_signers(
694 ProtocolTypes::Approval,
695 &SchemaType::Governance,
696 Namespace::new(),
697 )?;
698
699 if signers.is_empty() {
700 warn!(
701 request_id = %self.id,
702 schema_id = %SchemaType::Governance,
703 "No approvers available for schema"
704 );
705
706 return Err(RequestManagerError::NoApproversAvailable {
707 schema_id: SchemaType::Governance.to_string(),
708 governance_id: self
709 .governance_id
710 .clone()
711 .unwrap_or_else(|| self.subject_id.clone()),
712 });
713 }
714
715 self.run_approval(ctx, request, quorum, signers).await
716 }
717
718 async fn run_approval(
719 &mut self,
720 ctx: &mut ActorContext<Self>,
721 request: Signed<ApprovalReq>,
722 quorum: Quorum,
723 signers: HashSet<PublicKey>,
724 ) -> Result<(), RequestManagerError> {
725 let Some((hash, network)) = self.helpers.clone() else {
726 return Err(RequestManagerError::HelpersNotInitialized);
727 };
728
729 self.start_phase_metrics("approval");
730 info!("Init approval {}", self.id);
731 let child = ctx
732 .create_child(
733 "approval",
734 Approval::new(
735 self.our_key.clone(),
736 request,
737 quorum,
738 signers,
739 hash,
740 network,
741 ),
742 )
743 .await?;
744
745 child
746 .tell(ApprovalMessage::Create {
747 request_id: self.id.clone(),
748 version: self.version,
749 })
750 .await?;
751
752 send_to_tracking(
753 ctx,
754 RequestTrackingMessage::UpdateState {
755 request_id: self.id.clone(),
756 state: RequestState::Approval,
757 },
758 )
759 .await?;
760
761 Ok(())
762 }
763
764 async fn build_validation_req(
767 &mut self,
768 ctx: &mut ActorContext<Self>,
769 eval: Option<(EvaluationReq, EvaluationData)>,
770 appro_data: Option<ApprovalData>,
771 ) -> Result<
772 (
773 Signed<ValidationReq>,
774 Quorum,
775 HashSet<PublicKey>,
776 Option<ValueWrapper>,
777 CurrentRequestRoles,
778 ),
779 RequestManagerError,
780 > {
781 let (
782 vali_req,
783 quorum,
784 signers,
785 init_state,
786 current_request_roles,
787 schema_id,
788 governance_data,
789 ) = self.build_validation_data(ctx, eval, appro_data).await?;
790
791 if signers.is_empty() {
792 let governance_id = vali_req.get_governance_id().map_err(|error| {
793 error!(
794 request_id = %self.id,
795 schema_id = %schema_id,
796 error = %error,
797 "Validation request has invalid governance_id"
798 );
799 RequestManagerError::ActorError(
800 ActorError::FunctionalCritical {
801 description: format!(
802 "Validation request has invalid governance_id: {}",
803 error
804 ),
805 },
806 )
807 })?;
808
809 warn!(
810 request_id = %self.id,
811 schema_id = %schema_id,
812 "No validators available for schema"
813 );
814
815 return Err(RequestManagerError::NoValidatorsAvailable {
816 schema_id: schema_id.to_string(),
817 governance_id,
818 });
819 }
820
821 let signature = get_sign(
822 ctx,
823 SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
824 )
825 .await?;
826
827 let distribution_plan =
828 self.build_distribution_plan(&vali_req, governance_data.as_ref())?;
829
830 let signed_validation_req: Signed<ValidationReq> =
831 Signed::from_parts(vali_req, signature);
832
833 self.on_event(
834 RequestManagerEvent::UpdateState {
835 state: Box::new(RequestManagerState::Validation {
836 request: Box::new(signed_validation_req.clone()),
837 quorum: quorum.clone(),
838 init_state: init_state.clone(),
839 current_request_roles: current_request_roles.clone(),
840 signers: signers.clone(),
841 distribution_plan: distribution_plan.clone(),
842 }),
843 },
844 ctx,
845 )
846 .await;
847
848 Ok((
849 signed_validation_req,
850 quorum,
851 signers,
852 init_state,
853 current_request_roles,
854 ))
855 }
856
857 async fn build_validation_data(
858 &self,
859 ctx: &mut ActorContext<Self>,
860 eval: Option<(EvaluationReq, EvaluationData)>,
861 appro_data: Option<ApprovalData>,
862 ) -> Result<
863 (
864 ValidationReq,
865 Quorum,
866 HashSet<PublicKey>,
867 Option<ValueWrapper>,
868 CurrentRequestRoles,
869 SchemaType,
870 Option<GovernanceData>,
871 ),
872 RequestManagerError,
873 > {
874 let Some(request) = self.request.clone() else {
875 return Err(RequestManagerError::RequestNotSet);
876 };
877
878 if let EventRequest::Create(create) = request.content() {
879 if create.schema_id.is_gov() {
880 let governance_data =
881 GovernanceData::new((*self.our_key).clone());
882 let (signers, quorum) = governance_data
883 .get_quorum_and_signers(
884 ProtocolTypes::Validation,
885 &SchemaType::Governance,
886 Namespace::new(),
887 )?;
888
889 Ok((
890 ValidationReq::Create {
891 event_request: request.clone(),
892 gov_version: 0,
893 subject_id: self.subject_id.clone(),
894 },
895 quorum,
896 signers,
897 None,
898 CurrentRequestRoles {
899 evaluation: RoleDataRegister {
900 workers: HashSet::new(),
901 quorum: Quorum::default(),
902 },
903 approval: RoleDataRegister {
904 workers: HashSet::new(),
905 quorum: Quorum::default(),
906 },
907 },
908 SchemaType::Governance,
909 None,
910 ))
911 } else {
912 let governance_data =
913 get_gov(ctx, &create.governance_id).await?;
914
915 let (signers, quorum) = governance_data
916 .get_quorum_and_signers(
917 ProtocolTypes::Validation,
918 &create.schema_id,
919 create.namespace.clone(),
920 )?;
921
922 let init_state =
923 governance_data.get_init_state(&create.schema_id)?;
924
925 Ok((
926 ValidationReq::Create {
927 event_request: request.clone(),
928 gov_version: governance_data.version,
929 subject_id: self.subject_id.clone(),
930 },
931 quorum,
932 signers,
933 Some(init_state),
934 CurrentRequestRoles {
935 evaluation: RoleDataRegister {
936 workers: HashSet::new(),
937 quorum: Quorum::default(),
938 },
939 approval: RoleDataRegister {
940 workers: HashSet::new(),
941 quorum: Quorum::default(),
942 },
943 },
944 create.schema_id.clone(),
945 Some(governance_data),
946 ))
947 }
948 } else {
949 let Some((hash, ..)) = self.helpers else {
950 return Err(RequestManagerError::HelpersNotInitialized);
951 };
952
953 let governance_data = self.get_governance_data(ctx).await?;
954
955 let (actual_protocols, gov_version, sn) =
956 if let Some((eval_req, eval_data)) = eval {
957 if let Some(approval_data) = appro_data.clone() {
958 (
959 ActualProtocols::EvalApprove {
960 eval_data,
961 approval_data,
962 },
963 eval_req.gov_version,
964 Some(eval_req.sn),
965 )
966 } else {
967 (
968 ActualProtocols::Eval { eval_data },
969 eval_req.gov_version,
970 Some(eval_req.sn),
971 )
972 }
973 } else {
974 (ActualProtocols::None, governance_data.version, None)
975 };
976
977 let lease = acquire_subject(
978 ctx,
979 &self.subject_id,
980 self.requester_id(),
981 None,
982 self.needs_subject_manager(),
983 )
984 .await?;
985 let metadata = get_metadata(ctx, &self.subject_id).await;
986 let last_ledger_event = match metadata {
987 Ok(metadata) => {
988 let last_ledger_event =
989 get_last_ledger_event(ctx, &self.subject_id).await;
990 lease.finish(ctx).await?;
991 (metadata, last_ledger_event?)
992 }
993 Err(error) => {
994 lease.finish(ctx).await?;
995 return Err(error.into());
996 }
997 };
998
999 let (metadata, last_ledger_event) = last_ledger_event;
1000
1001 if gov_version != governance_data.version {
1002 return Err(RequestManagerError::GovernanceVersionChanged {
1003 governance_id: metadata.governance_id,
1004 expected: gov_version,
1005 current: governance_data.version,
1006 });
1007 }
1008
1009 let sn = if let Some(sn) = sn {
1010 sn
1011 } else {
1012 metadata.sn + 1
1013 };
1014
1015 let (signers, quorum) = governance_data.get_quorum_and_signers(
1016 ProtocolTypes::Validation,
1017 &metadata.schema_id,
1018 metadata.namespace.clone(),
1019 )?;
1020
1021 let Some(last_ledger_event) = last_ledger_event else {
1022 return Err(RequestManagerError::LastLedgerEventNotFound);
1023 };
1024
1025 let ledger_hash = last_ledger_event.ledger_hash(hash)?;
1026 let schema_id = metadata.schema_id.clone();
1027
1028 let current_request_roles =
1029 if gov_version == governance_data.version {
1030 let (evaluation_workers, evaluation_quorum) =
1031 governance_data.get_quorum_and_signers(
1032 ProtocolTypes::Evaluation,
1033 &metadata.schema_id,
1034 metadata.namespace.clone(),
1035 )?;
1036
1037 let (approval_workers, approval_quorum) =
1038 if appro_data.is_some() {
1039 governance_data.get_quorum_and_signers(
1040 ProtocolTypes::Approval,
1041 &SchemaType::Governance,
1042 Namespace::new(),
1043 )?
1044 } else {
1045 (HashSet::new(), Quorum::default())
1046 };
1047
1048 CurrentRequestRoles {
1049 evaluation: RoleDataRegister {
1050 workers: evaluation_workers,
1051 quorum: evaluation_quorum,
1052 },
1053 approval: RoleDataRegister {
1054 workers: approval_workers,
1055 quorum: approval_quorum,
1056 },
1057 }
1058 } else {
1059 CurrentRequestRoles {
1060 evaluation: RoleDataRegister {
1061 workers: HashSet::new(),
1062 quorum: Quorum::default(),
1063 },
1064 approval: RoleDataRegister {
1065 workers: HashSet::new(),
1066 quorum: Quorum::default(),
1067 },
1068 }
1069 };
1070
1071 Ok((
1072 ValidationReq::Event {
1073 actual_protocols: Box::new(actual_protocols),
1074 event_request: request.clone(),
1075 metadata: Box::new(metadata),
1076 last_data: Box::new(LastData {
1077 vali_data: last_ledger_event
1078 .protocols
1079 .get_validation_data(),
1080 gov_version: last_ledger_event.gov_version,
1081 }),
1082 gov_version,
1083 ledger_hash,
1084 sn,
1085 },
1086 quorum,
1087 signers,
1088 None,
1089 current_request_roles,
1090 schema_id,
1091 Some(governance_data),
1092 ))
1093 }
1094 }
1095
1096 async fn run_validation(
1097 &mut self,
1098 ctx: &mut ActorContext<Self>,
1099 request: Signed<ValidationReq>,
1100 quorum: Quorum,
1101 signers: HashSet<PublicKey>,
1102 init_state: Option<ValueWrapper>,
1103 current_request_roles: CurrentRequestRoles,
1104 ) -> Result<(), RequestManagerError> {
1105 let Some((hash, network)) = self.helpers.clone() else {
1106 return Err(RequestManagerError::HelpersNotInitialized);
1107 };
1108
1109 self.start_phase_metrics("validation");
1110 info!("Init validation {}", self.id);
1111 let child = ctx
1112 .create_child(
1113 "validation",
1114 Validation::new(
1115 self.our_key.clone(),
1116 request,
1117 init_state,
1118 current_request_roles,
1119 quorum,
1120 hash,
1121 network,
1122 ),
1123 )
1124 .await?;
1125
1126 child
1127 .tell(ValidationMessage::Create {
1128 request_id: self.id.clone(),
1129 version: self.version,
1130 signers,
1131 })
1132 .await?;
1133
1134 send_to_tracking(
1135 ctx,
1136 RequestTrackingMessage::UpdateState {
1137 request_id: self.id.clone(),
1138 state: RequestState::Validation,
1139 },
1140 )
1141 .await?;
1142
1143 Ok(())
1144 }
1145 async fn build_ledger(
1148 &mut self,
1149 ctx: &mut ActorContext<Self>,
1150 val_req: ValidationReq,
1151 val_res: ValidationData,
1152 distribution_plan: Vec<DistributionPlanEntry>,
1153 ) -> Result<Ledger, RequestManagerError> {
1154 let Some((hash, ..)) = self.helpers else {
1155 return Err(RequestManagerError::HelpersNotInitialized);
1156 };
1157
1158 let (protocols, ledger_seal) = match val_req {
1159 ValidationReq::Create {
1160 event_request,
1161 gov_version,
1162 ..
1163 } => {
1164 let protocols = Protocols::Create {
1165 event_request,
1166 validation: val_res,
1167 };
1168
1169 let protocols_hash = protocols.hash_for_ledger(&hash)?;
1170
1171 let ledger_seal = LedgerSeal {
1172 gov_version,
1173 sn: 0,
1174 prev_ledger_event_hash: DigestIdentifier::default(),
1175 protocols_hash,
1176 };
1177
1178 (protocols, ledger_seal)
1179 }
1180 ValidationReq::Event {
1181 actual_protocols,
1182 event_request,
1183 ledger_hash,
1184 metadata,
1185 gov_version,
1186 sn,
1187 ..
1188 } => {
1189 let protocols = Protocols::build(
1190 metadata.schema_id.is_gov(),
1191 event_request,
1192 *actual_protocols,
1193 val_res,
1194 )?;
1195
1196 let protocols_hash = protocols.hash_for_ledger(&hash)?;
1197
1198 let ledger_seal = LedgerSeal {
1199 gov_version,
1200 sn,
1201 prev_ledger_event_hash: ledger_hash,
1202 protocols_hash,
1203 };
1204
1205 (protocols, ledger_seal)
1206 }
1207 };
1208
1209 let signature =
1210 get_sign(ctx, SignTypesNode::LedgerSeal(ledger_seal.clone()))
1211 .await?;
1212
1213 let ledger = Ledger {
1214 gov_version: ledger_seal.gov_version,
1215 sn: ledger_seal.sn,
1216 prev_ledger_event_hash: ledger_seal.prev_ledger_event_hash,
1217 ledger_seal_signature: signature,
1218 protocols,
1219 };
1220
1221 self.on_event(
1222 RequestManagerEvent::UpdateState {
1223 state: Box::new(RequestManagerState::UpdateSubject {
1224 ledger: ledger.clone(),
1225 distribution_plan: distribution_plan.clone(),
1226 }),
1227 },
1228 ctx,
1229 )
1230 .await;
1231
1232 Ok(ledger)
1233 }
1234
1235 async fn update_subject(
1236 &mut self,
1237 ctx: &mut ActorContext<Self>,
1238 ledger: Ledger,
1239 distribution_plan: Vec<DistributionPlanEntry>,
1240 ) -> Result<(), RequestManagerError> {
1241 if ledger.get_event_request_type().is_create_event() {
1242 if let Err(e) = create_subject(ctx, ledger.clone()).await {
1243 if let ActorError::Functional { .. } = e {
1244 return Err(RequestManagerError::CheckLimit);
1245 }
1246 return Err(e.into());
1247 }
1248 } else {
1249 let lease = acquire_subject(
1250 ctx,
1251 &self.subject_id,
1252 self.requester_id(),
1253 None,
1254 self.needs_subject_manager(),
1255 )
1256 .await?;
1257 let update_result =
1258 update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1259 .await;
1260 lease.finish(ctx).await?;
1261 update_result?;
1262 }
1263
1264 self.on_event(
1265 RequestManagerEvent::UpdateState {
1266 state: Box::new(RequestManagerState::Distribution {
1267 ledger,
1268 distribution_plan,
1269 }),
1270 },
1271 ctx,
1272 )
1273 .await;
1274
1275 Ok(())
1276 }
1277
1278 async fn build_distribution(
1279 &mut self,
1280 ctx: &mut ActorContext<Self>,
1281 ledger: Ledger,
1282 mut distribution_plan: Vec<DistributionPlanEntry>,
1283 ) -> Result<bool, RequestManagerError> {
1284 let is_governance = match ledger.get_event_request() {
1285 Some(EventRequest::Create(create)) => create.schema_id.is_gov(),
1286 Some(_) => self.governance_id.is_none(),
1287 None => false,
1288 };
1289
1290 if is_governance {
1291 let governance_id = ledger.get_subject_id();
1292 let governance_data = get_gov(ctx, &governance_id).await?;
1293 distribution_plan = governance_data
1294 .get_witnesses(WitnessesData::Gov)?
1295 .into_iter()
1296 .map(|node| DistributionPlanEntry {
1297 node,
1298 mode: DistributionPlanMode::Clear,
1299 })
1300 .collect();
1301 }
1302
1303 if distribution_plan.is_empty() {
1304 return Ok(false);
1305 }
1306
1307 distribution_plan.retain(|entry| entry.node != *self.our_key);
1308
1309 if distribution_plan.is_empty() {
1310 warn!(
1311 request_id = %self.id,
1312 "No witnesses available for distribution"
1313 );
1314 return Ok(false);
1315 }
1316
1317 self.run_distribution(ctx, distribution_plan, ledger)
1318 .await?;
1319
1320 Ok(true)
1321 }
1322
1323 async fn run_distribution(
1324 &mut self,
1325 ctx: &mut ActorContext<Self>,
1326 distribution_plan: Vec<DistributionPlanEntry>,
1327 ledger: Ledger,
1328 ) -> Result<(), RequestManagerError> {
1329 let Some((.., network)) = self.helpers.clone() else {
1330 return Err(RequestManagerError::HelpersNotInitialized);
1331 };
1332
1333 self.start_phase_metrics("distribution");
1334 info!("Init distribution {}", self.id);
1335 let child = ctx
1336 .create_child(
1337 "distribution",
1338 Distribution::new(
1339 network,
1340 DistributionType::Request,
1341 self.id.clone(),
1342 ),
1343 )
1344 .await?;
1345
1346 child
1347 .tell(DistributionMessage::Create {
1348 ledger: Box::new(ledger),
1349 distribution_plan,
1350 })
1351 .await?;
1352
1353 send_to_tracking(
1354 ctx,
1355 RequestTrackingMessage::UpdateState {
1356 request_id: self.id.clone(),
1357 state: RequestState::Distribution,
1358 },
1359 )
1360 .await?;
1361
1362 Ok(())
1363 }
1364
1365 async fn init_wait(
1368 &self,
1369 ctx: &mut ActorContext<Self>,
1370 governance_id: &DigestIdentifier,
1371 ) -> Result<(), RequestManagerError> {
1372 let Some(config): Option<ConfigHelper> =
1373 ctx.system().get_helper("config").await
1374 else {
1375 return Err(RequestManagerError::ActorError(ActorError::Helper {
1376 name: "config".to_owned(),
1377 reason: "Not found".to_owned(),
1378 }));
1379 };
1380 let actor = ctx
1381 .create_child(
1382 "reboot",
1383 Reboot::new(
1384 governance_id.clone(),
1385 self.id.clone(),
1386 config.sync_reboot.stability_check_interval_secs,
1387 config.sync_reboot.stability_check_max_retries,
1388 ),
1389 )
1390 .await?;
1391
1392 actor.tell(RebootMessage::Init).await?;
1393
1394 Ok(())
1395 }
1396
1397 async fn init_update(
1398 &self,
1399 ctx: &mut ActorContext<Self>,
1400 governance_id: &DigestIdentifier,
1401 ) -> Result<(), RequestManagerError> {
1402 let Some((.., network)) = self.helpers.clone() else {
1403 return Err(RequestManagerError::HelpersNotInitialized);
1404 };
1405
1406 let gov_sn = get_gov_sn(ctx, governance_id).await?;
1407
1408 let governance_data = get_gov(ctx, governance_id).await?;
1409
1410 let mut witnesses = {
1411 let gov_witnesses =
1412 governance_data.get_witnesses(WitnessesData::Gov)?;
1413
1414 let auth_witnesses =
1415 Self::get_witnesses_auth(ctx, governance_id.clone())
1416 .await
1417 .unwrap_or_default();
1418
1419 gov_witnesses
1420 .union(&auth_witnesses)
1421 .cloned()
1422 .collect::<HashSet<PublicKey>>()
1423 };
1424
1425 witnesses.remove(&self.our_key);
1426
1427 if witnesses.is_empty() {
1428 if let Ok(actor) = ctx.reference().await {
1429 actor
1430 .tell(RequestManagerMessage::FinishReboot {
1431 request_id: self.id.clone(),
1432 })
1433 .await?;
1434 };
1435 } else if witnesses.len() == 1 {
1436 let Some(objetive) = witnesses.iter().next() else {
1437 error!(
1438 request_id = %self.id,
1439 governance_id = %governance_id,
1440 "Witness set became empty while selecting single reboot target"
1441 );
1442 return Err(RequestManagerError::ActorError(
1443 ActorError::FunctionalCritical {
1444 description:
1445 "Witness set became empty while selecting single reboot target"
1446 .to_owned(),
1447 },
1448 ));
1449 };
1450 let info = ComunicateInfo {
1451 receiver: objetive.clone(),
1452 request_id: String::default(),
1453 version: 0,
1454 receiver_actor: format!(
1455 "/user/node/distributor_{}",
1456 governance_id
1457 ),
1458 };
1459
1460 network
1461 .send_command(ave_network::CommandHelper::SendMessage {
1462 message: NetworkMessage {
1463 info,
1464 message: ActorMessage::DistributionLedgerReq {
1465 actual_sn: Some(gov_sn),
1466 target_sn: None,
1467 subject_id: governance_id.clone(),
1468 },
1469 },
1470 })
1471 .await?;
1472
1473 let Ok(actor) = ctx.reference().await else {
1474 return Ok(());
1475 };
1476
1477 actor
1478 .tell(RequestManagerMessage::RebootWait {
1479 request_id: self.id.clone(),
1480 governance_id: governance_id.clone(),
1481 })
1482 .await?;
1483 } else {
1484 let Some(config): Option<ConfigHelper> =
1485 ctx.system().get_helper("config").await
1486 else {
1487 return Ok(());
1488 };
1489 let data = UpdateNew {
1490 network,
1491 subject_id: governance_id.clone(),
1492 our_sn: Some(gov_sn),
1493 witnesses,
1494 update_type: UpdateType::Request {
1495 subject_id: self.subject_id.clone(),
1496 id: self.id.clone(),
1497 },
1498 subject_kind_hint: Some(
1499 crate::update::UpdateSubjectKind::Governance,
1500 ),
1501 round_retry_interval_secs: config
1502 .sync_update
1503 .round_retry_interval_secs,
1504 max_round_retries: config.sync_update.max_round_retries,
1505 witness_retry_count: config.sync_update.witness_retry_count,
1506 witness_retry_interval_secs: config
1507 .sync_update
1508 .witness_retry_interval_secs,
1509 };
1510
1511 let updater = Update::new(data);
1512 let Ok(child) = ctx.create_child("update", updater).await else {
1513 let Ok(actor) = ctx.reference().await else {
1514 return Ok(());
1515 };
1516
1517 actor
1518 .tell(RequestManagerMessage::RebootWait {
1519 request_id: self.id.clone(),
1520 governance_id: governance_id.clone(),
1521 })
1522 .await?;
1523
1524 return Ok(());
1525 };
1526
1527 child.tell(UpdateMessage::Run).await?;
1528 }
1529
1530 Ok(())
1531 }
1532
1533 async fn get_witnesses_auth(
1534 ctx: &ActorContext<Self>,
1535 governance_id: DigestIdentifier,
1536 ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1537 let path = ActorPath::from("/user/node/auth");
1538 let actor = ctx.system().get_actor::<Auth>(&path).await?;
1539
1540 let response = actor
1541 .ask(AuthMessage::GetAuth {
1542 subject_id: governance_id,
1543 })
1544 .await?;
1545
1546 match response {
1547 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1548 _ => Err(RequestManagerError::ActorError(
1549 ActorError::UnexpectedResponse {
1550 path,
1551 expected: "AuthResponse::Witnesses".to_owned(),
1552 },
1553 )),
1554 }
1555 }
1556
1557 async fn send_reboot(
1560 &self,
1561 ctx: &ActorContext<Self>,
1562 governance_id: DigestIdentifier,
1563 ) -> Result<(), ActorError> {
1564 let Ok(actor) = ctx.reference().await else {
1565 return Ok(());
1566 };
1567
1568 actor
1569 .tell(RequestManagerMessage::Reboot {
1570 request_id: self.id.clone(),
1571 governance_id,
1572 reboot_type: RebootType::TimeOut,
1573 })
1574 .await
1575 }
1576
1577 async fn match_error(
1578 &mut self,
1579 ctx: &mut ActorContext<Self>,
1580 error: RequestManagerError,
1581 ) {
1582 match error {
1583 RequestManagerError::NoEvaluatorsAvailable {
1584 governance_id,
1585 ..
1586 }
1587 | RequestManagerError::NoApproversAvailable {
1588 governance_id, ..
1589 }
1590 | RequestManagerError::NoValidatorsAvailable {
1591 governance_id,
1592 ..
1593 }
1594 | RequestManagerError::GovernanceVersionChanged {
1595 governance_id,
1596 ..
1597 } => {
1598 if let Err(e) = self.send_reboot(ctx, governance_id).await {
1599 emit_fail(ctx, e).await;
1600 }
1601 }
1602 RequestManagerError::CheckLimit
1603 | RequestManagerError::Governance(..)
1604 | RequestManagerError::NotIssuer
1605 | RequestManagerError::NotCreator => {
1606 if let Err(e) = self
1607 .abort_request(
1608 ctx,
1609 error.to_string(),
1610 None,
1611 (*self.our_key).clone(),
1612 )
1613 .await
1614 {
1615 emit_fail(
1616 ctx,
1617 ActorError::FunctionalCritical {
1618 description: e.to_string(),
1619 },
1620 )
1621 .await;
1622 }
1623 }
1624 _ => {
1625 emit_fail(
1626 ctx,
1627 ActorError::FunctionalCritical {
1628 description: error.to_string(),
1629 },
1630 )
1631 .await;
1632 }
1633 }
1634 }
1635
1636 async fn finish_request(
1637 &mut self,
1638 ctx: &mut ActorContext<Self>,
1639 ) -> Result<(), RequestManagerError> {
1640 self.finish_request_metrics("finished");
1641 info!("Ending {}", self.id);
1642 send_to_tracking(
1643 ctx,
1644 RequestTrackingMessage::UpdateState {
1645 request_id: self.id.clone(),
1646 state: RequestState::Finish,
1647 },
1648 )
1649 .await?;
1650
1651 self.on_event(RequestManagerEvent::Finish, ctx).await;
1652
1653 self.end_request(ctx).await?;
1654
1655 Ok(())
1656 }
1657
1658 async fn reboot(
1659 &mut self,
1660 ctx: &mut ActorContext<Self>,
1661 reboot_type: RebootType,
1662 governance_id: DigestIdentifier,
1663 ) -> Result<(), RequestManagerError> {
1664 let Some(config): Option<ConfigHelper> =
1665 ctx.system().get_helper("config").await
1666 else {
1667 return Err(ActorError::Helper {
1668 name: "config".to_owned(),
1669 reason: "Not found".to_owned(),
1670 }
1671 .into());
1672 };
1673 self.start_phase_metrics("reboot");
1674 self.on_event(
1675 RequestManagerEvent::UpdateState {
1676 state: Box::new(RequestManagerState::Reboot),
1677 },
1678 ctx,
1679 )
1680 .await;
1681
1682 let Ok(actor) = ctx.reference().await else {
1683 return Ok(());
1684 };
1685
1686 let request_id = self.id.clone();
1687
1688 match reboot_type {
1689 RebootType::Normal => {
1690 info!("Launching Normal reboot {}", self.id);
1691 send_to_tracking(
1692 ctx,
1693 RequestTrackingMessage::UpdateState {
1694 request_id: self.id.clone(),
1695 state: RequestState::Reboot,
1696 },
1697 )
1698 .await?;
1699
1700 actor
1701 .tell(RequestManagerMessage::RebootUpdate {
1702 request_id,
1703 governance_id,
1704 })
1705 .await?;
1706 }
1707 RebootType::Diff => {
1708 info!("Launching Diff reboot {}", self.id);
1709 self.retry_diff += 1;
1710
1711 let seconds = Self::retry_seconds_for_attempt(
1712 &config.sync_reboot.diff_retry_schedule_secs,
1713 self.retry_diff,
1714 );
1715
1716 info!(
1717 "Launching Diff reboot {}, try: {}, seconds: {}",
1718 self.id, self.retry_diff, seconds
1719 );
1720
1721 send_to_tracking(
1722 ctx,
1723 RequestTrackingMessage::UpdateState {
1724 request_id: self.id.clone(),
1725 state: RequestState::RebootDiff {
1726 seconds,
1727 count: self.retry_diff,
1728 },
1729 },
1730 )
1731 .await?;
1732
1733 tokio::spawn(async move {
1734 tokio::time::sleep(Duration::from_secs(seconds)).await;
1735 let _ = actor
1736 .tell(RequestManagerMessage::RebootUpdate {
1737 request_id,
1738 governance_id,
1739 })
1740 .await;
1741 });
1742 }
1743 RebootType::TimeOut => {
1744 self.retry_timeout += 1;
1745
1746 let seconds = Self::retry_seconds_for_attempt(
1747 &config.sync_reboot.timeout_retry_schedule_secs,
1748 self.retry_timeout,
1749 );
1750
1751 info!(
1752 "Launching TimeOut reboot {}, try: {}, seconds: {}",
1753 self.id, self.retry_timeout, seconds
1754 );
1755 send_to_tracking(
1756 ctx,
1757 RequestTrackingMessage::UpdateState {
1758 request_id: self.id.clone(),
1759 state: RequestState::RebootTimeOut {
1760 seconds,
1761 count: self.retry_timeout,
1762 },
1763 },
1764 )
1765 .await?;
1766
1767 tokio::spawn(async move {
1768 tokio::time::sleep(Duration::from_secs(seconds)).await;
1769 let _ = actor
1770 .tell(RequestManagerMessage::RebootUpdate {
1771 request_id,
1772 governance_id,
1773 })
1774 .await;
1775 });
1776 }
1777 }
1778
1779 Ok(())
1780 }
1781
1782 async fn match_command(
1783 &mut self,
1784 ctx: &mut ActorContext<Self>,
1785 ) -> Result<(), RequestManagerError> {
1786 match self.command {
1787 ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1788 ReqManInitMessage::Validate => {
1789 let (
1790 request,
1791 quorum,
1792 signers,
1793 init_state,
1794 current_request_roles,
1795 ) = self.build_validation_req(ctx, None, None).await?;
1796
1797 self.run_validation(
1798 ctx,
1799 request,
1800 quorum,
1801 signers,
1802 init_state,
1803 current_request_roles,
1804 )
1805 .await
1806 }
1807 }
1808 }
1809
1810 async fn check_request_roles_after_reboot(
1811 &self,
1812 ctx: &mut ActorContext<Self>,
1813 ) -> Result<(), RequestManagerError> {
1814 let Some(request) = self.request.clone() else {
1815 return Err(RequestManagerError::RequestNotSet);
1816 };
1817
1818 let gov = self.get_governance_data(ctx).await?;
1819 let subject_data = match request.content() {
1820 EventRequest::Create(..) => None,
1821 _ => get_subject_data(ctx, &self.subject_id).await?,
1822 };
1823
1824 let creator_scope = match request.content() {
1825 EventRequest::Create(create) if !create.schema_id.is_gov() => {
1826 Some((create.schema_id.clone(), create.namespace.clone()))
1827 }
1828 _ => subject_data.as_ref().and_then(|subject_data| {
1829 match subject_data {
1830 SubjectData::Tracker {
1831 schema_id,
1832 namespace,
1833 ..
1834 } => Some((
1835 schema_id.clone(),
1836 Namespace::from(namespace.clone()),
1837 )),
1838 _ => None,
1839 }
1840 }),
1841 };
1842
1843 if let Some((schema_id, namespace)) = creator_scope
1844 && !gov.has_this_role(HashThisRole::Schema {
1845 who: (*self.our_key).clone(),
1846 role: RoleTypes::Creator,
1847 schema_id,
1848 namespace,
1849 })
1850 {
1851 return Err(RequestManagerError::NotCreator);
1852 }
1853
1854 if let EventRequest::Fact { .. } = request.content() {
1855 let Some(subject_data) = subject_data else {
1856 return Err(RequestManagerError::SubjecData);
1857 };
1858
1859 match subject_data {
1860 SubjectData::Tracker {
1861 schema_id,
1862 namespace,
1863 ..
1864 } => {
1865 if !gov.has_this_role(HashThisRole::Schema {
1866 who: request.signature().signer.clone(),
1867 role: RoleTypes::Issuer,
1868 schema_id,
1869 namespace: Namespace::from(namespace),
1870 }) {
1871 return Err(RequestManagerError::NotIssuer);
1872 }
1873 }
1874 SubjectData::Governance { .. } => {
1875 if !gov.has_this_role(HashThisRole::Gov {
1876 who: request.signature().signer.clone(),
1877 role: RoleTypes::Issuer,
1878 }) {
1879 return Err(RequestManagerError::NotIssuer);
1880 }
1881 }
1882 }
1883 }
1884
1885 Ok(())
1886 }
1887
1888 async fn stops_childs(
1889 &self,
1890 ctx: &mut ActorContext<Self>,
1891 ) -> Result<(), RequestManagerError> {
1892 match self.state {
1893 RequestManagerState::Reboot => {
1894 if let Ok(actor) = ctx.get_child::<Update>("update").await {
1895 actor.ask_stop().await?;
1896 };
1897 if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1898 actor.ask_stop().await?;
1899 };
1900 }
1901 RequestManagerState::Evaluation => {
1902 if let Ok(actor) =
1903 ctx.get_child::<Evaluation>("evaluation").await
1904 {
1905 actor.ask_stop().await?;
1906 };
1907 }
1908 RequestManagerState::Approval { .. } => {
1909 if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1910 actor.ask_stop().await?;
1911 };
1912 let _ = make_obsolete(ctx, &self.subject_id).await;
1913 }
1914 RequestManagerState::Validation { .. } => {
1915 if let Ok(actor) =
1916 ctx.get_child::<Validation>("validation").await
1917 {
1918 actor.ask_stop().await?;
1919 };
1920 }
1921 RequestManagerState::Distribution { .. } => {
1922 if let Ok(actor) =
1923 ctx.get_child::<Distribution>("distribution").await
1924 {
1925 actor.ask_stop().await?;
1926 };
1927 }
1928 _ => {}
1929 }
1930
1931 Ok(())
1932 }
1933
1934 async fn abort_request(
1935 &mut self,
1936 ctx: &mut ActorContext<Self>,
1937 error: String,
1938 sn: Option<u64>,
1939 who: PublicKey,
1940 ) -> Result<(), RequestManagerError> {
1941 self.stops_childs(ctx).await?;
1942
1943 self.finish_request_metrics("aborted");
1944 info!("Aborting {}", self.id);
1945 send_to_tracking(
1946 ctx,
1947 RequestTrackingMessage::UpdateState {
1948 request_id: self.id.clone(),
1949 state: RequestState::Abort {
1950 subject_id: self.subject_id.to_string(),
1951 error,
1952 sn,
1953 who: who.to_string(),
1954 },
1955 },
1956 )
1957 .await?;
1958
1959 self.on_event(RequestManagerEvent::Finish, ctx).await;
1960
1961 self.end_request(ctx).await?;
1962
1963 Ok(())
1964 }
1965
1966 async fn end_request(
1967 &self,
1968 ctx: &ActorContext<Self>,
1969 ) -> Result<(), RequestManagerError> {
1970 let actor = ctx.get_parent::<RequestHandler>().await?;
1971 actor
1972 .tell(RequestHandlerMessage::EndHandling {
1973 subject_id: self.subject_id.clone(),
1974 })
1975 .await?;
1976
1977 Ok(())
1978 }
1979}
1980
1981#[derive(Debug, Clone)]
1982pub enum RequestManagerMessage {
1983 Run {
1984 request_id: DigestIdentifier,
1985 },
1986 FirstRun {
1987 command: ReqManInitMessage,
1988 request: Signed<EventRequest>,
1989 request_id: DigestIdentifier,
1990 },
1991 Abort {
1992 request_id: DigestIdentifier,
1993 who: PublicKey,
1994 reason: String,
1995 sn: u64,
1996 },
1997 ManualAbort,
1998 PurgeStorage,
1999 Reboot {
2000 request_id: DigestIdentifier,
2001 governance_id: DigestIdentifier,
2002 reboot_type: RebootType,
2003 },
2004 RebootUpdate {
2005 request_id: DigestIdentifier,
2006 governance_id: DigestIdentifier,
2007 },
2008 RebootWait {
2009 request_id: DigestIdentifier,
2010 governance_id: DigestIdentifier,
2011 },
2012 FinishReboot {
2013 request_id: DigestIdentifier,
2014 },
2015 EvaluationRes {
2016 request_id: DigestIdentifier,
2017 eval_req: Box<EvaluationReq>,
2018 eval_res: EvaluationData,
2019 },
2020 ApprovalRes {
2021 request_id: DigestIdentifier,
2022 appro_res: ApprovalData,
2023 },
2024 ValidationRes {
2025 request_id: DigestIdentifier,
2026 val_req: Box<ValidationReq>,
2027 val_res: ValidationData,
2028 },
2029 FinishRequest {
2030 request_id: DigestIdentifier,
2031 },
2032}
2033
2034impl Message for RequestManagerMessage {}
2035
2036#[derive(
2037 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
2038)]
2039pub enum RequestManagerEvent {
2040 Finish,
2041 UpdateState {
2042 state: Box<RequestManagerState>,
2043 },
2044 UpdateVersion {
2045 version: u64,
2046 },
2047 SafeState {
2048 command: ReqManInitMessage,
2049 request: Signed<EventRequest>,
2050 },
2051}
2052
2053impl Event for RequestManagerEvent {}
2054
2055#[async_trait]
2056impl Actor for RequestManager {
2057 type Event = RequestManagerEvent;
2058 type Message = RequestManagerMessage;
2059 type Response = ();
2060
2061 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
2062 parent_span.map_or_else(
2063 || info_span!("RequestManager", id),
2064 |parent_span| info_span!(parent: parent_span, "RequestManager", id),
2065 )
2066 }
2067
2068 async fn pre_start(
2069 &mut self,
2070 ctx: &mut ActorContext<Self>,
2071 ) -> Result<(), ActorError> {
2072 if let Err(e) =
2073 self.init_store("request_manager", None, false, ctx).await
2074 {
2075 error!(
2076 error = %e,
2077 subject_id = %self.subject_id,
2078 "Failed to initialize store"
2079 );
2080 return Err(e);
2081 }
2082
2083 if self.governance_id.is_none()
2084 && let Some(request) = &self.request
2085 && let EventRequest::Create(create) = request.content()
2086 && !create.schema_id.is_gov()
2087 {
2088 self.governance_id = Some(create.governance_id.clone());
2089 }
2090
2091 Ok(())
2092 }
2093}
2094
2095#[async_trait]
2096impl Handler<Self> for RequestManager {
2097 #[allow(clippy::large_stack_frames)]
2100 async fn handle_message(
2101 &mut self,
2102 _sender: ActorPath,
2103 msg: RequestManagerMessage,
2104 ctx: &mut ave_actors::ActorContext<Self>,
2105 ) -> Result<(), ActorError> {
2106 match msg {
2107 RequestManagerMessage::RebootUpdate {
2108 governance_id,
2109 request_id,
2110 } => {
2111 if request_id == self.id {
2112 info!("Init reboot update {}", self.id);
2113 debug!(
2114 msg_type = "RebootUpdate",
2115 request_id = %self.id,
2116 governance_id = %governance_id,
2117 "Initializing reboot update"
2118 );
2119
2120 if let Err(e) = self.init_update(ctx, &governance_id).await
2121 {
2122 error!(
2123 msg_type = "RebootUpdate",
2124 request_id = %self.id,
2125 governance_id = %governance_id,
2126 error = %e,
2127 "Failed to initialize reboot update"
2128 );
2129 self.match_error(ctx, e).await;
2130 return Ok(());
2131 }
2132 }
2133 }
2134 RequestManagerMessage::RebootWait {
2135 governance_id,
2136 request_id,
2137 } => {
2138 if request_id == self.id {
2139 info!("Init reboot wait {}", self.id);
2140 debug!(
2141 msg_type = "RebootWait",
2142 request_id = %self.id,
2143 governance_id = %governance_id,
2144 "Initializing reboot wait"
2145 );
2146
2147 if let Err(e) = self.init_wait(ctx, &governance_id).await {
2148 error!(
2149 msg_type = "RebootWait",
2150 request_id = %self.id,
2151 governance_id = %governance_id,
2152 error = %e,
2153 "Failed to initialize reboot wait"
2154 );
2155 self.match_error(ctx, e).await;
2156 return Ok(());
2157 }
2158 }
2159 }
2160 RequestManagerMessage::Reboot {
2161 governance_id,
2162 request_id,
2163 reboot_type,
2164 } => {
2165 if request_id == self.id {
2166 if matches!(self.state, RequestManagerState::Reboot) {
2167 debug!(
2168 msg_type = "Reboot",
2169 request_id = %self.id,
2170 governance_id = %governance_id,
2171 reboot_type = ?reboot_type,
2172 "Already in reboot state, ignoring"
2173 );
2174 } else {
2175 debug!(
2176 msg_type = "Reboot",
2177 request_id = %self.id,
2178 governance_id = %governance_id,
2179 reboot_type = ?reboot_type,
2180 "Initiating reboot"
2181 );
2182 if let Err(e) = self.stops_childs(ctx).await {
2183 error!(
2184 msg_type = "Reboot",
2185 request_id = %self.id,
2186 governance_id = %governance_id,
2187 error = %e,
2188 "Failed to stop childs"
2189 );
2190 self.match_error(ctx, e).await;
2191 return Ok(());
2192 };
2193 if let Err(e) = self
2194 .reboot(ctx, reboot_type, governance_id.clone())
2195 .await
2196 {
2197 error!(
2198 msg_type = "Reboot",
2199 request_id = %self.id,
2200 governance_id = %governance_id,
2201 error = %e,
2202 "Failed to initiate reboot"
2203 );
2204 self.match_error(ctx, e).await;
2205 return Ok(());
2206 }
2207 }
2208 }
2209 }
2210 RequestManagerMessage::FinishReboot { request_id } => {
2211 if request_id == self.id {
2212 info!("Init reboot finish {}", self.id);
2213 debug!(
2214 msg_type = "FinishReboot",
2215 request_id = %self.id,
2216 version = self.version,
2217 "Reboot completed, resuming request"
2218 );
2219 self.on_event(
2220 RequestManagerEvent::UpdateVersion {
2221 version: self.version + 1,
2222 },
2223 ctx,
2224 )
2225 .await;
2226
2227 if let Err(e) = send_to_tracking(
2228 ctx,
2229 RequestTrackingMessage::UpdateVersion {
2230 request_id: self.id.clone(),
2231 version: self.version,
2232 },
2233 )
2234 .await
2235 {
2236 error!(
2237 msg_type = "FinishReboot",
2238 request_id = %self.id,
2239 version = self.version,
2240 error = %e,
2241 "Failed to send version update to tracking"
2242 );
2243 return Err(emit_fail(ctx, e).await);
2244 }
2245
2246 if let Err(e) =
2247 self.check_request_roles_after_reboot(ctx).await
2248 {
2249 error!(
2250 msg_type = "FinishReboot",
2251 request_id = %self.id,
2252 error = %e,
2253 "Failed to check signatures after reboot"
2254 );
2255 self.match_error(ctx, e).await;
2256 return Ok(());
2257 }
2258
2259 if let Err(e) = self.match_command(ctx).await {
2260 error!(
2261 msg_type = "FinishReboot",
2262 request_id = %self.id,
2263 error = %e,
2264 "Failed to execute command after reboot"
2265 );
2266 self.match_error(ctx, e).await;
2267 return Ok(());
2268 }
2269 }
2270 }
2271 RequestManagerMessage::Abort {
2272 request_id,
2273 who,
2274 reason,
2275 sn,
2276 } => {
2277 if request_id == self.id {
2278 warn!(
2279 msg_type = "Abort",
2280 state = %self.state,
2281 request_id = %self.id,
2282 who = %who,
2283 reason = %reason,
2284 sn = sn,
2285 "Request abort received"
2286 );
2287 if let Err(e) =
2288 self.abort_request(ctx, reason, Some(sn), who).await
2289 {
2290 error!(
2291 msg_type = "Abort",
2292 request_id = %self.id,
2293 error = %e,
2294 "Failed to abort request"
2295 );
2296 self.match_error(ctx, e).await;
2297 return Ok(());
2298 }
2299 }
2300 }
2301 RequestManagerMessage::ManualAbort => {
2302 match &self.state {
2303 RequestManagerState::Reboot
2304 | RequestManagerState::Starting
2305 | RequestManagerState::Evaluation
2306 | RequestManagerState::Approval { .. }
2307 | RequestManagerState::Validation { .. } => {
2308 if let Err(e) = self
2309 .abort_request(
2310 ctx,
2311 "The user manually aborted the request"
2312 .to_owned(),
2313 None,
2314 (*self.our_key).clone(),
2315 )
2316 .await
2317 {
2318 error!(
2319 msg_type = "Abort",
2320 request_id = %self.id,
2321 error = %e,
2322 "Failed to abort request"
2323 );
2324 self.match_error(ctx, e).await;
2325 }
2326 }
2327 _ => {
2328 info!(
2329 "The request is in a state that cannot be aborted {}, state: {}",
2330 self.id, self.state
2331 );
2332 }
2333 }
2334
2335 return Ok(());
2336 }
2337 RequestManagerMessage::PurgeStorage => {
2338 purge_storage(ctx).await?;
2339
2340 debug!(
2341 msg_type = "PurgeStorage",
2342 subject_id = %self.subject_id,
2343 "Purged request manager storage"
2344 );
2345
2346 return Ok(());
2347 }
2348 RequestManagerMessage::FirstRun {
2349 command,
2350 request,
2351 request_id,
2352 } => {
2353 self.id = request_id.clone();
2354 self.begin_request_metrics();
2355 debug!(
2356 msg_type = "FirstRun",
2357 request_id = %request_id,
2358 command = ?command,
2359 "First run of request manager"
2360 );
2361 self.on_event(
2362 RequestManagerEvent::SafeState { command, request },
2363 ctx,
2364 )
2365 .await;
2366
2367 if let Err(e) = self.match_command(ctx).await {
2368 error!(
2369 msg_type = "FirstRun",
2370 request_id = %self.id,
2371 error = %e,
2372 "Failed to execute initial command"
2373 );
2374 self.match_error(ctx, e).await;
2375 return Ok(());
2376 };
2377 }
2378 RequestManagerMessage::Run { request_id } => {
2379 self.id = request_id;
2380 self.ensure_request_metrics_started();
2381
2382 debug!(
2383 msg_type = "Run",
2384 request_id = %self.id,
2385 state = ?self.state,
2386 version = self.version,
2387 "Running request manager"
2388 );
2389 match self.state.clone() {
2390 RequestManagerState::Starting
2391 | RequestManagerState::Reboot => {
2392 if let Err(e) = self.match_command(ctx).await {
2393 error!(
2394 msg_type = "Run",
2395 request_id = %self.id,
2396 state = "Starting/Reboot",
2397 error = %e,
2398 "Failed to execute command"
2399 );
2400 self.match_error(ctx, e).await;
2401 return Ok(());
2402 };
2403 }
2404 RequestManagerState::Evaluation => {
2405 if let Err(e) = self.build_evaluation(ctx).await {
2406 error!(
2407 msg_type = "Run",
2408 request_id = %self.id,
2409 state = "Evaluation",
2410 error = %e,
2411 "Failed to build evaluation"
2412 );
2413 self.match_error(ctx, e).await;
2414 return Ok(());
2415 }
2416 }
2417
2418 RequestManagerState::Approval { eval_req, eval_res } => {
2419 let Some(evaluator_res) =
2420 eval_res.evaluator_response_ok()
2421 else {
2422 error!(
2423 msg_type = "Run",
2424 request_id = %self.id,
2425 state = "Approval",
2426 "Approval state is missing a successful evaluator response"
2427 );
2428 self.match_error(
2429 ctx,
2430 RequestManagerError::InvalidRequestState {
2431 expected:
2432 "approval state with successful evaluator response",
2433 got:
2434 "approval state without successful evaluator response",
2435 },
2436 )
2437 .await;
2438 return Ok(());
2439 };
2440
2441 if let Err(e) = self
2442 .build_approval(ctx, eval_req, evaluator_res)
2443 .await
2444 {
2445 error!(
2446 msg_type = "Run",
2447 request_id = %self.id,
2448 state = "Approval",
2449 error = %e,
2450 "Failed to build approval"
2451 );
2452 self.match_error(ctx, e).await;
2453 return Ok(());
2454 }
2455 }
2456 RequestManagerState::Validation {
2457 request,
2458 quorum,
2459 init_state,
2460 current_request_roles,
2461 signers,
2462 distribution_plan: _,
2463 } => {
2464 if let Err(e) = self
2465 .run_validation(
2466 ctx,
2467 *request,
2468 quorum,
2469 signers,
2470 init_state,
2471 current_request_roles,
2472 )
2473 .await
2474 {
2475 error!(
2476 msg_type = "Run",
2477 request_id = %self.id,
2478 state = "Validation",
2479 error = %e,
2480 "Failed to run validation"
2481 );
2482 self.match_error(ctx, e).await;
2483 return Ok(());
2484 };
2485 }
2486 RequestManagerState::UpdateSubject {
2487 ledger,
2488 distribution_plan,
2489 } => {
2490 if let Err(e) = self
2491 .update_subject(
2492 ctx,
2493 ledger.clone(),
2494 distribution_plan.clone(),
2495 )
2496 .await
2497 {
2498 error!(
2499 msg_type = "Run",
2500 request_id = %self.id,
2501 state = "UpdateSubject",
2502 error = %e,
2503 "Failed to update subject"
2504 );
2505 self.match_error(ctx, e).await;
2506 return Ok(());
2507 };
2508
2509 match self
2510 .build_distribution(ctx, ledger, distribution_plan)
2511 .await
2512 {
2513 Ok(in_distribution) => {
2514 if !in_distribution
2515 && let Err(e) =
2516 self.finish_request(ctx).await
2517 {
2518 error!(
2519 msg_type = "Run",
2520 request_id = %self.id,
2521 state = "UpdateSubject",
2522 error = %e,
2523 "Failed to finish request after build distribution"
2524 );
2525 self.match_error(ctx, e).await;
2526 return Ok(());
2527 }
2528 }
2529 Err(e) => {
2530 error!(
2531 msg_type = "Run",
2532 request_id = %self.id,
2533 state = "UpdateSubject",
2534 error = %e,
2535 "Failed to build distribution"
2536 );
2537 self.match_error(ctx, e).await;
2538 return Ok(());
2539 }
2540 };
2541 }
2542 RequestManagerState::Distribution {
2543 ledger,
2544 distribution_plan,
2545 } => {
2546 match self
2547 .build_distribution(ctx, ledger, distribution_plan)
2548 .await
2549 {
2550 Ok(in_distribution) => {
2551 if !in_distribution
2552 && let Err(e) =
2553 self.finish_request(ctx).await
2554 {
2555 error!(
2556 msg_type = "Run",
2557 request_id = %self.id,
2558 state = "Distribution",
2559 error = %e,
2560 "Failed to finish request after build distribution"
2561 );
2562 self.match_error(ctx, e).await;
2563 return Ok(());
2564 }
2565 }
2566 Err(e) => {
2567 error!(
2568 msg_type = "Run",
2569 request_id = %self.id,
2570 state = "Distribution",
2571 error = %e,
2572 "Failed to build distribution"
2573 );
2574 self.match_error(ctx, e).await;
2575 return Ok(());
2576 }
2577 };
2578 }
2579 RequestManagerState::End => {
2580 if let Err(e) = self.end_request(ctx).await {
2581 error!(
2582 msg_type = "Run",
2583 request_id = %self.id,
2584 state = "End",
2585 error = %e,
2586 "Failed to end request"
2587 );
2588 self.match_error(ctx, e).await;
2589 return Ok(());
2590 }
2591 }
2592 };
2593 }
2594 RequestManagerMessage::EvaluationRes {
2595 eval_req,
2596 eval_res,
2597 request_id,
2598 } => {
2599 if request_id == self.id {
2600 debug!(
2601 msg_type = "EvaluationRes",
2602 request_id = %self.id,
2603 version = self.version,
2604 "Evaluation result received"
2605 );
2606 if let Err(e) = self.stops_childs(ctx).await {
2607 error!(
2608 msg_type = "EvaluationRes",
2609 request_id = %self.id,
2610 error = %e,
2611 "Failed to stop childs"
2612 );
2613 self.match_error(ctx, e).await;
2614 return Ok(());
2615 };
2616
2617 if let Some(evaluator_res) =
2618 eval_res.evaluator_response_ok()
2619 && evaluator_res.appr_required
2620 {
2621 debug!(
2622 msg_type = "EvaluationRes",
2623 request_id = %self.id,
2624 "Approval required, proceeding to approval phase"
2625 );
2626 self.on_event(
2627 RequestManagerEvent::UpdateState {
2628 state: Box::new(
2629 RequestManagerState::Approval {
2630 eval_req: *eval_req.clone(),
2631 eval_res: eval_res.clone(),
2632 },
2633 ),
2634 },
2635 ctx,
2636 )
2637 .await;
2638
2639 if let Err(e) = self
2640 .build_approval(ctx, *eval_req, evaluator_res)
2641 .await
2642 {
2643 error!(
2644 msg_type = "EvaluationRes",
2645 request_id = %self.id,
2646 error = %e,
2647 "Failed to build approval"
2648 );
2649 self.match_error(ctx, e).await;
2650 return Ok(());
2651 }
2652 } else {
2653 debug!(
2654 msg_type = "EvaluationRes",
2655 request_id = %self.id,
2656 "Approval not required, proceeding to validation phase"
2657 );
2658 let (
2659 request,
2660 quorum,
2661 signers,
2662 init_state,
2663 current_request_roles,
2664 ) = match self
2665 .build_validation_req(
2666 ctx,
2667 Some((*eval_req, eval_res)),
2668 None,
2669 )
2670 .await
2671 {
2672 Ok(data) => data,
2673 Err(e) => {
2674 error!(
2675 msg_type = "EvaluationRes",
2676 request_id = %self.id,
2677 error = %e,
2678 "Failed to build validation request"
2679 );
2680 self.match_error(ctx, e).await;
2681 return Ok(());
2682 }
2683 };
2684
2685 if let Err(e) = self
2686 .run_validation(
2687 ctx,
2688 request,
2689 quorum,
2690 signers,
2691 init_state,
2692 current_request_roles,
2693 )
2694 .await
2695 {
2696 error!(
2697 msg_type = "EvaluationRes",
2698 request_id = %self.id,
2699 error = %e,
2700 "Failed to run validation"
2701 );
2702 self.match_error(ctx, e).await;
2703 return Ok(());
2704 };
2705 }
2706 }
2707 }
2708 RequestManagerMessage::ApprovalRes {
2709 appro_res,
2710 request_id,
2711 } => {
2712 if request_id == self.id {
2713 let _ = make_obsolete(ctx, &self.subject_id).await;
2714 debug!(
2715 msg_type = "ApprovalRes",
2716 request_id = %self.id,
2717 version = self.version,
2718 "Approval result received"
2719 );
2720 if let Err(e) = self.stops_childs(ctx).await {
2721 error!(
2722 msg_type = "ApprovalRes",
2723 request_id = %self.id,
2724 error = %e,
2725 "Failed to stop childs"
2726 );
2727 self.match_error(ctx, e).await;
2728 return Ok(());
2729 };
2730
2731 let RequestManagerState::Approval { eval_req, eval_res } =
2732 self.state.clone()
2733 else {
2734 error!(
2735 msg_type = "ApprovalRes",
2736 request_id = %self.id,
2737 state = ?self.state,
2738 "Invalid state for approval response"
2739 );
2740 let e = ActorError::FunctionalCritical {
2741 description: "Invalid request state".to_owned(),
2742 };
2743 return Err(emit_fail(ctx, e).await);
2744 };
2745 let (
2746 request,
2747 quorum,
2748 signers,
2749 init_state,
2750 current_request_roles,
2751 ) = match self
2752 .build_validation_req(
2753 ctx,
2754 Some((eval_req, eval_res)),
2755 Some(appro_res),
2756 )
2757 .await
2758 {
2759 Ok(data) => data,
2760 Err(e) => {
2761 error!(
2762 msg_type = "ApprovalRes",
2763 request_id = %self.id,
2764 error = %e,
2765 "Failed to build validation request"
2766 );
2767 self.match_error(ctx, e).await;
2768 return Ok(());
2769 }
2770 };
2771
2772 if let Err(e) = self
2773 .run_validation(
2774 ctx,
2775 request,
2776 quorum,
2777 signers,
2778 init_state,
2779 current_request_roles,
2780 )
2781 .await
2782 {
2783 error!(
2784 msg_type = "ApprovalRes",
2785 request_id = %self.id,
2786 error = %e,
2787 "Failed to run validation"
2788 );
2789 self.match_error(ctx, e).await;
2790 return Ok(());
2791 };
2792 }
2793 }
2794 RequestManagerMessage::ValidationRes {
2795 val_res,
2796 val_req,
2797 request_id,
2798 } => {
2799 if request_id == self.id {
2800 debug!(
2801 msg_type = "ValidationRes",
2802 request_id = %self.id,
2803 version = self.version,
2804 "Validation result received"
2805 );
2806 if let Err(e) = self.stops_childs(ctx).await {
2807 error!(
2808 msg_type = "ValidationRes",
2809 request_id = %self.id,
2810 error = %e,
2811 "Failed to stop childs"
2812 );
2813 self.match_error(ctx, e).await;
2814 return Ok(());
2815 };
2816
2817 let distribution_plan = match &self.state {
2818 RequestManagerState::Validation {
2819 distribution_plan,
2820 ..
2821 } => distribution_plan.clone(),
2822 _ => {
2823 error!(
2824 msg_type = "ValidationRes",
2825 request_id = %self.id,
2826 state = ?self.state,
2827 "Invalid state for validation response"
2828 );
2829 self.match_error(
2830 ctx,
2831 RequestManagerError::InvalidRequestState {
2832 expected: "Validation",
2833 got: "Other",
2834 },
2835 )
2836 .await;
2837 return Ok(());
2838 }
2839 };
2840
2841 let signed_ledger = match self
2842 .build_ledger(
2843 ctx,
2844 *val_req,
2845 val_res,
2846 distribution_plan.clone(),
2847 )
2848 .await
2849 {
2850 Ok(signed_ledger) => signed_ledger,
2851 Err(e) => {
2852 error!(
2853 msg_type = "ValidationRes",
2854 request_id = %self.id,
2855 error = %e,
2856 "Failed to build ledger"
2857 );
2858 self.match_error(ctx, e).await;
2859 return Ok(());
2860 }
2861 };
2862
2863 if let Err(e) = self
2864 .update_subject(
2865 ctx,
2866 signed_ledger.clone(),
2867 distribution_plan.clone(),
2868 )
2869 .await
2870 {
2871 error!(
2872 msg_type = "ValidationRes",
2873 request_id = %self.id,
2874 error = %e,
2875 "Failed to update subject"
2876 );
2877 self.match_error(ctx, e).await;
2878 return Ok(());
2879 };
2880
2881 match self
2882 .build_distribution(
2883 ctx,
2884 signed_ledger,
2885 distribution_plan,
2886 )
2887 .await
2888 {
2889 Ok(in_distribution) => {
2890 if !in_distribution
2891 && let Err(e) = self.finish_request(ctx).await
2892 {
2893 error!(
2894 msg_type = "ValidationRes",
2895 request_id = %self.id,
2896 error = %e,
2897 "Failed to finish request after build distribution"
2898 );
2899 self.match_error(ctx, e).await;
2900 return Ok(());
2901 }
2902 }
2903 Err(e) => {
2904 error!(
2905 msg_type = "ValidationRes",
2906 request_id = %self.id,
2907 error = %e,
2908 "Failed to build distribution"
2909 );
2910 self.match_error(ctx, e).await;
2911 return Ok(());
2912 }
2913 };
2914 }
2915 }
2916 RequestManagerMessage::FinishRequest { request_id } => {
2917 if request_id == self.id {
2918 debug!(
2919 msg_type = "FinishRequest",
2920 request_id = %self.id,
2921 version = self.version,
2922 "Finishing request"
2923 );
2924
2925 if let Err(e) = self.stops_childs(ctx).await {
2926 error!(
2927 msg_type = "FinishRequest",
2928 request_id = %self.id,
2929 error = %e,
2930 "Failed to stop childs"
2931 );
2932 self.match_error(ctx, e).await;
2933 return Ok(());
2934 };
2935
2936 if let Err(e) = self.finish_request(ctx).await {
2937 error!(
2938 msg_type = "FinishRequest",
2939 request_id = %self.id,
2940 error = %e,
2941 "Failed to finish request"
2942 );
2943 self.match_error(ctx, e).await;
2944 return Ok(());
2945 }
2946 }
2947 }
2948 }
2949
2950 Ok(())
2951 }
2952
2953 async fn on_event(
2954 &mut self,
2955 event: RequestManagerEvent,
2956 ctx: &mut ActorContext<Self>,
2957 ) {
2958 let event_type = match &event {
2959 RequestManagerEvent::Finish => "Finish",
2960 RequestManagerEvent::UpdateState { .. } => "UpdateState",
2961 RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2962 RequestManagerEvent::SafeState { .. } => "SafeState",
2963 };
2964
2965 if let Err(e) = self.persist(&event, ctx).await {
2966 error!(
2967 event_type = event_type,
2968 request_id = %self.id,
2969 error = %e,
2970 "Failed to persist event"
2971 );
2972 emit_fail(ctx, e).await;
2973 };
2974 }
2975
2976 async fn on_child_fault(
2977 &mut self,
2978 error: ActorError,
2979 ctx: &mut ActorContext<Self>,
2980 ) -> ChildAction {
2981 error!(
2982 request_id = %self.id,
2983 version = self.version,
2984 state = ?self.state,
2985 error = %error,
2986 "Child fault in request manager"
2987 );
2988 emit_fail(ctx, error).await;
2989 ChildAction::Stop
2990 }
2991}
2992
2993#[async_trait]
2994impl PersistentActor for RequestManager {
2995 type Persistence = LightPersistence;
2996 type InitParams = InitRequestManager;
2997
2998 fn update(&mut self, state: Self) {
2999 self.command = state.command;
3000 self.request = state.request;
3001 self.state = state.state;
3002 self.version = state.version;
3003 }
3004
3005 fn create_initial(params: Self::InitParams) -> Self {
3006 Self {
3007 retry_diff: 0,
3008 retry_timeout: 0,
3009 request_started_at: None,
3010 current_phase: None,
3011 current_phase_started_at: None,
3012 our_key: params.our_key,
3013 id: DigestIdentifier::default(),
3014 subject_id: params.subject_id,
3015 governance_id: params.governance_id,
3016 command: ReqManInitMessage::Evaluate,
3017 request: None,
3018 state: RequestManagerState::Starting,
3019 version: 0,
3020 helpers: Some(params.helpers),
3021 }
3022 }
3023
3024 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3026 match event {
3027 RequestManagerEvent::Finish => {
3028 debug!(
3029 event_type = "Finish",
3030 request_id = %self.id,
3031 "Applying finish event"
3032 );
3033 self.state = RequestManagerState::End;
3034 self.request = None;
3035 self.id = DigestIdentifier::default();
3036 }
3037 RequestManagerEvent::UpdateState { state } => {
3038 debug!(
3039 event_type = "UpdateState",
3040 request_id = %self.id,
3041 old_state = ?self.state,
3042 new_state = ?state,
3043 "Applying state update"
3044 );
3045 self.state = *state.clone()
3046 }
3047 RequestManagerEvent::UpdateVersion { version } => {
3048 debug!(
3049 event_type = "UpdateVersion",
3050 request_id = %self.id,
3051 old_version = self.version,
3052 new_version = version,
3053 "Applying version update"
3054 );
3055 self.state = RequestManagerState::Starting;
3056 self.version = *version
3057 }
3058 RequestManagerEvent::SafeState { command, request } => {
3059 debug!(
3060 event_type = "SafeState",
3061 request_id = %self.id,
3062 command = ?command,
3063 "Applying safe state"
3064 );
3065 self.version = 0;
3066 self.retry_diff = 0;
3067 self.retry_timeout = 0;
3068 self.state = RequestManagerState::Starting;
3069 self.request = Some(request.clone());
3070 self.command = command.clone();
3071 }
3072 };
3073
3074 Ok(())
3075 }
3076}
3077
3078#[async_trait]
3079impl Storable for RequestManager {}