1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24 Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::EvaluateData;
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30 HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::governance::role_register::RoleDataRegister;
33use crate::helpers::network::service::NetworkSender;
34use crate::metrics::try_core_metrics;
35use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
36use crate::model::common::subject::{
37 acquire_subject, create_subject, get_gov, get_gov_sn,
38 get_last_ledger_event, get_metadata, make_obsolete, update_ledger,
39};
40use crate::model::common::{purge_storage, send_to_tracking};
41use crate::model::event::{
42 ApprovalData, EvaluationData, Ledger, Protocols, ValidationData,
43};
44use crate::node::SubjectData;
45use crate::request::error::RequestManagerError;
46use crate::request::tracking::RequestTrackingMessage;
47use crate::request::{RequestHandler, RequestHandlerMessage};
48use crate::subject::{Metadata, SignedLedger};
49
50use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
51use crate::validation::worker::CurrentRequestRoles;
52use crate::{
53 ActorMessage, NetworkMessage, Validation, ValidationMessage,
54 approval::{Approval, ApprovalMessage},
55 auth::{Auth, AuthMessage, AuthResponse},
56 db::Storable,
57 evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
58 model::common::emit_fail,
59 update::{Update, UpdateMessage, UpdateNew, UpdateType},
60};
61
62use super::{
63 reboot::{Reboot, RebootMessage},
64 types::{ReqManInitMessage, RequestManagerState},
65};
66
67#[derive(Clone, Debug, Serialize, Deserialize)]
68pub struct RequestManager {
69 #[serde(skip)]
70 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
71 #[serde(skip)]
72 our_key: Arc<PublicKey>,
73 #[serde(skip)]
74 id: DigestIdentifier,
75 #[serde(skip)]
76 subject_id: DigestIdentifier,
77 #[serde(skip)]
78 governance_id: Option<DigestIdentifier>,
79 #[serde(skip)]
80 retry_timeout: u64,
81 #[serde(skip)]
82 retry_diff: u64,
83 #[serde(skip)]
84 request_started_at: Option<Instant>,
85 #[serde(skip)]
86 current_phase: Option<&'static str>,
87 #[serde(skip)]
88 current_phase_started_at: Option<Instant>,
89 command: ReqManInitMessage,
90 request: Option<Signed<EventRequest>>,
91 state: RequestManagerState,
92 version: u64,
93}
94
95#[derive(Debug, Clone)]
96pub enum RebootType {
97 Normal,
98 Diff,
99 TimeOut,
100}
101
102pub struct InitRequestManager {
103 pub our_key: Arc<PublicKey>,
104 pub subject_id: DigestIdentifier,
105 pub governance_id: Option<DigestIdentifier>,
106 pub helpers: (HashAlgorithm, Arc<NetworkSender>),
107}
108
109impl BorshSerialize for RequestManager {
110 fn serialize<W: std::io::Write>(
111 &self,
112 writer: &mut W,
113 ) -> std::io::Result<()> {
114 BorshSerialize::serialize(&self.command, writer)?;
115 BorshSerialize::serialize(&self.state, writer)?;
116 BorshSerialize::serialize(&self.version, writer)?;
117 BorshSerialize::serialize(&self.request, writer)?;
118
119 Ok(())
120 }
121}
122
123impl BorshDeserialize for RequestManager {
124 fn deserialize_reader<R: std::io::Read>(
125 reader: &mut R,
126 ) -> std::io::Result<Self> {
127 let command = ReqManInitMessage::deserialize_reader(reader)?;
129 let state = RequestManagerState::deserialize_reader(reader)?;
130 let version = u64::deserialize_reader(reader)?;
131 let request =
132 Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
133
134 let our_key = Arc::new(PublicKey::default());
135 let subject_id = DigestIdentifier::default();
136 let id = DigestIdentifier::default();
137
138 Ok(Self {
139 retry_diff: 0,
140 retry_timeout: 0,
141 request_started_at: None,
142 current_phase: None,
143 current_phase_started_at: None,
144 helpers: None,
145 our_key,
146 id,
147 subject_id,
148 governance_id: None,
149 command,
150 request,
151 state,
152 version,
153 })
154 }
155}
156
157impl RequestManager {
158 fn begin_request_metrics(&mut self) {
159 self.request_started_at = Some(Instant::now());
160 self.current_phase = None;
161 self.current_phase_started_at = None;
162
163 if let Some(metrics) = try_core_metrics() {
164 metrics.observe_request_started();
165 }
166 }
167
168 fn ensure_request_metrics_started(&mut self) {
169 if self.request_started_at.is_none() {
170 self.request_started_at = Some(Instant::now());
171 }
172 }
173
174 fn start_phase_metrics(&mut self, phase: &'static str) {
175 self.ensure_request_metrics_started();
176 self.finish_phase_metrics();
177 self.current_phase = Some(phase);
178 self.current_phase_started_at = Some(Instant::now());
179 }
180
181 fn finish_phase_metrics(&mut self) {
182 let Some(phase) = self.current_phase.take() else {
183 self.current_phase_started_at = None;
184 return;
185 };
186 let Some(started_at) = self.current_phase_started_at.take() else {
187 return;
188 };
189
190 if let Some(metrics) = try_core_metrics() {
191 metrics.observe_request_phase(phase, started_at.elapsed());
192 }
193 }
194
195 fn finish_request_metrics(&mut self, result: &'static str) {
196 self.finish_phase_metrics();
197
198 if let Some(started_at) = self.request_started_at.take()
199 && let Some(metrics) = try_core_metrics()
200 {
201 metrics.observe_request_terminal(result, started_at.elapsed());
202 }
203 }
204
205 async fn build_evaluation(
209 &mut self,
210 ctx: &mut ActorContext<Self>,
211 ) -> Result<(), RequestManagerError> {
212 let Some(request) = self.request.clone() else {
213 return Err(RequestManagerError::RequestNotSet);
214 };
215
216 self.on_event(
217 RequestManagerEvent::UpdateState {
218 state: Box::new(RequestManagerState::Evaluation),
219 },
220 ctx,
221 )
222 .await;
223
224 let metadata = self.check_data_eval(ctx, &request).await?;
225
226 let (signed_evaluation_req, quorum, signers, init_state) =
227 self.build_request_eval(ctx, &metadata, &request).await?;
228
229 if signers.is_empty() {
230 warn!(
231 request_id = %self.id,
232 schema_id = %metadata.schema_id,
233 "No evaluators available for schema"
234 );
235
236 return Err(RequestManagerError::NoEvaluatorsAvailable {
237 schema_id: metadata.schema_id.to_string(),
238 governance_id: signed_evaluation_req
239 .content()
240 .governance_id
241 .clone(),
242 });
243 }
244
245 self.run_evaluation(
246 ctx,
247 signed_evaluation_req.clone(),
248 quorum,
249 init_state,
250 signers,
251 )
252 .await
253 }
254
255 const fn needs_subject_manager(&self) -> bool {
257 self.governance_id.is_some()
258 }
259
260 fn requester_id(&self) -> String {
261 self.id.to_string()
262 }
263
264 async fn check_data_eval(
265 &self,
266 ctx: &mut ActorContext<Self>,
267 request: &Signed<EventRequest>,
268 ) -> Result<Metadata, RequestManagerError> {
269 let (subject_id, confirm) = match request.content().clone() {
270 EventRequest::Fact(event) => (event.subject_id, false),
271 EventRequest::Transfer(event) => (event.subject_id, false),
272 EventRequest::Confirm(event) => (event.subject_id, true),
273 _ => {
274 return Err(
275 RequestManagerError::InvalidEventRequestForEvaluation,
276 );
277 }
278 };
279
280 let lease = acquire_subject(
281 ctx,
282 &self.subject_id,
283 self.requester_id(),
284 None,
285 self.needs_subject_manager(),
286 )
287 .await?;
288 let metadata = get_metadata(ctx, &subject_id).await;
289 lease.finish(ctx).await?;
290 let metadata = metadata?;
291
292 if confirm && !metadata.schema_id.is_gov() {
293 return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
294 }
295
296 Ok(metadata)
297 }
298
299 async fn get_governance_data(
300 &self,
301 ctx: &mut ActorContext<Self>,
302 ) -> Result<GovernanceData, RequestManagerError> {
303 let governance_id =
304 self.governance_id.as_ref().unwrap_or(&self.subject_id);
305 Ok(get_gov(ctx, governance_id).await?)
306 }
307
308 async fn build_request_eval(
309 &self,
310 ctx: &mut ActorContext<Self>,
311 metadata: &Metadata,
312 request: &Signed<EventRequest>,
313 ) -> Result<
314 (
315 Signed<EvaluationReq>,
316 Quorum,
317 HashSet<PublicKey>,
318 Option<ValueWrapper>,
319 ),
320 RequestManagerError,
321 > {
322 let is_gov = metadata.schema_id.is_gov();
323
324 let request_type = EventRequestType::from(request.content());
325 let (evaluate_data, governance_data, init_state) = match (
326 is_gov,
327 request_type,
328 ) {
329 (true, EventRequestType::Fact) => {
330 let state =
331 GovernanceData::try_from(metadata.properties.clone())?;
332
333 (
334 EvaluateData::GovFact {
335 state: state.clone(),
336 },
337 state,
338 None,
339 )
340 }
341 (true, EventRequestType::Transfer) => {
342 let state =
343 GovernanceData::try_from(metadata.properties.clone())?;
344
345 (
346 EvaluateData::GovTransfer {
347 state: state.clone(),
348 },
349 state,
350 None,
351 )
352 }
353 (true, EventRequestType::Confirm) => {
354 let state =
355 GovernanceData::try_from(metadata.properties.clone())?;
356
357 (
358 EvaluateData::GovConfirm {
359 state: state.clone(),
360 },
361 state,
362 None,
363 )
364 }
365 (false, EventRequestType::Fact) => {
366 let governance_data =
367 get_gov(ctx, &metadata.governance_id).await?;
368
369 let init_state =
370 governance_data.get_init_state(&metadata.schema_id)?;
371
372 (
373 EvaluateData::TrackerSchemasFact {
374 contract: format!(
375 "{}_{}",
376 metadata.governance_id, metadata.schema_id
377 ),
378 state: metadata.properties.clone(),
379 },
380 governance_data,
381 Some(init_state),
382 )
383 }
384 (false, EventRequestType::Transfer) => {
385 let governance_data =
386 get_gov(ctx, &metadata.governance_id).await?;
387 (
388 EvaluateData::TrackerSchemasTransfer {
389 governance_data: governance_data.clone(),
390 namespace: metadata.namespace.clone(),
391 schema_id: metadata.schema_id.clone(),
392 state: metadata.properties.clone(),
393 },
394 governance_data,
395 None,
396 )
397 }
398 _ => unreachable!(
399 "It was previously verified that the matched cases are the only possible ones"
400 ),
401 };
402
403 let (signers, quorum) = governance_data.get_quorum_and_signers(
404 ProtocolTypes::Evaluation,
405 &metadata.schema_id,
406 metadata.namespace.clone(),
407 )?;
408
409 let eval_req = EvaluationReq {
410 event_request: request.clone(),
411 data: evaluate_data,
412 sn: metadata.sn + 1,
413 gov_version: governance_data.version,
414 namespace: metadata.namespace.clone(),
415 schema_id: metadata.schema_id.clone(),
416 signer: (*self.our_key).clone(),
417 signer_is_owner: *self.our_key == request.signature().signer,
418 governance_id: metadata.governance_id.clone(),
419 };
420
421 let signature =
422 get_sign(ctx, SignTypesNode::EvaluationReq(eval_req.clone()))
423 .await?;
424
425 let signed_evaluation_req: Signed<EvaluationReq> =
426 Signed::from_parts(eval_req, signature);
427 Ok((signed_evaluation_req, quorum, signers, init_state))
428 }
429
430 async fn run_evaluation(
431 &mut self,
432 ctx: &mut ActorContext<Self>,
433 request: Signed<EvaluationReq>,
434 quorum: Quorum,
435 init_state: Option<ValueWrapper>,
436 signers: HashSet<PublicKey>,
437 ) -> Result<(), RequestManagerError> {
438 let Some((hash, network)) = self.helpers.clone() else {
439 return Err(RequestManagerError::HelpersNotInitialized);
440 };
441
442 self.start_phase_metrics("evaluation");
443 info!("Init evaluation {}", self.id);
444 let child = ctx
445 .create_child(
446 "evaluation",
447 Evaluation::new(
448 self.our_key.clone(),
449 request,
450 quorum,
451 init_state,
452 hash,
453 network,
454 ),
455 )
456 .await?;
457
458 child
459 .tell(EvaluationMessage::Create {
460 request_id: self.id.clone(),
461 version: self.version,
462 signers,
463 })
464 .await?;
465
466 send_to_tracking(
467 ctx,
468 RequestTrackingMessage::UpdateState {
469 request_id: self.id.clone(),
470 state: RequestState::Evaluation,
471 },
472 )
473 .await?;
474
475 Ok(())
476 }
477 async fn build_request_appro(
480 &self,
481 ctx: &mut ActorContext<Self>,
482 eval_req: EvaluationReq,
483 evaluator_res: EvaluatorResponse,
484 ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
485 let request = ApprovalReq {
486 subject_id: self.subject_id.clone(),
487 sn: eval_req.sn,
488 gov_version: eval_req.gov_version,
489 patch: evaluator_res.patch,
490 signer: eval_req.signer,
491 };
492
493 let signature =
494 get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
495
496 let signed_approval_req: Signed<ApprovalReq> =
497 Signed::from_parts(request, signature);
498
499 Ok(signed_approval_req)
500 }
501
502 async fn build_approval(
503 &mut self,
504 ctx: &mut ActorContext<Self>,
505 eval_req: EvaluationReq,
506 eval_res: EvaluatorResponse,
507 ) -> Result<(), RequestManagerError> {
508 let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
509
510 let governance_data =
511 get_gov(ctx, &request.content().subject_id).await?;
512
513 let (signers, quorum) = governance_data.get_quorum_and_signers(
514 ProtocolTypes::Approval,
515 &SchemaType::Governance,
516 Namespace::new(),
517 )?;
518
519 if signers.is_empty() {
520 warn!(
521 request_id = %self.id,
522 schema_id = %SchemaType::Governance,
523 "No approvers available for schema"
524 );
525
526 return Err(RequestManagerError::NoApproversAvailable {
527 schema_id: SchemaType::Governance.to_string(),
528 governance_id: self
529 .governance_id
530 .clone()
531 .unwrap_or_else(|| self.subject_id.clone()),
532 });
533 }
534
535 self.run_approval(ctx, request, quorum, signers).await
536 }
537
538 async fn run_approval(
539 &mut self,
540 ctx: &mut ActorContext<Self>,
541 request: Signed<ApprovalReq>,
542 quorum: Quorum,
543 signers: HashSet<PublicKey>,
544 ) -> Result<(), RequestManagerError> {
545 let Some((hash, network)) = self.helpers.clone() else {
546 return Err(RequestManagerError::HelpersNotInitialized);
547 };
548
549 self.start_phase_metrics("approval");
550 info!("Init approval {}", self.id);
551 let child = ctx
552 .create_child(
553 "approval",
554 Approval::new(
555 self.our_key.clone(),
556 request,
557 quorum,
558 signers,
559 hash,
560 network,
561 ),
562 )
563 .await?;
564
565 child
566 .tell(ApprovalMessage::Create {
567 request_id: self.id.clone(),
568 version: self.version,
569 })
570 .await?;
571
572 send_to_tracking(
573 ctx,
574 RequestTrackingMessage::UpdateState {
575 request_id: self.id.clone(),
576 state: RequestState::Approval,
577 },
578 )
579 .await?;
580
581 Ok(())
582 }
583
584 async fn build_validation_req(
587 &mut self,
588 ctx: &mut ActorContext<Self>,
589 eval: Option<(EvaluationReq, EvaluationData)>,
590 appro_data: Option<ApprovalData>,
591 ) -> Result<
592 (
593 Signed<ValidationReq>,
594 Quorum,
595 HashSet<PublicKey>,
596 Option<ValueWrapper>,
597 CurrentRequestRoles,
598 ),
599 RequestManagerError,
600 > {
601 let (
602 vali_req,
603 quorum,
604 signers,
605 init_state,
606 current_request_roles,
607 schema_id,
608 ) = self.build_validation_data(ctx, eval, appro_data).await?;
609
610 if signers.is_empty() {
611 warn!(
612 request_id = %self.id,
613 schema_id = %schema_id,
614 "No validators available for schema"
615 );
616
617 return Err(RequestManagerError::NoValidatorsAvailable {
618 schema_id: schema_id.to_string(),
619 governance_id: vali_req.get_governance_id().expect("The build process verified that the event request is valid")
620 });
621 }
622
623 let signature = get_sign(
624 ctx,
625 SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
626 )
627 .await?;
628
629 let signed_validation_req: Signed<ValidationReq> =
630 Signed::from_parts(vali_req, signature);
631
632 self.on_event(
633 RequestManagerEvent::UpdateState {
634 state: Box::new(RequestManagerState::Validation {
635 request: Box::new(signed_validation_req.clone()),
636 quorum: quorum.clone(),
637 init_state: init_state.clone(),
638 current_request_roles: current_request_roles.clone(),
639 signers: signers.clone(),
640 }),
641 },
642 ctx,
643 )
644 .await;
645
646 Ok((
647 signed_validation_req,
648 quorum,
649 signers,
650 init_state,
651 current_request_roles,
652 ))
653 }
654
655 async fn build_validation_data(
656 &self,
657 ctx: &mut ActorContext<Self>,
658 eval: Option<(EvaluationReq, EvaluationData)>,
659 appro_data: Option<ApprovalData>,
660 ) -> Result<
661 (
662 ValidationReq,
663 Quorum,
664 HashSet<PublicKey>,
665 Option<ValueWrapper>,
666 CurrentRequestRoles,
667 SchemaType,
668 ),
669 RequestManagerError,
670 > {
671 let Some(request) = self.request.clone() else {
672 return Err(RequestManagerError::RequestNotSet);
673 };
674
675 if let EventRequest::Create(create) = request.content() {
676 if create.schema_id.is_gov() {
677 let governance_data =
678 GovernanceData::new((*self.our_key).clone());
679 let (signers, quorum) = governance_data
680 .get_quorum_and_signers(
681 ProtocolTypes::Validation,
682 &SchemaType::Governance,
683 Namespace::new(),
684 )?;
685
686 Ok((
687 ValidationReq::Create {
688 event_request: request.clone(),
689 gov_version: 0,
690 subject_id: self.subject_id.clone(),
691 },
692 quorum,
693 signers,
694 None,
695 CurrentRequestRoles {
696 evaluation: RoleDataRegister {
697 workers: HashSet::new(),
698 quorum: Quorum::default(),
699 },
700 approval: RoleDataRegister {
701 workers: HashSet::new(),
702 quorum: Quorum::default(),
703 },
704 },
705 SchemaType::Governance,
706 ))
707 } else {
708 let governance_data =
709 get_gov(ctx, &create.governance_id).await?;
710
711 let (signers, quorum) = governance_data
712 .get_quorum_and_signers(
713 ProtocolTypes::Validation,
714 &create.schema_id,
715 create.namespace.clone(),
716 )?;
717
718 let init_state =
719 governance_data.get_init_state(&create.schema_id)?;
720
721 Ok((
722 ValidationReq::Create {
723 event_request: request.clone(),
724 gov_version: governance_data.version,
725 subject_id: self.subject_id.clone(),
726 },
727 quorum,
728 signers,
729 Some(init_state),
730 CurrentRequestRoles {
731 evaluation: RoleDataRegister {
732 workers: HashSet::new(),
733 quorum: Quorum::default(),
734 },
735 approval: RoleDataRegister {
736 workers: HashSet::new(),
737 quorum: Quorum::default(),
738 },
739 },
740 create.schema_id.clone(),
741 ))
742 }
743 } else {
744 let Some((hash, ..)) = self.helpers else {
745 return Err(RequestManagerError::HelpersNotInitialized);
746 };
747
748 let governance_data = self.get_governance_data(ctx).await?;
749
750 let (actual_protocols, gov_version, sn) =
751 if let Some((eval_req, eval_data)) = eval {
752 if let Some(approval_data) = appro_data.clone() {
753 (
754 ActualProtocols::EvalApprove {
755 eval_data,
756 approval_data,
757 },
758 eval_req.gov_version,
759 Some(eval_req.sn),
760 )
761 } else {
762 (
763 ActualProtocols::Eval { eval_data },
764 eval_req.gov_version,
765 Some(eval_req.sn),
766 )
767 }
768 } else {
769 (ActualProtocols::None, governance_data.version, None)
770 };
771
772 let lease = acquire_subject(
773 ctx,
774 &self.subject_id,
775 self.requester_id(),
776 None,
777 self.needs_subject_manager(),
778 )
779 .await?;
780 let metadata = get_metadata(ctx, &self.subject_id).await;
781 let last_ledger_event = match metadata {
782 Ok(metadata) => {
783 let last_ledger_event =
784 get_last_ledger_event(ctx, &self.subject_id).await;
785 lease.finish(ctx).await?;
786 (metadata, last_ledger_event?)
787 }
788 Err(error) => {
789 lease.finish(ctx).await?;
790 return Err(error.into());
791 }
792 };
793
794 let (metadata, last_ledger_event) = last_ledger_event;
795 let sn = if let Some(sn) = sn {
796 sn
797 } else {
798 metadata.sn + 1
799 };
800
801 let (signers, quorum) = governance_data.get_quorum_and_signers(
802 ProtocolTypes::Validation,
803 &metadata.schema_id,
804 metadata.namespace.clone(),
805 )?;
806
807 let Some(last_ledger_event) = last_ledger_event else {
808 return Err(RequestManagerError::LastLedgerEventNotFound);
809 };
810
811 let ledger_hash = hash_borsh(&*hash.hasher(), &last_ledger_event.0)
812 .map_err(|e| RequestManagerError::LedgerHashFailed {
813 details: e.to_string(),
814 })?;
815
816 let schema_id = metadata.schema_id.clone();
817
818 let current_request_roles =
819 if gov_version == governance_data.version {
820 let (evaluation_workers, evaluation_quorum) =
821 governance_data.get_quorum_and_signers(
822 ProtocolTypes::Evaluation,
823 &metadata.schema_id,
824 metadata.namespace.clone(),
825 )?;
826
827 let (approval_workers, approval_quorum) =
828 if appro_data.is_some() {
829 governance_data.get_quorum_and_signers(
830 ProtocolTypes::Approval,
831 &SchemaType::Governance,
832 Namespace::new(),
833 )?
834 } else {
835 (HashSet::new(), Quorum::default())
836 };
837
838 CurrentRequestRoles {
839 evaluation: RoleDataRegister {
840 workers: evaluation_workers,
841 quorum: evaluation_quorum,
842 },
843 approval: RoleDataRegister {
844 workers: approval_workers,
845 quorum: approval_quorum,
846 },
847 }
848 } else {
849 CurrentRequestRoles {
850 evaluation: RoleDataRegister {
851 workers: HashSet::new(),
852 quorum: Quorum::default(),
853 },
854 approval: RoleDataRegister {
855 workers: HashSet::new(),
856 quorum: Quorum::default(),
857 },
858 }
859 };
860
861 Ok((
862 ValidationReq::Event {
863 actual_protocols: Box::new(actual_protocols),
864 event_request: request.clone(),
865 metadata: Box::new(metadata),
866 last_data: Box::new(LastData {
867 vali_data: last_ledger_event
868 .content()
869 .protocols
870 .get_validation_data(),
871 gov_version: last_ledger_event.content().gov_version,
872 }),
873 gov_version,
874 ledger_hash,
875 sn,
876 },
877 quorum,
878 signers,
879 None,
880 current_request_roles,
881 schema_id,
882 ))
883 }
884 }
885
886 async fn run_validation(
887 &mut self,
888 ctx: &mut ActorContext<Self>,
889 request: Signed<ValidationReq>,
890 quorum: Quorum,
891 signers: HashSet<PublicKey>,
892 init_state: Option<ValueWrapper>,
893 current_request_roles: CurrentRequestRoles,
894 ) -> Result<(), RequestManagerError> {
895 let Some((hash, network)) = self.helpers.clone() else {
896 return Err(RequestManagerError::HelpersNotInitialized);
897 };
898
899 self.start_phase_metrics("validation");
900 info!("Init validation {}", self.id);
901 let child = ctx
902 .create_child(
903 "validation",
904 Validation::new(
905 self.our_key.clone(),
906 request,
907 init_state,
908 current_request_roles,
909 quorum,
910 hash,
911 network,
912 ),
913 )
914 .await?;
915
916 child
917 .tell(ValidationMessage::Create {
918 request_id: self.id.clone(),
919 version: self.version,
920 signers,
921 })
922 .await?;
923
924 send_to_tracking(
925 ctx,
926 RequestTrackingMessage::UpdateState {
927 request_id: self.id.clone(),
928 state: RequestState::Validation,
929 },
930 )
931 .await?;
932
933 Ok(())
934 }
935 async fn build_ledger(
938 &mut self,
939 ctx: &mut ActorContext<Self>,
940 val_req: ValidationReq,
941 val_res: ValidationData,
942 ) -> Result<SignedLedger, RequestManagerError> {
943 let ledger = match val_req {
944 ValidationReq::Create {
945 event_request,
946 gov_version,
947 ..
948 } => Ledger {
949 event_request,
950 gov_version,
951 sn: 0,
952 prev_ledger_event_hash: DigestIdentifier::default(),
953 protocols: Protocols::Create {
954 validation: val_res,
955 },
956 },
957 ValidationReq::Event {
958 actual_protocols,
959 event_request,
960 metadata,
961 gov_version,
962 sn,
963 ledger_hash,
964 ..
965 } => Ledger {
966 gov_version,
967 sn,
968 prev_ledger_event_hash: ledger_hash,
969 protocols: Protocols::build(
970 metadata.schema_id.is_gov(),
971 EventRequestType::from(event_request.content()),
972 *actual_protocols,
973 val_res,
974 )?,
975 event_request,
976 },
977 };
978
979 let signature =
980 get_sign(ctx, SignTypesNode::Ledger(ledger.clone())).await?;
981
982 let ledger = SignedLedger(Signed::from_parts(ledger, signature));
983
984 self.on_event(
985 RequestManagerEvent::UpdateState {
986 state: Box::new(RequestManagerState::UpdateSubject {
987 ledger: ledger.clone(),
988 }),
989 },
990 ctx,
991 )
992 .await;
993
994 Ok(ledger)
995 }
996
997 async fn update_subject(
998 &mut self,
999 ctx: &mut ActorContext<Self>,
1000 ledger: SignedLedger,
1001 ) -> Result<(), RequestManagerError> {
1002 if ledger.content().event_request.content().is_create_event() {
1003 if let Err(e) = create_subject(ctx, ledger.clone()).await {
1004 if let ActorError::Functional { .. } = e {
1005 return Err(RequestManagerError::CheckLimit);
1006 }
1007 return Err(e.into());
1008 }
1009 } else {
1010 let lease = acquire_subject(
1011 ctx,
1012 &self.subject_id,
1013 self.requester_id(),
1014 None,
1015 self.needs_subject_manager(),
1016 )
1017 .await?;
1018 let update_result =
1019 update_ledger(ctx, &self.subject_id, vec![ledger.clone()])
1020 .await;
1021 lease.finish(ctx).await?;
1022 update_result?;
1023 }
1024
1025 self.on_event(
1026 RequestManagerEvent::UpdateState {
1027 state: Box::new(RequestManagerState::Distribution { ledger }),
1028 },
1029 ctx,
1030 )
1031 .await;
1032
1033 Ok(())
1034 }
1035
1036 async fn build_distribution(
1037 &mut self,
1038 ctx: &mut ActorContext<Self>,
1039 ledger: SignedLedger,
1040 ) -> Result<bool, RequestManagerError> {
1041 let witnesses = self
1042 .build_distribution_data(ctx, ledger.signature().signer.clone())
1043 .await?;
1044
1045 let Some(mut witnesses) = witnesses else {
1046 return Ok(false);
1047 };
1048
1049 witnesses.remove(&self.our_key);
1050
1051 if witnesses.is_empty() {
1052 warn!(
1053 request_id = %self.id,
1054 "No witnesses available for distribution"
1055 );
1056 return Ok(false);
1057 }
1058
1059 self.run_distribution(ctx, witnesses, ledger).await?;
1060
1061 Ok(true)
1062 }
1063
1064 async fn build_distribution_data(
1065 &self,
1066 ctx: &mut ActorContext<Self>,
1067 creator: PublicKey,
1068 ) -> Result<Option<HashSet<PublicKey>>, RequestManagerError> {
1069 let Some(request) = self.request.clone() else {
1070 return Err(RequestManagerError::RequestNotSet);
1071 };
1072
1073 let witnesses = if let EventRequest::Create(create) = request.content()
1074 {
1075 if create.schema_id == SchemaType::Governance {
1076 None
1077 } else {
1078 let governance_data = self.get_governance_data(ctx).await?;
1079
1080 let witnesses =
1081 governance_data.get_witnesses(WitnessesData::Schema {
1082 creator,
1083 schema_id: create.schema_id.clone(),
1084 namespace: create.namespace.clone(),
1085 })?;
1086
1087 Some(witnesses)
1088 }
1089 } else {
1090 let data = get_subject_data(ctx, &self.subject_id).await?;
1091
1092 let Some(data) = data else {
1093 return Err(RequestManagerError::SubjectDataNotFound {
1094 subject_id: self.subject_id.to_string(),
1095 });
1096 };
1097
1098 let governance_data = self.get_governance_data(ctx).await?;
1099
1100 let witnesses = match data {
1101 SubjectData::Governance { .. } => {
1102 governance_data.get_witnesses(WitnessesData::Gov)?
1103 }
1104 SubjectData::Tracker {
1105 schema_id,
1106 namespace,
1107 ..
1108 } => governance_data.get_witnesses(WitnessesData::Schema {
1109 creator,
1110 schema_id,
1111 namespace: Namespace::from(namespace),
1112 })?,
1113 };
1114
1115 Some(witnesses)
1116 };
1117
1118 Ok(witnesses)
1119 }
1120
1121 async fn run_distribution(
1122 &mut self,
1123 ctx: &mut ActorContext<Self>,
1124 witnesses: HashSet<PublicKey>,
1125 ledger: SignedLedger,
1126 ) -> Result<(), RequestManagerError> {
1127 let Some((.., network)) = self.helpers.clone() else {
1128 return Err(RequestManagerError::HelpersNotInitialized);
1129 };
1130
1131 self.start_phase_metrics("distribution");
1132 info!("Init distribution {}", self.id);
1133 let child = ctx
1134 .create_child(
1135 "distribution",
1136 Distribution::new(
1137 network,
1138 DistributionType::Request,
1139 self.id.clone(),
1140 ),
1141 )
1142 .await?;
1143
1144 child
1145 .tell(DistributionMessage::Create {
1146 ledger: Box::new(ledger),
1147 witnesses,
1148 })
1149 .await?;
1150
1151 send_to_tracking(
1152 ctx,
1153 RequestTrackingMessage::UpdateState {
1154 request_id: self.id.clone(),
1155 state: RequestState::Distribution,
1156 },
1157 )
1158 .await?;
1159
1160 Ok(())
1161 }
1162
1163 async fn init_wait(
1166 &self,
1167 ctx: &mut ActorContext<Self>,
1168 governance_id: &DigestIdentifier,
1169 ) -> Result<(), RequestManagerError> {
1170 let actor = ctx
1171 .create_child(
1172 "reboot",
1173 Reboot::new(governance_id.clone(), self.id.clone()),
1174 )
1175 .await?;
1176
1177 actor.tell(RebootMessage::Init).await?;
1178
1179 Ok(())
1180 }
1181
1182 async fn init_update(
1183 &self,
1184 ctx: &mut ActorContext<Self>,
1185 governance_id: &DigestIdentifier,
1186 ) -> Result<(), RequestManagerError> {
1187 let Some((.., network)) = self.helpers.clone() else {
1188 return Err(RequestManagerError::HelpersNotInitialized);
1189 };
1190
1191 let gov_sn = get_gov_sn(ctx, governance_id).await?;
1192
1193 let governance_data = get_gov(ctx, governance_id).await?;
1194
1195 let mut witnesses = {
1196 let gov_witnesses =
1197 governance_data.get_witnesses(WitnessesData::Gov)?;
1198
1199 let auth_witnesses =
1200 Self::get_witnesses_auth(ctx, governance_id.clone())
1201 .await
1202 .unwrap_or_default();
1203
1204 gov_witnesses
1205 .union(&auth_witnesses)
1206 .cloned()
1207 .collect::<HashSet<PublicKey>>()
1208 };
1209
1210 witnesses.remove(&self.our_key);
1211
1212 if witnesses.is_empty() {
1213 if let Ok(actor) = ctx.reference().await {
1214 actor
1215 .tell(RequestManagerMessage::FinishReboot {
1216 request_id: self.id.clone(),
1217 })
1218 .await?;
1219 };
1220 } else if witnesses.len() == 1 {
1221 let objetive = witnesses.iter().next().expect("len is 1");
1222 let info = ComunicateInfo {
1223 receiver: objetive.clone(),
1224 request_id: String::default(),
1225 version: 0,
1226 receiver_actor: format!(
1227 "/user/node/distributor_{}",
1228 governance_id
1229 ),
1230 };
1231
1232 network
1233 .send_command(network::CommandHelper::SendMessage {
1234 message: NetworkMessage {
1235 info,
1236 message: ActorMessage::DistributionLedgerReq {
1237 actual_sn: Some(gov_sn),
1238 subject_id: governance_id.clone(),
1239 },
1240 },
1241 })
1242 .await?;
1243
1244 let Ok(actor) = ctx.reference().await else {
1245 return Ok(());
1246 };
1247
1248 actor
1249 .tell(RequestManagerMessage::RebootWait {
1250 request_id: self.id.clone(),
1251 governance_id: governance_id.clone(),
1252 })
1253 .await?;
1254 } else {
1255 let data = UpdateNew {
1256 network,
1257 subject_id: governance_id.clone(),
1258 our_sn: Some(gov_sn),
1259 witnesses,
1260 update_type: UpdateType::Request {
1261 subject_id: self.subject_id.clone(),
1262 id: self.id.clone(),
1263 },
1264 };
1265
1266 let updater = Update::new(data);
1267 let Ok(child) = ctx.create_child("update", updater).await else {
1268 let Ok(actor) = ctx.reference().await else {
1269 return Ok(());
1270 };
1271
1272 actor
1273 .tell(RequestManagerMessage::RebootWait {
1274 request_id: self.id.clone(),
1275 governance_id: governance_id.clone(),
1276 })
1277 .await?;
1278
1279 return Ok(());
1280 };
1281
1282 child.tell(UpdateMessage::Run).await?;
1283 }
1284
1285 Ok(())
1286 }
1287
1288 async fn get_witnesses_auth(
1289 ctx: &ActorContext<Self>,
1290 governance_id: DigestIdentifier,
1291 ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1292 let path = ActorPath::from("/user/node/auth");
1293 let actor = ctx.system().get_actor::<Auth>(&path).await?;
1294
1295 let response = actor
1296 .ask(AuthMessage::GetAuth {
1297 subject_id: governance_id,
1298 })
1299 .await?;
1300
1301 match response {
1302 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1303 _ => Err(RequestManagerError::ActorError(
1304 ActorError::UnexpectedResponse {
1305 path,
1306 expected: "AuthResponse::Witnesses".to_owned(),
1307 },
1308 )),
1309 }
1310 }
1311
1312 async fn send_reboot(
1315 &self,
1316 ctx: &ActorContext<Self>,
1317 governance_id: DigestIdentifier,
1318 ) -> Result<(), ActorError> {
1319 let Ok(actor) = ctx.reference().await else {
1320 return Ok(());
1321 };
1322
1323 actor
1324 .tell(RequestManagerMessage::Reboot {
1325 request_id: self.id.clone(),
1326 governance_id,
1327 reboot_type: RebootType::TimeOut,
1328 })
1329 .await
1330 }
1331
1332 async fn match_error(
1333 &mut self,
1334 ctx: &mut ActorContext<Self>,
1335 error: RequestManagerError,
1336 ) {
1337 match error {
1338 RequestManagerError::NoEvaluatorsAvailable {
1339 governance_id,
1340 ..
1341 }
1342 | RequestManagerError::NoApproversAvailable {
1343 governance_id, ..
1344 }
1345 | RequestManagerError::NoValidatorsAvailable {
1346 governance_id,
1347 ..
1348 } => {
1349 if let Err(e) = self.send_reboot(ctx, governance_id).await {
1350 emit_fail(ctx, e).await;
1351 }
1352 }
1353 RequestManagerError::CheckLimit
1354 | RequestManagerError::Governance(..) => {
1355 if let Err(e) = self
1356 .abort_request(
1357 ctx,
1358 error.to_string(),
1359 None,
1360 (*self.our_key).clone(),
1361 )
1362 .await
1363 {
1364 emit_fail(
1365 ctx,
1366 ActorError::FunctionalCritical {
1367 description: e.to_string(),
1368 },
1369 )
1370 .await;
1371 }
1372 }
1373 _ => {
1374 emit_fail(
1375 ctx,
1376 ActorError::FunctionalCritical {
1377 description: error.to_string(),
1378 },
1379 )
1380 .await;
1381 }
1382 }
1383 }
1384
1385 async fn finish_request(
1386 &mut self,
1387 ctx: &mut ActorContext<Self>,
1388 ) -> Result<(), RequestManagerError> {
1389 self.finish_request_metrics("finished");
1390 info!("Ending {}", self.id);
1391 send_to_tracking(
1392 ctx,
1393 RequestTrackingMessage::UpdateState {
1394 request_id: self.id.clone(),
1395 state: RequestState::Finish,
1396 },
1397 )
1398 .await?;
1399
1400 self.on_event(RequestManagerEvent::Finish, ctx).await;
1401
1402 self.end_request(ctx).await?;
1403
1404 Ok(())
1405 }
1406
1407 async fn reboot(
1408 &mut self,
1409 ctx: &mut ActorContext<Self>,
1410 reboot_type: RebootType,
1411 governance_id: DigestIdentifier,
1412 ) -> Result<(), RequestManagerError> {
1413 self.start_phase_metrics("reboot");
1414 self.on_event(
1415 RequestManagerEvent::UpdateState {
1416 state: Box::new(RequestManagerState::Reboot),
1417 },
1418 ctx,
1419 )
1420 .await;
1421
1422 let Ok(actor) = ctx.reference().await else {
1423 return Ok(());
1424 };
1425
1426 let request_id = self.id.clone();
1427
1428 match reboot_type {
1429 RebootType::Normal => {
1430 info!("Launching Normal reboot {}", self.id);
1431 send_to_tracking(
1432 ctx,
1433 RequestTrackingMessage::UpdateState {
1434 request_id: self.id.clone(),
1435 state: RequestState::Reboot,
1436 },
1437 )
1438 .await?;
1439
1440 actor
1441 .tell(RequestManagerMessage::RebootUpdate {
1442 request_id,
1443 governance_id,
1444 })
1445 .await?;
1446 }
1447 RebootType::Diff => {
1448 info!("Launching Diff reboot {}", self.id);
1449 self.retry_diff += 1;
1450
1451 let seconds = match self.retry_diff {
1452 1 => 10,
1453 2 => 20,
1454 3 => 30,
1455 _ => 60,
1456 };
1457
1458 info!(
1459 "Launching Diff reboot {}, try: {}, seconds: {}",
1460 self.id, self.retry_diff, seconds
1461 );
1462
1463 send_to_tracking(
1464 ctx,
1465 RequestTrackingMessage::UpdateState {
1466 request_id: self.id.clone(),
1467 state: RequestState::RebootDiff {
1468 seconds,
1469 count: self.retry_diff,
1470 },
1471 },
1472 )
1473 .await?;
1474
1475 tokio::spawn(async move {
1476 tokio::time::sleep(Duration::from_secs(seconds)).await;
1477 let _ = actor
1478 .tell(RequestManagerMessage::RebootUpdate {
1479 request_id,
1480 governance_id,
1481 })
1482 .await;
1483 });
1484 }
1485 RebootType::TimeOut => {
1486 self.retry_timeout += 1;
1487
1488 #[cfg(not(any(test, feature = "test")))]
1489 let seconds = match self.retry_timeout {
1490 1 => 30,
1491 2 => 60,
1492 3 => 120,
1493 _ => 300,
1494 };
1495
1496 #[cfg(any(test, feature = "test"))]
1497 let seconds = match self.retry_timeout {
1498 1 => 5,
1499 2 => 5,
1500 3 => 5,
1501 _ => 5,
1502 };
1503
1504 info!(
1505 "Launching TimeOut reboot {}, try: {}, seconds: {}",
1506 self.id, self.retry_timeout, seconds
1507 );
1508 send_to_tracking(
1509 ctx,
1510 RequestTrackingMessage::UpdateState {
1511 request_id: self.id.clone(),
1512 state: RequestState::RebootTimeOut {
1513 seconds,
1514 count: self.retry_timeout,
1515 },
1516 },
1517 )
1518 .await?;
1519
1520 tokio::spawn(async move {
1521 tokio::time::sleep(Duration::from_secs(seconds)).await;
1522 let _ = actor
1523 .tell(RequestManagerMessage::RebootUpdate {
1524 request_id,
1525 governance_id,
1526 })
1527 .await;
1528 });
1529 }
1530 }
1531
1532 Ok(())
1533 }
1534
1535 async fn match_command(
1536 &mut self,
1537 ctx: &mut ActorContext<Self>,
1538 ) -> Result<(), RequestManagerError> {
1539 match self.command {
1540 ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1541 ReqManInitMessage::Validate => {
1542 let (
1543 request,
1544 quorum,
1545 signers,
1546 init_state,
1547 current_request_roles,
1548 ) = self.build_validation_req(ctx, None, None).await?;
1549
1550 self.run_validation(
1551 ctx,
1552 request,
1553 quorum,
1554 signers,
1555 init_state,
1556 current_request_roles,
1557 )
1558 .await
1559 }
1560 }
1561 }
1562
1563 async fn check_signature(
1564 &self,
1565 ctx: &mut ActorContext<Self>,
1566 ) -> Result<(), RequestManagerError> {
1567 let Some(request) = self.request.clone() else {
1568 return Err(RequestManagerError::RequestNotSet);
1569 };
1570
1571 if let EventRequest::Fact { .. } = request.content() {
1572 let subject_data = get_subject_data(ctx, &self.subject_id).await?;
1573 let Some(subject_data) = subject_data else {
1574 return Err(RequestManagerError::SubjecData);
1575 };
1576
1577 let gov = self.get_governance_data(ctx).await?;
1578 match subject_data {
1579 SubjectData::Tracker {
1580 schema_id,
1581 namespace,
1582 ..
1583 } => {
1584 if !gov.has_this_role(HashThisRole::Schema {
1585 who: request.signature().signer.clone(),
1586 role: RoleTypes::Issuer,
1587 schema_id,
1588 namespace: Namespace::from(namespace),
1589 }) {
1590 return Err(RequestManagerError::NotIssuer);
1591 }
1592 }
1593 SubjectData::Governance { .. } => {
1594 if !gov.has_this_role(HashThisRole::Gov {
1595 who: request.signature().signer.clone(),
1596 role: RoleTypes::Issuer,
1597 }) {
1598 return Err(RequestManagerError::NotIssuer);
1599 }
1600 }
1601 }
1602 }
1603
1604 Ok(())
1605 }
1606
1607 async fn stops_childs(
1608 &self,
1609 ctx: &mut ActorContext<Self>,
1610 ) -> Result<(), RequestManagerError> {
1611 match self.state {
1612 RequestManagerState::Reboot => {
1613 if let Ok(actor) = ctx.get_child::<Update>("update").await {
1614 actor.ask_stop().await?;
1615 };
1616 if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1617 actor.ask_stop().await?;
1618 };
1619 }
1620 RequestManagerState::Evaluation => {
1621 if let Ok(actor) =
1622 ctx.get_child::<Evaluation>("evaluation").await
1623 {
1624 actor.ask_stop().await?;
1625 };
1626 }
1627 RequestManagerState::Approval { .. } => {
1628 if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1629 actor.ask_stop().await?;
1630 };
1631 let _ = make_obsolete(ctx, &self.subject_id).await;
1632 }
1633 RequestManagerState::Validation { .. } => {
1634 if let Ok(actor) =
1635 ctx.get_child::<Validation>("validation").await
1636 {
1637 actor.ask_stop().await?;
1638 };
1639 }
1640 RequestManagerState::Distribution { .. } => {
1641 if let Ok(actor) =
1642 ctx.get_child::<Distribution>("distribution").await
1643 {
1644 actor.ask_stop().await?;
1645 };
1646 }
1647 _ => {}
1648 }
1649
1650 Ok(())
1651 }
1652
1653 async fn abort_request(
1654 &mut self,
1655 ctx: &mut ActorContext<Self>,
1656 error: String,
1657 sn: Option<u64>,
1658 who: PublicKey,
1659 ) -> Result<(), RequestManagerError> {
1660 self.stops_childs(ctx).await?;
1661
1662 self.finish_request_metrics("aborted");
1663 info!("Aborting {}", self.id);
1664 send_to_tracking(
1665 ctx,
1666 RequestTrackingMessage::UpdateState {
1667 request_id: self.id.clone(),
1668 state: RequestState::Abort {
1669 subject_id: self.subject_id.to_string(),
1670 error,
1671 sn,
1672 who: who.to_string(),
1673 },
1674 },
1675 )
1676 .await?;
1677
1678 self.on_event(RequestManagerEvent::Finish, ctx).await;
1679
1680 self.end_request(ctx).await?;
1681
1682 Ok(())
1683 }
1684
1685 async fn end_request(
1686 &self,
1687 ctx: &ActorContext<Self>,
1688 ) -> Result<(), RequestManagerError> {
1689 let actor = ctx.get_parent::<RequestHandler>().await?;
1690 actor
1691 .tell(RequestHandlerMessage::EndHandling {
1692 subject_id: self.subject_id.clone(),
1693 })
1694 .await?;
1695
1696 Ok(())
1697 }
1698}
1699
1700#[derive(Debug, Clone)]
1701pub enum RequestManagerMessage {
1702 Run {
1703 request_id: DigestIdentifier,
1704 },
1705 FirstRun {
1706 command: ReqManInitMessage,
1707 request: Signed<EventRequest>,
1708 request_id: DigestIdentifier,
1709 },
1710 Abort {
1711 request_id: DigestIdentifier,
1712 who: PublicKey,
1713 reason: String,
1714 sn: u64,
1715 },
1716 ManualAbort,
1717 PurgeStorage,
1718 Reboot {
1719 request_id: DigestIdentifier,
1720 governance_id: DigestIdentifier,
1721 reboot_type: RebootType,
1722 },
1723 RebootUpdate {
1724 request_id: DigestIdentifier,
1725 governance_id: DigestIdentifier,
1726 },
1727 RebootWait {
1728 request_id: DigestIdentifier,
1729 governance_id: DigestIdentifier,
1730 },
1731 FinishReboot {
1732 request_id: DigestIdentifier,
1733 },
1734 EvaluationRes {
1735 request_id: DigestIdentifier,
1736 eval_req: Box<EvaluationReq>,
1737 eval_res: EvaluationData,
1738 },
1739 ApprovalRes {
1740 request_id: DigestIdentifier,
1741 appro_res: ApprovalData,
1742 },
1743 ValidationRes {
1744 request_id: DigestIdentifier,
1745 val_req: Box<ValidationReq>,
1746 val_res: ValidationData,
1747 },
1748 FinishRequest {
1749 request_id: DigestIdentifier,
1750 },
1751}
1752
1753impl Message for RequestManagerMessage {}
1754
1755#[derive(
1756 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
1757)]
1758pub enum RequestManagerEvent {
1759 Finish,
1760 UpdateState {
1761 state: Box<RequestManagerState>,
1762 },
1763 UpdateVersion {
1764 version: u64,
1765 },
1766 SafeState {
1767 command: ReqManInitMessage,
1768 request: Signed<EventRequest>,
1769 },
1770}
1771
1772impl Event for RequestManagerEvent {}
1773
1774#[async_trait]
1775impl Actor for RequestManager {
1776 type Event = RequestManagerEvent;
1777 type Message = RequestManagerMessage;
1778 type Response = ();
1779
1780 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1781 parent_span.map_or_else(
1782 || info_span!("RequestManager", id),
1783 |parent_span| info_span!(parent: parent_span, "RequestManager", id),
1784 )
1785 }
1786
1787 async fn pre_start(
1788 &mut self,
1789 ctx: &mut ActorContext<Self>,
1790 ) -> Result<(), ActorError> {
1791 if let Err(e) =
1792 self.init_store("request_manager", None, false, ctx).await
1793 {
1794 error!(
1795 error = %e,
1796 subject_id = %self.subject_id,
1797 "Failed to initialize store"
1798 );
1799 return Err(e);
1800 }
1801
1802 if self.governance_id.is_none()
1803 && let Some(request) = &self.request
1804 && let EventRequest::Create(create) = request.content()
1805 && !create.schema_id.is_gov()
1806 {
1807 self.governance_id = Some(create.governance_id.clone());
1808 }
1809
1810 Ok(())
1811 }
1812}
1813
1814#[async_trait]
1815impl Handler<Self> for RequestManager {
1816 #[allow(clippy::large_stack_frames)]
1819 async fn handle_message(
1820 &mut self,
1821 _sender: ActorPath,
1822 msg: RequestManagerMessage,
1823 ctx: &mut ave_actors::ActorContext<Self>,
1824 ) -> Result<(), ActorError> {
1825 match msg {
1826 RequestManagerMessage::RebootUpdate {
1827 governance_id,
1828 request_id,
1829 } => {
1830 if request_id == self.id {
1831 info!("Init reboot update {}", self.id);
1832 debug!(
1833 msg_type = "RebootUpdate",
1834 request_id = %self.id,
1835 governance_id = %governance_id,
1836 "Initializing reboot update"
1837 );
1838
1839 if let Err(e) = self.init_update(ctx, &governance_id).await
1840 {
1841 error!(
1842 msg_type = "RebootUpdate",
1843 request_id = %self.id,
1844 governance_id = %governance_id,
1845 error = %e,
1846 "Failed to initialize reboot update"
1847 );
1848 self.match_error(ctx, e).await;
1849 return Ok(());
1850 }
1851 }
1852 }
1853 RequestManagerMessage::RebootWait {
1854 governance_id,
1855 request_id,
1856 } => {
1857 if request_id == self.id {
1858 info!("Init reboot wait {}", self.id);
1859 debug!(
1860 msg_type = "RebootWait",
1861 request_id = %self.id,
1862 governance_id = %governance_id,
1863 "Initializing reboot wait"
1864 );
1865
1866 if let Err(e) = self.init_wait(ctx, &governance_id).await {
1867 error!(
1868 msg_type = "RebootWait",
1869 request_id = %self.id,
1870 governance_id = %governance_id,
1871 error = %e,
1872 "Failed to initialize reboot wait"
1873 );
1874 self.match_error(ctx, e).await;
1875 return Ok(());
1876 }
1877 }
1878 }
1879 RequestManagerMessage::Reboot {
1880 governance_id,
1881 request_id,
1882 reboot_type,
1883 } => {
1884 if request_id == self.id {
1885 if matches!(self.state, RequestManagerState::Reboot) {
1886 debug!(
1887 msg_type = "Reboot",
1888 request_id = %self.id,
1889 governance_id = %governance_id,
1890 reboot_type = ?reboot_type,
1891 "Already in reboot state, ignoring"
1892 );
1893 } else {
1894 debug!(
1895 msg_type = "Reboot",
1896 request_id = %self.id,
1897 governance_id = %governance_id,
1898 reboot_type = ?reboot_type,
1899 "Initiating reboot"
1900 );
1901 if let Err(e) = self.stops_childs(ctx).await {
1902 error!(
1903 msg_type = "Reboot",
1904 request_id = %self.id,
1905 governance_id = %governance_id,
1906 error = %e,
1907 "Failed to stop childs"
1908 );
1909 self.match_error(ctx, e).await;
1910 return Ok(());
1911 };
1912 if let Err(e) = self
1913 .reboot(ctx, reboot_type, governance_id.clone())
1914 .await
1915 {
1916 error!(
1917 msg_type = "Reboot",
1918 request_id = %self.id,
1919 governance_id = %governance_id,
1920 error = %e,
1921 "Failed to initiate reboot"
1922 );
1923 self.match_error(ctx, e).await;
1924 return Ok(());
1925 }
1926 }
1927 }
1928 }
1929 RequestManagerMessage::FinishReboot { request_id } => {
1930 if request_id == self.id {
1931 info!("Init reboot finish {}", self.id);
1932 debug!(
1933 msg_type = "FinishReboot",
1934 request_id = %self.id,
1935 version = self.version,
1936 "Reboot completed, resuming request"
1937 );
1938 self.on_event(
1939 RequestManagerEvent::UpdateVersion {
1940 version: self.version + 1,
1941 },
1942 ctx,
1943 )
1944 .await;
1945
1946 if let Err(e) = send_to_tracking(
1947 ctx,
1948 RequestTrackingMessage::UpdateVersion {
1949 request_id: self.id.clone(),
1950 version: self.version,
1951 },
1952 )
1953 .await
1954 {
1955 error!(
1956 msg_type = "FinishReboot",
1957 request_id = %self.id,
1958 version = self.version,
1959 error = %e,
1960 "Failed to send version update to tracking"
1961 );
1962 return Err(emit_fail(ctx, e).await);
1963 }
1964
1965 if let Err(e) = self.check_signature(ctx).await {
1966 error!(
1967 msg_type = "FinishReboot",
1968 request_id = %self.id,
1969 error = %e,
1970 "Failed to check signatures after reboot"
1971 );
1972 self.match_error(ctx, e).await;
1973 return Ok(());
1974 }
1975
1976 if let Err(e) = self.match_command(ctx).await {
1977 error!(
1978 msg_type = "FinishReboot",
1979 request_id = %self.id,
1980 error = %e,
1981 "Failed to execute command after reboot"
1982 );
1983 self.match_error(ctx, e).await;
1984 return Ok(());
1985 }
1986 }
1987 }
1988 RequestManagerMessage::Abort {
1989 request_id,
1990 who,
1991 reason,
1992 sn,
1993 } => {
1994 if request_id == self.id {
1995 warn!(
1996 msg_type = "Abort",
1997 state = %self.state,
1998 request_id = %self.id,
1999 who = %who,
2000 reason = %reason,
2001 sn = sn,
2002 "Request abort received"
2003 );
2004 if let Err(e) =
2005 self.abort_request(ctx, reason, Some(sn), who).await
2006 {
2007 error!(
2008 msg_type = "Abort",
2009 request_id = %self.id,
2010 error = %e,
2011 "Failed to abort request"
2012 );
2013 self.match_error(ctx, e).await;
2014 return Ok(());
2015 }
2016 }
2017 }
2018 RequestManagerMessage::ManualAbort => {
2019 match &self.state {
2020 RequestManagerState::Reboot
2021 | RequestManagerState::Starting
2022 | RequestManagerState::Evaluation
2023 | RequestManagerState::Approval { .. }
2024 | RequestManagerState::Validation { .. } => {
2025 if let Err(e) = self
2026 .abort_request(
2027 ctx,
2028 "The user manually aborted the request"
2029 .to_owned(),
2030 None,
2031 (*self.our_key).clone(),
2032 )
2033 .await
2034 {
2035 error!(
2036 msg_type = "Abort",
2037 request_id = %self.id,
2038 error = %e,
2039 "Failed to abort request"
2040 );
2041 self.match_error(ctx, e).await;
2042 }
2043 }
2044 _ => {
2045 info!(
2046 "The request is in a state that cannot be aborted {}, state: {}",
2047 self.id, self.state
2048 );
2049 }
2050 }
2051
2052 return Ok(());
2053 }
2054 RequestManagerMessage::PurgeStorage => {
2055 purge_storage(ctx).await?;
2056
2057 debug!(
2058 msg_type = "PurgeStorage",
2059 subject_id = %self.subject_id,
2060 "Purged request manager storage"
2061 );
2062
2063 return Ok(());
2064 }
2065 RequestManagerMessage::FirstRun {
2066 command,
2067 request,
2068 request_id,
2069 } => {
2070 self.id = request_id.clone();
2071 self.begin_request_metrics();
2072 debug!(
2073 msg_type = "FirstRun",
2074 request_id = %request_id,
2075 command = ?command,
2076 "First run of request manager"
2077 );
2078 self.on_event(
2079 RequestManagerEvent::SafeState { command, request },
2080 ctx,
2081 )
2082 .await;
2083
2084 if let Err(e) = self.match_command(ctx).await {
2085 error!(
2086 msg_type = "FirstRun",
2087 request_id = %self.id,
2088 error = %e,
2089 "Failed to execute initial command"
2090 );
2091 self.match_error(ctx, e).await;
2092 return Ok(());
2093 };
2094 }
2095 RequestManagerMessage::Run { request_id } => {
2096 self.id = request_id;
2097 self.ensure_request_metrics_started();
2098
2099 debug!(
2100 msg_type = "Run",
2101 request_id = %self.id,
2102 state = ?self.state,
2103 version = self.version,
2104 "Running request manager"
2105 );
2106 match self.state.clone() {
2107 RequestManagerState::Starting
2108 | RequestManagerState::Reboot => {
2109 if let Err(e) = self.match_command(ctx).await {
2110 error!(
2111 msg_type = "Run",
2112 request_id = %self.id,
2113 state = "Starting/Reboot",
2114 error = %e,
2115 "Failed to execute command"
2116 );
2117 self.match_error(ctx, e).await;
2118 return Ok(())
2119 };
2120 }
2121 RequestManagerState::Evaluation => {
2122 if let Err(e) = self.build_evaluation(ctx).await {
2123 error!(
2124 msg_type = "Run",
2125 request_id = %self.id,
2126 state = "Evaluation",
2127 error = %e,
2128 "Failed to build evaluation"
2129 );
2130 self.match_error(ctx, e).await;
2131 return Ok(())
2132 }
2133 }
2134
2135 RequestManagerState::Approval {
2136 eval_req,
2137 eval_res,
2138 } => {
2139 if let Err(e) = self
2140 .build_approval(ctx, eval_req, eval_res.evaluator_res().expect("If the status is approval, it means that the evaluator's response is valid"))
2141 .await
2142 {
2143 error!(
2144 msg_type = "Run",
2145 request_id = %self.id,
2146 state = "Approval",
2147 error = %e,
2148 "Failed to build approval"
2149 );
2150 self.match_error(ctx, e).await;
2151 return Ok(())
2152 }
2153 }
2154 RequestManagerState::Validation {
2155 request,
2156 quorum,
2157 init_state,
2158 current_request_roles,
2159 signers,
2160 } => {
2161 if let Err(e) = self
2162 .run_validation(
2163 ctx,
2164 *request,
2165 quorum,
2166 signers,
2167 init_state,
2168 current_request_roles,
2169 )
2170 .await
2171 {
2172 error!(
2173 msg_type = "Run",
2174 request_id = %self.id,
2175 state = "Validation",
2176 error = %e,
2177 "Failed to run validation"
2178 );
2179 self.match_error(ctx, e).await;
2180 return Ok(())
2181 };
2182 }
2183 RequestManagerState::UpdateSubject { ledger } => {
2184 if let Err(e) =
2185 self.update_subject(ctx, ledger.clone()).await
2186 {
2187 error!(
2188 msg_type = "Run",
2189 request_id = %self.id,
2190 state = "UpdateSubject",
2191 error = %e,
2192 "Failed to update subject"
2193 );
2194 self.match_error(ctx, e).await;
2195 return Ok(())
2196 };
2197
2198 match self.build_distribution(ctx, ledger).await {
2199 Ok(in_distribution) => {
2200 if !in_distribution
2201 && let Err(e) =
2202 self.finish_request(ctx).await
2203 {
2204 error!(
2205 msg_type = "Run",
2206 request_id = %self.id,
2207 state = "UpdateSubject",
2208 error = %e,
2209 "Failed to finish request after build distribution"
2210 );
2211 self.match_error(ctx, e).await;
2212 return Ok(())
2213 }
2214 }
2215 Err(e) => {
2216 error!(
2217 msg_type = "Run",
2218 request_id = %self.id,
2219 state = "UpdateSubject",
2220 error = %e,
2221 "Failed to build distribution"
2222 );
2223 self.match_error(ctx, e).await;
2224 return Ok(())
2225 }
2226 };
2227 }
2228 RequestManagerState::Distribution { ledger } => {
2229 match self.build_distribution(ctx, ledger).await {
2230 Ok(in_distribution) => {
2231 if !in_distribution
2232 && let Err(e) =
2233 self.finish_request(ctx).await
2234 {
2235 error!(
2236 msg_type = "Run",
2237 request_id = %self.id,
2238 state = "Distribution",
2239 error = %e,
2240 "Failed to finish request after build distribution"
2241 );
2242 self.match_error(ctx, e).await;
2243 return Ok(())
2244 }
2245 }
2246 Err(e) => {
2247 error!(
2248 msg_type = "Run",
2249 request_id = %self.id,
2250 state = "Distribution",
2251 error = %e,
2252 "Failed to build distribution"
2253 );
2254 self.match_error(ctx, e).await;
2255 return Ok(())
2256 }
2257 };
2258 }
2259 RequestManagerState::End => {
2260 if let Err(e) = self.end_request(ctx).await {
2261 error!(
2262 msg_type = "Run",
2263 request_id = %self.id,
2264 state = "End",
2265 error = %e,
2266 "Failed to end request"
2267 );
2268 self.match_error(ctx, e).await;
2269 return Ok(())
2270 }
2271 }
2272 };
2273 }
2274 RequestManagerMessage::EvaluationRes {
2275 eval_req,
2276 eval_res,
2277 request_id,
2278 } => {
2279 if request_id == self.id {
2280 debug!(
2281 msg_type = "EvaluationRes",
2282 request_id = %self.id,
2283 version = self.version,
2284 "Evaluation result received"
2285 );
2286 if let Err(e) = self.stops_childs(ctx).await {
2287 error!(
2288 msg_type = "EvaluationRes",
2289 request_id = %self.id,
2290 error = %e,
2291 "Failed to stop childs"
2292 );
2293 self.match_error(ctx, e).await;
2294 return Ok(());
2295 };
2296
2297 if let Some(evaluator_res) = eval_res.evaluator_res()
2298 && evaluator_res.appr_required
2299 {
2300 debug!(
2301 msg_type = "EvaluationRes",
2302 request_id = %self.id,
2303 "Approval required, proceeding to approval phase"
2304 );
2305 self.on_event(
2306 RequestManagerEvent::UpdateState {
2307 state: Box::new(
2308 RequestManagerState::Approval {
2309 eval_req: *eval_req.clone(),
2310 eval_res: eval_res.clone(),
2311 },
2312 ),
2313 },
2314 ctx,
2315 )
2316 .await;
2317
2318 if let Err(e) = self
2319 .build_approval(ctx, *eval_req, evaluator_res)
2320 .await
2321 {
2322 error!(
2323 msg_type = "EvaluationRes",
2324 request_id = %self.id,
2325 error = %e,
2326 "Failed to build approval"
2327 );
2328 self.match_error(ctx, e).await;
2329 return Ok(());
2330 }
2331 } else {
2332 debug!(
2333 msg_type = "EvaluationRes",
2334 request_id = %self.id,
2335 "Approval not required, proceeding to validation phase"
2336 );
2337 let (
2338 request,
2339 quorum,
2340 signers,
2341 init_state,
2342 current_request_roles,
2343 ) = match self
2344 .build_validation_req(
2345 ctx,
2346 Some((*eval_req, eval_res)),
2347 None,
2348 )
2349 .await
2350 {
2351 Ok(data) => data,
2352 Err(e) => {
2353 error!(
2354 msg_type = "EvaluationRes",
2355 request_id = %self.id,
2356 error = %e,
2357 "Failed to build validation request"
2358 );
2359 self.match_error(ctx, e).await;
2360 return Ok(());
2361 }
2362 };
2363
2364 if let Err(e) = self
2365 .run_validation(
2366 ctx,
2367 request,
2368 quorum,
2369 signers,
2370 init_state,
2371 current_request_roles,
2372 )
2373 .await
2374 {
2375 error!(
2376 msg_type = "EvaluationRes",
2377 request_id = %self.id,
2378 error = %e,
2379 "Failed to run validation"
2380 );
2381 self.match_error(ctx, e).await;
2382 return Ok(());
2383 };
2384 }
2385 }
2386 }
2387 RequestManagerMessage::ApprovalRes {
2388 appro_res,
2389 request_id,
2390 } => {
2391 if request_id == self.id {
2392 let _ = make_obsolete(ctx, &self.subject_id).await;
2393 debug!(
2394 msg_type = "ApprovalRes",
2395 request_id = %self.id,
2396 version = self.version,
2397 "Approval result received"
2398 );
2399 if let Err(e) = self.stops_childs(ctx).await {
2400 error!(
2401 msg_type = "ApprovalRes",
2402 request_id = %self.id,
2403 error = %e,
2404 "Failed to stop childs"
2405 );
2406 self.match_error(ctx, e).await;
2407 return Ok(());
2408 };
2409
2410 let RequestManagerState::Approval { eval_req, eval_res } =
2411 self.state.clone()
2412 else {
2413 error!(
2414 msg_type = "ApprovalRes",
2415 request_id = %self.id,
2416 state = ?self.state,
2417 "Invalid state for approval response"
2418 );
2419 let e = ActorError::FunctionalCritical {
2420 description: "Invalid request state".to_owned(),
2421 };
2422 return Err(emit_fail(ctx, e).await);
2423 };
2424 let (
2425 request,
2426 quorum,
2427 signers,
2428 init_state,
2429 current_request_roles,
2430 ) = match self
2431 .build_validation_req(
2432 ctx,
2433 Some((eval_req, eval_res)),
2434 Some(appro_res),
2435 )
2436 .await
2437 {
2438 Ok(data) => data,
2439 Err(e) => {
2440 error!(
2441 msg_type = "ApprovalRes",
2442 request_id = %self.id,
2443 error = %e,
2444 "Failed to build validation request"
2445 );
2446 self.match_error(ctx, e).await;
2447 return Ok(());
2448 }
2449 };
2450
2451 if let Err(e) = self
2452 .run_validation(
2453 ctx,
2454 request,
2455 quorum,
2456 signers,
2457 init_state,
2458 current_request_roles,
2459 )
2460 .await
2461 {
2462 error!(
2463 msg_type = "ApprovalRes",
2464 request_id = %self.id,
2465 error = %e,
2466 "Failed to run validation"
2467 );
2468 self.match_error(ctx, e).await;
2469 return Ok(());
2470 };
2471 }
2472 }
2473 RequestManagerMessage::ValidationRes {
2474 val_res,
2475 val_req,
2476 request_id,
2477 } => {
2478 if request_id == self.id {
2479 debug!(
2480 msg_type = "ValidationRes",
2481 request_id = %self.id,
2482 version = self.version,
2483 "Validation result received"
2484 );
2485 if let Err(e) = self.stops_childs(ctx).await {
2486 error!(
2487 msg_type = "ValidationRes",
2488 request_id = %self.id,
2489 error = %e,
2490 "Failed to stop childs"
2491 );
2492 self.match_error(ctx, e).await;
2493 return Ok(());
2494 };
2495
2496 let signed_ledger =
2497 match self.build_ledger(ctx, *val_req, val_res).await {
2498 Ok(signed_ledger) => signed_ledger,
2499 Err(e) => {
2500 error!(
2501 msg_type = "ValidationRes",
2502 request_id = %self.id,
2503 error = %e,
2504 "Failed to build ledger"
2505 );
2506 self.match_error(ctx, e).await;
2507 return Ok(());
2508 }
2509 };
2510
2511 if let Err(e) =
2512 self.update_subject(ctx, signed_ledger.clone()).await
2513 {
2514 error!(
2515 msg_type = "ValidationRes",
2516 request_id = %self.id,
2517 error = %e,
2518 "Failed to update subject"
2519 );
2520 self.match_error(ctx, e).await;
2521 return Ok(());
2522 };
2523
2524 match self.build_distribution(ctx, signed_ledger).await {
2525 Ok(in_distribution) => {
2526 if !in_distribution
2527 && let Err(e) = self.finish_request(ctx).await
2528 {
2529 error!(
2530 msg_type = "ValidationRes",
2531 request_id = %self.id,
2532 error = %e,
2533 "Failed to finish request after build distribution"
2534 );
2535 self.match_error(ctx, e).await;
2536 return Ok(());
2537 }
2538 }
2539 Err(e) => {
2540 error!(
2541 msg_type = "ValidationRes",
2542 request_id = %self.id,
2543 error = %e,
2544 "Failed to build distribution"
2545 );
2546 self.match_error(ctx, e).await;
2547 return Ok(());
2548 }
2549 };
2550 }
2551 }
2552 RequestManagerMessage::FinishRequest { request_id } => {
2553 if request_id == self.id {
2554 debug!(
2555 msg_type = "FinishRequest",
2556 request_id = %self.id,
2557 version = self.version,
2558 "Finishing request"
2559 );
2560
2561 if let Err(e) = self.stops_childs(ctx).await {
2562 error!(
2563 msg_type = "FinishRequest",
2564 request_id = %self.id,
2565 error = %e,
2566 "Failed to stop childs"
2567 );
2568 self.match_error(ctx, e).await;
2569 return Ok(());
2570 };
2571
2572 if let Err(e) = self.finish_request(ctx).await {
2573 error!(
2574 msg_type = "FinishRequest",
2575 request_id = %self.id,
2576 error = %e,
2577 "Failed to finish request"
2578 );
2579 self.match_error(ctx, e).await;
2580 return Ok(());
2581 }
2582 }
2583 }
2584 }
2585
2586 Ok(())
2587 }
2588
2589 async fn on_event(
2590 &mut self,
2591 event: RequestManagerEvent,
2592 ctx: &mut ActorContext<Self>,
2593 ) {
2594 let event_type = match &event {
2595 RequestManagerEvent::Finish => "Finish",
2596 RequestManagerEvent::UpdateState { .. } => "UpdateState",
2597 RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2598 RequestManagerEvent::SafeState { .. } => "SafeState",
2599 };
2600
2601 if let Err(e) = self.persist(&event, ctx).await {
2602 error!(
2603 event_type = event_type,
2604 request_id = %self.id,
2605 error = %e,
2606 "Failed to persist event"
2607 );
2608 emit_fail(ctx, e).await;
2609 };
2610 }
2611
2612 async fn on_child_fault(
2613 &mut self,
2614 error: ActorError,
2615 ctx: &mut ActorContext<Self>,
2616 ) -> ChildAction {
2617 error!(
2618 request_id = %self.id,
2619 version = self.version,
2620 state = ?self.state,
2621 error = %error,
2622 "Child fault in request manager"
2623 );
2624 emit_fail(ctx, error).await;
2625 ChildAction::Stop
2626 }
2627}
2628
2629#[async_trait]
2630impl PersistentActor for RequestManager {
2631 type Persistence = LightPersistence;
2632 type InitParams = InitRequestManager;
2633
2634 fn update(&mut self, state: Self) {
2635 self.command = state.command;
2636 self.request = state.request;
2637 self.state = state.state;
2638 self.version = state.version;
2639 }
2640
2641 fn create_initial(params: Self::InitParams) -> Self {
2642 Self {
2643 retry_diff: 0,
2644 retry_timeout: 0,
2645 request_started_at: None,
2646 current_phase: None,
2647 current_phase_started_at: None,
2648 our_key: params.our_key,
2649 id: DigestIdentifier::default(),
2650 subject_id: params.subject_id,
2651 governance_id: params.governance_id,
2652 command: ReqManInitMessage::Evaluate,
2653 request: None,
2654 state: RequestManagerState::Starting,
2655 version: 0,
2656 helpers: Some(params.helpers),
2657 }
2658 }
2659
2660 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2662 match event {
2663 RequestManagerEvent::Finish => {
2664 debug!(
2665 event_type = "Finish",
2666 request_id = %self.id,
2667 "Applying finish event"
2668 );
2669 self.state = RequestManagerState::End;
2670 self.request = None;
2671 self.id = DigestIdentifier::default();
2672 }
2673 RequestManagerEvent::UpdateState { state } => {
2674 debug!(
2675 event_type = "UpdateState",
2676 request_id = %self.id,
2677 old_state = ?self.state,
2678 new_state = ?state,
2679 "Applying state update"
2680 );
2681 self.state = *state.clone()
2682 }
2683 RequestManagerEvent::UpdateVersion { version } => {
2684 debug!(
2685 event_type = "UpdateVersion",
2686 request_id = %self.id,
2687 old_version = self.version,
2688 new_version = version,
2689 "Applying version update"
2690 );
2691 self.state = RequestManagerState::Starting;
2692 self.version = *version
2693 }
2694 RequestManagerEvent::SafeState { command, request } => {
2695 debug!(
2696 event_type = "SafeState",
2697 request_id = %self.id,
2698 command = ?command,
2699 "Applying safe state"
2700 );
2701 self.version = 0;
2702 self.retry_diff = 0;
2703 self.retry_timeout = 0;
2704 self.state = RequestManagerState::Starting;
2705 self.request = Some(request.clone());
2706 self.command = command.clone();
2707 }
2708 };
2709
2710 Ok(())
2711 }
2712}
2713
2714#[async_trait]
2715impl Storable for RequestManager {}