1use async_trait::async_trait;
2use ave_actors::{
3 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4 Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9 ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12 DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16 RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29 ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::metrics::try_core_metrics;
38use crate::model::common::node::{get_subject_data, i_owner_new_owner};
39use crate::model::common::subject::{
40 get_gov, get_tracker_visibility_state, get_version,
41};
42use crate::model::common::{
43 check_subject_creation, emit_fail, send_to_tracking,
44 viewpoints::validate_fact_viewpoints,
45};
46use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
47use crate::request::manager::InitRequestManager;
48use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
49use crate::system::ConfigHelper;
50
51pub mod error;
52pub mod manager;
53pub mod reboot;
54pub mod tracking;
55pub mod types;
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct RequestData {
59 pub request_id: DigestIdentifier,
60 pub subject_id: DigestIdentifier,
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct RequestHandler {
65 #[serde(skip)]
66 helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
67 #[serde(skip)]
68 our_key: Arc<PublicKey>,
69 handling: HashMap<DigestIdentifier, DigestIdentifier>,
70 in_queue: HashMap<
71 DigestIdentifier,
72 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
73 >,
74}
75
76impl BorshSerialize for RequestHandler {
77 fn serialize<W: std::io::Write>(
78 &self,
79 writer: &mut W,
80 ) -> std::io::Result<()> {
81 BorshSerialize::serialize(&self.handling, writer)?;
83 BorshSerialize::serialize(&self.in_queue, writer)?;
84 Ok(())
85 }
86}
87
88impl BorshDeserialize for RequestHandler {
89 fn deserialize_reader<R: std::io::Read>(
90 reader: &mut R,
91 ) -> std::io::Result<Self> {
92 let handling =
94 HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
95 reader,
96 )?;
97 let in_queue = HashMap::<
98 DigestIdentifier,
99 VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
100 >::deserialize_reader(reader)?;
101
102 let our_key = Arc::new(PublicKey::default());
103
104 Ok(Self {
105 helpers: None,
106 our_key,
107 handling,
108 in_queue,
109 })
110 }
111}
112
113impl RequestHandler {
114 async fn check_signer_authorization(
115 ctx: &mut ActorContext<Self>,
116 our_key: PublicKey,
117 signer: PublicKey,
118 governance_id: &DigestIdentifier,
119 event_request: &EventRequestType,
120 subject_data: SubjectData,
121 ) -> Result<(), ActorError> {
122 let gov = if matches!(subject_data, SubjectData::Tracker { .. })
123 || matches!(event_request, EventRequestType::Fact)
124 {
125 Some(get_gov(ctx, governance_id).await?)
126 } else {
127 None
128 };
129
130 if let SubjectData::Tracker {
131 schema_id,
132 namespace,
133 ..
134 } = &subject_data
135 {
136 let Some(gov) = gov.as_ref() else {
137 return Err(ActorError::FunctionalCritical {
138 description:
139 "Governance required for tracker signer authorization"
140 .to_owned(),
141 });
142 };
143
144 if !gov.has_this_role(HashThisRole::Schema {
145 who: our_key.clone(),
146 role: RoleTypes::Creator,
147 schema_id: schema_id.clone(),
148 namespace: Namespace::from(namespace.clone()),
149 }) {
150 return Err(ActorError::Functional {
151 description:
152 "In tracker events, the node has to be a creator"
153 .to_string(),
154 });
155 }
156 }
157
158 match event_request {
159 EventRequestType::Create
160 | EventRequestType::Transfer
161 | EventRequestType::Confirm
162 | EventRequestType::Reject
163 | EventRequestType::Eol => {
164 if signer != our_key {
165 return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
166 }
167 }
168 EventRequestType::Fact => match subject_data {
169 SubjectData::Tracker {
170 schema_id,
171 namespace,
172 ..
173 } => {
174 let Some(gov) = gov.as_ref() else {
175 return Err(ActorError::FunctionalCritical {
176 description:
177 "Governance required for fact signer authorization"
178 .to_owned(),
179 });
180 };
181
182 if !gov.has_this_role(HashThisRole::Schema {
183 who: signer,
184 role: RoleTypes::Issuer,
185 schema_id,
186 namespace: Namespace::from(namespace),
187 }) {
188 return Err(ActorError::Functional {
189 description:
190 "In fact events, the signer has to be an issuer"
191 .to_string(),
192 });
193 }
194 }
195 SubjectData::Governance { .. } => {
196 let Some(gov) = gov.as_ref() else {
197 return Err(ActorError::FunctionalCritical {
198 description:
199 "Governance required for fact signer authorization"
200 .to_owned(),
201 });
202 };
203
204 if !gov.has_this_role(HashThisRole::Gov {
205 who: signer,
206 role: RoleTypes::Issuer,
207 }) {
208 return Err(ActorError::Functional {
209 description:
210 "In fact events, the signer has to be an issuer"
211 .to_string(),
212 });
213 }
214 }
215 },
216 }
217
218 Ok(())
219 }
220
221 async fn queued_event(
222 ctx: &ActorContext<Self>,
223 subject_id: &DigestIdentifier,
224 ) -> Result<(), ActorError> {
225 let request_actor = ctx.reference().await?;
226 request_actor
227 .tell(RequestHandlerMessage::PopQueue {
228 subject_id: subject_id.to_owned(),
229 })
230 .await
231 }
232
233 async fn error_queue_handling(
234 &mut self,
235 ctx: &mut ActorContext<Self>,
236 error: String,
237 subject_id: &DigestIdentifier,
238 request_id: &DigestIdentifier,
239 ) -> Result<(), ActorError> {
240 if let Some(metrics) = try_core_metrics() {
241 metrics.observe_request_invalid();
242 }
243
244 self.on_event(
245 RequestHandlerEvent::Invalid {
246 subject_id: subject_id.to_owned(),
247 },
248 ctx,
249 )
250 .await;
251
252 send_to_tracking(
253 ctx,
254 RequestTrackingMessage::UpdateState {
255 request_id: request_id.clone(),
256 state: RequestState::Invalid {
257 error,
258 sn: None,
259 subject_id: subject_id.to_string(),
260 who: self.our_key.to_string(),
261 },
262 },
263 )
264 .await?;
265
266 Self::queued_event(ctx, subject_id).await
267 }
268
269 async fn change_approval(
270 ctx: &ActorContext<Self>,
271 subject_id: &DigestIdentifier,
272 state: ApprovalStateRes,
273 ) -> Result<(), RequestHandlerError> {
274 if state == ApprovalStateRes::Obsolete {
275 return Err(RequestHandlerError::ObsoleteApproval);
276 }
277
278 let approver_path = ActorPath::from(format!(
279 "/user/node/subject_manager/{}/approver",
280 subject_id
281 ));
282 let approver_actor = ctx
283 .system()
284 .get_actor::<ApprPersist>(&approver_path)
285 .await
286 .map_err(|_| {
287 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
288 })?;
289
290 approver_actor
291 .tell(ApprPersistMessage::ChangeResponse {
292 response: state.clone(),
293 })
294 .await
295 .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
296 }
297
298 async fn get_approval(
299 ctx: &ActorContext<Self>,
300 subject_id: &DigestIdentifier,
301 state: Option<ApprovalState>,
302 ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
303 let approver_path = ActorPath::from(format!(
304 "/user/node/subject_manager/{}/approver",
305 subject_id
306 ));
307 let approver_actor = ctx
308 .system()
309 .get_actor::<ApprPersist>(&approver_path)
310 .await
311 .map_err(|_| {
312 RequestHandlerError::ApprovalNotFound(subject_id.to_string())
313 })?;
314
315 let response = approver_actor
316 .ask(ApprPersistMessage::GetApproval { state })
317 .await
318 .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
319
320 let res = match response {
321 ApprPersistResponse::Ok => None,
322 ApprPersistResponse::Approval { request, state } => {
323 Some((request, state))
324 }
325 };
326
327 Ok(res)
328 }
329
330 async fn get_all_approvals(
331 ctx: &ActorContext<Self>,
332 state: Option<ApprovalState>,
333 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
334 let node_path = ActorPath::from("/user/node");
335 let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
336 let response = node_actor.ask(NodeMessage::GetGovernances).await?;
337 let vec = match response {
338 NodeResponse::Governances(govs) => govs,
339 _ => {
340 return Err(ActorError::UnexpectedResponse {
341 path: node_path,
342 expected: "NodeResponse::Governances".to_string(),
343 });
344 }
345 };
346
347 let mut responses = vec![];
348 for governance in vec.iter() {
349 let approver_path = ActorPath::from(format!(
350 "/user/node/subject_manager/{}/approver",
351 governance
352 ));
353 if let Ok(approver_actor) =
354 ctx.system().get_actor::<ApprPersist>(&approver_path).await
355 {
356 let response = approver_actor
357 .ask(ApprPersistMessage::GetApproval {
358 state: state.clone(),
359 })
360 .await?;
361
362 match response {
363 ApprPersistResponse::Ok => {}
364 ApprPersistResponse::Approval { request, state } => {
365 responses.push((request, state))
366 }
367 };
368 };
369 }
370
371 Ok(responses)
372 }
373
374 async fn check_owner_new_owner(
375 ctx: &mut ActorContext<Self>,
376 request: &EventRequest,
377 ) -> Result<(), RequestHandlerError> {
378 match request {
379 EventRequest::Create(..) => {}
380 EventRequest::Fact(..)
381 | EventRequest::Transfer(..)
382 | EventRequest::EOL(..) => {
383 let subject_id = request.get_subject_id();
384 let (i_owner, i_new_owner) =
385 i_owner_new_owner(ctx, &subject_id).await?;
386 if !i_owner {
387 return Err(RequestHandlerError::NotOwner(
388 subject_id.to_string(),
389 ));
390 }
391
392 if i_new_owner.is_some() {
393 return Err(RequestHandlerError::PendingNewOwner(
394 subject_id.to_string(),
395 ));
396 }
397 }
398 EventRequest::Confirm(..) | EventRequest::Reject(..) => {
399 let subject_id = request.get_subject_id();
400 let (i_owner, i_new_owner) =
401 i_owner_new_owner(ctx, &subject_id).await?;
402 if i_owner {
403 return Err(RequestHandlerError::IsOwner(
404 subject_id.to_string(),
405 ));
406 }
407
408 if let Some(new_owner) = i_new_owner {
409 if !new_owner {
410 return Err(RequestHandlerError::NotNewOwner(
411 subject_id.to_string(),
412 ));
413 }
414 } else {
415 return Err(RequestHandlerError::NoNewOwnerPending(
416 subject_id.to_string(),
417 ));
418 }
419 }
420 };
421 Ok(())
422 }
423
424 fn check_event_request(
425 request: &EventRequest,
426 is_gov: bool,
427 ) -> Result<(), RequestHandlerError> {
428 match request {
429 EventRequest::Create(create_request) => {
430 if let Some(name) = &create_request.name
431 && (name.is_empty() || name.len() > 100)
432 {
433 return Err(RequestHandlerError::InvalidName);
434 }
435
436 if let Some(description) = &create_request.description
437 && (description.is_empty() || description.len() > 200)
438 {
439 return Err(RequestHandlerError::InvalidDescription);
440 }
441
442 if !create_request.schema_id.is_valid_in_request() {
443 return Err(RequestHandlerError::InvalidSchemaId);
444 }
445
446 if is_gov {
447 if !create_request.governance_id.is_empty() {
448 return Err(
449 RequestHandlerError::GovernanceIdMustBeEmpty,
450 );
451 }
452
453 if !create_request.namespace.is_empty() {
454 return Err(RequestHandlerError::NamespaceMustBeEmpty);
455 }
456 } else if create_request.governance_id.is_empty() {
457 return Err(RequestHandlerError::GovernanceIdRequired);
458 }
459 }
460 EventRequest::Transfer(transfer_request) => {
461 if transfer_request.new_owner.is_empty() {
462 return Err(RequestHandlerError::TransferNewOwnerEmpty);
463 }
464 }
465 EventRequest::Confirm(confirm_request) => {
466 if is_gov {
467 if let Some(name_old_owner) =
468 &confirm_request.name_old_owner
469 && name_old_owner.is_empty()
470 {
471 return Err(
472 RequestHandlerError::ConfirmNameOldOwnerEmpty,
473 );
474 }
475 } else if confirm_request.name_old_owner.is_some() {
476 return Err(
477 RequestHandlerError::ConfirmTrackerNameOldOwner,
478 );
479 }
480 }
481 EventRequest::Fact(fact_request) => {
482 if is_gov && !fact_request.viewpoints.is_empty() {
483 return Err(
484 RequestHandlerError::GovFactViewpointsNotAllowed,
485 );
486 }
487
488 if is_gov
489 && serde_json::from_value::<GovernanceEvent>(
490 fact_request.payload.0.clone(),
491 )
492 .is_err()
493 {
494 return Err(RequestHandlerError::GovFactInvalidEvent);
495 }
496 }
497 EventRequest::Reject(..) | EventRequest::EOL(..) => {}
498 }
499
500 Ok(())
501 }
502
503 async fn check_fact_viewpoints(
504 ctx: &mut ActorContext<Self>,
505 request: &EventRequest,
506 subject_data: &SubjectData,
507 ) -> Result<(), RequestHandlerError> {
508 let EventRequest::Fact(fact_request) = request else {
509 return Ok(());
510 };
511
512 match subject_data {
513 SubjectData::Governance { .. } => validate_fact_viewpoints(
514 &fact_request.viewpoints,
515 &ave_common::SchemaType::Governance,
516 None,
517 )
518 .map_err(RequestHandlerError::InvalidTrackerFactViewpoints),
519 SubjectData::Tracker {
520 governance_id,
521 schema_id,
522 ..
523 } => {
524 let governance = get_gov(ctx, governance_id).await?;
525 let Some(schema) = governance.schemas.get(schema_id) else {
526 return Err(RequestHandlerError::Actor(
527 ActorError::FunctionalCritical {
528 description: format!(
529 "schema {} not found in governance {} while validating fact viewpoints",
530 schema_id, governance_id
531 ),
532 },
533 ));
534 };
535
536 validate_fact_viewpoints(
537 &fact_request.viewpoints,
538 schema_id,
539 Some(&schema.viewpoints),
540 )
541 .map_err(RequestHandlerError::InvalidTrackerFactViewpoints)
542 }
543 }
544 }
545
546 async fn check_tracker_ledger_full(
547 ctx: &mut ActorContext<Self>,
548 request: &EventRequest,
549 subject_data: &SubjectData,
550 ) -> Result<(), RequestHandlerError> {
551 let SubjectData::Tracker { governance_id, .. } = subject_data else {
552 return Ok(());
553 };
554
555 if matches!(request, EventRequest::Create(..)) {
556 return Ok(());
557 }
558
559 let subject_id = request.get_subject_id();
560 let visibility_state =
561 get_tracker_visibility_state(ctx, governance_id, &subject_id)
562 .await?;
563
564 if !visibility_state.is_full() {
565 return Err(RequestHandlerError::TrackerLedgerNotFull(
566 subject_id.to_string(),
567 ));
568 }
569
570 Ok(())
571 }
572
573 async fn build_subject_data(
574 ctx: &mut ActorContext<Self>,
575 request: &EventRequest,
576 ) -> Result<SubjectData, RequestHandlerError> {
577 let subject_data = match request {
578 EventRequest::Create(create_request) => {
579 if create_request.schema_id.is_gov() {
580 SubjectData::Governance { active: true }
581 } else {
582 SubjectData::Tracker {
583 governance_id: create_request.governance_id.clone(),
584 schema_id: create_request.schema_id.clone(),
585 namespace: create_request.namespace.to_string(),
586 active: true,
587 }
588 }
589 }
590 EventRequest::Fact(..)
591 | EventRequest::Transfer(..)
592 | EventRequest::Confirm(..)
593 | EventRequest::Reject(..)
594 | EventRequest::EOL(..) => {
595 let subject_id = request.get_subject_id();
596 let Some(subject_data) =
597 get_subject_data(ctx, &subject_id).await?
598 else {
599 return Err(RequestHandlerError::SubjectDataNotFound(
600 subject_id.to_string(),
601 ));
602 };
603
604 subject_data
605 }
606 };
607
608 Ok(subject_data)
609 }
610
611 async fn check_creation(
612 ctx: &mut ActorContext<Self>,
613 subject_data: SubjectData,
614 event_request: &EventRequestType,
615 signer: PublicKey,
616 ) -> Result<(), ActorError> {
617 match event_request {
618 EventRequestType::Create | EventRequestType::Confirm => {
619 if let SubjectData::Tracker {
620 governance_id,
621 schema_id,
622 namespace,
623 ..
624 } = subject_data
625 {
626 let version = get_version(ctx, &governance_id).await?;
627 check_subject_creation(
628 ctx,
629 &governance_id,
630 signer,
631 version,
632 namespace,
633 schema_id,
634 )
635 .await?;
636 }
637 }
638 _ => {}
639 }
640
641 Ok(())
642 }
643
644 fn build_request_id_subject_id(
645 hash: HashAlgorithm,
646 request: &Signed<EventRequest>,
647 ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
648 match &request.content() {
649 EventRequest::Create(..) => {
650 let request_id = hash_borsh(
651 &*hash.hasher(),
652 &(request.clone(), TimeStamp::now().as_nanos()),
653 )
654 .map_err(|e| {
655 RequestHandlerError::RequestIdHash(e.to_string())
656 })?;
657
658 let subject_id =
659 hash_borsh(&*hash.hasher(), request).map_err(|e| {
660 RequestHandlerError::SubjectIdHash(e.to_string())
661 })?;
662
663 Ok((request_id, subject_id))
664 }
665 EventRequest::Fact(..)
666 | EventRequest::Transfer(..)
667 | EventRequest::Confirm(..)
668 | EventRequest::Reject(..)
669 | EventRequest::EOL(..) => {
670 let request_id = hash_borsh(
671 &*hash.hasher(),
672 &(request.clone(), TimeStamp::now().as_nanos()),
673 )
674 .map_err(|e| {
675 RequestHandlerError::RequestIdHash(e.to_string())
676 })?;
677
678 Ok((request_id, request.content().get_subject_id()))
679 }
680 }
681 }
682
683 async fn handle_queue_request(
684 &mut self,
685 ctx: &mut ActorContext<Self>,
686 request: Signed<EventRequest>,
687 request_id: &DigestIdentifier,
688 subject_id: &DigestIdentifier,
689 is_gov: bool,
690 governance_id: Option<DigestIdentifier>,
691 ) -> Result<(), ActorError> {
692 let Some(helpers) = self.helpers.clone() else {
693 let e = " Can not obtain helpers".to_string();
694
695 return Err(ActorError::FunctionalCritical { description: e });
696 };
697
698 let in_handling = self.handling.contains_key(subject_id);
699 let in_queue = self.in_queue.contains_key(subject_id);
700
701 if !in_handling && !in_queue {
702 let command = Self::build_req_manager_init_msg(
703 &EventRequestType::from(request.content()),
704 is_gov,
705 );
706 let init_data = InitRequestManager {
707 our_key: self.our_key.clone(),
708 subject_id: subject_id.clone(),
709 governance_id,
710 helpers,
711 };
712
713 let actor = ctx
714 .create_child(
715 &subject_id.to_string(),
716 RequestManager::initial(init_data),
717 )
718 .await?;
719 actor
720 .tell(RequestManagerMessage::FirstRun {
721 command,
722 request,
723 request_id: request_id.clone(),
724 })
725 .await?;
726
727 self.on_event(
728 RequestHandlerEvent::EventToHandling {
729 subject_id: subject_id.clone(),
730 request_id: request_id.clone(),
731 },
732 ctx,
733 )
734 .await;
735
736 send_to_tracking(
737 ctx,
738 RequestTrackingMessage::UpdateState {
739 request_id: request_id.clone(),
740 state: RequestState::Handling,
741 },
742 )
743 .await?;
744 } else {
745 self.on_event(
746 RequestHandlerEvent::EventToQueue {
747 subject_id: subject_id.clone(),
748 event: request,
749 request_id: request_id.clone(),
750 },
751 ctx,
752 )
753 .await;
754
755 send_to_tracking(
756 ctx,
757 RequestTrackingMessage::UpdateState {
758 request_id: request_id.clone(),
759 state: RequestState::InQueue,
760 },
761 )
762 .await?;
763 }
764
765 Ok(())
766 }
767
768 const fn build_req_manager_init_msg(
769 event_request: &EventRequestType,
770 is_gov: bool,
771 ) -> ReqManInitMessage {
772 match event_request {
773 EventRequestType::Create => ReqManInitMessage::Validate,
774 EventRequestType::Fact => ReqManInitMessage::Evaluate,
775 EventRequestType::Transfer => ReqManInitMessage::Evaluate,
776 EventRequestType::Confirm => {
777 if is_gov {
778 ReqManInitMessage::Evaluate
779 } else {
780 ReqManInitMessage::Validate
781 }
782 }
783 EventRequestType::Reject => ReqManInitMessage::Validate,
784 EventRequestType::Eol => ReqManInitMessage::Validate,
785 }
786 }
787
788 async fn check_in_queue(
789 ctx: &mut ActorContext<Self>,
790 request: &Signed<EventRequest>,
791 our_key: PublicKey,
792 ) -> Result<bool, RequestHandlerError> {
793 if let EventRequest::Create(..) = request.content() {
794 return Err(RequestHandlerError::CreationNotQueued);
795 }
796
797 Self::check_owner_new_owner(ctx, request.content()).await?;
798
799 let subject_data =
800 Self::build_subject_data(ctx, request.content()).await?;
801 let event_request_type = EventRequestType::from(request.content());
802 let signer = request.signature().signer.clone();
803 let governance_id = subject_data
804 .get_governance_id()
805 .unwrap_or_else(|| request.content().get_subject_id());
806 let is_gov = subject_data.get_schema_id().is_gov();
807
808 if !subject_data.get_active() {
809 return Err(RequestHandlerError::SubjectNotActive(
810 request.content().get_subject_id().to_string(),
811 ));
812 }
813
814 Self::check_tracker_ledger_full(ctx, request.content(), &subject_data)
815 .await?;
816
817 Self::check_signer_authorization(
818 ctx,
819 our_key,
820 signer.clone(),
821 &governance_id,
822 &event_request_type,
823 subject_data.clone(),
824 )
825 .await?;
826
827 Self::check_fact_viewpoints(ctx, request.content(), &subject_data)
828 .await?;
829
830 Self::check_creation(ctx, subject_data, &event_request_type, signer)
831 .await?;
832
833 Ok(is_gov)
834 }
835
836 async fn in_queue_to_handling(
837 &mut self,
838 ctx: &mut ActorContext<Self>,
839 request: Signed<EventRequest>,
840 request_id: &DigestIdentifier,
841 is_gov: bool,
842 ) -> Result<(), ActorError> {
843 let command = Self::build_req_manager_init_msg(
844 &EventRequestType::from(request.content()),
845 is_gov,
846 );
847 let subject_id = request.content().get_subject_id();
848
849 let actor = ctx
850 .get_child::<RequestManager>(&subject_id.to_string())
851 .await?;
852
853 actor
854 .tell(RequestManagerMessage::FirstRun {
855 command,
856 request,
857 request_id: request_id.clone(),
858 })
859 .await?;
860
861 self.on_event(
862 RequestHandlerEvent::EventToHandling {
863 subject_id: subject_id.clone(),
864 request_id: request_id.clone(),
865 },
866 ctx,
867 )
868 .await;
869
870 send_to_tracking(
871 ctx,
872 RequestTrackingMessage::UpdateState {
873 request_id: request_id.clone(),
874 state: RequestState::Handling,
875 },
876 )
877 .await
878 }
879
880 async fn end_child(
881 ctx: &ActorContext<Self>,
882 subject_id: &DigestIdentifier,
883 ) -> Result<(), ActorError> {
884 let actor = ctx
885 .get_child::<RequestManager>(&subject_id.to_string())
886 .await?;
887 actor.ask_stop().await
888 }
889
890 async fn manual_abort_request(
891 &self,
892 ctx: &ActorContext<Self>,
893 subject_id: &DigestIdentifier,
894 ) -> Result<(), ActorError> {
895 let actor = ctx
896 .get_child::<RequestManager>(&subject_id.to_string())
897 .await?;
898
899 actor.tell(RequestManagerMessage::ManualAbort).await
900 }
901
902 async fn purge_request_manager(
903 &self,
904 ctx: &mut ActorContext<Self>,
905 subject_id: &DigestIdentifier,
906 ) -> Result<(), ActorError> {
907 let Some((hash, network)) = self.helpers.clone() else {
908 return Err(ActorError::FunctionalCritical {
909 description: "Request handler helpers are not initialized"
910 .to_string(),
911 });
912 };
913
914 let governance_id = get_subject_data(ctx, subject_id)
915 .await?
916 .and_then(|data| data.get_governance_id());
917 let request_manager_init = InitRequestManager {
918 our_key: self.our_key.clone(),
919 subject_id: subject_id.clone(),
920 governance_id,
921 helpers: (hash, network),
922 };
923
924 let actor = match ctx
925 .create_child(
926 &subject_id.to_string(),
927 RequestManager::initial(request_manager_init),
928 )
929 .await
930 {
931 Ok(actor) => actor,
932 Err(ActorError::Exists { .. }) => {
933 ctx.get_child::<RequestManager>(&subject_id.to_string())
934 .await?
935 }
936 Err(err) => return Err(err),
937 };
938
939 actor.ask(RequestManagerMessage::PurgeStorage).await?;
940 actor.ask_stop().await?;
941
942 Ok(())
943 }
944}
945
946#[derive(Debug, Clone)]
947pub enum RequestHandlerMessage {
948 NewRequest {
949 request: Signed<EventRequest>,
950 },
951 RequestInManager,
952 RequestInManagerSubjectId {
953 subject_id: DigestIdentifier,
954 },
955 ChangeApprovalState {
956 subject_id: DigestIdentifier,
957 state: ApprovalStateRes,
958 },
959 GetApproval {
960 subject_id: DigestIdentifier,
961 state: Option<ApprovalState>,
962 },
963 GetAllApprovals {
964 state: Option<ApprovalState>,
965 },
966 PopQueue {
967 subject_id: DigestIdentifier,
968 },
969 EndHandling {
970 subject_id: DigestIdentifier,
971 },
972 PurgeSubject {
973 subject_id: DigestIdentifier,
974 },
975 AbortRequest {
976 subject_id: DigestIdentifier,
977 },
978}
979
980impl Message for RequestHandlerMessage {}
981
982#[derive(Debug, Clone)]
983pub enum RequestHandlerResponse {
984 RequestInManager(RequestsInManager),
985 RequestInManagerSubjectId(RequestsInManagerSubject),
986 Ok(RequestData),
987 Response(String),
988 Approval(Option<(ApprovalReq, ApprovalState)>),
989 Approvals(Vec<(ApprovalReq, ApprovalState)>),
990 None,
991}
992
993impl Response for RequestHandlerResponse {}
994
995#[derive(
996 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
997)]
998pub enum RequestHandlerEvent {
999 EventToQueue {
1000 subject_id: DigestIdentifier,
1001 event: Signed<EventRequest>,
1002 request_id: DigestIdentifier,
1003 },
1004 Invalid {
1005 subject_id: DigestIdentifier,
1006 },
1007 FinishHandling {
1008 subject_id: DigestIdentifier,
1009 },
1010 PurgeSubject {
1011 subject_id: DigestIdentifier,
1012 },
1013 EventToHandling {
1014 subject_id: DigestIdentifier,
1015 request_id: DigestIdentifier,
1016 },
1017}
1018
1019impl Event for RequestHandlerEvent {}
1020
1021#[async_trait]
1022impl Actor for RequestHandler {
1023 type Event = RequestHandlerEvent;
1024 type Message = RequestHandlerMessage;
1025 type Response = RequestHandlerResponse;
1026
1027 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
1028 parent_span.map_or_else(
1029 || info_span!("RequestHandler"),
1030 |parent_span| info_span!(parent: parent_span, "RequestHandler"),
1031 )
1032 }
1033
1034 async fn pre_start(
1035 &mut self,
1036 ctx: &mut ActorContext<Self>,
1037 ) -> Result<(), ActorError> {
1038 if let Err(e) = self.init_store("request", None, false, ctx).await {
1039 error!(
1040 error = %e,
1041 "Failed to initialize store during pre_start"
1042 );
1043 return Err(e);
1044 }
1045
1046 let Some(config) =
1047 ctx.system().get_helper::<ConfigHelper>("config").await
1048 else {
1049 error!(
1050 helper = "config",
1051 "Config helper not found during pre_start"
1052 );
1053 return Err(ActorError::Helper {
1054 name: "config".to_owned(),
1055 reason: "Not found".to_string(),
1056 });
1057 };
1058
1059 if !config.safe_mode {
1060 let Some(ext_db): Option<Arc<ExternalDB>> =
1061 ctx.system().get_helper("ext_db").await
1062 else {
1063 error!("External database helper not found");
1064 return Err(ActorError::Helper {
1065 name: "ext_db".to_string(),
1066 reason: "Not found".to_string(),
1067 });
1068 };
1069
1070 let tracking = match ctx
1071 .create_child(
1072 "tracking",
1073 RequestTracking::new(config.tracking_size),
1074 )
1075 .await
1076 {
1077 Ok(actor) => actor,
1078 Err(e) => {
1079 error!(
1080 error = %e,
1081 "Failed to create tracking child during pre_start"
1082 );
1083 return Err(e);
1084 }
1085 };
1086
1087 let sink =
1088 Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
1089
1090 ctx.system().run_sink(sink).await;
1091 }
1092
1093 let Some((hash, network)) = self.helpers.clone() else {
1094 let e = " Can not obtain helpers".to_string();
1095 error!(
1096 error = %e,
1097 "Failed to obtain helpers during pre_start"
1098 );
1099 ctx.system().crash_system();
1100 return Err(ActorError::FunctionalCritical { description: e });
1101 };
1102
1103 if config.safe_mode {
1104 return Ok(());
1105 }
1106
1107 for (subject_id, request_id) in self.handling.clone() {
1108 let governance_id = get_subject_data(ctx, &subject_id)
1109 .await?
1110 .and_then(|data| data.get_governance_id());
1111 let request_manager_init = InitRequestManager {
1112 our_key: self.our_key.clone(),
1113 subject_id: subject_id.clone(),
1114 governance_id,
1115 helpers: (hash, network.clone()),
1116 };
1117
1118 let request_manager_actor = match ctx
1119 .create_child(
1120 &subject_id.to_string(),
1121 RequestManager::initial(request_manager_init),
1122 )
1123 .await
1124 {
1125 Ok(actor) => actor,
1126 Err(e) => {
1127 error!(
1128 subject_id = %subject_id,
1129 error = %e,
1130 "Failed to create request manager child during pre_start"
1131 );
1132 return Err(e);
1133 }
1134 };
1135
1136 if let Err(e) = request_manager_actor
1137 .tell(RequestManagerMessage::Run {
1138 request_id: request_id.clone(),
1139 })
1140 .await
1141 {
1142 error!(
1143 subject_id = %subject_id,
1144 request_id = %request_id,
1145 error = %e,
1146 "Failed to send Run message to request manager during pre_start"
1147 );
1148 return Err(e);
1149 }
1150 }
1151
1152 Ok(())
1153 }
1154}
1155
1156#[async_trait]
1157impl Handler<Self> for RequestHandler {
1158 async fn handle_message(
1159 &mut self,
1160 _sender: ActorPath,
1161 msg: RequestHandlerMessage,
1162 ctx: &mut ave_actors::ActorContext<Self>,
1163 ) -> Result<RequestHandlerResponse, ActorError> {
1164 match msg {
1165 RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
1166 let handling =
1167 self.handling.get(&subject_id).map(|x| x.to_string());
1168 let in_queue = self.in_queue.get(&subject_id).map(|x| {
1169 x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
1170 });
1171
1172 Ok(RequestHandlerResponse::RequestInManagerSubjectId(
1173 RequestsInManagerSubject { handling, in_queue },
1174 ))
1175 }
1176 RequestHandlerMessage::RequestInManager => Ok(
1177 RequestHandlerResponse::RequestInManager(RequestsInManager {
1178 handling: self
1179 .handling
1180 .iter()
1181 .map(|x| (x.0.to_string(), x.1.to_string()))
1182 .collect(),
1183 in_queue: self
1184 .in_queue
1185 .iter()
1186 .map(|x| {
1187 (
1188 x.0.to_string(),
1189 x.1.iter()
1190 .map(|x| x.1.to_string())
1191 .collect::<Vec<String>>(),
1192 )
1193 })
1194 .collect(),
1195 }),
1196 ),
1197 RequestHandlerMessage::AbortRequest { subject_id } => {
1198 self.manual_abort_request(ctx, &subject_id).await?;
1199 Ok(RequestHandlerResponse::None)
1200 }
1201 RequestHandlerMessage::PurgeSubject { subject_id } => {
1202 self.purge_request_manager(ctx, &subject_id).await?;
1203 self.on_event(
1204 RequestHandlerEvent::PurgeSubject {
1205 subject_id: subject_id.clone(),
1206 },
1207 ctx,
1208 )
1209 .await;
1210 Ok(RequestHandlerResponse::None)
1211 }
1212 RequestHandlerMessage::ChangeApprovalState {
1213 subject_id,
1214 state,
1215 } => {
1216 Self::change_approval(ctx, &subject_id, state.clone())
1217 .await
1218 .map_err(|e| {
1219 error!(
1220 error = %e,
1221 "ChangeApprovalState failed"
1222 );
1223 ActorError::from(e)
1224 })?;
1225
1226 Ok(RequestHandlerResponse::Response(format!(
1227 "The approval request for subject {} has changed to {}",
1228 subject_id, state
1229 )))
1230 }
1231 RequestHandlerMessage::GetApproval { subject_id, state } => {
1232 let res = Self::get_approval(ctx, &subject_id, state.clone())
1233 .await
1234 .map_err(ActorError::from)?;
1235
1236 Ok(RequestHandlerResponse::Approval(res))
1237 }
1238 RequestHandlerMessage::GetAllApprovals { state } => {
1239 let res = Self::get_all_approvals(ctx, state.clone())
1240 .await
1241 .map_err(|e| {
1242 error!(
1243 error = %e,
1244 "GetAllApprovals failed"
1245 );
1246 e
1247 })?;
1248
1249 Ok(RequestHandlerResponse::Approvals(res))
1250 }
1251 RequestHandlerMessage::NewRequest { request } => {
1252 if let Err(e) = request.verify() {
1253 let err = RequestHandlerError::SignatureVerification(
1254 e.to_string(),
1255 );
1256 error!(error = %err, "Request signature verification failed");
1257 return Err(ActorError::from(err));
1258 };
1259
1260 let Some((hash, ..)) = self.helpers.clone() else {
1261 let err = RequestHandlerError::HelpersNotInitialized;
1262 error!(
1263 msg_type = "NewRequest",
1264 error = %err,
1265 "Helpers not initialized"
1266 );
1267 return Err(emit_fail(ctx, ActorError::from(err)).await);
1268 };
1269
1270 if let Err(e) =
1271 Self::check_owner_new_owner(ctx, request.content()).await
1272 {
1273 error!(
1274 msg_type = "NewRequest",
1275 error = %e,
1276 "Owner or new owner check failed"
1277 );
1278 return Err(ActorError::from(e));
1279 }
1280
1281 let subject_data = match Self::build_subject_data(
1282 ctx,
1283 request.content(),
1284 )
1285 .await
1286 {
1287 Ok(data) => data,
1288 Err(e) => {
1289 error!(
1290 msg_type = "NewRequest",
1291 error = %e,
1292 "Failed to build subject data"
1293 );
1294 return Err(ActorError::from(e));
1295 }
1296 };
1297 let event_request_type =
1298 EventRequestType::from(request.content());
1299 let signer = request.signature().signer.clone();
1300 let governance_id = subject_data.get_governance_id();
1301 let governance_subject_id = governance_id
1302 .clone()
1303 .unwrap_or_else(|| request.content().get_subject_id());
1304 let is_gov = subject_data.get_schema_id().is_gov();
1305
1306 if !subject_data.get_active() {
1307 let subject_id = request.content().get_subject_id();
1308 error!(
1309 msg_type = "NewRequest",
1310 subject_id = %subject_id,
1311 "Subject is not active"
1312 );
1313 return Err(ActorError::from(
1314 RequestHandlerError::SubjectNotActive(
1315 subject_id.to_string(),
1316 ),
1317 ));
1318 }
1319
1320 if let Err(e) = Self::check_tracker_ledger_full(
1321 ctx,
1322 request.content(),
1323 &subject_data,
1324 )
1325 .await
1326 {
1327 error!(
1328 msg_type = "NewRequest",
1329 error = %e,
1330 "Tracker full ledger check failed"
1331 );
1332 return Err(ActorError::from(e));
1333 }
1334
1335 if let Err(e) =
1336 Self::check_event_request(request.content(), is_gov)
1337 {
1338 error!(
1339 msg_type = "NewRequest",
1340 error = %e,
1341 "Event request validation failed"
1342 );
1343 return Err(ActorError::from(e));
1344 }
1345
1346 if let Err(e) = Self::check_fact_viewpoints(
1347 ctx,
1348 request.content(),
1349 &subject_data,
1350 )
1351 .await
1352 {
1353 error!(
1354 msg_type = "NewRequest",
1355 error = %e,
1356 "Fact viewpoints validation failed"
1357 );
1358 return Err(ActorError::from(e));
1359 }
1360
1361 if let Err(e) = Self::check_signer_authorization(
1362 ctx,
1363 (*self.our_key).clone(),
1364 signer.clone(),
1365 &governance_subject_id,
1366 &event_request_type,
1367 subject_data.clone(),
1368 )
1369 .await
1370 {
1371 error!(
1372 msg_type = "NewRequest",
1373 governance_id = %governance_subject_id,
1374 error = %e,
1375 "Signature check failed"
1376 );
1377 return Err(e);
1378 }
1379
1380 if let Err(e) = Self::check_creation(
1381 ctx,
1382 subject_data,
1383 &event_request_type,
1384 signer,
1385 )
1386 .await
1387 {
1388 error!(
1389 msg_type = "NewRequest",
1390 error = %e,
1391 "Creation check failed"
1392 );
1393 return Err(e);
1394 }
1395
1396 let (request_id, subject_id) =
1397 match Self::build_request_id_subject_id(hash, &request) {
1398 Ok(ids) => ids,
1399 Err(e) => {
1400 error!(
1401 msg_type = "NewRequest",
1402 error = %e,
1403 "Failed to build request ID and subject ID"
1404 );
1405 return Err(ActorError::from(e));
1406 }
1407 };
1408
1409 if let Err(e) = self
1410 .handle_queue_request(
1411 ctx,
1412 request,
1413 &request_id,
1414 &subject_id,
1415 is_gov,
1416 governance_id,
1417 )
1418 .await
1419 {
1420 error!(
1421 msg_type = "NewRequest",
1422 request_id = %request_id,
1423 subject_id = %subject_id,
1424 error = %e,
1425 "Failed to handle queue request"
1426 );
1427 return Err(e);
1428 }
1429
1430 Ok(RequestHandlerResponse::Ok(RequestData {
1431 request_id,
1432 subject_id,
1433 }))
1434 }
1435 RequestHandlerMessage::PopQueue { subject_id } => {
1436 let (event, request_id) = if let Some(events) =
1437 self.in_queue.get(&subject_id)
1438 {
1439 if let Some((event, request_id)) =
1440 events.clone().pop_front()
1441 {
1442 (event, request_id)
1443 } else {
1444 if let Err(e) = Self::end_child(ctx, &subject_id).await
1445 {
1446 error!(
1447 msg_type = "PopQueue",
1448 subject_id = %subject_id,
1449 error = %e,
1450 "Failed to end child actor when queue is empty"
1451 );
1452 ctx.system().crash_system();
1453 return Err(e);
1454 }
1455 return Ok(RequestHandlerResponse::None);
1456 }
1457 } else {
1458 if let Err(e) = Self::end_child(ctx, &subject_id).await {
1459 error!(
1460 msg_type = "PopQueue",
1461 subject_id = %subject_id,
1462 error = %e,
1463 "Failed to end child actor when no events available"
1464 );
1465 ctx.system().crash_system();
1466 return Err(e);
1467 }
1468 return Ok(RequestHandlerResponse::None);
1469 };
1470
1471 let is_gov = match Self::check_in_queue(
1472 ctx,
1473 &event,
1474 (*self.our_key).clone(),
1475 )
1476 .await
1477 {
1478 Ok(is_gov) => is_gov,
1479 Err(e) => {
1480 if let Err(e) = self
1481 .error_queue_handling(
1482 ctx,
1483 e.to_string(),
1484 &subject_id,
1485 &request_id,
1486 )
1487 .await
1488 {
1489 error!(
1490 msg_type = "PopQueue",
1491 subject_id = %subject_id,
1492 request_id = %request_id,
1493 error = %e,
1494 "Failed to handle queue error"
1495 );
1496 ctx.system().crash_system();
1497 return Err(e);
1498 };
1499
1500 return Ok(RequestHandlerResponse::None);
1501 }
1502 };
1503
1504 if let Err(e) = self
1505 .in_queue_to_handling(ctx, event, &request_id, is_gov)
1506 .await
1507 {
1508 error!(
1509 msg_type = "PopQueue",
1510 request_id = %request_id,
1511 error = %e,
1512 "Failed to transition from queue to handling"
1513 );
1514 ctx.system().crash_system();
1515 return Err(e);
1516 }
1517
1518 Ok(RequestHandlerResponse::None)
1519 }
1520 RequestHandlerMessage::EndHandling { subject_id } => {
1521 self.on_event(
1522 RequestHandlerEvent::FinishHandling {
1523 subject_id: subject_id.clone(),
1524 },
1525 ctx,
1526 )
1527 .await;
1528
1529 if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1530 error!(
1531 msg_type = "EndHandling",
1532 subject_id = %subject_id,
1533 error = %e,
1534 "Failed to enqueue next event"
1535 );
1536 ctx.system().crash_system();
1537 return Err(e);
1538 }
1539
1540 Ok(RequestHandlerResponse::None)
1541 }
1542 }
1543 }
1544
1545 async fn on_child_fault(
1546 &mut self,
1547 error: ActorError,
1548 ctx: &mut ActorContext<Self>,
1549 ) -> ChildAction {
1550 error!(
1551 error = %error,
1552 "Child fault in request handler"
1553 );
1554 ctx.system().crash_system();
1555 ChildAction::Stop
1556 }
1557
1558 async fn on_event(
1559 &mut self,
1560 event: RequestHandlerEvent,
1561 ctx: &mut ActorContext<Self>,
1562 ) {
1563 if let Err(e) = self.persist(&event, ctx).await {
1564 error!(
1565 error = %e,
1566 "Failed to persist event"
1567 );
1568 ctx.system().crash_system();
1569 };
1570 }
1571}
1572
1573#[async_trait]
1574impl Storable for RequestHandler {}
1575
1576#[async_trait]
1577impl PersistentActor for RequestHandler {
1578 type Persistence = LightPersistence;
1579 type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1580
1581 fn update(&mut self, state: Self) {
1582 self.in_queue = state.in_queue;
1583 self.handling = state.handling;
1584 }
1585
1586 fn create_initial(params: Self::InitParams) -> Self {
1587 Self {
1588 our_key: params.0,
1589 helpers: Some(params.1),
1590 handling: HashMap::new(),
1591 in_queue: HashMap::new(),
1592 }
1593 }
1594
1595 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1597 match event {
1598 RequestHandlerEvent::EventToQueue {
1599 subject_id,
1600 event,
1601 request_id,
1602 } => {
1603 self.in_queue
1604 .entry(subject_id.clone())
1605 .or_default()
1606 .push_back((event.clone(), request_id.clone()));
1607 }
1608 RequestHandlerEvent::Invalid { subject_id } => {
1609 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1610 vec.pop_front();
1611 if vec.is_empty() {
1612 self.in_queue.remove(subject_id);
1613 }
1614 }
1615 }
1616 RequestHandlerEvent::EventToHandling {
1617 subject_id,
1618 request_id,
1619 } => {
1620 self.handling.insert(subject_id.clone(), request_id.clone());
1621 if let Some(vec) = self.in_queue.get_mut(subject_id) {
1622 vec.pop_front();
1623 if vec.is_empty() {
1624 self.in_queue.remove(subject_id);
1625 }
1626 }
1627 }
1628 RequestHandlerEvent::FinishHandling { subject_id } => {
1629 self.handling.remove(subject_id);
1630 }
1631 RequestHandlerEvent::PurgeSubject { subject_id } => {
1632 self.handling.remove(subject_id);
1633 self.in_queue.remove(subject_id);
1634 }
1635 };
1636
1637 Ok(())
1638 }
1639}