1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9 ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12 DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16 RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29 ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::model::common::node::{get_subject_data, i_owner_new_owner};
38use crate::model::common::subject::{get_gov, get_version};
39use crate::model::common::{
40 check_subject_creation, emit_fail, send_to_tracking,
41};
42use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
43use crate::request::manager::InitRequestManager;
44use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
45use crate::system::ConfigHelper;
46
47pub mod error;
48pub mod manager;
49pub mod reboot;
50pub mod tracking;
51pub mod types;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct RequestData {
55 pub request_id: DigestIdentifier,
56 pub subject_id: DigestIdentifier,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct RequestHandler {
61 #[serde(skip)]
62 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
63 #[serde(skip)]
64 our_key: Arc<PublicKey>,
65 handling: HashMap<DigestIdentifier, DigestIdentifier>,
66 in_queue: HashMap<
67 DigestIdentifier,
68 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
69 >,
70}
71
72impl BorshSerialize for RequestHandler {
73 fn serialize<W: std::io::Write>(
74 &self,
75 writer: &mut W,
76 ) -> std::io::Result<()> {
77 BorshSerialize::serialize(&self.handling, writer)?;
79 BorshSerialize::serialize(&self.in_queue, writer)?;
80 Ok(())
81 }
82}
83
84impl BorshDeserialize for RequestHandler {
85 fn deserialize_reader<R: std::io::Read>(
86 reader: &mut R,
87 ) -> std::io::Result<Self> {
88 let handling =
90 HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
91 reader,
92 )?;
93 let in_queue = HashMap::<
94 DigestIdentifier,
95 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
96 >::deserialize_reader(reader)?;
97
98 let our_key = Arc::new(PublicKey::default());
99
100 Ok(Self {
101 helpers: None,
102 our_key,
103 handling,
104 in_queue,
105 })
106 }
107}
108
109impl RequestHandler {
110 async fn check_signature(
111 ctx: &mut ActorContext<Self>,
112 our_key: PublicKey,
113 signer: PublicKey,
114 governance_id: &DigestIdentifier,
115 event_request: &EventRequestType,
116 subject_data: SubjectData,
117 ) -> Result<(), ActorError> {
118 match event_request {
119 EventRequestType::Create
120 | EventRequestType::Transfer
121 | EventRequestType::Confirm
122 | EventRequestType::Reject
123 | EventRequestType::Eol => {
124 if signer != our_key {
125 return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
126 }
127 }
128 EventRequestType::Fact => {
129 let gov = get_gov(ctx, governance_id).await?;
130 match subject_data {
131 SubjectData::Tracker {
132 schema_id,
133 namespace,
134 ..
135 } => {
136 if !gov.has_this_role(HashThisRole::Schema {
137 who: signer,
138 role: RoleTypes::Issuer,
139 schema_id,
140 namespace: Namespace::from(namespace),
141 }) {
142 return Err(ActorError::Functional {
143 description:
144 "In fact events, the signer has to be an issuer"
145 .to_string(),
146 });
147 }
148 }
149 SubjectData::Governance { .. } => {
150 if !gov.has_this_role(HashThisRole::Gov {
151 who: signer,
152 role: RoleTypes::Issuer,
153 }) {
154 return Err(ActorError::Functional {
155 description:
156 "In fact events, the signer has to be an issuer"
157 .to_string(),
158 });
159 }
160 }
161 }
162 }
163 }
164
165 Ok(())
166 }
167
168 async fn queued_event(
169 ctx: &ActorContext<Self>,
170 subject_id: &DigestIdentifier,
171 ) -> Result<(), ActorError> {
172 let request_actor = ctx.reference().await?;
173 request_actor
174 .tell(RequestHandlerMessage::PopQueue {
175 subject_id: subject_id.to_owned(),
176 })
177 .await
178 }
179
180 async fn error_queue_handling(
181 &mut self,
182 ctx: &mut ActorContext<Self>,
183 error: String,
184 subject_id: &DigestIdentifier,
185 request_id: &DigestIdentifier,
186 ) -> Result<(), ActorError> {
187 self.on_event(
188 RequestHandlerEvent::Invalid {
189 subject_id: subject_id.to_owned(),
190 },
191 ctx,
192 )
193 .await;
194
195 send_to_tracking(
196 ctx,
197 RequestTrackingMessage::UpdateState {
198 request_id: request_id.clone(),
199 state: RequestState::Invalid {
200 error,
201 sn: None,
202 subject_id: subject_id.to_string(),
203 who: self.our_key.to_string(),
204 },
205 },
206 )
207 .await?;
208
209 Self::queued_event(ctx, subject_id).await
210 }
211
212 async fn change_approval(
213 ctx: &ActorContext<Self>,
214 subject_id: &DigestIdentifier,
215 state: ApprovalStateRes,
216 ) -> Result<(), RequestHandlerError> {
217 if state == ApprovalStateRes::Obsolete {
218 return Err(RequestHandlerError::ObsoleteApproval);
219 }
220
221 let approver_path =
222 ActorPath::from(format!("/user/node/{}/approver", subject_id));
223 let approver_actor = ctx
224 .system()
225 .get_actor::<ApprPersist>(&approver_path)
226 .await
227 .map_err(|_| {
228 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
229 })?;
230
231 approver_actor
232 .tell(ApprPersistMessage::ChangeResponse {
233 response: state.clone(),
234 })
235 .await
236 .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
237 }
238
239 async fn get_approval(
240 ctx: &ActorContext<Self>,
241 subject_id: &DigestIdentifier,
242 state: Option<ApprovalState>,
243 ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
244 let approver_path =
245 ActorPath::from(format!("/user/node/{}/approver", subject_id));
246 let approver_actor = ctx
247 .system()
248 .get_actor::<ApprPersist>(&approver_path)
249 .await
250 .map_err(|_| {
251 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
252 })?;
253
254 let response = approver_actor
255 .ask(ApprPersistMessage::GetApproval { state })
256 .await
257 .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
258
259 let res = match response {
260 ApprPersistResponse::Ok => None,
261 ApprPersistResponse::Approval { request, state } => {
262 Some((request, state))
263 }
264 };
265
266 Ok(res)
267 }
268
269 async fn get_all_approvals(
270 ctx: &ActorContext<Self>,
271 state: Option<ApprovalState>,
272 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
273 let node_path = ActorPath::from("/user/node");
274 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
275 let response = node_actor.ask(NodeMessage::GetGovernances).await?;
276 let vec = match response {
277 NodeResponse::Governances(govs) => govs,
278 _ => {
279 return Err(ActorError::UnexpectedResponse {
280 path: node_path,
281 expected: "NodeResponse::Governances".to_string(),
282 });
283 }
284 };
285
286 let mut responses = vec![];
287 for governance in vec.iter() {
288 let approver_path =
289 ActorPath::from(format!("/user/node/{}/approver", governance));
290 if let Ok(approver_actor) =
291 ctx.system().get_actor::<ApprPersist>(&approver_path).await
292 {
293 let response = approver_actor
294 .ask(ApprPersistMessage::GetApproval {
295 state: state.clone(),
296 })
297 .await?;
298
299 match response {
300 ApprPersistResponse::Ok => {}
301 ApprPersistResponse::Approval { request, state } => {
302 responses.push((request, state))
303 }
304 };
305 };
306 }
307
308 Ok(responses)
309 }
310
311 async fn check_owner_new_owner(
312 ctx: &mut ActorContext<Self>,
313 request: &EventRequest,
314 ) -> Result<(), RequestHandlerError> {
315 match request {
316 EventRequest::Create(..) => {}
317 EventRequest::Fact(..)
318 | EventRequest::Transfer(..)
319 | EventRequest::EOL(..) => {
320 let subject_id = request.get_subject_id();
321 let (i_owner, i_new_owner) =
322 i_owner_new_owner(ctx, &subject_id).await?;
323 if !i_owner {
324 return Err(RequestHandlerError::NotOwner(
325 subject_id.to_string(),
326 ));
327 }
328
329 if i_new_owner.is_some() {
330 return Err(RequestHandlerError::PendingNewOwner(
331 subject_id.to_string(),
332 ));
333 }
334 }
335 EventRequest::Confirm(..) | EventRequest::Reject(..) => {
336 let subject_id = request.get_subject_id();
337 let (i_owner, i_new_owner) =
338 i_owner_new_owner(ctx, &subject_id).await?;
339 if i_owner {
340 return Err(RequestHandlerError::IsOwner(
341 subject_id.to_string(),
342 ));
343 }
344
345 if let Some(new_owner) = i_new_owner {
346 if !new_owner {
347 return Err(RequestHandlerError::NotNewOwner(
348 subject_id.to_string(),
349 ));
350 }
351 } else {
352 return Err(RequestHandlerError::NoNewOwnerPending(
353 subject_id.to_string(),
354 ));
355 }
356 }
357 };
358 Ok(())
359 }
360
361 fn check_event_request(
362 request: &EventRequest,
363 is_gov: bool,
364 ) -> Result<(), RequestHandlerError> {
365 match request {
366 EventRequest::Create(create_request) => {
367 if let Some(name) = &create_request.name
368 && (name.is_empty() || name.len() > 100)
369 {
370 return Err(RequestHandlerError::InvalidName);
371 }
372
373 if let Some(description) = &create_request.description
374 && (description.is_empty() || description.len() > 200)
375 {
376 return Err(RequestHandlerError::InvalidDescription);
377 }
378
379 if !create_request.schema_id.is_valid_in_request() {
380 return Err(RequestHandlerError::InvalidSchemaId);
381 }
382
383 if is_gov {
384 if !create_request.governance_id.is_empty() {
385 return Err(
386 RequestHandlerError::GovernanceIdMustBeEmpty,
387 );
388 }
389
390 if !create_request.namespace.is_empty() {
391 return Err(RequestHandlerError::NamespaceMustBeEmpty);
392 }
393 } else if create_request.governance_id.is_empty() {
394 return Err(RequestHandlerError::GovernanceIdRequired);
395 }
396 }
397 EventRequest::Transfer(transfer_request) => {
398 if transfer_request.new_owner.is_empty() {
399 return Err(RequestHandlerError::TransferNewOwnerEmpty);
400 }
401 }
402 EventRequest::Confirm(confirm_request) => {
403 if is_gov {
404 if let Some(name_old_owner) =
405 &confirm_request.name_old_owner
406 && name_old_owner.is_empty()
407 {
408 return Err(
409 RequestHandlerError::ConfirmNameOldOwnerEmpty,
410 );
411 }
412 } else if confirm_request.name_old_owner.is_some() {
413 return Err(
414 RequestHandlerError::ConfirmTrackerNameOldOwner,
415 );
416 }
417 }
418 EventRequest::Fact(fact_request) => {
419 if is_gov
420 && serde_json::from_value::<GovernanceEvent>(
421 fact_request.payload.0.clone(),
422 )
423 .is_err()
424 {
425 return Err(RequestHandlerError::GovFactInvalidEvent);
426 }
427 }
428 EventRequest::Reject(..) | EventRequest::EOL(..) => {}
429 }
430
431 Ok(())
432 }
433
434 async fn build_subject_data(
435 ctx: &mut ActorContext<Self>,
436 request: &EventRequest,
437 ) -> Result<SubjectData, RequestHandlerError> {
438 let subject_data = match request {
439 EventRequest::Create(create_request) => {
440 if create_request.schema_id.is_gov() {
441 SubjectData::Governance { active: true }
442 } else {
443 SubjectData::Tracker {
444 governance_id: create_request.governance_id.clone(),
445 schema_id: create_request.schema_id.clone(),
446 namespace: create_request.namespace.to_string(),
447 active: true,
448 }
449 }
450 }
451 EventRequest::Fact(..)
452 | EventRequest::Transfer(..)
453 | EventRequest::Confirm(..)
454 | EventRequest::Reject(..)
455 | EventRequest::EOL(..) => {
456 let subject_id = request.get_subject_id();
457 let Some(subject_data) =
458 get_subject_data(ctx, &subject_id).await?
459 else {
460 return Err(RequestHandlerError::SubjectDataNotFound(
461 subject_id.to_string(),
462 ));
463 };
464
465 subject_data
466 }
467 };
468
469 Ok(subject_data)
470 }
471
472 async fn check_creation(
473 ctx: &mut ActorContext<Self>,
474 subject_data: SubjectData,
475 event_request: &EventRequestType,
476 signer: PublicKey,
477 ) -> Result<(), ActorError> {
478 match event_request {
479 EventRequestType::Create | EventRequestType::Confirm => {
480 if let SubjectData::Tracker {
481 governance_id,
482 schema_id,
483 namespace,
484 ..
485 } = subject_data
486 {
487 let version = get_version(ctx, &governance_id).await?;
488 check_subject_creation(
489 ctx,
490 &governance_id,
491 signer,
492 version,
493 namespace,
494 schema_id,
495 )
496 .await?;
497 }
498 }
499 _ => {}
500 }
501
502 Ok(())
503 }
504
505 fn build_request_id_subject_id(
506 hash: HashAlgorithm,
507 request: &Signed<EventRequest>,
508 ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
509 match &request.content() {
510 EventRequest::Create(..) => {
511 let request_id = hash_borsh(
512 &*hash.hasher(),
513 &(request.clone(), TimeStamp::now().as_nanos()),
514 )
515 .map_err(|e| {
516 RequestHandlerError::RequestIdHash(e.to_string())
517 })?;
518
519 let subject_id =
520 hash_borsh(&*hash.hasher(), request).map_err(|e| {
521 RequestHandlerError::SubjectIdHash(e.to_string())
522 })?;
523
524 Ok((request_id, subject_id))
525 }
526 EventRequest::Fact(..)
527 | EventRequest::Transfer(..)
528 | EventRequest::Confirm(..)
529 | EventRequest::Reject(..)
530 | EventRequest::EOL(..) => {
531 let request_id = hash_borsh(
532 &*hash.hasher(),
533 &(request.clone(), TimeStamp::now().as_nanos()),
534 )
535 .map_err(|e| {
536 RequestHandlerError::RequestIdHash(e.to_string())
537 })?;
538
539 Ok((request_id, request.content().get_subject_id()))
540 }
541 }
542 }
543
544 async fn handle_queue_request(
545 &mut self,
546 ctx: &mut ActorContext<Self>,
547 request: Signed<EventRequest>,
548 request_id: &DigestIdentifier,
549 subject_id: &DigestIdentifier,
550 is_gov: bool,
551 ) -> Result<(), ActorError> {
552 let Some(helpers) = self.helpers.clone() else {
553 let e = " Can not obtain helpers".to_string();
554
555 return Err(ActorError::FunctionalCritical { description: e });
556 };
557
558 let in_handling = self.handling.contains_key(subject_id);
559 let in_queue = self.in_queue.contains_key(subject_id);
560
561 if !in_handling && !in_queue {
562 let command = Self::build_req_manager_init_msg(
563 &EventRequestType::from(request.content()),
564 is_gov,
565 );
566 let init_data = InitRequestManager {
567 our_key: self.our_key.clone(),
568 subject_id: subject_id.clone(),
569 helpers,
570 };
571
572 let actor = ctx
573 .create_child(
574 &subject_id.to_string(),
575 RequestManager::initial(init_data),
576 )
577 .await?;
578 actor
579 .tell(RequestManagerMessage::FirstRun {
580 command,
581 request,
582 request_id: request_id.clone(),
583 })
584 .await?;
585
586 self.on_event(
587 RequestHandlerEvent::EventToHandling {
588 subject_id: subject_id.clone(),
589 request_id: request_id.clone(),
590 },
591 ctx,
592 )
593 .await;
594
595 send_to_tracking(
596 ctx,
597 RequestTrackingMessage::UpdateState {
598 request_id: request_id.clone(),
599 state: RequestState::Handling,
600 },
601 )
602 .await?;
603 } else {
604 self.on_event(
605 RequestHandlerEvent::EventToQueue {
606 subject_id: subject_id.clone(),
607 event: request,
608 request_id: request_id.clone(),
609 },
610 ctx,
611 )
612 .await;
613
614 send_to_tracking(
615 ctx,
616 RequestTrackingMessage::UpdateState {
617 request_id: request_id.clone(),
618 state: RequestState::InQueue,
619 },
620 )
621 .await?;
622 }
623
624 Ok(())
625 }
626
627 const fn build_req_manager_init_msg(
628 event_request: &EventRequestType,
629 is_gov: bool,
630 ) -> ReqManInitMessage {
631 match event_request {
632 EventRequestType::Create => ReqManInitMessage::Validate,
633 EventRequestType::Fact => ReqManInitMessage::Evaluate,
634 EventRequestType::Transfer => ReqManInitMessage::Evaluate,
635 EventRequestType::Confirm => {
636 if is_gov {
637 ReqManInitMessage::Evaluate
638 } else {
639 ReqManInitMessage::Validate
640 }
641 }
642 EventRequestType::Reject => ReqManInitMessage::Validate,
643 EventRequestType::Eol => ReqManInitMessage::Validate,
644 }
645 }
646
647 async fn check_in_queue(
648 ctx: &mut ActorContext<Self>,
649 request: &Signed<EventRequest>,
650 our_key: PublicKey,
651 ) -> Result<bool, RequestHandlerError> {
652 if let EventRequest::Create(..) = request.content() {
653 return Err(RequestHandlerError::CreationNotQueued);
654 }
655
656 Self::check_owner_new_owner(ctx, request.content()).await?;
657
658 let subject_data =
659 Self::build_subject_data(ctx, request.content()).await?;
660 let event_request_type = EventRequestType::from(request.content());
661 let signer = request.signature().signer.clone();
662 let governance_id = subject_data
663 .get_governance_id()
664 .unwrap_or_else(|| request.content().get_subject_id());
665 let is_gov = subject_data.get_schema_id().is_gov();
666
667 if !subject_data.get_active() {
668 return Err(RequestHandlerError::SubjectNotActive(
669 request.content().get_subject_id().to_string(),
670 ));
671 }
672
673 Self::check_signature(
674 ctx,
675 our_key,
676 signer.clone(),
677 &governance_id,
678 &event_request_type,
679 subject_data.clone(),
680 )
681 .await?;
682
683 Self::check_creation(ctx, subject_data, &event_request_type, signer)
684 .await?;
685
686 Ok(is_gov)
687 }
688
689 async fn in_queue_to_handling(
690 &mut self,
691 ctx: &mut ActorContext<Self>,
692 request: Signed<EventRequest>,
693 request_id: &DigestIdentifier,
694 is_gov: bool,
695 ) -> Result<(), ActorError> {
696 let command = Self::build_req_manager_init_msg(
697 &EventRequestType::from(request.content()),
698 is_gov,
699 );
700 let subject_id = request.content().get_subject_id();
701
702 let actor = ctx
703 .get_child::<RequestManager>(&subject_id.to_string())
704 .await?;
705
706 actor
707 .tell(RequestManagerMessage::FirstRun {
708 command,
709 request,
710 request_id: request_id.clone(),
711 })
712 .await?;
713
714 self.on_event(
715 RequestHandlerEvent::EventToHandling {
716 subject_id: subject_id.clone(),
717 request_id: request_id.clone(),
718 },
719 ctx,
720 )
721 .await;
722
723 send_to_tracking(
724 ctx,
725 RequestTrackingMessage::UpdateState {
726 request_id: request_id.clone(),
727 state: RequestState::Handling,
728 },
729 )
730 .await
731 }
732
733 async fn end_child(
734 ctx: &ActorContext<Self>,
735 subject_id: &DigestIdentifier,
736 ) -> Result<(), ActorError> {
737 let actor = ctx
738 .get_child::<RequestManager>(&subject_id.to_string())
739 .await?;
740 actor.ask_stop().await
741 }
742
743 async fn manual_abort_request(
744 &self,
745 ctx: &ActorContext<Self>,
746 subject_id: &DigestIdentifier,
747 ) -> Result<(), ActorError> {
748 let actor = ctx
749 .get_child::<RequestManager>(&subject_id.to_string())
750 .await?;
751
752 actor.tell(RequestManagerMessage::ManualAbort).await
753 }
754}
755
756#[derive(Debug, Clone)]
757pub enum RequestHandlerMessage {
758 NewRequest {
759 request: Signed<EventRequest>,
760 },
761 RequestInManager,
762 RequestInManagerSubjectId {
763 subject_id: DigestIdentifier,
764 },
765 ChangeApprovalState {
766 subject_id: DigestIdentifier,
767 state: ApprovalStateRes,
768 },
769 GetApproval {
770 subject_id: DigestIdentifier,
771 state: Option<ApprovalState>,
772 },
773 GetAllApprovals {
774 state: Option<ApprovalState>,
775 },
776 PopQueue {
777 subject_id: DigestIdentifier,
778 },
779 EndHandling {
780 subject_id: DigestIdentifier,
781 },
782 AbortRequest {
783 subject_id: DigestIdentifier,
784 },
785}
786
787impl Message for RequestHandlerMessage {}
788
789#[derive(Debug, Clone)]
790pub enum RequestHandlerResponse {
791 RequestInManager(RequestsInManager),
792 RequestInManagerSubjectId(RequestsInManagerSubject),
793 Ok(RequestData),
794 Response(String),
795 Approval(Option<(ApprovalReq, ApprovalState)>),
796 Approvals(Vec<(ApprovalReq, ApprovalState)>),
797 None,
798}
799
800impl Response for RequestHandlerResponse {}
801
802#[derive(
803 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
804)]
805pub enum RequestHandlerEvent {
806 EventToQueue {
807 subject_id: DigestIdentifier,
808 event: Signed<EventRequest>,
809 request_id: DigestIdentifier,
810 },
811 Invalid {
812 subject_id: DigestIdentifier,
813 },
814 FinishHandling {
815 subject_id: DigestIdentifier,
816 },
817 EventToHandling {
818 subject_id: DigestIdentifier,
819 request_id: DigestIdentifier,
820 },
821}
822
823impl Event for RequestHandlerEvent {}
824
825#[async_trait]
826impl Actor for RequestHandler {
827 type Event = RequestHandlerEvent;
828 type Message = RequestHandlerMessage;
829 type Response = RequestHandlerResponse;
830
831 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
832 parent_span.map_or_else(
833 || info_span!("RequestHandler"),
834 |parent_span| info_span!(parent: parent_span, "RequestHandler"),
835 )
836 }
837
838 async fn pre_start(
839 &mut self,
840 ctx: &mut ActorContext<Self>,
841 ) -> Result<(), ActorError> {
842 if let Err(e) = self.init_store("request", None, false, ctx).await {
843 error!(
844 error = %e,
845 "Failed to initialize store during pre_start"
846 );
847 return Err(e);
848 }
849
850 let Some(ext_db): Option<Arc<ExternalDB>> =
851 ctx.system().get_helper("ext_db").await
852 else {
853 error!("External database helper not found");
854 return Err(ActorError::Helper {
855 name: "ext_db".to_string(),
856 reason: "Not found".to_string(),
857 });
858 };
859
860 let tracking_size = if let Some(config) =
861 ctx.system().get_helper::<ConfigHelper>("config").await
862 {
863 config.tracking_size
864 } else {
865 error!(
866 helper = "config",
867 "Config helper not found during pre_start"
868 );
869 return Err(ActorError::Helper {
870 name: "config".to_owned(),
871 reason: "Not found".to_string(),
872 });
873 };
874
875 let tracking = match ctx
876 .create_child("tracking", RequestTracking::new(tracking_size))
877 .await
878 {
879 Ok(actor) => actor,
880 Err(e) => {
881 error!(
882 error = %e,
883 "Failed to create tracking child during pre_start"
884 );
885 return Err(e);
886 }
887 };
888
889 let sink =
890 Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
891
892 ctx.system().run_sink(sink).await;
893
894 let Some((hash, network)) = self.helpers.clone() else {
895 let e = " Can not obtain helpers".to_string();
896 error!(
897 error = %e,
898 "Failed to obtain helpers during pre_start"
899 );
900 ctx.system().crash_system();
901 return Err(ActorError::FunctionalCritical { description: e });
902 };
903
904 for (subject_id, request_id) in self.handling.clone() {
905 let request_manager_init = InitRequestManager {
906 our_key: self.our_key.clone(),
907 subject_id: subject_id.clone(),
908 helpers: (hash, network.clone()),
909 };
910
911 let request_manager_actor = match ctx
912 .create_child(
913 &subject_id.to_string(),
914 RequestManager::initial(request_manager_init),
915 )
916 .await
917 {
918 Ok(actor) => actor,
919 Err(e) => {
920 error!(
921 subject_id = %subject_id,
922 error = %e,
923 "Failed to create request manager child during pre_start"
924 );
925 return Err(e);
926 }
927 };
928
929 if let Err(e) = request_manager_actor
930 .tell(RequestManagerMessage::Run {
931 request_id: request_id.clone(),
932 })
933 .await
934 {
935 error!(
936 subject_id = %subject_id,
937 request_id = %request_id,
938 error = %e,
939 "Failed to send Run message to request manager during pre_start"
940 );
941 return Err(e);
942 }
943 }
944
945 Ok(())
946 }
947}
948
949#[async_trait]
950impl Handler<Self> for RequestHandler {
951 async fn handle_message(
952 &mut self,
953 _sender: ActorPath,
954 msg: RequestHandlerMessage,
955 ctx: &mut ave_actors::ActorContext<Self>,
956 ) -> Result<RequestHandlerResponse, ActorError> {
957 match msg {
958 RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
959 let handling =
960 self.handling.get(&subject_id).map(|x| x.to_string());
961 let in_queue = self.in_queue.get(&subject_id).map(|x| {
962 x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
963 });
964
965 Ok(RequestHandlerResponse::RequestInManagerSubjectId(
966 RequestsInManagerSubject { handling, in_queue },
967 ))
968 }
969 RequestHandlerMessage::RequestInManager => Ok(
970 RequestHandlerResponse::RequestInManager(RequestsInManager {
971 handling: self
972 .handling
973 .iter()
974 .map(|x| (x.0.to_string(), x.1.to_string()))
975 .collect(),
976 in_queue: self
977 .in_queue
978 .iter()
979 .map(|x| {
980 (
981 x.0.to_string(),
982 x.1.iter()
983 .map(|x| x.1.to_string())
984 .collect::<Vec<String>>(),
985 )
986 })
987 .collect(),
988 }),
989 ),
990 RequestHandlerMessage::AbortRequest { subject_id } => {
991 self.manual_abort_request(ctx, &subject_id).await?;
992 Ok(RequestHandlerResponse::None)
993 }
994 RequestHandlerMessage::ChangeApprovalState {
995 subject_id,
996 state,
997 } => {
998 Self::change_approval(ctx, &subject_id, state.clone())
999 .await
1000 .map_err(|e| {
1001 error!(
1002 error = %e,
1003 "ChangeApprovalState failed"
1004 );
1005 ActorError::from(e)
1006 })?;
1007
1008 Ok(RequestHandlerResponse::Response(format!(
1009 "The approval request for subject {} has changed to {}",
1010 subject_id, state
1011 )))
1012 }
1013 RequestHandlerMessage::GetApproval { subject_id, state } => {
1014 let res = Self::get_approval(ctx, &subject_id, state.clone())
1015 .await
1016 .map_err(ActorError::from)?;
1017
1018 Ok(RequestHandlerResponse::Approval(res))
1019 }
1020 RequestHandlerMessage::GetAllApprovals { state } => {
1021 let res = Self::get_all_approvals(ctx, state.clone())
1022 .await
1023 .map_err(|e| {
1024 error!(
1025 error = %e,
1026 "GetAllApprovals failed"
1027 );
1028 e
1029 })?;
1030
1031 Ok(RequestHandlerResponse::Approvals(res))
1032 }
1033 RequestHandlerMessage::NewRequest { request } => {
1034 if let Err(e) = request.verify() {
1035 let err = RequestHandlerError::SignatureVerification(
1036 e.to_string(),
1037 );
1038 error!(error = %err, "Request signature verification failed");
1039 return Err(ActorError::from(err));
1040 };
1041
1042 let Some((hash, ..)) = self.helpers.clone() else {
1043 let err = RequestHandlerError::HelpersNotInitialized;
1044 error!(
1045 msg_type = "NewRequest",
1046 error = %err,
1047 "Helpers not initialized"
1048 );
1049 return Err(emit_fail(ctx, ActorError::from(err)).await);
1050 };
1051
1052 if let Err(e) =
1053 Self::check_owner_new_owner(ctx, request.content()).await
1054 {
1055 error!(
1056 msg_type = "NewRequest",
1057 error = %e,
1058 "Owner or new owner check failed"
1059 );
1060 return Err(ActorError::from(e));
1061 }
1062
1063 let subject_data = match Self::build_subject_data(
1064 ctx,
1065 request.content(),
1066 )
1067 .await
1068 {
1069 Ok(data) => data,
1070 Err(e) => {
1071 error!(
1072 msg_type = "NewRequest",
1073 error = %e,
1074 "Failed to build subject data"
1075 );
1076 return Err(ActorError::from(e));
1077 }
1078 };
1079 let event_request_type =
1080 EventRequestType::from(request.content());
1081 let signer = request.signature().signer.clone();
1082 let governance_id = subject_data
1083 .get_governance_id()
1084 .unwrap_or_else(|| request.content().get_subject_id());
1085 let is_gov = subject_data.get_schema_id().is_gov();
1086
1087 if !subject_data.get_active() {
1088 let subject_id = request.content().get_subject_id();
1089 error!(
1090 msg_type = "NewRequest",
1091 subject_id = %subject_id,
1092 "Subject is not active"
1093 );
1094 return Err(ActorError::from(
1095 RequestHandlerError::SubjectNotActive(
1096 subject_id.to_string(),
1097 ),
1098 ));
1099 }
1100
1101 if let Err(e) =
1102 Self::check_event_request(request.content(), is_gov)
1103 {
1104 error!(
1105 msg_type = "NewRequest",
1106 error = %e,
1107 "Event request validation failed"
1108 );
1109 return Err(ActorError::from(e));
1110 }
1111
1112 if let Err(e) = Self::check_signature(
1113 ctx,
1114 (*self.our_key).clone(),
1115 signer.clone(),
1116 &governance_id,
1117 &event_request_type,
1118 subject_data.clone(),
1119 )
1120 .await
1121 {
1122 error!(
1123 msg_type = "NewRequest",
1124 governance_id = %governance_id,
1125 error = %e,
1126 "Signature check failed"
1127 );
1128 return Err(e);
1129 }
1130
1131 if let Err(e) = Self::check_creation(
1132 ctx,
1133 subject_data,
1134 &event_request_type,
1135 signer,
1136 )
1137 .await
1138 {
1139 error!(
1140 msg_type = "NewRequest",
1141 error = %e,
1142 "Creation check failed"
1143 );
1144 return Err(e);
1145 }
1146
1147 let (request_id, subject_id) =
1148 match Self::build_request_id_subject_id(hash, &request) {
1149 Ok(ids) => ids,
1150 Err(e) => {
1151 error!(
1152 msg_type = "NewRequest",
1153 error = %e,
1154 "Failed to build request ID and subject ID"
1155 );
1156 return Err(ActorError::from(e));
1157 }
1158 };
1159
1160 if let Err(e) = self
1161 .handle_queue_request(
1162 ctx,
1163 request,
1164 &request_id,
1165 &subject_id,
1166 is_gov,
1167 )
1168 .await
1169 {
1170 error!(
1171 msg_type = "NewRequest",
1172 request_id = %request_id,
1173 subject_id = %subject_id,
1174 error = %e,
1175 "Failed to handle queue request"
1176 );
1177 return Err(e);
1178 }
1179
1180 Ok(RequestHandlerResponse::Ok(RequestData {
1181 request_id,
1182 subject_id,
1183 }))
1184 }
1185 RequestHandlerMessage::PopQueue { subject_id } => {
1186 let (event, request_id) = if let Some(events) =
1187 self.in_queue.get(&subject_id)
1188 {
1189 if let Some((event, request_id)) =
1190 events.clone().pop_front()
1191 {
1192 (event, request_id)
1193 } else {
1194 if let Err(e) = Self::end_child(ctx, &subject_id).await
1195 {
1196 error!(
1197 msg_type = "PopQueue",
1198 subject_id = %subject_id,
1199 error = %e,
1200 "Failed to end child actor when queue is empty"
1201 );
1202 ctx.system().crash_system();
1203 return Err(e);
1204 }
1205 return Ok(RequestHandlerResponse::None);
1206 }
1207 } else {
1208 if let Err(e) = Self::end_child(ctx, &subject_id).await {
1209 error!(
1210 msg_type = "PopQueue",
1211 subject_id = %subject_id,
1212 error = %e,
1213 "Failed to end child actor when no events available"
1214 );
1215 ctx.system().crash_system();
1216 return Err(e);
1217 }
1218 return Ok(RequestHandlerResponse::None);
1219 };
1220
1221 let is_gov = match Self::check_in_queue(
1222 ctx,
1223 &event,
1224 (*self.our_key).clone(),
1225 )
1226 .await
1227 {
1228 Ok(is_gov) => is_gov,
1229 Err(e) => {
1230 if let Err(e) = self
1231 .error_queue_handling(
1232 ctx,
1233 e.to_string(),
1234 &subject_id,
1235 &request_id,
1236 )
1237 .await
1238 {
1239 error!(
1240 msg_type = "PopQueue",
1241 subject_id = %subject_id,
1242 request_id = %request_id,
1243 error = %e,
1244 "Failed to handle queue error"
1245 );
1246 ctx.system().crash_system();
1247 return Err(e);
1248 };
1249
1250 return Ok(RequestHandlerResponse::None);
1251 }
1252 };
1253
1254 if let Err(e) = self
1255 .in_queue_to_handling(ctx, event, &request_id, is_gov)
1256 .await
1257 {
1258 error!(
1259 msg_type = "PopQueue",
1260 request_id = %request_id,
1261 error = %e,
1262 "Failed to transition from queue to handling"
1263 );
1264 ctx.system().crash_system();
1265 return Err(e);
1266 }
1267
1268 Ok(RequestHandlerResponse::None)
1269 }
1270 RequestHandlerMessage::EndHandling { subject_id } => {
1271 self.on_event(
1272 RequestHandlerEvent::FinishHandling {
1273 subject_id: subject_id.clone(),
1274 },
1275 ctx,
1276 )
1277 .await;
1278
1279 if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1280 error!(
1281 msg_type = "EndHandling",
1282 subject_id = %subject_id,
1283 error = %e,
1284 "Failed to enqueue next event"
1285 );
1286 ctx.system().crash_system();
1287 return Err(e);
1288 }
1289
1290 Ok(RequestHandlerResponse::None)
1291 }
1292 }
1293 }
1294
1295 async fn on_child_fault(
1296 &mut self,
1297 error: ActorError,
1298 ctx: &mut ActorContext<Self>,
1299 ) -> ChildAction {
1300 error!(
1301 error = %error,
1302 "Child fault in request handler"
1303 );
1304 ctx.system().crash_system();
1305 ChildAction::Stop
1306 }
1307
1308 async fn on_event(
1309 &mut self,
1310 event: RequestHandlerEvent,
1311 ctx: &mut ActorContext<Self>,
1312 ) {
1313 if let Err(e) = self.persist(&event, ctx).await {
1314 error!(
1315 error = %e,
1316 "Failed to persist event"
1317 );
1318 ctx.system().crash_system();
1319 };
1320 }
1321}
1322
1323#[async_trait]
1324impl Storable for RequestHandler {}
1325
1326#[async_trait]
1327impl PersistentActor for RequestHandler {
1328 type Persistence = LightPersistence;
1329 type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1330
1331 fn update(&mut self, state: Self) {
1332 self.in_queue = state.in_queue;
1333 self.handling = state.handling;
1334 }
1335
1336 fn create_initial(params: Self::InitParams) -> Self {
1337 Self {
1338 our_key: params.0,
1339 helpers: Some(params.1),
1340 handling: HashMap::new(),
1341 in_queue: HashMap::new(),
1342 }
1343 }
1344
1345 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1347 match event {
1348 RequestHandlerEvent::EventToQueue {
1349 subject_id,
1350 event,
1351 request_id,
1352 } => {
1353 self.in_queue
1354 .entry(subject_id.clone())
1355 .or_default()
1356 .push_back((event.clone(), request_id.clone()));
1357 }
1358 RequestHandlerEvent::Invalid { subject_id } => {
1359 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1360 vec.pop_front();
1361 if vec.is_empty() {
1362 self.in_queue.remove(subject_id);
1363 }
1364 }
1365 }
1366 RequestHandlerEvent::EventToHandling {
1367 subject_id,
1368 request_id,
1369 } => {
1370 self.handling.insert(subject_id.clone(), request_id.clone());
1371 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1372 vec.pop_front();
1373 if vec.is_empty() {
1374 self.in_queue.remove(subject_id);
1375 }
1376 }
1377 }
1378 RequestHandlerEvent::FinishHandling { subject_id } => {
1379 self.handling.remove(subject_id);
1380 }
1381 };
1382
1383 Ok(())
1384 }
1385}