1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::bridge::request::EventRequestType;
8use ave_common::identity::{
9 DigestIdentifier, HashAlgorithm, PublicKey, Signed, hash_borsh,
10};
11use ave_common::request::EventRequest;
12use ave_common::response::RequestState;
13use ave_common::{Namespace, SchemaType, ValueWrapper};
14use borsh::{BorshDeserialize, BorshSerialize};
15use network::ComunicateInfo;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18use std::sync::Arc;
19use std::time::Duration;
20use tracing::{Span, debug, error, info, info_span, warn};
21
22use crate::approval::request::ApprovalReq;
23use crate::distribution::{
24 Distribution, DistributionMessage, DistributionType,
25};
26use crate::evaluation::request::EvaluateData;
27use crate::evaluation::response::EvaluatorResponse;
28use crate::governance::data::GovernanceData;
29use crate::governance::model::{
30 HashThisRole, ProtocolTypes, Quorum, RoleTypes, WitnessesData,
31};
32use crate::helpers::network::service::NetworkSender;
33use crate::model::common::node::{SignTypesNode, get_sign, get_subject_data};
34use crate::model::common::send_to_tracking;
35use crate::model::common::subject::{
36 create_subject, get_gov, get_gov_sn, get_last_ledger_event, get_metadata,
37 make_obsolete, update_ledger,
38};
39use crate::model::event::{
40 ApprovalData, EvaluationData, Ledger, Protocols, ValidationData,
41};
42use crate::node::SubjectData;
43use crate::request::error::RequestManagerError;
44use crate::request::tracking::RequestTrackingMessage;
45use crate::request::{RequestHandler, RequestHandlerMessage};
46use crate::subject::{Metadata, SignedLedger};
47
48use crate::validation::request::{ActualProtocols, LastData, ValidationReq};
49use crate::{
50 ActorMessage, NetworkMessage, Validation, ValidationMessage,
51 approval::{Approval, ApprovalMessage},
52 auth::{Auth, AuthMessage, AuthResponse},
53 db::Storable,
54 evaluation::{Evaluation, EvaluationMessage, request::EvaluationReq},
55 model::common::emit_fail,
56 update::{Update, UpdateMessage, UpdateNew, UpdateType},
57};
58
59use super::{
60 reboot::{Reboot, RebootMessage},
61 types::{ReqManInitMessage, RequestManagerState},
62};
63
64#[derive(Clone, Debug, Serialize, Deserialize)]
65pub struct RequestManager {
66 #[serde(skip)]
67 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
68 #[serde(skip)]
69 our_key: Arc<PublicKey>,
70 #[serde(skip)]
71 id: DigestIdentifier,
72 #[serde(skip)]
73 subject_id: DigestIdentifier,
74 #[serde(skip)]
75 retry_timeout: u64,
76 #[serde(skip)]
77 retry_diff: u64,
78 command: ReqManInitMessage,
79 request: Option<Signed<EventRequest>>,
80 state: RequestManagerState,
81 version: u64,
82}
83
84#[derive(Debug, Clone)]
85pub enum RebootType {
86 Normal,
87 Diff,
88 TimeOut,
89}
90
91pub struct InitRequestManager {
92 pub our_key: Arc<PublicKey>,
93 pub subject_id: DigestIdentifier,
94 pub helpers: (HashAlgorithm, Arc<NetworkSender>),
95}
96
97impl BorshSerialize for RequestManager {
98 fn serialize<W: std::io::Write>(
99 &self,
100 writer: &mut W,
101 ) -> std::io::Result<()> {
102 BorshSerialize::serialize(&self.command, writer)?;
103 BorshSerialize::serialize(&self.state, writer)?;
104 BorshSerialize::serialize(&self.version, writer)?;
105 BorshSerialize::serialize(&self.request, writer)?;
106
107 Ok(())
108 }
109}
110
111impl BorshDeserialize for RequestManager {
112 fn deserialize_reader<R: std::io::Read>(
113 reader: &mut R,
114 ) -> std::io::Result<Self> {
115 let command = ReqManInitMessage::deserialize_reader(reader)?;
117 let state = RequestManagerState::deserialize_reader(reader)?;
118 let version = u64::deserialize_reader(reader)?;
119 let request =
120 Option::<Signed<EventRequest>>::deserialize_reader(reader)?;
121
122 let our_key = Arc::new(PublicKey::default());
123 let subject_id = DigestIdentifier::default();
124 let id = DigestIdentifier::default();
125
126 Ok(Self {
127 retry_diff: 0,
128 retry_timeout: 0,
129 helpers: None,
130 our_key,
131 id,
132 subject_id,
133 command,
134 request,
135 state,
136 version,
137 })
138 }
139}
140
141impl RequestManager {
142 async fn build_evaluation(
146 &mut self,
147 ctx: &mut ActorContext<Self>,
148 ) -> Result<(), RequestManagerError> {
149 let Some(request) = self.request.clone() else {
150 return Err(RequestManagerError::RequestNotSet);
151 };
152
153 self.on_event(
154 RequestManagerEvent::UpdateState {
155 state: Box::new(RequestManagerState::Evaluation),
156 },
157 ctx,
158 )
159 .await;
160
161 let metadata = Self::check_data_eval(ctx, &request).await?;
162
163 let (signed_evaluation_req, quorum, signers, init_state) =
164 self.build_request_eval(ctx, &metadata, &request).await?;
165
166 if signers.is_empty() {
167 warn!(
168 request_id = %self.id,
169 schema_id = %metadata.schema_id,
170 "No evaluators available for schema"
171 );
172
173 return Err(RequestManagerError::NoEvaluatorsAvailable {
174 schema_id: metadata.schema_id.to_string(),
175 governance_id: signed_evaluation_req
176 .content()
177 .governance_id
178 .clone(),
179 });
180 }
181
182 self.run_evaluation(
183 ctx,
184 signed_evaluation_req.clone(),
185 quorum,
186 init_state,
187 signers,
188 )
189 .await
190 }
191
192 async fn check_data_eval(
194 ctx: &mut ActorContext<Self>,
195 request: &Signed<EventRequest>,
196 ) -> Result<Metadata, RequestManagerError> {
197 let (subject_id, confirm) = match request.content().clone() {
198 EventRequest::Fact(event) => (event.subject_id, false),
199 EventRequest::Transfer(event) => (event.subject_id, false),
200 EventRequest::Confirm(event) => (event.subject_id, true),
201 _ => {
202 return Err(
203 RequestManagerError::InvalidEventRequestForEvaluation,
204 );
205 }
206 };
207
208 let metadata = get_metadata(ctx, &subject_id).await?;
209
210 if confirm && !metadata.schema_id.is_gov() {
211 return Err(RequestManagerError::ConfirmNotEvaluableForTracker);
212 }
213
214 Ok(metadata)
215 }
216
217 async fn build_request_eval(
218 &self,
219 ctx: &mut ActorContext<Self>,
220 metadata: &Metadata,
221 request: &Signed<EventRequest>,
222 ) -> Result<
223 (
224 Signed<EvaluationReq>,
225 Quorum,
226 HashSet<PublicKey>,
227 Option<ValueWrapper>,
228 ),
229 RequestManagerError,
230 > {
231 let is_gov = metadata.schema_id.is_gov();
232
233 let request_type = EventRequestType::from(request.content());
234 let (evaluate_data, governance_data, init_state) = match (
235 is_gov,
236 request_type,
237 ) {
238 (true, EventRequestType::Fact) => {
239 let state =
240 GovernanceData::try_from(metadata.properties.clone())?;
241
242 (
243 EvaluateData::GovFact {
244 state: state.clone(),
245 },
246 state,
247 None,
248 )
249 }
250 (true, EventRequestType::Transfer) => {
251 let state =
252 GovernanceData::try_from(metadata.properties.clone())?;
253
254 (
255 EvaluateData::GovTransfer {
256 state: state.clone(),
257 },
258 state,
259 None,
260 )
261 }
262 (true, EventRequestType::Confirm) => {
263 let state =
264 GovernanceData::try_from(metadata.properties.clone())?;
265
266 (
267 EvaluateData::GovConfirm {
268 state: state.clone(),
269 },
270 state,
271 None,
272 )
273 }
274 (false, EventRequestType::Fact) => {
275 let governance_data =
276 get_gov(ctx, &metadata.governance_id).await?;
277
278 let init_state =
279 governance_data.get_init_state(&metadata.schema_id)?;
280
281 (
282 EvaluateData::TrackerSchemasFact {
283 contract: format!(
284 "{}_{}",
285 metadata.governance_id, metadata.schema_id
286 ),
287 state: metadata.properties.clone(),
288 },
289 governance_data,
290 Some(init_state),
291 )
292 }
293 (false, EventRequestType::Transfer) => {
294 let governance_data =
295 get_gov(ctx, &metadata.governance_id).await?;
296 (
297 EvaluateData::TrackerSchemasTransfer {
298 governance_data: governance_data.clone(),
299 namespace: metadata.namespace.clone(),
300 schema_id: metadata.schema_id.clone(),
301 state: metadata.properties.clone(),
302 },
303 governance_data,
304 None,
305 )
306 }
307 _ => unreachable!(
308 "It was previously verified that the matched cases are the only possible ones"
309 ),
310 };
311
312 let (signers, quorum) = governance_data.get_quorum_and_signers(
313 ProtocolTypes::Evaluation,
314 &metadata.schema_id,
315 metadata.namespace.clone(),
316 )?;
317
318 let eval_req = EvaluationReq {
319 event_request: request.clone(),
320 data: evaluate_data,
321 sn: metadata.sn + 1,
322 gov_version: governance_data.version,
323 namespace: metadata.namespace.clone(),
324 schema_id: metadata.schema_id.clone(),
325 signer: (*self.our_key).clone(),
326 signer_is_owner: *self.our_key == request.signature().signer,
327 governance_id: metadata.governance_id.clone(),
328 };
329
330 let signature =
331 get_sign(ctx, SignTypesNode::EvaluationReq(eval_req.clone()))
332 .await?;
333
334 let signed_evaluation_req: Signed<EvaluationReq> =
335 Signed::from_parts(eval_req, signature);
336 Ok((signed_evaluation_req, quorum, signers, init_state))
337 }
338
339 async fn run_evaluation(
340 &self,
341 ctx: &mut ActorContext<Self>,
342 request: Signed<EvaluationReq>,
343 quorum: Quorum,
344 init_state: Option<ValueWrapper>,
345 signers: HashSet<PublicKey>,
346 ) -> Result<(), RequestManagerError> {
347 let Some((hash, network)) = self.helpers.clone() else {
348 return Err(RequestManagerError::HelpersNotInitialized);
349 };
350
351 info!("Init evaluation {}", self.id);
352 let child = ctx
353 .create_child(
354 "evaluation",
355 Evaluation::new(
356 self.our_key.clone(),
357 request,
358 quorum,
359 init_state,
360 hash,
361 network,
362 ),
363 )
364 .await?;
365
366 child
367 .tell(EvaluationMessage::Create {
368 request_id: self.id.clone(),
369 version: self.version,
370 signers,
371 })
372 .await?;
373
374 send_to_tracking(
375 ctx,
376 RequestTrackingMessage::UpdateState {
377 request_id: self.id.clone(),
378 state: RequestState::Evaluation,
379 },
380 )
381 .await?;
382
383 Ok(())
384 }
385 async fn build_request_appro(
388 &self,
389 ctx: &mut ActorContext<Self>,
390 eval_req: EvaluationReq,
391 evaluator_res: EvaluatorResponse,
392 ) -> Result<Signed<ApprovalReq>, RequestManagerError> {
393 let request = ApprovalReq {
394 subject_id: self.subject_id.clone(),
395 sn: eval_req.sn,
396 gov_version: eval_req.gov_version,
397 patch: evaluator_res.patch,
398 signer: eval_req.signer,
399 };
400
401 let signature =
402 get_sign(ctx, SignTypesNode::ApprovalReq(request.clone())).await?;
403
404 let signed_approval_req: Signed<ApprovalReq> =
405 Signed::from_parts(request, signature);
406
407 Ok(signed_approval_req)
408 }
409
410 async fn build_approval(
411 &self,
412 ctx: &mut ActorContext<Self>,
413 eval_req: EvaluationReq,
414 eval_res: EvaluatorResponse,
415 ) -> Result<(), RequestManagerError> {
416 let request = self.build_request_appro(ctx, eval_req, eval_res).await?;
417
418 let governance_data =
419 get_gov(ctx, &request.content().subject_id).await?;
420
421 let (signers, quorum) = governance_data.get_quorum_and_signers(
422 ProtocolTypes::Approval,
423 &SchemaType::Governance,
424 Namespace::new(),
425 )?;
426
427 if signers.is_empty() {
428 warn!(
429 request_id = %self.id,
430 schema_id = %SchemaType::Governance,
431 "No approvers available for schema"
432 );
433
434 return Err(RequestManagerError::NoApproversAvailable {
435 schema_id: SchemaType::Governance.to_string(),
436 governance_id: self.subject_id.clone(),
437 });
438 }
439
440 self.run_approval(ctx, request, quorum, signers).await
441 }
442
443 async fn run_approval(
444 &self,
445 ctx: &mut ActorContext<Self>,
446 request: Signed<ApprovalReq>,
447 quorum: Quorum,
448 signers: HashSet<PublicKey>,
449 ) -> Result<(), RequestManagerError> {
450 let Some((hash, network)) = self.helpers.clone() else {
451 return Err(RequestManagerError::HelpersNotInitialized);
452 };
453
454 info!("Init approval {}", self.id);
455 let child = ctx
456 .create_child(
457 "approval",
458 Approval::new(
459 self.our_key.clone(),
460 request,
461 quorum,
462 signers,
463 hash,
464 network,
465 ),
466 )
467 .await?;
468
469 child
470 .tell(ApprovalMessage::Create {
471 request_id: self.id.clone(),
472 version: self.version,
473 })
474 .await?;
475
476 send_to_tracking(
477 ctx,
478 RequestTrackingMessage::UpdateState {
479 request_id: self.id.clone(),
480 state: RequestState::Approval,
481 },
482 )
483 .await?;
484
485 Ok(())
486 }
487
488 async fn build_validation_req(
491 &mut self,
492 ctx: &mut ActorContext<Self>,
493 eval: Option<(EvaluationReq, EvaluationData)>,
494 appro_data: Option<ApprovalData>,
495 ) -> Result<
496 (
497 Signed<ValidationReq>,
498 Quorum,
499 HashSet<PublicKey>,
500 Option<ValueWrapper>,
501 ),
502 RequestManagerError,
503 > {
504 let (vali_req, quorum, signers, init_state, schema_id) =
505 self.build_validation_data(ctx, eval, appro_data).await?;
506
507 if signers.is_empty() {
508 warn!(
509 request_id = %self.id,
510 schema_id = %schema_id,
511 "No validators available for schema"
512 );
513
514 return Err(RequestManagerError::NoValidatorsAvailable {
515 schema_id: schema_id.to_string(),
516 governance_id: vali_req.get_governance_id().expect("The build process verified that the event request is valid")
517 });
518 }
519
520 let signature = get_sign(
521 ctx,
522 SignTypesNode::ValidationReq(Box::new(vali_req.clone())),
523 )
524 .await?;
525
526 let signed_validation_req: Signed<ValidationReq> =
527 Signed::from_parts(vali_req, signature);
528
529 self.on_event(
530 RequestManagerEvent::UpdateState {
531 state: Box::new(RequestManagerState::Validation {
532 request: Box::new(signed_validation_req.clone()),
533 quorum: quorum.clone(),
534 init_state: init_state.clone(),
535 signers: signers.clone(),
536 }),
537 },
538 ctx,
539 )
540 .await;
541
542 Ok((signed_validation_req, quorum, signers, init_state))
543 }
544
545 async fn build_validation_data(
546 &self,
547 ctx: &mut ActorContext<Self>,
548 eval: Option<(EvaluationReq, EvaluationData)>,
549 appro_data: Option<ApprovalData>,
550 ) -> Result<
551 (
552 ValidationReq,
553 Quorum,
554 HashSet<PublicKey>,
555 Option<ValueWrapper>,
556 SchemaType,
557 ),
558 RequestManagerError,
559 > {
560 let Some(request) = self.request.clone() else {
561 return Err(RequestManagerError::RequestNotSet);
562 };
563
564 if let EventRequest::Create(create) = request.content() {
565 if create.schema_id.is_gov() {
566 let governance_data =
567 GovernanceData::new((*self.our_key).clone());
568 let (signers, quorum) = governance_data
569 .get_quorum_and_signers(
570 ProtocolTypes::Validation,
571 &SchemaType::Governance,
572 Namespace::new(),
573 )?;
574
575 Ok((
576 ValidationReq::Create {
577 event_request: request.clone(),
578 gov_version: 0,
579 subject_id: self.subject_id.clone(),
580 },
581 quorum,
582 signers,
583 None,
584 SchemaType::Governance,
585 ))
586 } else {
587 let governance_data =
588 get_gov(ctx, &create.governance_id).await?;
589
590 let (signers, quorum) = governance_data
591 .get_quorum_and_signers(
592 ProtocolTypes::Validation,
593 &create.schema_id,
594 create.namespace.clone(),
595 )?;
596
597 let init_state =
598 governance_data.get_init_state(&create.schema_id)?;
599
600 Ok((
601 ValidationReq::Create {
602 event_request: request.clone(),
603 gov_version: governance_data.version,
604 subject_id: self.subject_id.clone(),
605 },
606 quorum,
607 signers,
608 Some(init_state),
609 create.schema_id.clone(),
610 ))
611 }
612 } else {
613 let Some((hash, ..)) = self.helpers else {
614 return Err(RequestManagerError::HelpersNotInitialized);
615 };
616
617 let governance_data = get_gov(ctx, &self.subject_id).await?;
618
619 let (actual_protocols, gov_version, sn) =
620 if let Some((eval_req, eval_data)) = eval {
621 if let Some(approval_data) = appro_data {
622 (
623 ActualProtocols::EvalApprove {
624 eval_data,
625 approval_data,
626 },
627 eval_req.gov_version,
628 Some(eval_req.sn),
629 )
630 } else {
631 (
632 ActualProtocols::Eval { eval_data },
633 eval_req.gov_version,
634 Some(eval_req.sn),
635 )
636 }
637 } else {
638 (ActualProtocols::None, governance_data.version, None)
639 };
640
641 let metadata = get_metadata(ctx, &self.subject_id).await?;
642 let sn = if let Some(sn) = sn {
643 sn
644 } else {
645 metadata.sn + 1
646 };
647
648 let (signers, quorum) = governance_data.get_quorum_and_signers(
649 ProtocolTypes::Validation,
650 &metadata.schema_id,
651 metadata.namespace.clone(),
652 )?;
653
654 let last_ledger_event =
655 get_last_ledger_event(ctx, &self.subject_id).await?;
656
657 let Some(last_ledger_event) = last_ledger_event else {
658 return Err(RequestManagerError::LastLedgerEventNotFound);
659 };
660
661 let ledger_hash = hash_borsh(&*hash.hasher(), &last_ledger_event.0)
662 .map_err(|e| RequestManagerError::LedgerHashFailed {
663 details: e.to_string(),
664 })?;
665
666 let schema_id = metadata.schema_id.clone();
667
668 Ok((
669 ValidationReq::Event {
670 actual_protocols: Box::new(actual_protocols),
671 event_request: request.clone(),
672 metadata: Box::new(metadata),
673 last_data: Box::new(LastData {
674 vali_data: last_ledger_event
675 .content()
676 .protocols
677 .get_validation_data(),
678 gov_version: last_ledger_event.content().gov_version,
679 }),
680 gov_version,
681 ledger_hash,
682 sn,
683 },
684 quorum,
685 signers,
686 None,
687 schema_id,
688 ))
689 }
690 }
691
692 async fn run_validation(
693 &self,
694 ctx: &mut ActorContext<Self>,
695 request: Signed<ValidationReq>,
696 quorum: Quorum,
697 signers: HashSet<PublicKey>,
698 init_state: Option<ValueWrapper>,
699 ) -> Result<(), RequestManagerError> {
700 let Some((hash, network)) = self.helpers.clone() else {
701 return Err(RequestManagerError::HelpersNotInitialized);
702 };
703
704 info!("Init validation {}", self.id);
705 let child = ctx
706 .create_child(
707 "validation",
708 Validation::new(
709 self.our_key.clone(),
710 request,
711 init_state,
712 quorum,
713 hash,
714 network,
715 ),
716 )
717 .await?;
718
719 child
720 .tell(ValidationMessage::Create {
721 request_id: self.id.clone(),
722 version: self.version,
723 signers,
724 })
725 .await?;
726
727 send_to_tracking(
728 ctx,
729 RequestTrackingMessage::UpdateState {
730 request_id: self.id.clone(),
731 state: RequestState::Validation,
732 },
733 )
734 .await?;
735
736 Ok(())
737 }
738 async fn build_ledger(
741 &mut self,
742 ctx: &mut ActorContext<Self>,
743 val_req: ValidationReq,
744 val_res: ValidationData,
745 ) -> Result<SignedLedger, RequestManagerError> {
746 let ledger = match val_req {
747 ValidationReq::Create {
748 event_request,
749 gov_version,
750 ..
751 } => Ledger {
752 event_request,
753 gov_version,
754 sn: 0,
755 prev_ledger_event_hash: DigestIdentifier::default(),
756 protocols: Protocols::Create {
757 validation: val_res,
758 },
759 },
760 ValidationReq::Event {
761 actual_protocols,
762 event_request,
763 metadata,
764 gov_version,
765 sn,
766 ledger_hash,
767 ..
768 } => Ledger {
769 gov_version,
770 sn,
771 prev_ledger_event_hash: ledger_hash,
772 protocols: Protocols::build(
773 metadata.schema_id.is_gov(),
774 EventRequestType::from(event_request.content()),
775 *actual_protocols,
776 val_res,
777 )?,
778 event_request,
779 },
780 };
781
782 let signature =
783 get_sign(ctx, SignTypesNode::Ledger(ledger.clone())).await?;
784
785 let ledger = SignedLedger(Signed::from_parts(ledger, signature));
786
787 self.on_event(
788 RequestManagerEvent::UpdateState {
789 state: Box::new(RequestManagerState::UpdateSubject {
790 ledger: ledger.clone(),
791 }),
792 },
793 ctx,
794 )
795 .await;
796
797 Ok(ledger)
798 }
799
800 async fn update_subject(
801 &mut self,
802 ctx: &mut ActorContext<Self>,
803 ledger: SignedLedger,
804 ) -> Result<(), RequestManagerError> {
805 if ledger.content().event_request.content().is_create_event() {
806 if let Err(e) = create_subject(ctx, ledger.clone()).await {
807 if let ActorError::Functional { .. } = e {
808 return Err(RequestManagerError::CheckLimit);
809 } else {
810 return Err(RequestManagerError::ActorError(e));
811 }
812 };
813 } else {
814 update_ledger(ctx, &self.subject_id, vec![ledger.clone()]).await?;
815 }
816
817 self.on_event(
818 RequestManagerEvent::UpdateState {
819 state: Box::new(RequestManagerState::Distribution { ledger }),
820 },
821 ctx,
822 )
823 .await;
824
825 Ok(())
826 }
827
828 async fn build_distribution(
829 &self,
830 ctx: &mut ActorContext<Self>,
831 ledger: SignedLedger,
832 ) -> Result<bool, RequestManagerError> {
833 let witnesses = self
834 .build_distribution_data(ctx, ledger.signature().signer.clone())
835 .await?;
836
837 let Some(mut witnesses) = witnesses else {
838 return Ok(false);
839 };
840
841 witnesses.remove(&self.our_key);
842
843 if witnesses.is_empty() {
844 warn!(
845 request_id = %self.id,
846 "No witnesses available for distribution"
847 );
848 return Ok(false);
849 }
850
851 self.run_distribution(ctx, witnesses, ledger).await?;
852
853 Ok(true)
854 }
855
856 async fn build_distribution_data(
857 &self,
858 ctx: &mut ActorContext<Self>,
859 creator: PublicKey,
860 ) -> Result<Option<HashSet<PublicKey>>, RequestManagerError> {
861 let Some(request) = self.request.clone() else {
862 return Err(RequestManagerError::RequestNotSet);
863 };
864
865 let witnesses = if let EventRequest::Create(create) = request.content()
866 {
867 if create.schema_id == SchemaType::Governance {
868 None
869 } else {
870 let governance_data = get_gov(ctx, &self.subject_id).await?;
871
872 let witnesses =
873 governance_data.get_witnesses(WitnessesData::Schema {
874 creator,
875 schema_id: create.schema_id.clone(),
876 namespace: create.namespace.clone(),
877 })?;
878
879 Some(witnesses)
880 }
881 } else {
882 let data = get_subject_data(ctx, &self.subject_id).await?;
883
884 let Some(data) = data else {
885 return Err(RequestManagerError::SubjectDataNotFound {
886 subject_id: self.subject_id.to_string(),
887 });
888 };
889
890 let governance_data = get_gov(ctx, &self.subject_id).await?;
891
892 let witnesses = match data {
893 SubjectData::Governance { .. } => {
894 governance_data.get_witnesses(WitnessesData::Gov)?
895 }
896 SubjectData::Tracker {
897 schema_id,
898 namespace,
899 ..
900 } => governance_data.get_witnesses(WitnessesData::Schema {
901 creator,
902 schema_id,
903 namespace: Namespace::from(namespace),
904 })?,
905 };
906
907 Some(witnesses)
908 };
909
910 Ok(witnesses)
911 }
912
913 async fn run_distribution(
914 &self,
915 ctx: &mut ActorContext<Self>,
916 witnesses: HashSet<PublicKey>,
917 ledger: SignedLedger,
918 ) -> Result<(), RequestManagerError> {
919 let Some((.., network)) = self.helpers.clone() else {
920 return Err(RequestManagerError::HelpersNotInitialized);
921 };
922
923 info!("Init distribution {}", self.id);
924 let child = ctx
925 .create_child(
926 "distribution",
927 Distribution::new(
928 network,
929 DistributionType::Request,
930 self.id.clone(),
931 ),
932 )
933 .await?;
934
935 child
936 .tell(DistributionMessage::Create {
937 ledger: Box::new(ledger),
938 witnesses,
939 })
940 .await?;
941
942 send_to_tracking(
943 ctx,
944 RequestTrackingMessage::UpdateState {
945 request_id: self.id.clone(),
946 state: RequestState::Distribution,
947 },
948 )
949 .await?;
950
951 Ok(())
952 }
953
954 async fn init_wait(
957 &self,
958 ctx: &mut ActorContext<Self>,
959 governance_id: &DigestIdentifier,
960 ) -> Result<(), RequestManagerError> {
961 let actor = ctx
962 .create_child(
963 "reboot",
964 Reboot::new(governance_id.clone(), self.id.clone()),
965 )
966 .await?;
967
968 actor.tell(RebootMessage::Init).await?;
969
970 Ok(())
971 }
972
973 async fn init_update(
974 &self,
975 ctx: &mut ActorContext<Self>,
976 governance_id: &DigestIdentifier,
977 ) -> Result<(), RequestManagerError> {
978 let Some((.., network)) = self.helpers.clone() else {
979 return Err(RequestManagerError::HelpersNotInitialized);
980 };
981
982 let gov_sn = get_gov_sn(ctx, governance_id).await?;
983
984 let governance_data = get_gov(ctx, governance_id).await?;
985
986 let mut witnesses = {
987 let gov_witnesses =
988 governance_data.get_witnesses(WitnessesData::Gov)?;
989
990 let auth_witnesses =
991 Self::get_witnesses_auth(ctx, governance_id.clone())
992 .await
993 .unwrap_or_default();
994
995 gov_witnesses
996 .union(&auth_witnesses)
997 .cloned()
998 .collect::<HashSet<PublicKey>>()
999 };
1000
1001 witnesses.remove(&self.our_key);
1002
1003 if witnesses.is_empty() {
1004 if let Ok(actor) = ctx.reference().await {
1005 actor
1006 .tell(RequestManagerMessage::FinishReboot {
1007 request_id: self.id.clone(),
1008 })
1009 .await?;
1010 };
1011 } else if witnesses.len() == 1 {
1012 let objetive = witnesses.iter().next().expect("len is 1");
1013 let info = ComunicateInfo {
1014 receiver: objetive.clone(),
1015 request_id: String::default(),
1016 version: 0,
1017 receiver_actor: format!(
1018 "/user/node/distributor_{}",
1019 governance_id
1020 ),
1021 };
1022
1023 network
1024 .send_command(network::CommandHelper::SendMessage {
1025 message: NetworkMessage {
1026 info,
1027 message: ActorMessage::DistributionLedgerReq {
1028 actual_sn: Some(gov_sn),
1029 subject_id: governance_id.clone(),
1030 },
1031 },
1032 })
1033 .await?;
1034
1035 let Ok(actor) = ctx.reference().await else {
1036 return Ok(());
1037 };
1038
1039 actor
1040 .tell(RequestManagerMessage::RebootWait {
1041 request_id: self.id.clone(),
1042 governance_id: governance_id.clone(),
1043 })
1044 .await?;
1045 } else {
1046 let data = UpdateNew {
1047 network,
1048 subject_id: governance_id.clone(),
1049 our_sn: Some(gov_sn),
1050 witnesses,
1051 update_type: UpdateType::Request {
1052 subject_id: self.subject_id.clone(),
1053 id: self.id.clone(),
1054 },
1055 };
1056
1057 let updater = Update::new(data);
1058 let Ok(child) = ctx.create_child("update", updater).await else {
1059 let Ok(actor) = ctx.reference().await else {
1060 return Ok(());
1061 };
1062
1063 actor
1064 .tell(RequestManagerMessage::RebootWait {
1065 request_id: self.id.clone(),
1066 governance_id: governance_id.clone(),
1067 })
1068 .await?;
1069
1070 return Ok(());
1071 };
1072
1073 child.tell(UpdateMessage::Run).await?;
1074 }
1075
1076 Ok(())
1077 }
1078
1079 async fn get_witnesses_auth(
1080 ctx: &ActorContext<Self>,
1081 governance_id: DigestIdentifier,
1082 ) -> Result<HashSet<PublicKey>, RequestManagerError> {
1083 let path = ActorPath::from("/user/node/auth");
1084 let actor = ctx.system().get_actor::<Auth>(&path).await?;
1085
1086 let response = actor
1087 .ask(AuthMessage::GetAuth {
1088 subject_id: governance_id,
1089 })
1090 .await?;
1091
1092 match response {
1093 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
1094 _ => Err(RequestManagerError::ActorError(
1095 ActorError::UnexpectedResponse {
1096 path,
1097 expected: "AuthResponse::Witnesses".to_owned(),
1098 },
1099 )),
1100 }
1101 }
1102
1103 async fn send_reboot(
1106 &self,
1107 ctx: &ActorContext<Self>,
1108 governance_id: DigestIdentifier,
1109 ) -> Result<(), ActorError> {
1110 let Ok(actor) = ctx.reference().await else {
1111 return Ok(());
1112 };
1113
1114 actor
1115 .tell(RequestManagerMessage::Reboot {
1116 request_id: self.id.clone(),
1117 governance_id,
1118 reboot_type: RebootType::TimeOut,
1119 })
1120 .await
1121 }
1122
1123 async fn match_error(
1124 &mut self,
1125 ctx: &mut ActorContext<Self>,
1126 error: RequestManagerError,
1127 ) {
1128 match error {
1129 RequestManagerError::NoEvaluatorsAvailable {
1130 governance_id,
1131 ..
1132 }
1133 | RequestManagerError::NoApproversAvailable {
1134 governance_id, ..
1135 }
1136 | RequestManagerError::NoValidatorsAvailable {
1137 governance_id,
1138 ..
1139 } => {
1140 if let Err(e) = self.send_reboot(ctx, governance_id).await {
1141 emit_fail(ctx, e).await;
1142 }
1143 }
1144 RequestManagerError::CheckLimit
1145 | RequestManagerError::Governance(..) => {
1146 if let Err(e) = self
1147 .abort_request(
1148 ctx,
1149 error.to_string(),
1150 None,
1151 (*self.our_key).clone(),
1152 )
1153 .await
1154 {
1155 emit_fail(
1156 ctx,
1157 ActorError::FunctionalCritical {
1158 description: e.to_string(),
1159 },
1160 )
1161 .await;
1162 }
1163 }
1164 _ => {
1165 emit_fail(
1166 ctx,
1167 ActorError::FunctionalCritical {
1168 description: error.to_string(),
1169 },
1170 )
1171 .await;
1172 }
1173 }
1174 }
1175
1176 async fn finish_request(
1177 &mut self,
1178 ctx: &mut ActorContext<Self>,
1179 ) -> Result<(), RequestManagerError> {
1180 info!("Ending {}", self.id);
1181 send_to_tracking(
1182 ctx,
1183 RequestTrackingMessage::UpdateState {
1184 request_id: self.id.clone(),
1185 state: RequestState::Finish,
1186 },
1187 )
1188 .await?;
1189
1190 self.on_event(RequestManagerEvent::Finish, ctx).await;
1191
1192 self.end_request(ctx).await?;
1193
1194 Ok(())
1195 }
1196
1197 async fn reboot(
1198 &mut self,
1199 ctx: &mut ActorContext<Self>,
1200 reboot_type: RebootType,
1201 governance_id: DigestIdentifier,
1202 ) -> Result<(), RequestManagerError> {
1203 self.on_event(
1204 RequestManagerEvent::UpdateState {
1205 state: Box::new(RequestManagerState::Reboot),
1206 },
1207 ctx,
1208 )
1209 .await;
1210
1211 let Ok(actor) = ctx.reference().await else {
1212 return Ok(());
1213 };
1214
1215 let request_id = self.id.clone();
1216
1217 match reboot_type {
1218 RebootType::Normal => {
1219 info!("Launching Normal reboot {}", self.id);
1220 send_to_tracking(
1221 ctx,
1222 RequestTrackingMessage::UpdateState {
1223 request_id: self.id.clone(),
1224 state: RequestState::Reboot,
1225 },
1226 )
1227 .await?;
1228
1229 actor
1230 .tell(RequestManagerMessage::RebootUpdate {
1231 request_id,
1232 governance_id,
1233 })
1234 .await?;
1235 }
1236 RebootType::Diff => {
1237 info!("Launching Diff reboot {}", self.id);
1238 self.retry_diff += 1;
1239
1240 let seconds = match self.retry_diff {
1241 1 => 10,
1242 2 => 20,
1243 3 => 30,
1244 _ => 60,
1245 };
1246
1247 info!(
1248 "Launching Diff reboot {}, try: {}, seconds: {}",
1249 self.id, self.retry_diff, seconds
1250 );
1251
1252 send_to_tracking(
1253 ctx,
1254 RequestTrackingMessage::UpdateState {
1255 request_id: self.id.clone(),
1256 state: RequestState::RebootDiff {
1257 seconds,
1258 count: self.retry_diff,
1259 },
1260 },
1261 )
1262 .await?;
1263
1264 tokio::spawn(async move {
1265 tokio::time::sleep(Duration::from_secs(seconds)).await;
1266 let _ = actor
1267 .tell(RequestManagerMessage::RebootUpdate {
1268 request_id,
1269 governance_id,
1270 })
1271 .await;
1272 });
1273 }
1274 RebootType::TimeOut => {
1275 self.retry_timeout += 1;
1276
1277 let seconds = match self.retry_timeout {
1278 1 => 30,
1279 2 => 60,
1280 3 => 120,
1281 _ => 300,
1282 };
1283
1284 info!(
1285 "Launching TimeOut reboot {}, try: {}, seconds: {}",
1286 self.id, self.retry_timeout, seconds
1287 );
1288 send_to_tracking(
1289 ctx,
1290 RequestTrackingMessage::UpdateState {
1291 request_id: self.id.clone(),
1292 state: RequestState::RebootTimeOut {
1293 seconds,
1294 count: self.retry_timeout,
1295 },
1296 },
1297 )
1298 .await?;
1299
1300 tokio::spawn(async move {
1301 tokio::time::sleep(Duration::from_secs(seconds)).await;
1302 let _ = actor
1303 .tell(RequestManagerMessage::RebootUpdate {
1304 request_id,
1305 governance_id,
1306 })
1307 .await;
1308 });
1309 }
1310 }
1311
1312 Ok(())
1313 }
1314
1315 async fn match_command(
1316 &mut self,
1317 ctx: &mut ActorContext<Self>,
1318 ) -> Result<(), RequestManagerError> {
1319 match self.command {
1320 ReqManInitMessage::Evaluate => self.build_evaluation(ctx).await,
1321 ReqManInitMessage::Validate => {
1322 let (request, quorum, signers, init_state) =
1323 self.build_validation_req(ctx, None, None).await?;
1324
1325 self.run_validation(ctx, request, quorum, signers, init_state)
1326 .await
1327 }
1328 }
1329 }
1330
1331 async fn check_signature(
1332 &self,
1333 ctx: &mut ActorContext<Self>,
1334 ) -> Result<(), RequestManagerError> {
1335 let Some(request) = self.request.clone() else {
1336 return Err(RequestManagerError::RequestNotSet);
1337 };
1338
1339 if let EventRequest::Fact { .. } = request.content() {
1340 let subject_data = get_subject_data(ctx, &self.subject_id).await?;
1341 let Some(subject_data) = subject_data else {
1342 return Err(RequestManagerError::SubjecData);
1343 };
1344
1345 let gov = get_gov(ctx, &self.subject_id).await?;
1346 match subject_data {
1347 SubjectData::Tracker {
1348 schema_id,
1349 namespace,
1350 ..
1351 } => {
1352 if !gov.has_this_role(HashThisRole::Schema {
1353 who: request.signature().signer.clone(),
1354 role: RoleTypes::Issuer,
1355 schema_id,
1356 namespace: Namespace::from(namespace),
1357 }) {
1358 return Err(RequestManagerError::NotIssuer);
1359 }
1360 }
1361 SubjectData::Governance { .. } => {
1362 if !gov.has_this_role(HashThisRole::Gov {
1363 who: request.signature().signer.clone(),
1364 role: RoleTypes::Issuer,
1365 }) {
1366 return Err(RequestManagerError::NotIssuer);
1367 }
1368 }
1369 }
1370 }
1371
1372 Ok(())
1373 }
1374
1375 async fn stops_childs(
1376 &self,
1377 ctx: &mut ActorContext<Self>,
1378 ) -> Result<(), RequestManagerError> {
1379 match self.state {
1380 RequestManagerState::Reboot => {
1381 if let Ok(actor) = ctx.get_child::<Update>("update").await {
1382 actor.ask_stop().await?;
1383 };
1384 if let Ok(actor) = ctx.get_child::<Reboot>("reboot").await {
1385 actor.ask_stop().await?;
1386 };
1387 }
1388 RequestManagerState::Evaluation => {
1389 if let Ok(actor) =
1390 ctx.get_child::<Evaluation>("evaluation").await
1391 {
1392 actor.ask_stop().await?;
1393 };
1394 }
1395 RequestManagerState::Approval { .. } => {
1396 if let Ok(actor) = ctx.get_child::<Approval>("approval").await {
1397 actor.ask_stop().await?;
1398 };
1399 let _ = make_obsolete(ctx, &self.subject_id).await;
1400 }
1401 RequestManagerState::Validation { .. } => {
1402 if let Ok(actor) =
1403 ctx.get_child::<Validation>("validation").await
1404 {
1405 actor.ask_stop().await?;
1406 };
1407 }
1408 RequestManagerState::Distribution { .. } => {
1409 if let Ok(actor) =
1410 ctx.get_child::<Distribution>("distribution").await
1411 {
1412 actor.ask_stop().await?;
1413 };
1414 }
1415 _ => {}
1416 }
1417
1418 Ok(())
1419 }
1420
1421 async fn abort_request(
1422 &mut self,
1423 ctx: &mut ActorContext<Self>,
1424 error: String,
1425 sn: Option<u64>,
1426 who: PublicKey,
1427 ) -> Result<(), RequestManagerError> {
1428 self.stops_childs(ctx).await?;
1429
1430 info!("Aborting {}", self.id);
1431 send_to_tracking(
1432 ctx,
1433 RequestTrackingMessage::UpdateState {
1434 request_id: self.id.clone(),
1435 state: RequestState::Abort {
1436 subject_id: self.subject_id.to_string(),
1437 error,
1438 sn,
1439 who: who.to_string(),
1440 },
1441 },
1442 )
1443 .await?;
1444
1445 self.on_event(RequestManagerEvent::Finish, ctx).await;
1446
1447 self.end_request(ctx).await?;
1448
1449 Ok(())
1450 }
1451
1452 async fn end_request(
1453 &self,
1454 ctx: &ActorContext<Self>,
1455 ) -> Result<(), RequestManagerError> {
1456 let actor = ctx.get_parent::<RequestHandler>().await?;
1457 actor
1458 .tell(RequestHandlerMessage::EndHandling {
1459 subject_id: self.subject_id.clone(),
1460 })
1461 .await?;
1462
1463 Ok(())
1464 }
1465}
1466
1467#[derive(Debug, Clone)]
1468pub enum RequestManagerMessage {
1469 Run {
1470 request_id: DigestIdentifier,
1471 },
1472 FirstRun {
1473 command: ReqManInitMessage,
1474 request: Signed<EventRequest>,
1475 request_id: DigestIdentifier,
1476 },
1477 Abort {
1478 request_id: DigestIdentifier,
1479 who: PublicKey,
1480 reason: String,
1481 sn: u64,
1482 },
1483 ManualAbort,
1484 Reboot {
1485 request_id: DigestIdentifier,
1486 governance_id: DigestIdentifier,
1487 reboot_type: RebootType,
1488 },
1489 RebootUpdate {
1490 request_id: DigestIdentifier,
1491 governance_id: DigestIdentifier,
1492 },
1493 RebootWait {
1494 request_id: DigestIdentifier,
1495 governance_id: DigestIdentifier,
1496 },
1497 FinishReboot {
1498 request_id: DigestIdentifier,
1499 },
1500 EvaluationRes {
1501 request_id: DigestIdentifier,
1502 eval_req: Box<EvaluationReq>,
1503 eval_res: EvaluationData,
1504 },
1505 ApprovalRes {
1506 request_id: DigestIdentifier,
1507 appro_res: ApprovalData,
1508 },
1509 ValidationRes {
1510 request_id: DigestIdentifier,
1511 val_req: Box<ValidationReq>,
1512 val_res: ValidationData,
1513 },
1514 FinishRequest {
1515 request_id: DigestIdentifier,
1516 },
1517}
1518
1519impl Message for RequestManagerMessage {}
1520
1521#[derive(
1522 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
1523)]
1524pub enum RequestManagerEvent {
1525 Finish,
1526 UpdateState {
1527 state: Box<RequestManagerState>,
1528 },
1529 UpdateVersion {
1530 version: u64,
1531 },
1532 SafeState {
1533 command: ReqManInitMessage,
1534 request: Signed<EventRequest>,
1535 },
1536}
1537
1538impl Event for RequestManagerEvent {}
1539
1540#[async_trait]
1541impl Actor for RequestManager {
1542 type Event = RequestManagerEvent;
1543 type Message = RequestManagerMessage;
1544 type Response = ();
1545
1546 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
1547 parent_span.map_or_else(
1548 || info_span!("RequestManager", id),
1549 |parent_span| info_span!(parent: parent_span, "RequestManager", id),
1550 )
1551 }
1552
1553 async fn pre_start(
1554 &mut self,
1555 ctx: &mut ActorContext<Self>,
1556 ) -> Result<(), ActorError> {
1557 if let Err(e) =
1558 self.init_store("request_manager", None, false, ctx).await
1559 {
1560 error!(
1561 error = %e,
1562 subject_id = %self.subject_id,
1563 "Failed to initialize store"
1564 );
1565 return Err(e);
1566 }
1567 Ok(())
1568 }
1569}
1570
1571#[async_trait]
1572impl Handler<Self> for RequestManager {
1573 #[allow(clippy::large_stack_frames)]
1576 async fn handle_message(
1577 &mut self,
1578 _sender: ActorPath,
1579 msg: RequestManagerMessage,
1580 ctx: &mut ave_actors::ActorContext<Self>,
1581 ) -> Result<(), ActorError> {
1582 match msg {
1583 RequestManagerMessage::RebootUpdate {
1584 governance_id,
1585 request_id,
1586 } => {
1587 if request_id == self.id {
1588 info!("Init reboot update {}", self.id);
1589 debug!(
1590 msg_type = "RebootUpdate",
1591 request_id = %self.id,
1592 governance_id = %governance_id,
1593 "Initializing reboot update"
1594 );
1595
1596 if let Err(e) = self.init_update(ctx, &governance_id).await
1597 {
1598 error!(
1599 msg_type = "RebootUpdate",
1600 request_id = %self.id,
1601 governance_id = %governance_id,
1602 error = %e,
1603 "Failed to initialize reboot update"
1604 );
1605 self.match_error(ctx, e).await;
1606 return Ok(());
1607 }
1608 }
1609 }
1610 RequestManagerMessage::RebootWait {
1611 governance_id,
1612 request_id,
1613 } => {
1614 if request_id == self.id {
1615 info!("Init reboot wait {}", self.id);
1616 debug!(
1617 msg_type = "RebootWait",
1618 request_id = %self.id,
1619 governance_id = %governance_id,
1620 "Initializing reboot wait"
1621 );
1622
1623 if let Err(e) = self.init_wait(ctx, &governance_id).await {
1624 error!(
1625 msg_type = "RebootWait",
1626 request_id = %self.id,
1627 governance_id = %governance_id,
1628 error = %e,
1629 "Failed to initialize reboot wait"
1630 );
1631 self.match_error(ctx, e).await;
1632 return Ok(());
1633 }
1634 }
1635 }
1636 RequestManagerMessage::Reboot {
1637 governance_id,
1638 request_id,
1639 reboot_type,
1640 } => {
1641 if request_id == self.id {
1642 if matches!(self.state, RequestManagerState::Reboot) {
1643 debug!(
1644 msg_type = "Reboot",
1645 request_id = %self.id,
1646 governance_id = %governance_id,
1647 reboot_type = ?reboot_type,
1648 "Already in reboot state, ignoring"
1649 );
1650 } else {
1651 debug!(
1652 msg_type = "Reboot",
1653 request_id = %self.id,
1654 governance_id = %governance_id,
1655 reboot_type = ?reboot_type,
1656 "Initiating reboot"
1657 );
1658 if let Err(e) = self.stops_childs(ctx).await {
1659 error!(
1660 msg_type = "Reboot",
1661 request_id = %self.id,
1662 governance_id = %governance_id,
1663 error = %e,
1664 "Failed to stop childs"
1665 );
1666 self.match_error(ctx, e).await;
1667 return Ok(());
1668 };
1669 if let Err(e) = self
1670 .reboot(ctx, reboot_type, governance_id.clone())
1671 .await
1672 {
1673 error!(
1674 msg_type = "Reboot",
1675 request_id = %self.id,
1676 governance_id = %governance_id,
1677 error = %e,
1678 "Failed to initiate reboot"
1679 );
1680 self.match_error(ctx, e).await;
1681 return Ok(());
1682 }
1683 }
1684 }
1685 }
1686 RequestManagerMessage::FinishReboot { request_id } => {
1687 if request_id == self.id {
1688 info!("Init reboot finish {}", self.id);
1689 debug!(
1690 msg_type = "FinishReboot",
1691 request_id = %self.id,
1692 version = self.version,
1693 "Reboot completed, resuming request"
1694 );
1695 self.on_event(
1696 RequestManagerEvent::UpdateVersion {
1697 version: self.version + 1,
1698 },
1699 ctx,
1700 )
1701 .await;
1702
1703 if let Err(e) = send_to_tracking(
1704 ctx,
1705 RequestTrackingMessage::UpdateVersion {
1706 request_id: self.id.clone(),
1707 version: self.version,
1708 },
1709 )
1710 .await
1711 {
1712 error!(
1713 msg_type = "FinishReboot",
1714 request_id = %self.id,
1715 version = self.version,
1716 error = %e,
1717 "Failed to send version update to tracking"
1718 );
1719 return Err(emit_fail(ctx, e).await);
1720 }
1721
1722 if let Err(e) = self.check_signature(ctx).await {
1723 error!(
1724 msg_type = "FinishReboot",
1725 request_id = %self.id,
1726 error = %e,
1727 "Failed to check signatures after reboot"
1728 );
1729 self.match_error(ctx, e).await;
1730 return Ok(());
1731 }
1732
1733 if let Err(e) = self.match_command(ctx).await {
1734 error!(
1735 msg_type = "FinishReboot",
1736 request_id = %self.id,
1737 error = %e,
1738 "Failed to execute command after reboot"
1739 );
1740 self.match_error(ctx, e).await;
1741 return Ok(());
1742 }
1743 }
1744 }
1745 RequestManagerMessage::Abort {
1746 request_id,
1747 who,
1748 reason,
1749 sn,
1750 } => {
1751 if request_id == self.id {
1752 warn!(
1753 msg_type = "Abort",
1754 state = %self.state,
1755 request_id = %self.id,
1756 who = %who,
1757 reason = %reason,
1758 sn = sn,
1759 "Request abort received"
1760 );
1761 if let Err(e) =
1762 self.abort_request(ctx, reason, Some(sn), who).await
1763 {
1764 error!(
1765 msg_type = "Abort",
1766 request_id = %self.id,
1767 error = %e,
1768 "Failed to abort request"
1769 );
1770 self.match_error(ctx, e).await;
1771 return Ok(());
1772 }
1773 }
1774 }
1775 RequestManagerMessage::ManualAbort => {
1776 match &self.state {
1777 RequestManagerState::Reboot
1778 | RequestManagerState::Starting
1779 | RequestManagerState::Evaluation
1780 | RequestManagerState::Approval { .. }
1781 | RequestManagerState::Validation { .. } => {
1782 if let Err(e) = self
1783 .abort_request(
1784 ctx,
1785 "The user manually aborted the request"
1786 .to_owned(),
1787 None,
1788 (*self.our_key).clone(),
1789 )
1790 .await
1791 {
1792 error!(
1793 msg_type = "Abort",
1794 request_id = %self.id,
1795 error = %e,
1796 "Failed to abort request"
1797 );
1798 self.match_error(ctx, e).await;
1799 }
1800 }
1801 _ => {
1802 info!(
1803 "The request is in a state that cannot be aborted {}, state: {}",
1804 self.id, self.state
1805 );
1806 }
1807 }
1808
1809 return Ok(());
1810 }
1811 RequestManagerMessage::FirstRun {
1812 command,
1813 request,
1814 request_id,
1815 } => {
1816 self.id = request_id.clone();
1817 debug!(
1818 msg_type = "FirstRun",
1819 request_id = %request_id,
1820 command = ?command,
1821 "First run of request manager"
1822 );
1823 self.on_event(
1824 RequestManagerEvent::SafeState { command, request },
1825 ctx,
1826 )
1827 .await;
1828
1829 if let Err(e) = self.match_command(ctx).await {
1830 error!(
1831 msg_type = "FirstRun",
1832 request_id = %self.id,
1833 error = %e,
1834 "Failed to execute initial command"
1835 );
1836 self.match_error(ctx, e).await;
1837 return Ok(());
1838 };
1839 }
1840 RequestManagerMessage::Run { request_id } => {
1841 self.id = request_id;
1842
1843 debug!(
1844 msg_type = "Run",
1845 request_id = %self.id,
1846 state = ?self.state,
1847 version = self.version,
1848 "Running request manager"
1849 );
1850 match self.state.clone() {
1851 RequestManagerState::Starting
1852 | RequestManagerState::Reboot => {
1853 if let Err(e) = self.match_command(ctx).await {
1854 error!(
1855 msg_type = "Run",
1856 request_id = %self.id,
1857 state = "Starting/Reboot",
1858 error = %e,
1859 "Failed to execute command"
1860 );
1861 self.match_error(ctx, e).await;
1862 return Ok(())
1863 };
1864 }
1865 RequestManagerState::Evaluation => {
1866 if let Err(e) = self.build_evaluation(ctx).await {
1867 error!(
1868 msg_type = "Run",
1869 request_id = %self.id,
1870 state = "Evaluation",
1871 error = %e,
1872 "Failed to build evaluation"
1873 );
1874 self.match_error(ctx, e).await;
1875 return Ok(())
1876 }
1877 }
1878
1879 RequestManagerState::Approval {
1880 eval_req,
1881 eval_res,
1882 } => {
1883 if let Err(e) = self
1884 .build_approval(ctx, eval_req, eval_res.evaluator_res().expect("If the status is approval, it means that the evaluator's response is valid"))
1885 .await
1886 {
1887 error!(
1888 msg_type = "Run",
1889 request_id = %self.id,
1890 state = "Approval",
1891 error = %e,
1892 "Failed to build approval"
1893 );
1894 self.match_error(ctx, e).await;
1895 return Ok(())
1896 }
1897 }
1898 RequestManagerState::Validation {
1899 request,
1900 quorum,
1901 init_state,
1902 signers,
1903 } => {
1904 if let Err(e) = self
1905 .run_validation(
1906 ctx, *request, quorum, signers, init_state,
1907 )
1908 .await
1909 {
1910 error!(
1911 msg_type = "Run",
1912 request_id = %self.id,
1913 state = "Validation",
1914 error = %e,
1915 "Failed to run validation"
1916 );
1917 self.match_error(ctx, e).await;
1918 return Ok(())
1919 };
1920 }
1921 RequestManagerState::UpdateSubject { ledger } => {
1922 if let Err(e) =
1923 self.update_subject(ctx, ledger.clone()).await
1924 {
1925 error!(
1926 msg_type = "Run",
1927 request_id = %self.id,
1928 state = "UpdateSubject",
1929 error = %e,
1930 "Failed to update subject"
1931 );
1932 self.match_error(ctx, e).await;
1933 return Ok(())
1934 };
1935
1936 match self.build_distribution(ctx, ledger).await {
1937 Ok(in_distribution) => {
1938 if !in_distribution
1939 && let Err(e) =
1940 self.finish_request(ctx).await
1941 {
1942 error!(
1943 msg_type = "Run",
1944 request_id = %self.id,
1945 state = "UpdateSubject",
1946 error = %e,
1947 "Failed to finish request after build distribution"
1948 );
1949 self.match_error(ctx, e).await;
1950 return Ok(())
1951 }
1952 }
1953 Err(e) => {
1954 error!(
1955 msg_type = "Run",
1956 request_id = %self.id,
1957 state = "UpdateSubject",
1958 error = %e,
1959 "Failed to build distribution"
1960 );
1961 self.match_error(ctx, e).await;
1962 return Ok(())
1963 }
1964 };
1965 }
1966 RequestManagerState::Distribution { ledger } => {
1967 match self.build_distribution(ctx, ledger).await {
1968 Ok(in_distribution) => {
1969 if !in_distribution
1970 && let Err(e) =
1971 self.finish_request(ctx).await
1972 {
1973 error!(
1974 msg_type = "Run",
1975 request_id = %self.id,
1976 state = "Distribution",
1977 error = %e,
1978 "Failed to finish request after build distribution"
1979 );
1980 self.match_error(ctx, e).await;
1981 return Ok(())
1982 }
1983 }
1984 Err(e) => {
1985 error!(
1986 msg_type = "Run",
1987 request_id = %self.id,
1988 state = "Distribution",
1989 error = %e,
1990 "Failed to build distribution"
1991 );
1992 self.match_error(ctx, e).await;
1993 return Ok(())
1994 }
1995 };
1996 }
1997 RequestManagerState::End => {
1998 if let Err(e) = self.end_request(ctx).await {
1999 error!(
2000 msg_type = "Run",
2001 request_id = %self.id,
2002 state = "End",
2003 error = %e,
2004 "Failed to end request"
2005 );
2006 self.match_error(ctx, e).await;
2007 return Ok(())
2008 }
2009 }
2010 };
2011 }
2012 RequestManagerMessage::EvaluationRes {
2013 eval_req,
2014 eval_res,
2015 request_id,
2016 } => {
2017 if request_id == self.id {
2018 debug!(
2019 msg_type = "EvaluationRes",
2020 request_id = %self.id,
2021 version = self.version,
2022 "Evaluation result received"
2023 );
2024 if let Err(e) = self.stops_childs(ctx).await {
2025 error!(
2026 msg_type = "EvaluationRes",
2027 request_id = %self.id,
2028 error = %e,
2029 "Failed to stop childs"
2030 );
2031 self.match_error(ctx, e).await;
2032 return Ok(());
2033 };
2034
2035 if let Some(evaluator_res) = eval_res.evaluator_res()
2036 && evaluator_res.appr_required
2037 {
2038 debug!(
2039 msg_type = "EvaluationRes",
2040 request_id = %self.id,
2041 "Approval required, proceeding to approval phase"
2042 );
2043 self.on_event(
2044 RequestManagerEvent::UpdateState {
2045 state: Box::new(
2046 RequestManagerState::Approval {
2047 eval_req: *eval_req.clone(),
2048 eval_res: eval_res.clone(),
2049 },
2050 ),
2051 },
2052 ctx,
2053 )
2054 .await;
2055
2056 if let Err(e) = self
2057 .build_approval(ctx, *eval_req, evaluator_res)
2058 .await
2059 {
2060 error!(
2061 msg_type = "EvaluationRes",
2062 request_id = %self.id,
2063 error = %e,
2064 "Failed to build approval"
2065 );
2066 self.match_error(ctx, e).await;
2067 return Ok(());
2068 }
2069 } else {
2070 debug!(
2071 msg_type = "EvaluationRes",
2072 request_id = %self.id,
2073 "Approval not required, proceeding to validation phase"
2074 );
2075 let (request, quorum, signers, init_state) = match self
2076 .build_validation_req(
2077 ctx,
2078 Some((*eval_req, eval_res)),
2079 None,
2080 )
2081 .await
2082 {
2083 Ok(data) => data,
2084 Err(e) => {
2085 error!(
2086 msg_type = "EvaluationRes",
2087 request_id = %self.id,
2088 error = %e,
2089 "Failed to build validation request"
2090 );
2091 self.match_error(ctx, e).await;
2092 return Ok(());
2093 }
2094 };
2095
2096 if let Err(e) = self
2097 .run_validation(
2098 ctx, request, quorum, signers, init_state,
2099 )
2100 .await
2101 {
2102 error!(
2103 msg_type = "EvaluationRes",
2104 request_id = %self.id,
2105 error = %e,
2106 "Failed to run validation"
2107 );
2108 self.match_error(ctx, e).await;
2109 return Ok(());
2110 };
2111 }
2112 }
2113 }
2114 RequestManagerMessage::ApprovalRes {
2115 appro_res,
2116 request_id,
2117 } => {
2118 if request_id == self.id {
2119 let _ = make_obsolete(ctx, &self.subject_id).await;
2120 debug!(
2121 msg_type = "ApprovalRes",
2122 request_id = %self.id,
2123 version = self.version,
2124 "Approval result received"
2125 );
2126 if let Err(e) = self.stops_childs(ctx).await {
2127 error!(
2128 msg_type = "ApprovalRes",
2129 request_id = %self.id,
2130 error = %e,
2131 "Failed to stop childs"
2132 );
2133 self.match_error(ctx, e).await;
2134 return Ok(());
2135 };
2136
2137 let RequestManagerState::Approval { eval_req, eval_res } =
2138 self.state.clone()
2139 else {
2140 error!(
2141 msg_type = "ApprovalRes",
2142 request_id = %self.id,
2143 state = ?self.state,
2144 "Invalid state for approval response"
2145 );
2146 let e = ActorError::FunctionalCritical {
2147 description: "Invalid request state".to_owned(),
2148 };
2149 return Err(emit_fail(ctx, e).await);
2150 };
2151 let (request, quorum, signers, init_state) = match self
2152 .build_validation_req(
2153 ctx,
2154 Some((eval_req, eval_res)),
2155 Some(appro_res),
2156 )
2157 .await
2158 {
2159 Ok(data) => data,
2160 Err(e) => {
2161 error!(
2162 msg_type = "ApprovalRes",
2163 request_id = %self.id,
2164 error = %e,
2165 "Failed to build validation request"
2166 );
2167 self.match_error(ctx, e).await;
2168 return Ok(());
2169 }
2170 };
2171
2172 if let Err(e) = self
2173 .run_validation(
2174 ctx, request, quorum, signers, init_state,
2175 )
2176 .await
2177 {
2178 error!(
2179 msg_type = "ApprovalRes",
2180 request_id = %self.id,
2181 error = %e,
2182 "Failed to run validation"
2183 );
2184 self.match_error(ctx, e).await;
2185 return Ok(());
2186 };
2187 }
2188 }
2189 RequestManagerMessage::ValidationRes {
2190 val_res,
2191 val_req,
2192 request_id,
2193 } => {
2194 if request_id == self.id {
2195 debug!(
2196 msg_type = "ValidationRes",
2197 request_id = %self.id,
2198 version = self.version,
2199 "Validation result received"
2200 );
2201 if let Err(e) = self.stops_childs(ctx).await {
2202 error!(
2203 msg_type = "ValidationRes",
2204 request_id = %self.id,
2205 error = %e,
2206 "Failed to stop childs"
2207 );
2208 self.match_error(ctx, e).await;
2209 return Ok(());
2210 };
2211
2212 let signed_ledger =
2213 match self.build_ledger(ctx, *val_req, val_res).await {
2214 Ok(signed_ledger) => signed_ledger,
2215 Err(e) => {
2216 error!(
2217 msg_type = "ValidationRes",
2218 request_id = %self.id,
2219 error = %e,
2220 "Failed to build ledger"
2221 );
2222 self.match_error(ctx, e).await;
2223 return Ok(());
2224 }
2225 };
2226
2227 if let Err(e) =
2228 self.update_subject(ctx, signed_ledger.clone()).await
2229 {
2230 error!(
2231 msg_type = "ValidationRes",
2232 request_id = %self.id,
2233 error = %e,
2234 "Failed to update subject"
2235 );
2236 self.match_error(ctx, e).await;
2237 return Ok(());
2238 };
2239
2240 match self.build_distribution(ctx, signed_ledger).await {
2241 Ok(in_distribution) => {
2242 if !in_distribution
2243 && let Err(e) = self.finish_request(ctx).await
2244 {
2245 error!(
2246 msg_type = "ValidationRes",
2247 request_id = %self.id,
2248 error = %e,
2249 "Failed to finish request after build distribution"
2250 );
2251 self.match_error(ctx, e).await;
2252 return Ok(());
2253 }
2254 }
2255 Err(e) => {
2256 error!(
2257 msg_type = "ValidationRes",
2258 request_id = %self.id,
2259 error = %e,
2260 "Failed to build distribution"
2261 );
2262 self.match_error(ctx, e).await;
2263 return Ok(());
2264 }
2265 };
2266 }
2267 }
2268 RequestManagerMessage::FinishRequest { request_id } => {
2269 if request_id == self.id {
2270 debug!(
2271 msg_type = "FinishRequest",
2272 request_id = %self.id,
2273 version = self.version,
2274 "Finishing request"
2275 );
2276
2277 if let Err(e) = self.stops_childs(ctx).await {
2278 error!(
2279 msg_type = "FinishRequest",
2280 request_id = %self.id,
2281 error = %e,
2282 "Failed to stop childs"
2283 );
2284 self.match_error(ctx, e).await;
2285 return Ok(());
2286 };
2287
2288 if let Err(e) = self.finish_request(ctx).await {
2289 error!(
2290 msg_type = "FinishRequest",
2291 request_id = %self.id,
2292 error = %e,
2293 "Failed to finish request"
2294 );
2295 self.match_error(ctx, e).await;
2296 return Ok(());
2297 }
2298 }
2299 }
2300 }
2301
2302 Ok(())
2303 }
2304
2305 async fn on_event(
2306 &mut self,
2307 event: RequestManagerEvent,
2308 ctx: &mut ActorContext<Self>,
2309 ) {
2310 let event_type = match &event {
2311 RequestManagerEvent::Finish => "Finish",
2312 RequestManagerEvent::UpdateState { .. } => "UpdateState",
2313 RequestManagerEvent::UpdateVersion { .. } => "UpdateVersion",
2314 RequestManagerEvent::SafeState { .. } => "SafeState",
2315 };
2316
2317 if let Err(e) = self.persist(&event, ctx).await {
2318 error!(
2319 event_type = event_type,
2320 request_id = %self.id,
2321 error = %e,
2322 "Failed to persist event"
2323 );
2324 emit_fail(ctx, e).await;
2325 };
2326 }
2327
2328 async fn on_child_fault(
2329 &mut self,
2330 error: ActorError,
2331 ctx: &mut ActorContext<Self>,
2332 ) -> ChildAction {
2333 error!(
2334 request_id = %self.id,
2335 version = self.version,
2336 state = ?self.state,
2337 error = %error,
2338 "Child fault in request manager"
2339 );
2340 emit_fail(ctx, error).await;
2341 ChildAction::Stop
2342 }
2343}
2344
2345#[async_trait]
2346impl PersistentActor for RequestManager {
2347 type Persistence = LightPersistence;
2348 type InitParams = InitRequestManager;
2349
2350 fn update(&mut self, state: Self) {
2351 self.command = state.command;
2352 self.request = state.request;
2353 self.state = state.state;
2354 self.version = state.version;
2355 }
2356
2357 fn create_initial(params: Self::InitParams) -> Self {
2358 Self {
2359 retry_diff: 0,
2360 retry_timeout: 0,
2361 our_key: params.our_key,
2362 id: DigestIdentifier::default(),
2363 subject_id: params.subject_id,
2364 command: ReqManInitMessage::Evaluate,
2365 request: None,
2366 state: RequestManagerState::Starting,
2367 version: 0,
2368 helpers: Some(params.helpers),
2369 }
2370 }
2371
2372 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2374 match event {
2375 RequestManagerEvent::Finish => {
2376 debug!(
2377 event_type = "Finish",
2378 request_id = %self.id,
2379 "Applying finish event"
2380 );
2381 self.state = RequestManagerState::End;
2382 self.request = None;
2383 self.id = DigestIdentifier::default();
2384 }
2385 RequestManagerEvent::UpdateState { state } => {
2386 debug!(
2387 event_type = "UpdateState",
2388 request_id = %self.id,
2389 old_state = ?self.state,
2390 new_state = ?state,
2391 "Applying state update"
2392 );
2393 self.state = *state.clone()
2394 }
2395 RequestManagerEvent::UpdateVersion { version } => {
2396 debug!(
2397 event_type = "UpdateVersion",
2398 request_id = %self.id,
2399 old_version = self.version,
2400 new_version = version,
2401 "Applying version update"
2402 );
2403 self.state = RequestManagerState::Starting;
2404 self.version = *version
2405 }
2406 RequestManagerEvent::SafeState { command, request } => {
2407 debug!(
2408 event_type = "SafeState",
2409 request_id = %self.id,
2410 command = ?command,
2411 "Applying safe state"
2412 );
2413 self.version = 0;
2414 self.retry_diff = 0;
2415 self.retry_timeout = 0;
2416 self.state = RequestManagerState::Starting;
2417 self.request = Some(request.clone());
2418 self.command = command.clone();
2419 }
2420 };
2421
2422 Ok(())
2423 }
2424}
2425
2426#[async_trait]
2427impl Storable for RequestManager {}