1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9 ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12 DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16 RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29 ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::metrics::try_core_metrics;
38use crate::model::common::node::{get_subject_data, i_owner_new_owner};
39use crate::model::common::subject::{get_gov, get_version};
40use crate::model::common::{
41 check_subject_creation, emit_fail, send_to_tracking,
42};
43use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
44use crate::request::manager::InitRequestManager;
45use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
46use crate::system::ConfigHelper;
47
48pub mod error;
49pub mod manager;
50pub mod reboot;
51pub mod tracking;
52pub mod types;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RequestData {
56 pub request_id: DigestIdentifier,
57 pub subject_id: DigestIdentifier,
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize)]
61pub struct RequestHandler {
62 #[serde(skip)]
63 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
64 #[serde(skip)]
65 our_key: Arc<PublicKey>,
66 handling: HashMap<DigestIdentifier, DigestIdentifier>,
67 in_queue: HashMap<
68 DigestIdentifier,
69 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
70 >,
71}
72
73impl BorshSerialize for RequestHandler {
74 fn serialize<W: std::io::Write>(
75 &self,
76 writer: &mut W,
77 ) -> std::io::Result<()> {
78 BorshSerialize::serialize(&self.handling, writer)?;
80 BorshSerialize::serialize(&self.in_queue, writer)?;
81 Ok(())
82 }
83}
84
85impl BorshDeserialize for RequestHandler {
86 fn deserialize_reader<R: std::io::Read>(
87 reader: &mut R,
88 ) -> std::io::Result<Self> {
89 let handling =
91 HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
92 reader,
93 )?;
94 let in_queue = HashMap::<
95 DigestIdentifier,
96 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
97 >::deserialize_reader(reader)?;
98
99 let our_key = Arc::new(PublicKey::default());
100
101 Ok(Self {
102 helpers: None,
103 our_key,
104 handling,
105 in_queue,
106 })
107 }
108}
109
110impl RequestHandler {
111 async fn check_signature(
112 ctx: &mut ActorContext<Self>,
113 our_key: PublicKey,
114 signer: PublicKey,
115 governance_id: &DigestIdentifier,
116 event_request: &EventRequestType,
117 subject_data: SubjectData,
118 ) -> Result<(), ActorError> {
119 match event_request {
120 EventRequestType::Create
121 | EventRequestType::Transfer
122 | EventRequestType::Confirm
123 | EventRequestType::Reject
124 | EventRequestType::Eol => {
125 if signer != our_key {
126 return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
127 }
128 }
129 EventRequestType::Fact => {
130 let gov = get_gov(ctx, governance_id).await?;
131 match subject_data {
132 SubjectData::Tracker {
133 schema_id,
134 namespace,
135 ..
136 } => {
137 if !gov.has_this_role(HashThisRole::Schema {
138 who: signer,
139 role: RoleTypes::Issuer,
140 schema_id,
141 namespace: Namespace::from(namespace),
142 }) {
143 return Err(ActorError::Functional {
144 description:
145 "In fact events, the signer has to be an issuer"
146 .to_string(),
147 });
148 }
149 }
150 SubjectData::Governance { .. } => {
151 if !gov.has_this_role(HashThisRole::Gov {
152 who: signer,
153 role: RoleTypes::Issuer,
154 }) {
155 return Err(ActorError::Functional {
156 description:
157 "In fact events, the signer has to be an issuer"
158 .to_string(),
159 });
160 }
161 }
162 }
163 }
164 }
165
166 Ok(())
167 }
168
169 async fn queued_event(
170 ctx: &ActorContext<Self>,
171 subject_id: &DigestIdentifier,
172 ) -> Result<(), ActorError> {
173 let request_actor = ctx.reference().await?;
174 request_actor
175 .tell(RequestHandlerMessage::PopQueue {
176 subject_id: subject_id.to_owned(),
177 })
178 .await
179 }
180
181 async fn error_queue_handling(
182 &mut self,
183 ctx: &mut ActorContext<Self>,
184 error: String,
185 subject_id: &DigestIdentifier,
186 request_id: &DigestIdentifier,
187 ) -> Result<(), ActorError> {
188 if let Some(metrics) = try_core_metrics() {
189 metrics.observe_request_invalid();
190 }
191
192 self.on_event(
193 RequestHandlerEvent::Invalid {
194 subject_id: subject_id.to_owned(),
195 },
196 ctx,
197 )
198 .await;
199
200 send_to_tracking(
201 ctx,
202 RequestTrackingMessage::UpdateState {
203 request_id: request_id.clone(),
204 state: RequestState::Invalid {
205 error,
206 sn: None,
207 subject_id: subject_id.to_string(),
208 who: self.our_key.to_string(),
209 },
210 },
211 )
212 .await?;
213
214 Self::queued_event(ctx, subject_id).await
215 }
216
217 async fn change_approval(
218 ctx: &ActorContext<Self>,
219 subject_id: &DigestIdentifier,
220 state: ApprovalStateRes,
221 ) -> Result<(), RequestHandlerError> {
222 if state == ApprovalStateRes::Obsolete {
223 return Err(RequestHandlerError::ObsoleteApproval);
224 }
225
226 let approver_path = ActorPath::from(format!(
227 "/user/node/subject_manager/{}/approver",
228 subject_id
229 ));
230 let approver_actor = ctx
231 .system()
232 .get_actor::<ApprPersist>(&approver_path)
233 .await
234 .map_err(|_| {
235 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
236 })?;
237
238 approver_actor
239 .tell(ApprPersistMessage::ChangeResponse {
240 response: state.clone(),
241 })
242 .await
243 .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
244 }
245
246 async fn get_approval(
247 ctx: &ActorContext<Self>,
248 subject_id: &DigestIdentifier,
249 state: Option<ApprovalState>,
250 ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
251 let approver_path = ActorPath::from(format!(
252 "/user/node/subject_manager/{}/approver",
253 subject_id
254 ));
255 let approver_actor = ctx
256 .system()
257 .get_actor::<ApprPersist>(&approver_path)
258 .await
259 .map_err(|_| {
260 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
261 })?;
262
263 let response = approver_actor
264 .ask(ApprPersistMessage::GetApproval { state })
265 .await
266 .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
267
268 let res = match response {
269 ApprPersistResponse::Ok => None,
270 ApprPersistResponse::Approval { request, state } => {
271 Some((request, state))
272 }
273 };
274
275 Ok(res)
276 }
277
278 async fn get_all_approvals(
279 ctx: &ActorContext<Self>,
280 state: Option<ApprovalState>,
281 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
282 let node_path = ActorPath::from("/user/node");
283 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
284 let response = node_actor.ask(NodeMessage::GetGovernances).await?;
285 let vec = match response {
286 NodeResponse::Governances(govs) => govs,
287 _ => {
288 return Err(ActorError::UnexpectedResponse {
289 path: node_path,
290 expected: "NodeResponse::Governances".to_string(),
291 });
292 }
293 };
294
295 let mut responses = vec![];
296 for governance in vec.iter() {
297 let approver_path = ActorPath::from(format!(
298 "/user/node/subject_manager/{}/approver",
299 governance
300 ));
301 if let Ok(approver_actor) =
302 ctx.system().get_actor::<ApprPersist>(&approver_path).await
303 {
304 let response = approver_actor
305 .ask(ApprPersistMessage::GetApproval {
306 state: state.clone(),
307 })
308 .await?;
309
310 match response {
311 ApprPersistResponse::Ok => {}
312 ApprPersistResponse::Approval { request, state } => {
313 responses.push((request, state))
314 }
315 };
316 };
317 }
318
319 Ok(responses)
320 }
321
322 async fn check_owner_new_owner(
323 ctx: &mut ActorContext<Self>,
324 request: &EventRequest,
325 ) -> Result<(), RequestHandlerError> {
326 match request {
327 EventRequest::Create(..) => {}
328 EventRequest::Fact(..)
329 | EventRequest::Transfer(..)
330 | EventRequest::EOL(..) => {
331 let subject_id = request.get_subject_id();
332 let (i_owner, i_new_owner) =
333 i_owner_new_owner(ctx, &subject_id).await?;
334 if !i_owner {
335 return Err(RequestHandlerError::NotOwner(
336 subject_id.to_string(),
337 ));
338 }
339
340 if i_new_owner.is_some() {
341 return Err(RequestHandlerError::PendingNewOwner(
342 subject_id.to_string(),
343 ));
344 }
345 }
346 EventRequest::Confirm(..) | EventRequest::Reject(..) => {
347 let subject_id = request.get_subject_id();
348 let (i_owner, i_new_owner) =
349 i_owner_new_owner(ctx, &subject_id).await?;
350 if i_owner {
351 return Err(RequestHandlerError::IsOwner(
352 subject_id.to_string(),
353 ));
354 }
355
356 if let Some(new_owner) = i_new_owner {
357 if !new_owner {
358 return Err(RequestHandlerError::NotNewOwner(
359 subject_id.to_string(),
360 ));
361 }
362 } else {
363 return Err(RequestHandlerError::NoNewOwnerPending(
364 subject_id.to_string(),
365 ));
366 }
367 }
368 };
369 Ok(())
370 }
371
372 fn check_event_request(
373 request: &EventRequest,
374 is_gov: bool,
375 ) -> Result<(), RequestHandlerError> {
376 match request {
377 EventRequest::Create(create_request) => {
378 if let Some(name) = &create_request.name
379 && (name.is_empty() || name.len() > 100)
380 {
381 return Err(RequestHandlerError::InvalidName);
382 }
383
384 if let Some(description) = &create_request.description
385 && (description.is_empty() || description.len() > 200)
386 {
387 return Err(RequestHandlerError::InvalidDescription);
388 }
389
390 if !create_request.schema_id.is_valid_in_request() {
391 return Err(RequestHandlerError::InvalidSchemaId);
392 }
393
394 if is_gov {
395 if !create_request.governance_id.is_empty() {
396 return Err(
397 RequestHandlerError::GovernanceIdMustBeEmpty,
398 );
399 }
400
401 if !create_request.namespace.is_empty() {
402 return Err(RequestHandlerError::NamespaceMustBeEmpty);
403 }
404 } else if create_request.governance_id.is_empty() {
405 return Err(RequestHandlerError::GovernanceIdRequired);
406 }
407 }
408 EventRequest::Transfer(transfer_request) => {
409 if transfer_request.new_owner.is_empty() {
410 return Err(RequestHandlerError::TransferNewOwnerEmpty);
411 }
412 }
413 EventRequest::Confirm(confirm_request) => {
414 if is_gov {
415 if let Some(name_old_owner) =
416 &confirm_request.name_old_owner
417 && name_old_owner.is_empty()
418 {
419 return Err(
420 RequestHandlerError::ConfirmNameOldOwnerEmpty,
421 );
422 }
423 } else if confirm_request.name_old_owner.is_some() {
424 return Err(
425 RequestHandlerError::ConfirmTrackerNameOldOwner,
426 );
427 }
428 }
429 EventRequest::Fact(fact_request) => {
430 if is_gov
431 && serde_json::from_value::<GovernanceEvent>(
432 fact_request.payload.0.clone(),
433 )
434 .is_err()
435 {
436 return Err(RequestHandlerError::GovFactInvalidEvent);
437 }
438 }
439 EventRequest::Reject(..) | EventRequest::EOL(..) => {}
440 }
441
442 Ok(())
443 }
444
445 async fn build_subject_data(
446 ctx: &mut ActorContext<Self>,
447 request: &EventRequest,
448 ) -> Result<SubjectData, RequestHandlerError> {
449 let subject_data = match request {
450 EventRequest::Create(create_request) => {
451 if create_request.schema_id.is_gov() {
452 SubjectData::Governance { active: true }
453 } else {
454 SubjectData::Tracker {
455 governance_id: create_request.governance_id.clone(),
456 schema_id: create_request.schema_id.clone(),
457 namespace: create_request.namespace.to_string(),
458 active: true,
459 }
460 }
461 }
462 EventRequest::Fact(..)
463 | EventRequest::Transfer(..)
464 | EventRequest::Confirm(..)
465 | EventRequest::Reject(..)
466 | EventRequest::EOL(..) => {
467 let subject_id = request.get_subject_id();
468 let Some(subject_data) =
469 get_subject_data(ctx, &subject_id).await?
470 else {
471 return Err(RequestHandlerError::SubjectDataNotFound(
472 subject_id.to_string(),
473 ));
474 };
475
476 subject_data
477 }
478 };
479
480 Ok(subject_data)
481 }
482
483 async fn check_creation(
484 ctx: &mut ActorContext<Self>,
485 subject_data: SubjectData,
486 event_request: &EventRequestType,
487 signer: PublicKey,
488 ) -> Result<(), ActorError> {
489 match event_request {
490 EventRequestType::Create | EventRequestType::Confirm => {
491 if let SubjectData::Tracker {
492 governance_id,
493 schema_id,
494 namespace,
495 ..
496 } = subject_data
497 {
498 let version = get_version(ctx, &governance_id).await?;
499 check_subject_creation(
500 ctx,
501 &governance_id,
502 signer,
503 version,
504 namespace,
505 schema_id,
506 )
507 .await?;
508 }
509 }
510 _ => {}
511 }
512
513 Ok(())
514 }
515
516 fn build_request_id_subject_id(
517 hash: HashAlgorithm,
518 request: &Signed<EventRequest>,
519 ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
520 match &request.content() {
521 EventRequest::Create(..) => {
522 let request_id = hash_borsh(
523 &*hash.hasher(),
524 &(request.clone(), TimeStamp::now().as_nanos()),
525 )
526 .map_err(|e| {
527 RequestHandlerError::RequestIdHash(e.to_string())
528 })?;
529
530 let subject_id =
531 hash_borsh(&*hash.hasher(), request).map_err(|e| {
532 RequestHandlerError::SubjectIdHash(e.to_string())
533 })?;
534
535 Ok((request_id, subject_id))
536 }
537 EventRequest::Fact(..)
538 | EventRequest::Transfer(..)
539 | EventRequest::Confirm(..)
540 | EventRequest::Reject(..)
541 | EventRequest::EOL(..) => {
542 let request_id = hash_borsh(
543 &*hash.hasher(),
544 &(request.clone(), TimeStamp::now().as_nanos()),
545 )
546 .map_err(|e| {
547 RequestHandlerError::RequestIdHash(e.to_string())
548 })?;
549
550 Ok((request_id, request.content().get_subject_id()))
551 }
552 }
553 }
554
555 async fn handle_queue_request(
556 &mut self,
557 ctx: &mut ActorContext<Self>,
558 request: Signed<EventRequest>,
559 request_id: &DigestIdentifier,
560 subject_id: &DigestIdentifier,
561 is_gov: bool,
562 governance_id: Option<DigestIdentifier>,
563 ) -> Result<(), ActorError> {
564 let Some(helpers) = self.helpers.clone() else {
565 let e = " Can not obtain helpers".to_string();
566
567 return Err(ActorError::FunctionalCritical { description: e });
568 };
569
570 let in_handling = self.handling.contains_key(subject_id);
571 let in_queue = self.in_queue.contains_key(subject_id);
572
573 if !in_handling && !in_queue {
574 let command = Self::build_req_manager_init_msg(
575 &EventRequestType::from(request.content()),
576 is_gov,
577 );
578 let init_data = InitRequestManager {
579 our_key: self.our_key.clone(),
580 subject_id: subject_id.clone(),
581 governance_id,
582 helpers,
583 };
584
585 let actor = ctx
586 .create_child(
587 &subject_id.to_string(),
588 RequestManager::initial(init_data),
589 )
590 .await?;
591 actor
592 .tell(RequestManagerMessage::FirstRun {
593 command,
594 request,
595 request_id: request_id.clone(),
596 })
597 .await?;
598
599 self.on_event(
600 RequestHandlerEvent::EventToHandling {
601 subject_id: subject_id.clone(),
602 request_id: request_id.clone(),
603 },
604 ctx,
605 )
606 .await;
607
608 send_to_tracking(
609 ctx,
610 RequestTrackingMessage::UpdateState {
611 request_id: request_id.clone(),
612 state: RequestState::Handling,
613 },
614 )
615 .await?;
616 } else {
617 self.on_event(
618 RequestHandlerEvent::EventToQueue {
619 subject_id: subject_id.clone(),
620 event: request,
621 request_id: request_id.clone(),
622 },
623 ctx,
624 )
625 .await;
626
627 send_to_tracking(
628 ctx,
629 RequestTrackingMessage::UpdateState {
630 request_id: request_id.clone(),
631 state: RequestState::InQueue,
632 },
633 )
634 .await?;
635 }
636
637 Ok(())
638 }
639
640 const fn build_req_manager_init_msg(
641 event_request: &EventRequestType,
642 is_gov: bool,
643 ) -> ReqManInitMessage {
644 match event_request {
645 EventRequestType::Create => ReqManInitMessage::Validate,
646 EventRequestType::Fact => ReqManInitMessage::Evaluate,
647 EventRequestType::Transfer => ReqManInitMessage::Evaluate,
648 EventRequestType::Confirm => {
649 if is_gov {
650 ReqManInitMessage::Evaluate
651 } else {
652 ReqManInitMessage::Validate
653 }
654 }
655 EventRequestType::Reject => ReqManInitMessage::Validate,
656 EventRequestType::Eol => ReqManInitMessage::Validate,
657 }
658 }
659
660 async fn check_in_queue(
661 ctx: &mut ActorContext<Self>,
662 request: &Signed<EventRequest>,
663 our_key: PublicKey,
664 ) -> Result<bool, RequestHandlerError> {
665 if let EventRequest::Create(..) = request.content() {
666 return Err(RequestHandlerError::CreationNotQueued);
667 }
668
669 Self::check_owner_new_owner(ctx, request.content()).await?;
670
671 let subject_data =
672 Self::build_subject_data(ctx, request.content()).await?;
673 let event_request_type = EventRequestType::from(request.content());
674 let signer = request.signature().signer.clone();
675 let governance_id = subject_data
676 .get_governance_id()
677 .unwrap_or_else(|| request.content().get_subject_id());
678 let is_gov = subject_data.get_schema_id().is_gov();
679
680 if !subject_data.get_active() {
681 return Err(RequestHandlerError::SubjectNotActive(
682 request.content().get_subject_id().to_string(),
683 ));
684 }
685
686 Self::check_signature(
687 ctx,
688 our_key,
689 signer.clone(),
690 &governance_id,
691 &event_request_type,
692 subject_data.clone(),
693 )
694 .await?;
695
696 Self::check_creation(ctx, subject_data, &event_request_type, signer)
697 .await?;
698
699 Ok(is_gov)
700 }
701
702 async fn in_queue_to_handling(
703 &mut self,
704 ctx: &mut ActorContext<Self>,
705 request: Signed<EventRequest>,
706 request_id: &DigestIdentifier,
707 is_gov: bool,
708 ) -> Result<(), ActorError> {
709 let command = Self::build_req_manager_init_msg(
710 &EventRequestType::from(request.content()),
711 is_gov,
712 );
713 let subject_id = request.content().get_subject_id();
714
715 let actor = ctx
716 .get_child::<RequestManager>(&subject_id.to_string())
717 .await?;
718
719 actor
720 .tell(RequestManagerMessage::FirstRun {
721 command,
722 request,
723 request_id: request_id.clone(),
724 })
725 .await?;
726
727 self.on_event(
728 RequestHandlerEvent::EventToHandling {
729 subject_id: subject_id.clone(),
730 request_id: request_id.clone(),
731 },
732 ctx,
733 )
734 .await;
735
736 send_to_tracking(
737 ctx,
738 RequestTrackingMessage::UpdateState {
739 request_id: request_id.clone(),
740 state: RequestState::Handling,
741 },
742 )
743 .await
744 }
745
746 async fn end_child(
747 ctx: &ActorContext<Self>,
748 subject_id: &DigestIdentifier,
749 ) -> Result<(), ActorError> {
750 let actor = ctx
751 .get_child::<RequestManager>(&subject_id.to_string())
752 .await?;
753 actor.ask_stop().await
754 }
755
756 async fn manual_abort_request(
757 &self,
758 ctx: &ActorContext<Self>,
759 subject_id: &DigestIdentifier,
760 ) -> Result<(), ActorError> {
761 let actor = ctx
762 .get_child::<RequestManager>(&subject_id.to_string())
763 .await?;
764
765 actor.tell(RequestManagerMessage::ManualAbort).await
766 }
767
768 async fn purge_request_manager(
769 &self,
770 ctx: &mut ActorContext<Self>,
771 subject_id: &DigestIdentifier,
772 ) -> Result<(), ActorError> {
773 let Some((hash, network)) = self.helpers.clone() else {
774 return Err(ActorError::FunctionalCritical {
775 description: "Request handler helpers are not initialized"
776 .to_string(),
777 });
778 };
779
780 let governance_id = get_subject_data(ctx, subject_id)
781 .await?
782 .and_then(|data| data.get_governance_id());
783 let request_manager_init = InitRequestManager {
784 our_key: self.our_key.clone(),
785 subject_id: subject_id.clone(),
786 governance_id,
787 helpers: (hash, network),
788 };
789
790 let actor = match ctx
791 .create_child(
792 &subject_id.to_string(),
793 RequestManager::initial(request_manager_init),
794 )
795 .await
796 {
797 Ok(actor) => actor,
798 Err(ActorError::Exists { .. }) => {
799 ctx.get_child::<RequestManager>(&subject_id.to_string())
800 .await?
801 }
802 Err(err) => return Err(err),
803 };
804
805 actor.ask(RequestManagerMessage::PurgeStorage).await?;
806 actor.ask_stop().await?;
807
808 Ok(())
809 }
810}
811
812#[derive(Debug, Clone)]
813pub enum RequestHandlerMessage {
814 NewRequest {
815 request: Signed<EventRequest>,
816 },
817 RequestInManager,
818 RequestInManagerSubjectId {
819 subject_id: DigestIdentifier,
820 },
821 ChangeApprovalState {
822 subject_id: DigestIdentifier,
823 state: ApprovalStateRes,
824 },
825 GetApproval {
826 subject_id: DigestIdentifier,
827 state: Option<ApprovalState>,
828 },
829 GetAllApprovals {
830 state: Option<ApprovalState>,
831 },
832 PopQueue {
833 subject_id: DigestIdentifier,
834 },
835 EndHandling {
836 subject_id: DigestIdentifier,
837 },
838 PurgeSubject {
839 subject_id: DigestIdentifier,
840 },
841 AbortRequest {
842 subject_id: DigestIdentifier,
843 },
844}
845
846impl Message for RequestHandlerMessage {}
847
848#[derive(Debug, Clone)]
849pub enum RequestHandlerResponse {
850 RequestInManager(RequestsInManager),
851 RequestInManagerSubjectId(RequestsInManagerSubject),
852 Ok(RequestData),
853 Response(String),
854 Approval(Option<(ApprovalReq, ApprovalState)>),
855 Approvals(Vec<(ApprovalReq, ApprovalState)>),
856 None,
857}
858
859impl Response for RequestHandlerResponse {}
860
861#[derive(
862 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
863)]
864pub enum RequestHandlerEvent {
865 EventToQueue {
866 subject_id: DigestIdentifier,
867 event: Signed<EventRequest>,
868 request_id: DigestIdentifier,
869 },
870 Invalid {
871 subject_id: DigestIdentifier,
872 },
873 FinishHandling {
874 subject_id: DigestIdentifier,
875 },
876 PurgeSubject {
877 subject_id: DigestIdentifier,
878 },
879 EventToHandling {
880 subject_id: DigestIdentifier,
881 request_id: DigestIdentifier,
882 },
883}
884
885impl Event for RequestHandlerEvent {}
886
887#[async_trait]
888impl Actor for RequestHandler {
889 type Event = RequestHandlerEvent;
890 type Message = RequestHandlerMessage;
891 type Response = RequestHandlerResponse;
892
893 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
894 parent_span.map_or_else(
895 || info_span!("RequestHandler"),
896 |parent_span| info_span!(parent: parent_span, "RequestHandler"),
897 )
898 }
899
900 async fn pre_start(
901 &mut self,
902 ctx: &mut ActorContext<Self>,
903 ) -> Result<(), ActorError> {
904 if let Err(e) = self.init_store("request", None, false, ctx).await {
905 error!(
906 error = %e,
907 "Failed to initialize store during pre_start"
908 );
909 return Err(e);
910 }
911
912 let Some(config) =
913 ctx.system().get_helper::<ConfigHelper>("config").await
914 else {
915 error!(
916 helper = "config",
917 "Config helper not found during pre_start"
918 );
919 return Err(ActorError::Helper {
920 name: "config".to_owned(),
921 reason: "Not found".to_string(),
922 });
923 };
924
925 if !config.safe_mode {
926 let Some(ext_db): Option<Arc<ExternalDB>> =
927 ctx.system().get_helper("ext_db").await
928 else {
929 error!("External database helper not found");
930 return Err(ActorError::Helper {
931 name: "ext_db".to_string(),
932 reason: "Not found".to_string(),
933 });
934 };
935
936 let tracking = match ctx
937 .create_child(
938 "tracking",
939 RequestTracking::new(config.tracking_size),
940 )
941 .await
942 {
943 Ok(actor) => actor,
944 Err(e) => {
945 error!(
946 error = %e,
947 "Failed to create tracking child during pre_start"
948 );
949 return Err(e);
950 }
951 };
952
953 let sink =
954 Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
955
956 ctx.system().run_sink(sink).await;
957 }
958
959 let Some((hash, network)) = self.helpers.clone() else {
960 let e = " Can not obtain helpers".to_string();
961 error!(
962 error = %e,
963 "Failed to obtain helpers during pre_start"
964 );
965 ctx.system().crash_system();
966 return Err(ActorError::FunctionalCritical { description: e });
967 };
968
969 if config.safe_mode {
970 return Ok(());
971 }
972
973 for (subject_id, request_id) in self.handling.clone() {
974 let governance_id = get_subject_data(ctx, &subject_id)
975 .await?
976 .and_then(|data| data.get_governance_id());
977 let request_manager_init = InitRequestManager {
978 our_key: self.our_key.clone(),
979 subject_id: subject_id.clone(),
980 governance_id,
981 helpers: (hash, network.clone()),
982 };
983
984 let request_manager_actor = match ctx
985 .create_child(
986 &subject_id.to_string(),
987 RequestManager::initial(request_manager_init),
988 )
989 .await
990 {
991 Ok(actor) => actor,
992 Err(e) => {
993 error!(
994 subject_id = %subject_id,
995 error = %e,
996 "Failed to create request manager child during pre_start"
997 );
998 return Err(e);
999 }
1000 };
1001
1002 if let Err(e) = request_manager_actor
1003 .tell(RequestManagerMessage::Run {
1004 request_id: request_id.clone(),
1005 })
1006 .await
1007 {
1008 error!(
1009 subject_id = %subject_id,
1010 request_id = %request_id,
1011 error = %e,
1012 "Failed to send Run message to request manager during pre_start"
1013 );
1014 return Err(e);
1015 }
1016 }
1017
1018 Ok(())
1019 }
1020}
1021
1022#[async_trait]
1023impl Handler<Self> for RequestHandler {
1024 async fn handle_message(
1025 &mut self,
1026 _sender: ActorPath,
1027 msg: RequestHandlerMessage,
1028 ctx: &mut ave_actors::ActorContext<Self>,
1029 ) -> Result<RequestHandlerResponse, ActorError> {
1030 match msg {
1031 RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
1032 let handling =
1033 self.handling.get(&subject_id).map(|x| x.to_string());
1034 let in_queue = self.in_queue.get(&subject_id).map(|x| {
1035 x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
1036 });
1037
1038 Ok(RequestHandlerResponse::RequestInManagerSubjectId(
1039 RequestsInManagerSubject { handling, in_queue },
1040 ))
1041 }
1042 RequestHandlerMessage::RequestInManager => Ok(
1043 RequestHandlerResponse::RequestInManager(RequestsInManager {
1044 handling: self
1045 .handling
1046 .iter()
1047 .map(|x| (x.0.to_string(), x.1.to_string()))
1048 .collect(),
1049 in_queue: self
1050 .in_queue
1051 .iter()
1052 .map(|x| {
1053 (
1054 x.0.to_string(),
1055 x.1.iter()
1056 .map(|x| x.1.to_string())
1057 .collect::<Vec<String>>(),
1058 )
1059 })
1060 .collect(),
1061 }),
1062 ),
1063 RequestHandlerMessage::AbortRequest { subject_id } => {
1064 self.manual_abort_request(ctx, &subject_id).await?;
1065 Ok(RequestHandlerResponse::None)
1066 }
1067 RequestHandlerMessage::PurgeSubject { subject_id } => {
1068 self.purge_request_manager(ctx, &subject_id).await?;
1069 self.on_event(
1070 RequestHandlerEvent::PurgeSubject {
1071 subject_id: subject_id.clone(),
1072 },
1073 ctx,
1074 )
1075 .await;
1076 Ok(RequestHandlerResponse::None)
1077 }
1078 RequestHandlerMessage::ChangeApprovalState {
1079 subject_id,
1080 state,
1081 } => {
1082 Self::change_approval(ctx, &subject_id, state.clone())
1083 .await
1084 .map_err(|e| {
1085 error!(
1086 error = %e,
1087 "ChangeApprovalState failed"
1088 );
1089 ActorError::from(e)
1090 })?;
1091
1092 Ok(RequestHandlerResponse::Response(format!(
1093 "The approval request for subject {} has changed to {}",
1094 subject_id, state
1095 )))
1096 }
1097 RequestHandlerMessage::GetApproval { subject_id, state } => {
1098 let res = Self::get_approval(ctx, &subject_id, state.clone())
1099 .await
1100 .map_err(ActorError::from)?;
1101
1102 Ok(RequestHandlerResponse::Approval(res))
1103 }
1104 RequestHandlerMessage::GetAllApprovals { state } => {
1105 let res = Self::get_all_approvals(ctx, state.clone())
1106 .await
1107 .map_err(|e| {
1108 error!(
1109 error = %e,
1110 "GetAllApprovals failed"
1111 );
1112 e
1113 })?;
1114
1115 Ok(RequestHandlerResponse::Approvals(res))
1116 }
1117 RequestHandlerMessage::NewRequest { request } => {
1118 if let Err(e) = request.verify() {
1119 let err = RequestHandlerError::SignatureVerification(
1120 e.to_string(),
1121 );
1122 error!(error = %err, "Request signature verification failed");
1123 return Err(ActorError::from(err));
1124 };
1125
1126 let Some((hash, ..)) = self.helpers.clone() else {
1127 let err = RequestHandlerError::HelpersNotInitialized;
1128 error!(
1129 msg_type = "NewRequest",
1130 error = %err,
1131 "Helpers not initialized"
1132 );
1133 return Err(emit_fail(ctx, ActorError::from(err)).await);
1134 };
1135
1136 if let Err(e) =
1137 Self::check_owner_new_owner(ctx, request.content()).await
1138 {
1139 error!(
1140 msg_type = "NewRequest",
1141 error = %e,
1142 "Owner or new owner check failed"
1143 );
1144 return Err(ActorError::from(e));
1145 }
1146
1147 let subject_data = match Self::build_subject_data(
1148 ctx,
1149 request.content(),
1150 )
1151 .await
1152 {
1153 Ok(data) => data,
1154 Err(e) => {
1155 error!(
1156 msg_type = "NewRequest",
1157 error = %e,
1158 "Failed to build subject data"
1159 );
1160 return Err(ActorError::from(e));
1161 }
1162 };
1163 let event_request_type =
1164 EventRequestType::from(request.content());
1165 let signer = request.signature().signer.clone();
1166 let governance_id = subject_data.get_governance_id();
1167 let governance_subject_id = governance_id
1168 .clone()
1169 .unwrap_or_else(|| request.content().get_subject_id());
1170 let is_gov = subject_data.get_schema_id().is_gov();
1171
1172 if !subject_data.get_active() {
1173 let subject_id = request.content().get_subject_id();
1174 error!(
1175 msg_type = "NewRequest",
1176 subject_id = %subject_id,
1177 "Subject is not active"
1178 );
1179 return Err(ActorError::from(
1180 RequestHandlerError::SubjectNotActive(
1181 subject_id.to_string(),
1182 ),
1183 ));
1184 }
1185
1186 if let Err(e) =
1187 Self::check_event_request(request.content(), is_gov)
1188 {
1189 error!(
1190 msg_type = "NewRequest",
1191 error = %e,
1192 "Event request validation failed"
1193 );
1194 return Err(ActorError::from(e));
1195 }
1196
1197 if let Err(e) = Self::check_signature(
1198 ctx,
1199 (*self.our_key).clone(),
1200 signer.clone(),
1201 &governance_subject_id,
1202 &event_request_type,
1203 subject_data.clone(),
1204 )
1205 .await
1206 {
1207 error!(
1208 msg_type = "NewRequest",
1209 governance_id = %governance_subject_id,
1210 error = %e,
1211 "Signature check failed"
1212 );
1213 return Err(e);
1214 }
1215
1216 if let Err(e) = Self::check_creation(
1217 ctx,
1218 subject_data,
1219 &event_request_type,
1220 signer,
1221 )
1222 .await
1223 {
1224 error!(
1225 msg_type = "NewRequest",
1226 error = %e,
1227 "Creation check failed"
1228 );
1229 return Err(e);
1230 }
1231
1232 let (request_id, subject_id) =
1233 match Self::build_request_id_subject_id(hash, &request) {
1234 Ok(ids) => ids,
1235 Err(e) => {
1236 error!(
1237 msg_type = "NewRequest",
1238 error = %e,
1239 "Failed to build request ID and subject ID"
1240 );
1241 return Err(ActorError::from(e));
1242 }
1243 };
1244
1245 if let Err(e) = self
1246 .handle_queue_request(
1247 ctx,
1248 request,
1249 &request_id,
1250 &subject_id,
1251 is_gov,
1252 governance_id,
1253 )
1254 .await
1255 {
1256 error!(
1257 msg_type = "NewRequest",
1258 request_id = %request_id,
1259 subject_id = %subject_id,
1260 error = %e,
1261 "Failed to handle queue request"
1262 );
1263 return Err(e);
1264 }
1265
1266 Ok(RequestHandlerResponse::Ok(RequestData {
1267 request_id,
1268 subject_id,
1269 }))
1270 }
1271 RequestHandlerMessage::PopQueue { subject_id } => {
1272 let (event, request_id) = if let Some(events) =
1273 self.in_queue.get(&subject_id)
1274 {
1275 if let Some((event, request_id)) =
1276 events.clone().pop_front()
1277 {
1278 (event, request_id)
1279 } else {
1280 if let Err(e) = Self::end_child(ctx, &subject_id).await
1281 {
1282 error!(
1283 msg_type = "PopQueue",
1284 subject_id = %subject_id,
1285 error = %e,
1286 "Failed to end child actor when queue is empty"
1287 );
1288 ctx.system().crash_system();
1289 return Err(e);
1290 }
1291 return Ok(RequestHandlerResponse::None);
1292 }
1293 } else {
1294 if let Err(e) = Self::end_child(ctx, &subject_id).await {
1295 error!(
1296 msg_type = "PopQueue",
1297 subject_id = %subject_id,
1298 error = %e,
1299 "Failed to end child actor when no events available"
1300 );
1301 ctx.system().crash_system();
1302 return Err(e);
1303 }
1304 return Ok(RequestHandlerResponse::None);
1305 };
1306
1307 let is_gov = match Self::check_in_queue(
1308 ctx,
1309 &event,
1310 (*self.our_key).clone(),
1311 )
1312 .await
1313 {
1314 Ok(is_gov) => is_gov,
1315 Err(e) => {
1316 if let Err(e) = self
1317 .error_queue_handling(
1318 ctx,
1319 e.to_string(),
1320 &subject_id,
1321 &request_id,
1322 )
1323 .await
1324 {
1325 error!(
1326 msg_type = "PopQueue",
1327 subject_id = %subject_id,
1328 request_id = %request_id,
1329 error = %e,
1330 "Failed to handle queue error"
1331 );
1332 ctx.system().crash_system();
1333 return Err(e);
1334 };
1335
1336 return Ok(RequestHandlerResponse::None);
1337 }
1338 };
1339
1340 if let Err(e) = self
1341 .in_queue_to_handling(ctx, event, &request_id, is_gov)
1342 .await
1343 {
1344 error!(
1345 msg_type = "PopQueue",
1346 request_id = %request_id,
1347 error = %e,
1348 "Failed to transition from queue to handling"
1349 );
1350 ctx.system().crash_system();
1351 return Err(e);
1352 }
1353
1354 Ok(RequestHandlerResponse::None)
1355 }
1356 RequestHandlerMessage::EndHandling { subject_id } => {
1357 self.on_event(
1358 RequestHandlerEvent::FinishHandling {
1359 subject_id: subject_id.clone(),
1360 },
1361 ctx,
1362 )
1363 .await;
1364
1365 if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1366 error!(
1367 msg_type = "EndHandling",
1368 subject_id = %subject_id,
1369 error = %e,
1370 "Failed to enqueue next event"
1371 );
1372 ctx.system().crash_system();
1373 return Err(e);
1374 }
1375
1376 Ok(RequestHandlerResponse::None)
1377 }
1378 }
1379 }
1380
1381 async fn on_child_fault(
1382 &mut self,
1383 error: ActorError,
1384 ctx: &mut ActorContext<Self>,
1385 ) -> ChildAction {
1386 error!(
1387 error = %error,
1388 "Child fault in request handler"
1389 );
1390 ctx.system().crash_system();
1391 ChildAction::Stop
1392 }
1393
1394 async fn on_event(
1395 &mut self,
1396 event: RequestHandlerEvent,
1397 ctx: &mut ActorContext<Self>,
1398 ) {
1399 if let Err(e) = self.persist(&event, ctx).await {
1400 error!(
1401 error = %e,
1402 "Failed to persist event"
1403 );
1404 ctx.system().crash_system();
1405 };
1406 }
1407}
1408
1409#[async_trait]
1410impl Storable for RequestHandler {}
1411
1412#[async_trait]
1413impl PersistentActor for RequestHandler {
1414 type Persistence = LightPersistence;
1415 type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1416
1417 fn update(&mut self, state: Self) {
1418 self.in_queue = state.in_queue;
1419 self.handling = state.handling;
1420 }
1421
1422 fn create_initial(params: Self::InitParams) -> Self {
1423 Self {
1424 our_key: params.0,
1425 helpers: Some(params.1),
1426 handling: HashMap::new(),
1427 in_queue: HashMap::new(),
1428 }
1429 }
1430
1431 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1433 match event {
1434 RequestHandlerEvent::EventToQueue {
1435 subject_id,
1436 event,
1437 request_id,
1438 } => {
1439 self.in_queue
1440 .entry(subject_id.clone())
1441 .or_default()
1442 .push_back((event.clone(), request_id.clone()));
1443 }
1444 RequestHandlerEvent::Invalid { subject_id } => {
1445 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1446 vec.pop_front();
1447 if vec.is_empty() {
1448 self.in_queue.remove(subject_id);
1449 }
1450 }
1451 }
1452 RequestHandlerEvent::EventToHandling {
1453 subject_id,
1454 request_id,
1455 } => {
1456 self.handling.insert(subject_id.clone(), request_id.clone());
1457 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1458 vec.pop_front();
1459 if vec.is_empty() {
1460 self.in_queue.remove(subject_id);
1461 }
1462 }
1463 }
1464 RequestHandlerEvent::FinishHandling { subject_id } => {
1465 self.handling.remove(subject_id);
1466 }
1467 RequestHandlerEvent::PurgeSubject { subject_id } => {
1468 self.handling.remove(subject_id);
1469 self.in_queue.remove(subject_id);
1470 }
1471 };
1472
1473 Ok(())
1474 }
1475}