Skip to main content

ave_core/request/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9    ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12    DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16    RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29    ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::metrics::try_core_metrics;
38use crate::model::common::node::{get_subject_data, i_owner_new_owner};
39use crate::model::common::subject::{get_gov, get_version};
40use crate::model::common::{
41    check_subject_creation, emit_fail, send_to_tracking,
42};
43use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
44use crate::request::manager::InitRequestManager;
45use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
46use crate::system::ConfigHelper;
47
48pub mod error;
49pub mod manager;
50pub mod reboot;
51pub mod tracking;
52pub mod types;
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RequestData {
56    pub request_id: DigestIdentifier,
57    pub subject_id: DigestIdentifier,
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize)]
61pub struct RequestHandler {
62    #[serde(skip)]
63    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
64    #[serde(skip)]
65    our_key: Arc<PublicKey>,
66    handling: HashMap<DigestIdentifier, DigestIdentifier>,
67    in_queue: HashMap<
68        DigestIdentifier,
69        VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
70    >,
71}
72
73impl BorshSerialize for RequestHandler {
74    fn serialize<W: std::io::Write>(
75        &self,
76        writer: &mut W,
77    ) -> std::io::Result<()> {
78        // Serialize only the fields we want to persist, skipping 'owner'
79        BorshSerialize::serialize(&self.handling, writer)?;
80        BorshSerialize::serialize(&self.in_queue, writer)?;
81        Ok(())
82    }
83}
84
85impl BorshDeserialize for RequestHandler {
86    fn deserialize_reader<R: std::io::Read>(
87        reader: &mut R,
88    ) -> std::io::Result<Self> {
89        // Deserialize the persisted fields
90        let handling =
91            HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
92                reader,
93            )?;
94        let in_queue = HashMap::<
95            DigestIdentifier,
96            VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
97        >::deserialize_reader(reader)?;
98
99        let our_key = Arc::new(PublicKey::default());
100
101        Ok(Self {
102            helpers: None,
103            our_key,
104            handling,
105            in_queue,
106        })
107    }
108}
109
110impl RequestHandler {
111    async fn check_signature(
112        ctx: &mut ActorContext<Self>,
113        our_key: PublicKey,
114        signer: PublicKey,
115        governance_id: &DigestIdentifier,
116        event_request: &EventRequestType,
117        subject_data: SubjectData,
118    ) -> Result<(), ActorError> {
119        match event_request {
120            EventRequestType::Create
121            | EventRequestType::Transfer
122            | EventRequestType::Confirm
123            | EventRequestType::Reject
124            | EventRequestType::Eol => {
125                if signer != our_key {
126                    return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
127                }
128            }
129            EventRequestType::Fact => {
130                let gov = get_gov(ctx, governance_id).await?;
131                match subject_data {
132                    SubjectData::Tracker {
133                        schema_id,
134                        namespace,
135                        ..
136                    } => {
137                        if !gov.has_this_role(HashThisRole::Schema {
138                            who: signer,
139                            role: RoleTypes::Issuer,
140                            schema_id,
141                            namespace: Namespace::from(namespace),
142                        }) {
143                            return Err(ActorError::Functional {
144                            description:
145                                "In fact events, the signer has to be an issuer"
146                                    .to_string(),
147                        });
148                        }
149                    }
150                    SubjectData::Governance { .. } => {
151                        if !gov.has_this_role(HashThisRole::Gov {
152                            who: signer,
153                            role: RoleTypes::Issuer,
154                        }) {
155                            return Err(ActorError::Functional {
156                            description:
157                                "In fact events, the signer has to be an issuer"
158                                    .to_string(),
159                        });
160                        }
161                    }
162                }
163            }
164        }
165
166        Ok(())
167    }
168
169    async fn queued_event(
170        ctx: &ActorContext<Self>,
171        subject_id: &DigestIdentifier,
172    ) -> Result<(), ActorError> {
173        let request_actor = ctx.reference().await?;
174        request_actor
175            .tell(RequestHandlerMessage::PopQueue {
176                subject_id: subject_id.to_owned(),
177            })
178            .await
179    }
180
181    async fn error_queue_handling(
182        &mut self,
183        ctx: &mut ActorContext<Self>,
184        error: String,
185        subject_id: &DigestIdentifier,
186        request_id: &DigestIdentifier,
187    ) -> Result<(), ActorError> {
188        if let Some(metrics) = try_core_metrics() {
189            metrics.observe_request_invalid();
190        }
191
192        self.on_event(
193            RequestHandlerEvent::Invalid {
194                subject_id: subject_id.to_owned(),
195            },
196            ctx,
197        )
198        .await;
199
200        send_to_tracking(
201            ctx,
202            RequestTrackingMessage::UpdateState {
203                request_id: request_id.clone(),
204                state: RequestState::Invalid {
205                    error,
206                    sn: None,
207                    subject_id: subject_id.to_string(),
208                    who: self.our_key.to_string(),
209                },
210            },
211        )
212        .await?;
213
214        Self::queued_event(ctx, subject_id).await
215    }
216
217    async fn change_approval(
218        ctx: &ActorContext<Self>,
219        subject_id: &DigestIdentifier,
220        state: ApprovalStateRes,
221    ) -> Result<(), RequestHandlerError> {
222        if state == ApprovalStateRes::Obsolete {
223            return Err(RequestHandlerError::ObsoleteApproval);
224        }
225
226        let approver_path = ActorPath::from(format!(
227            "/user/node/subject_manager/{}/approver",
228            subject_id
229        ));
230        let approver_actor = ctx
231            .system()
232            .get_actor::<ApprPersist>(&approver_path)
233            .await
234            .map_err(|_| {
235                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
236            })?;
237
238        approver_actor
239            .tell(ApprPersistMessage::ChangeResponse {
240                response: state.clone(),
241            })
242            .await
243            .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
244    }
245
246    async fn get_approval(
247        ctx: &ActorContext<Self>,
248        subject_id: &DigestIdentifier,
249        state: Option<ApprovalState>,
250    ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
251        let approver_path = ActorPath::from(format!(
252            "/user/node/subject_manager/{}/approver",
253            subject_id
254        ));
255        let approver_actor = ctx
256            .system()
257            .get_actor::<ApprPersist>(&approver_path)
258            .await
259            .map_err(|_| {
260                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
261            })?;
262
263        let response = approver_actor
264            .ask(ApprPersistMessage::GetApproval { state })
265            .await
266            .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
267
268        let res = match response {
269            ApprPersistResponse::Ok => None,
270            ApprPersistResponse::Approval { request, state } => {
271                Some((request, state))
272            }
273        };
274
275        Ok(res)
276    }
277
278    async fn get_all_approvals(
279        ctx: &ActorContext<Self>,
280        state: Option<ApprovalState>,
281    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
282        let node_path = ActorPath::from("/user/node");
283        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
284        let response = node_actor.ask(NodeMessage::GetGovernances).await?;
285        let vec = match response {
286            NodeResponse::Governances(govs) => govs,
287            _ => {
288                return Err(ActorError::UnexpectedResponse {
289                    path: node_path,
290                    expected: "NodeResponse::Governances".to_string(),
291                });
292            }
293        };
294
295        let mut responses = vec![];
296        for governance in vec.iter() {
297            let approver_path = ActorPath::from(format!(
298                "/user/node/subject_manager/{}/approver",
299                governance
300            ));
301            if let Ok(approver_actor) =
302                ctx.system().get_actor::<ApprPersist>(&approver_path).await
303            {
304                let response = approver_actor
305                    .ask(ApprPersistMessage::GetApproval {
306                        state: state.clone(),
307                    })
308                    .await?;
309
310                match response {
311                    ApprPersistResponse::Ok => {}
312                    ApprPersistResponse::Approval { request, state } => {
313                        responses.push((request, state))
314                    }
315                };
316            };
317        }
318
319        Ok(responses)
320    }
321
322    async fn check_owner_new_owner(
323        ctx: &mut ActorContext<Self>,
324        request: &EventRequest,
325    ) -> Result<(), RequestHandlerError> {
326        match request {
327            EventRequest::Create(..) => {}
328            EventRequest::Fact(..)
329            | EventRequest::Transfer(..)
330            | EventRequest::EOL(..) => {
331                let subject_id = request.get_subject_id();
332                let (i_owner, i_new_owner) =
333                    i_owner_new_owner(ctx, &subject_id).await?;
334                if !i_owner {
335                    return Err(RequestHandlerError::NotOwner(
336                        subject_id.to_string(),
337                    ));
338                }
339
340                if i_new_owner.is_some() {
341                    return Err(RequestHandlerError::PendingNewOwner(
342                        subject_id.to_string(),
343                    ));
344                }
345            }
346            EventRequest::Confirm(..) | EventRequest::Reject(..) => {
347                let subject_id = request.get_subject_id();
348                let (i_owner, i_new_owner) =
349                    i_owner_new_owner(ctx, &subject_id).await?;
350                if i_owner {
351                    return Err(RequestHandlerError::IsOwner(
352                        subject_id.to_string(),
353                    ));
354                }
355
356                if let Some(new_owner) = i_new_owner {
357                    if !new_owner {
358                        return Err(RequestHandlerError::NotNewOwner(
359                            subject_id.to_string(),
360                        ));
361                    }
362                } else {
363                    return Err(RequestHandlerError::NoNewOwnerPending(
364                        subject_id.to_string(),
365                    ));
366                }
367            }
368        };
369        Ok(())
370    }
371
372    fn check_event_request(
373        request: &EventRequest,
374        is_gov: bool,
375    ) -> Result<(), RequestHandlerError> {
376        match request {
377            EventRequest::Create(create_request) => {
378                if let Some(name) = &create_request.name
379                    && (name.is_empty() || name.len() > 100)
380                {
381                    return Err(RequestHandlerError::InvalidName);
382                }
383
384                if let Some(description) = &create_request.description
385                    && (description.is_empty() || description.len() > 200)
386                {
387                    return Err(RequestHandlerError::InvalidDescription);
388                }
389
390                if !create_request.schema_id.is_valid_in_request() {
391                    return Err(RequestHandlerError::InvalidSchemaId);
392                }
393
394                if is_gov {
395                    if !create_request.governance_id.is_empty() {
396                        return Err(
397                            RequestHandlerError::GovernanceIdMustBeEmpty,
398                        );
399                    }
400
401                    if !create_request.namespace.is_empty() {
402                        return Err(RequestHandlerError::NamespaceMustBeEmpty);
403                    }
404                } else if create_request.governance_id.is_empty() {
405                    return Err(RequestHandlerError::GovernanceIdRequired);
406                }
407            }
408            EventRequest::Transfer(transfer_request) => {
409                if transfer_request.new_owner.is_empty() {
410                    return Err(RequestHandlerError::TransferNewOwnerEmpty);
411                }
412            }
413            EventRequest::Confirm(confirm_request) => {
414                if is_gov {
415                    if let Some(name_old_owner) =
416                        &confirm_request.name_old_owner
417                        && name_old_owner.is_empty()
418                    {
419                        return Err(
420                            RequestHandlerError::ConfirmNameOldOwnerEmpty,
421                        );
422                    }
423                } else if confirm_request.name_old_owner.is_some() {
424                    return Err(
425                        RequestHandlerError::ConfirmTrackerNameOldOwner,
426                    );
427                }
428            }
429            EventRequest::Fact(fact_request) => {
430                if is_gov
431                    && serde_json::from_value::<GovernanceEvent>(
432                        fact_request.payload.0.clone(),
433                    )
434                    .is_err()
435                {
436                    return Err(RequestHandlerError::GovFactInvalidEvent);
437                }
438            }
439            EventRequest::Reject(..) | EventRequest::EOL(..) => {}
440        }
441
442        Ok(())
443    }
444
445    async fn build_subject_data(
446        ctx: &mut ActorContext<Self>,
447        request: &EventRequest,
448    ) -> Result<SubjectData, RequestHandlerError> {
449        let subject_data = match request {
450            EventRequest::Create(create_request) => {
451                if create_request.schema_id.is_gov() {
452                    SubjectData::Governance { active: true }
453                } else {
454                    SubjectData::Tracker {
455                        governance_id: create_request.governance_id.clone(),
456                        schema_id: create_request.schema_id.clone(),
457                        namespace: create_request.namespace.to_string(),
458                        active: true,
459                    }
460                }
461            }
462            EventRequest::Fact(..)
463            | EventRequest::Transfer(..)
464            | EventRequest::Confirm(..)
465            | EventRequest::Reject(..)
466            | EventRequest::EOL(..) => {
467                let subject_id = request.get_subject_id();
468                let Some(subject_data) =
469                    get_subject_data(ctx, &subject_id).await?
470                else {
471                    return Err(RequestHandlerError::SubjectDataNotFound(
472                        subject_id.to_string(),
473                    ));
474                };
475
476                subject_data
477            }
478        };
479
480        Ok(subject_data)
481    }
482
483    async fn check_creation(
484        ctx: &mut ActorContext<Self>,
485        subject_data: SubjectData,
486        event_request: &EventRequestType,
487        signer: PublicKey,
488    ) -> Result<(), ActorError> {
489        match event_request {
490            EventRequestType::Create | EventRequestType::Confirm => {
491                if let SubjectData::Tracker {
492                    governance_id,
493                    schema_id,
494                    namespace,
495                    ..
496                } = subject_data
497                {
498                    let version = get_version(ctx, &governance_id).await?;
499                    check_subject_creation(
500                        ctx,
501                        &governance_id,
502                        signer,
503                        version,
504                        namespace,
505                        schema_id,
506                    )
507                    .await?;
508                }
509            }
510            _ => {}
511        }
512
513        Ok(())
514    }
515
516    fn build_request_id_subject_id(
517        hash: HashAlgorithm,
518        request: &Signed<EventRequest>,
519    ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
520        match &request.content() {
521            EventRequest::Create(..) => {
522                let request_id = hash_borsh(
523                    &*hash.hasher(),
524                    &(request.clone(), TimeStamp::now().as_nanos()),
525                )
526                .map_err(|e| {
527                    RequestHandlerError::RequestIdHash(e.to_string())
528                })?;
529
530                let subject_id =
531                    hash_borsh(&*hash.hasher(), request).map_err(|e| {
532                        RequestHandlerError::SubjectIdHash(e.to_string())
533                    })?;
534
535                Ok((request_id, subject_id))
536            }
537            EventRequest::Fact(..)
538            | EventRequest::Transfer(..)
539            | EventRequest::Confirm(..)
540            | EventRequest::Reject(..)
541            | EventRequest::EOL(..) => {
542                let request_id = hash_borsh(
543                    &*hash.hasher(),
544                    &(request.clone(), TimeStamp::now().as_nanos()),
545                )
546                .map_err(|e| {
547                    RequestHandlerError::RequestIdHash(e.to_string())
548                })?;
549
550                Ok((request_id, request.content().get_subject_id()))
551            }
552        }
553    }
554
555    async fn handle_queue_request(
556        &mut self,
557        ctx: &mut ActorContext<Self>,
558        request: Signed<EventRequest>,
559        request_id: &DigestIdentifier,
560        subject_id: &DigestIdentifier,
561        is_gov: bool,
562        governance_id: Option<DigestIdentifier>,
563    ) -> Result<(), ActorError> {
564        let Some(helpers) = self.helpers.clone() else {
565            let e = " Can not obtain helpers".to_string();
566
567            return Err(ActorError::FunctionalCritical { description: e });
568        };
569
570        let in_handling = self.handling.contains_key(subject_id);
571        let in_queue = self.in_queue.contains_key(subject_id);
572
573        if !in_handling && !in_queue {
574            let command = Self::build_req_manager_init_msg(
575                &EventRequestType::from(request.content()),
576                is_gov,
577            );
578            let init_data = InitRequestManager {
579                our_key: self.our_key.clone(),
580                subject_id: subject_id.clone(),
581                governance_id,
582                helpers,
583            };
584
585            let actor = ctx
586                .create_child(
587                    &subject_id.to_string(),
588                    RequestManager::initial(init_data),
589                )
590                .await?;
591            actor
592                .tell(RequestManagerMessage::FirstRun {
593                    command,
594                    request,
595                    request_id: request_id.clone(),
596                })
597                .await?;
598
599            self.on_event(
600                RequestHandlerEvent::EventToHandling {
601                    subject_id: subject_id.clone(),
602                    request_id: request_id.clone(),
603                },
604                ctx,
605            )
606            .await;
607
608            send_to_tracking(
609                ctx,
610                RequestTrackingMessage::UpdateState {
611                    request_id: request_id.clone(),
612                    state: RequestState::Handling,
613                },
614            )
615            .await?;
616        } else {
617            self.on_event(
618                RequestHandlerEvent::EventToQueue {
619                    subject_id: subject_id.clone(),
620                    event: request,
621                    request_id: request_id.clone(),
622                },
623                ctx,
624            )
625            .await;
626
627            send_to_tracking(
628                ctx,
629                RequestTrackingMessage::UpdateState {
630                    request_id: request_id.clone(),
631                    state: RequestState::InQueue,
632                },
633            )
634            .await?;
635        }
636
637        Ok(())
638    }
639
640    const fn build_req_manager_init_msg(
641        event_request: &EventRequestType,
642        is_gov: bool,
643    ) -> ReqManInitMessage {
644        match event_request {
645            EventRequestType::Create => ReqManInitMessage::Validate,
646            EventRequestType::Fact => ReqManInitMessage::Evaluate,
647            EventRequestType::Transfer => ReqManInitMessage::Evaluate,
648            EventRequestType::Confirm => {
649                if is_gov {
650                    ReqManInitMessage::Evaluate
651                } else {
652                    ReqManInitMessage::Validate
653                }
654            }
655            EventRequestType::Reject => ReqManInitMessage::Validate,
656            EventRequestType::Eol => ReqManInitMessage::Validate,
657        }
658    }
659
660    async fn check_in_queue(
661        ctx: &mut ActorContext<Self>,
662        request: &Signed<EventRequest>,
663        our_key: PublicKey,
664    ) -> Result<bool, RequestHandlerError> {
665        if let EventRequest::Create(..) = request.content() {
666            return Err(RequestHandlerError::CreationNotQueued);
667        }
668
669        Self::check_owner_new_owner(ctx, request.content()).await?;
670
671        let subject_data =
672            Self::build_subject_data(ctx, request.content()).await?;
673        let event_request_type = EventRequestType::from(request.content());
674        let signer = request.signature().signer.clone();
675        let governance_id = subject_data
676            .get_governance_id()
677            .unwrap_or_else(|| request.content().get_subject_id());
678        let is_gov = subject_data.get_schema_id().is_gov();
679
680        if !subject_data.get_active() {
681            return Err(RequestHandlerError::SubjectNotActive(
682                request.content().get_subject_id().to_string(),
683            ));
684        }
685
686        Self::check_signature(
687            ctx,
688            our_key,
689            signer.clone(),
690            &governance_id,
691            &event_request_type,
692            subject_data.clone(),
693        )
694        .await?;
695
696        Self::check_creation(ctx, subject_data, &event_request_type, signer)
697            .await?;
698
699        Ok(is_gov)
700    }
701
702    async fn in_queue_to_handling(
703        &mut self,
704        ctx: &mut ActorContext<Self>,
705        request: Signed<EventRequest>,
706        request_id: &DigestIdentifier,
707        is_gov: bool,
708    ) -> Result<(), ActorError> {
709        let command = Self::build_req_manager_init_msg(
710            &EventRequestType::from(request.content()),
711            is_gov,
712        );
713        let subject_id = request.content().get_subject_id();
714
715        let actor = ctx
716            .get_child::<RequestManager>(&subject_id.to_string())
717            .await?;
718
719        actor
720            .tell(RequestManagerMessage::FirstRun {
721                command,
722                request,
723                request_id: request_id.clone(),
724            })
725            .await?;
726
727        self.on_event(
728            RequestHandlerEvent::EventToHandling {
729                subject_id: subject_id.clone(),
730                request_id: request_id.clone(),
731            },
732            ctx,
733        )
734        .await;
735
736        send_to_tracking(
737            ctx,
738            RequestTrackingMessage::UpdateState {
739                request_id: request_id.clone(),
740                state: RequestState::Handling,
741            },
742        )
743        .await
744    }
745
746    async fn end_child(
747        ctx: &ActorContext<Self>,
748        subject_id: &DigestIdentifier,
749    ) -> Result<(), ActorError> {
750        let actor = ctx
751            .get_child::<RequestManager>(&subject_id.to_string())
752            .await?;
753        actor.ask_stop().await
754    }
755
756    async fn manual_abort_request(
757        &self,
758        ctx: &ActorContext<Self>,
759        subject_id: &DigestIdentifier,
760    ) -> Result<(), ActorError> {
761        let actor = ctx
762            .get_child::<RequestManager>(&subject_id.to_string())
763            .await?;
764
765        actor.tell(RequestManagerMessage::ManualAbort).await
766    }
767
768    async fn purge_request_manager(
769        &self,
770        ctx: &mut ActorContext<Self>,
771        subject_id: &DigestIdentifier,
772    ) -> Result<(), ActorError> {
773        let Some((hash, network)) = self.helpers.clone() else {
774            return Err(ActorError::FunctionalCritical {
775                description: "Request handler helpers are not initialized"
776                    .to_string(),
777            });
778        };
779
780        let governance_id = get_subject_data(ctx, subject_id)
781            .await?
782            .and_then(|data| data.get_governance_id());
783        let request_manager_init = InitRequestManager {
784            our_key: self.our_key.clone(),
785            subject_id: subject_id.clone(),
786            governance_id,
787            helpers: (hash, network),
788        };
789
790        let actor = match ctx
791            .create_child(
792                &subject_id.to_string(),
793                RequestManager::initial(request_manager_init),
794            )
795            .await
796        {
797            Ok(actor) => actor,
798            Err(ActorError::Exists { .. }) => {
799                ctx.get_child::<RequestManager>(&subject_id.to_string())
800                    .await?
801            }
802            Err(err) => return Err(err),
803        };
804
805        actor.ask(RequestManagerMessage::PurgeStorage).await?;
806        actor.ask_stop().await?;
807
808        Ok(())
809    }
810}
811
812#[derive(Debug, Clone)]
813pub enum RequestHandlerMessage {
814    NewRequest {
815        request: Signed<EventRequest>,
816    },
817    RequestInManager,
818    RequestInManagerSubjectId {
819        subject_id: DigestIdentifier,
820    },
821    ChangeApprovalState {
822        subject_id: DigestIdentifier,
823        state: ApprovalStateRes,
824    },
825    GetApproval {
826        subject_id: DigestIdentifier,
827        state: Option<ApprovalState>,
828    },
829    GetAllApprovals {
830        state: Option<ApprovalState>,
831    },
832    PopQueue {
833        subject_id: DigestIdentifier,
834    },
835    EndHandling {
836        subject_id: DigestIdentifier,
837    },
838    PurgeSubject {
839        subject_id: DigestIdentifier,
840    },
841    AbortRequest {
842        subject_id: DigestIdentifier,
843    },
844}
845
846impl Message for RequestHandlerMessage {}
847
848#[derive(Debug, Clone)]
849pub enum RequestHandlerResponse {
850    RequestInManager(RequestsInManager),
851    RequestInManagerSubjectId(RequestsInManagerSubject),
852    Ok(RequestData),
853    Response(String),
854    Approval(Option<(ApprovalReq, ApprovalState)>),
855    Approvals(Vec<(ApprovalReq, ApprovalState)>),
856    None,
857}
858
859impl Response for RequestHandlerResponse {}
860
861#[derive(
862    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
863)]
864pub enum RequestHandlerEvent {
865    EventToQueue {
866        subject_id: DigestIdentifier,
867        event: Signed<EventRequest>,
868        request_id: DigestIdentifier,
869    },
870    Invalid {
871        subject_id: DigestIdentifier,
872    },
873    FinishHandling {
874        subject_id: DigestIdentifier,
875    },
876    PurgeSubject {
877        subject_id: DigestIdentifier,
878    },
879    EventToHandling {
880        subject_id: DigestIdentifier,
881        request_id: DigestIdentifier,
882    },
883}
884
885impl Event for RequestHandlerEvent {}
886
887#[async_trait]
888impl Actor for RequestHandler {
889    type Event = RequestHandlerEvent;
890    type Message = RequestHandlerMessage;
891    type Response = RequestHandlerResponse;
892
893    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
894        parent_span.map_or_else(
895            || info_span!("RequestHandler"),
896            |parent_span| info_span!(parent: parent_span, "RequestHandler"),
897        )
898    }
899
900    async fn pre_start(
901        &mut self,
902        ctx: &mut ActorContext<Self>,
903    ) -> Result<(), ActorError> {
904        if let Err(e) = self.init_store("request", None, false, ctx).await {
905            error!(
906                error = %e,
907                "Failed to initialize store during pre_start"
908            );
909            return Err(e);
910        }
911
912        let Some(config) =
913            ctx.system().get_helper::<ConfigHelper>("config").await
914        else {
915            error!(
916                helper = "config",
917                "Config helper not found during pre_start"
918            );
919            return Err(ActorError::Helper {
920                name: "config".to_owned(),
921                reason: "Not found".to_string(),
922            });
923        };
924
925        if !config.safe_mode {
926            let Some(ext_db): Option<Arc<ExternalDB>> =
927                ctx.system().get_helper("ext_db").await
928            else {
929                error!("External database helper not found");
930                return Err(ActorError::Helper {
931                    name: "ext_db".to_string(),
932                    reason: "Not found".to_string(),
933                });
934            };
935
936            let tracking = match ctx
937                .create_child(
938                    "tracking",
939                    RequestTracking::new(config.tracking_size),
940                )
941                .await
942            {
943                Ok(actor) => actor,
944                Err(e) => {
945                    error!(
946                        error = %e,
947                        "Failed to create tracking child during pre_start"
948                    );
949                    return Err(e);
950                }
951            };
952
953            let sink =
954                Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
955
956            ctx.system().run_sink(sink).await;
957        }
958
959        let Some((hash, network)) = self.helpers.clone() else {
960            let e = " Can not obtain helpers".to_string();
961            error!(
962                error = %e,
963                "Failed to obtain helpers during pre_start"
964            );
965            ctx.system().crash_system();
966            return Err(ActorError::FunctionalCritical { description: e });
967        };
968
969        if config.safe_mode {
970            return Ok(());
971        }
972
973        for (subject_id, request_id) in self.handling.clone() {
974            let governance_id = get_subject_data(ctx, &subject_id)
975                .await?
976                .and_then(|data| data.get_governance_id());
977            let request_manager_init = InitRequestManager {
978                our_key: self.our_key.clone(),
979                subject_id: subject_id.clone(),
980                governance_id,
981                helpers: (hash, network.clone()),
982            };
983
984            let request_manager_actor = match ctx
985                .create_child(
986                    &subject_id.to_string(),
987                    RequestManager::initial(request_manager_init),
988                )
989                .await
990            {
991                Ok(actor) => actor,
992                Err(e) => {
993                    error!(
994                        subject_id = %subject_id,
995                        error = %e,
996                        "Failed to create request manager child during pre_start"
997                    );
998                    return Err(e);
999                }
1000            };
1001
1002            if let Err(e) = request_manager_actor
1003                .tell(RequestManagerMessage::Run {
1004                    request_id: request_id.clone(),
1005                })
1006                .await
1007            {
1008                error!(
1009                    subject_id = %subject_id,
1010                    request_id = %request_id,
1011                    error = %e,
1012                    "Failed to send Run message to request manager during pre_start"
1013                );
1014                return Err(e);
1015            }
1016        }
1017
1018        Ok(())
1019    }
1020}
1021
1022#[async_trait]
1023impl Handler<Self> for RequestHandler {
1024    async fn handle_message(
1025        &mut self,
1026        _sender: ActorPath,
1027        msg: RequestHandlerMessage,
1028        ctx: &mut ave_actors::ActorContext<Self>,
1029    ) -> Result<RequestHandlerResponse, ActorError> {
1030        match msg {
1031            RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
1032                let handling =
1033                    self.handling.get(&subject_id).map(|x| x.to_string());
1034                let in_queue = self.in_queue.get(&subject_id).map(|x| {
1035                    x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
1036                });
1037
1038                Ok(RequestHandlerResponse::RequestInManagerSubjectId(
1039                    RequestsInManagerSubject { handling, in_queue },
1040                ))
1041            }
1042            RequestHandlerMessage::RequestInManager => Ok(
1043                RequestHandlerResponse::RequestInManager(RequestsInManager {
1044                    handling: self
1045                        .handling
1046                        .iter()
1047                        .map(|x| (x.0.to_string(), x.1.to_string()))
1048                        .collect(),
1049                    in_queue: self
1050                        .in_queue
1051                        .iter()
1052                        .map(|x| {
1053                            (
1054                                x.0.to_string(),
1055                                x.1.iter()
1056                                    .map(|x| x.1.to_string())
1057                                    .collect::<Vec<String>>(),
1058                            )
1059                        })
1060                        .collect(),
1061                }),
1062            ),
1063            RequestHandlerMessage::AbortRequest { subject_id } => {
1064                self.manual_abort_request(ctx, &subject_id).await?;
1065                Ok(RequestHandlerResponse::None)
1066            }
1067            RequestHandlerMessage::PurgeSubject { subject_id } => {
1068                self.purge_request_manager(ctx, &subject_id).await?;
1069                self.on_event(
1070                    RequestHandlerEvent::PurgeSubject {
1071                        subject_id: subject_id.clone(),
1072                    },
1073                    ctx,
1074                )
1075                .await;
1076                Ok(RequestHandlerResponse::None)
1077            }
1078            RequestHandlerMessage::ChangeApprovalState {
1079                subject_id,
1080                state,
1081            } => {
1082                Self::change_approval(ctx, &subject_id, state.clone())
1083                    .await
1084                    .map_err(|e| {
1085                        error!(
1086                            error = %e,
1087                            "ChangeApprovalState failed"
1088                        );
1089                        ActorError::from(e)
1090                    })?;
1091
1092                Ok(RequestHandlerResponse::Response(format!(
1093                    "The approval request for subject {} has changed to {}",
1094                    subject_id, state
1095                )))
1096            }
1097            RequestHandlerMessage::GetApproval { subject_id, state } => {
1098                let res = Self::get_approval(ctx, &subject_id, state.clone())
1099                    .await
1100                    .map_err(ActorError::from)?;
1101
1102                Ok(RequestHandlerResponse::Approval(res))
1103            }
1104            RequestHandlerMessage::GetAllApprovals { state } => {
1105                let res = Self::get_all_approvals(ctx, state.clone())
1106                    .await
1107                    .map_err(|e| {
1108                        error!(
1109                            error = %e,
1110                            "GetAllApprovals failed"
1111                        );
1112                        e
1113                    })?;
1114
1115                Ok(RequestHandlerResponse::Approvals(res))
1116            }
1117            RequestHandlerMessage::NewRequest { request } => {
1118                if let Err(e) = request.verify() {
1119                    let err = RequestHandlerError::SignatureVerification(
1120                        e.to_string(),
1121                    );
1122                    error!(error = %err, "Request signature verification failed");
1123                    return Err(ActorError::from(err));
1124                };
1125
1126                let Some((hash, ..)) = self.helpers.clone() else {
1127                    let err = RequestHandlerError::HelpersNotInitialized;
1128                    error!(
1129                        msg_type = "NewRequest",
1130                        error = %err,
1131                        "Helpers not initialized"
1132                    );
1133                    return Err(emit_fail(ctx, ActorError::from(err)).await);
1134                };
1135
1136                if let Err(e) =
1137                    Self::check_owner_new_owner(ctx, request.content()).await
1138                {
1139                    error!(
1140                        msg_type = "NewRequest",
1141                        error = %e,
1142                        "Owner or new owner check failed"
1143                    );
1144                    return Err(ActorError::from(e));
1145                }
1146
1147                let subject_data = match Self::build_subject_data(
1148                    ctx,
1149                    request.content(),
1150                )
1151                .await
1152                {
1153                    Ok(data) => data,
1154                    Err(e) => {
1155                        error!(
1156                            msg_type = "NewRequest",
1157                            error = %e,
1158                            "Failed to build subject data"
1159                        );
1160                        return Err(ActorError::from(e));
1161                    }
1162                };
1163                let event_request_type =
1164                    EventRequestType::from(request.content());
1165                let signer = request.signature().signer.clone();
1166                let governance_id = subject_data.get_governance_id();
1167                let governance_subject_id = governance_id
1168                    .clone()
1169                    .unwrap_or_else(|| request.content().get_subject_id());
1170                let is_gov = subject_data.get_schema_id().is_gov();
1171
1172                if !subject_data.get_active() {
1173                    let subject_id = request.content().get_subject_id();
1174                    error!(
1175                        msg_type = "NewRequest",
1176                        subject_id = %subject_id,
1177                        "Subject is not active"
1178                    );
1179                    return Err(ActorError::from(
1180                        RequestHandlerError::SubjectNotActive(
1181                            subject_id.to_string(),
1182                        ),
1183                    ));
1184                }
1185
1186                if let Err(e) =
1187                    Self::check_event_request(request.content(), is_gov)
1188                {
1189                    error!(
1190                        msg_type = "NewRequest",
1191                        error = %e,
1192                        "Event request validation failed"
1193                    );
1194                    return Err(ActorError::from(e));
1195                }
1196
1197                if let Err(e) = Self::check_signature(
1198                    ctx,
1199                    (*self.our_key).clone(),
1200                    signer.clone(),
1201                    &governance_subject_id,
1202                    &event_request_type,
1203                    subject_data.clone(),
1204                )
1205                .await
1206                {
1207                    error!(
1208                        msg_type = "NewRequest",
1209                        governance_id = %governance_subject_id,
1210                        error = %e,
1211                        "Signature check failed"
1212                    );
1213                    return Err(e);
1214                }
1215
1216                if let Err(e) = Self::check_creation(
1217                    ctx,
1218                    subject_data,
1219                    &event_request_type,
1220                    signer,
1221                )
1222                .await
1223                {
1224                    error!(
1225                        msg_type = "NewRequest",
1226                        error = %e,
1227                        "Creation check failed"
1228                    );
1229                    return Err(e);
1230                }
1231
1232                let (request_id, subject_id) =
1233                    match Self::build_request_id_subject_id(hash, &request) {
1234                        Ok(ids) => ids,
1235                        Err(e) => {
1236                            error!(
1237                                msg_type = "NewRequest",
1238                                error = %e,
1239                                "Failed to build request ID and subject ID"
1240                            );
1241                            return Err(ActorError::from(e));
1242                        }
1243                    };
1244
1245                if let Err(e) = self
1246                    .handle_queue_request(
1247                        ctx,
1248                        request,
1249                        &request_id,
1250                        &subject_id,
1251                        is_gov,
1252                        governance_id,
1253                    )
1254                    .await
1255                {
1256                    error!(
1257                        msg_type = "NewRequest",
1258                        request_id = %request_id,
1259                        subject_id = %subject_id,
1260                        error = %e,
1261                        "Failed to handle queue request"
1262                    );
1263                    return Err(e);
1264                }
1265
1266                Ok(RequestHandlerResponse::Ok(RequestData {
1267                    request_id,
1268                    subject_id,
1269                }))
1270            }
1271            RequestHandlerMessage::PopQueue { subject_id } => {
1272                let (event, request_id) = if let Some(events) =
1273                    self.in_queue.get(&subject_id)
1274                {
1275                    if let Some((event, request_id)) =
1276                        events.clone().pop_front()
1277                    {
1278                        (event, request_id)
1279                    } else {
1280                        if let Err(e) = Self::end_child(ctx, &subject_id).await
1281                        {
1282                            error!(
1283                                msg_type = "PopQueue",
1284                                subject_id = %subject_id,
1285                                error = %e,
1286                                "Failed to end child actor when queue is empty"
1287                            );
1288                            ctx.system().crash_system();
1289                            return Err(e);
1290                        }
1291                        return Ok(RequestHandlerResponse::None);
1292                    }
1293                } else {
1294                    if let Err(e) = Self::end_child(ctx, &subject_id).await {
1295                        error!(
1296                            msg_type = "PopQueue",
1297                            subject_id = %subject_id,
1298                            error = %e,
1299                            "Failed to end child actor when no events available"
1300                        );
1301                        ctx.system().crash_system();
1302                        return Err(e);
1303                    }
1304                    return Ok(RequestHandlerResponse::None);
1305                };
1306
1307                let is_gov = match Self::check_in_queue(
1308                    ctx,
1309                    &event,
1310                    (*self.our_key).clone(),
1311                )
1312                .await
1313                {
1314                    Ok(is_gov) => is_gov,
1315                    Err(e) => {
1316                        if let Err(e) = self
1317                            .error_queue_handling(
1318                                ctx,
1319                                e.to_string(),
1320                                &subject_id,
1321                                &request_id,
1322                            )
1323                            .await
1324                        {
1325                            error!(
1326                                msg_type = "PopQueue",
1327                                subject_id = %subject_id,
1328                                request_id = %request_id,
1329                                error = %e,
1330                                "Failed to handle queue error"
1331                            );
1332                            ctx.system().crash_system();
1333                            return Err(e);
1334                        };
1335
1336                        return Ok(RequestHandlerResponse::None);
1337                    }
1338                };
1339
1340                if let Err(e) = self
1341                    .in_queue_to_handling(ctx, event, &request_id, is_gov)
1342                    .await
1343                {
1344                    error!(
1345                        msg_type = "PopQueue",
1346                        request_id = %request_id,
1347                        error = %e,
1348                        "Failed to transition from queue to handling"
1349                    );
1350                    ctx.system().crash_system();
1351                    return Err(e);
1352                }
1353
1354                Ok(RequestHandlerResponse::None)
1355            }
1356            RequestHandlerMessage::EndHandling { subject_id } => {
1357                self.on_event(
1358                    RequestHandlerEvent::FinishHandling {
1359                        subject_id: subject_id.clone(),
1360                    },
1361                    ctx,
1362                )
1363                .await;
1364
1365                if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1366                    error!(
1367                        msg_type = "EndHandling",
1368                        subject_id = %subject_id,
1369                        error = %e,
1370                        "Failed to enqueue next event"
1371                    );
1372                    ctx.system().crash_system();
1373                    return Err(e);
1374                }
1375
1376                Ok(RequestHandlerResponse::None)
1377            }
1378        }
1379    }
1380
1381    async fn on_child_fault(
1382        &mut self,
1383        error: ActorError,
1384        ctx: &mut ActorContext<Self>,
1385    ) -> ChildAction {
1386        error!(
1387            error = %error,
1388            "Child fault in request handler"
1389        );
1390        ctx.system().crash_system();
1391        ChildAction::Stop
1392    }
1393
1394    async fn on_event(
1395        &mut self,
1396        event: RequestHandlerEvent,
1397        ctx: &mut ActorContext<Self>,
1398    ) {
1399        if let Err(e) = self.persist(&event, ctx).await {
1400            error!(
1401                error = %e,
1402                "Failed to persist event"
1403            );
1404            ctx.system().crash_system();
1405        };
1406    }
1407}
1408
1409#[async_trait]
1410impl Storable for RequestHandler {}
1411
1412#[async_trait]
1413impl PersistentActor for RequestHandler {
1414    type Persistence = LightPersistence;
1415    type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1416
1417    fn update(&mut self, state: Self) {
1418        self.in_queue = state.in_queue;
1419        self.handling = state.handling;
1420    }
1421
1422    fn create_initial(params: Self::InitParams) -> Self {
1423        Self {
1424            our_key: params.0,
1425            helpers: Some(params.1),
1426            handling: HashMap::new(),
1427            in_queue: HashMap::new(),
1428        }
1429    }
1430
1431    /// Change node state.
1432    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1433        match event {
1434            RequestHandlerEvent::EventToQueue {
1435                subject_id,
1436                event,
1437                request_id,
1438            } => {
1439                self.in_queue
1440                    .entry(subject_id.clone())
1441                    .or_default()
1442                    .push_back((event.clone(), request_id.clone()));
1443            }
1444            RequestHandlerEvent::Invalid { subject_id } => {
1445                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1446                    vec.pop_front();
1447                    if vec.is_empty() {
1448                        self.in_queue.remove(subject_id);
1449                    }
1450                }
1451            }
1452            RequestHandlerEvent::EventToHandling {
1453                subject_id,
1454                request_id,
1455            } => {
1456                self.handling.insert(subject_id.clone(), request_id.clone());
1457                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1458                    vec.pop_front();
1459                    if vec.is_empty() {
1460                        self.in_queue.remove(subject_id);
1461                    }
1462                }
1463            }
1464            RequestHandlerEvent::FinishHandling { subject_id } => {
1465                self.handling.remove(subject_id);
1466            }
1467            RequestHandlerEvent::PurgeSubject { subject_id } => {
1468                self.handling.remove(subject_id);
1469                self.in_queue.remove(subject_id);
1470            }
1471        };
1472
1473        Ok(())
1474    }
1475}