Skip to main content

ave_core/request/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9    ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12    DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16    RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29    ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::model::common::node::{get_subject_data, i_owner_new_owner};
38use crate::model::common::subject::{get_gov, get_version};
39use crate::model::common::{
40    check_subject_creation, emit_fail, send_to_tracking,
41};
42use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
43use crate::request::manager::InitRequestManager;
44use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
45use crate::system::ConfigHelper;
46
47pub mod error;
48pub mod manager;
49pub mod reboot;
50pub mod tracking;
51pub mod types;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct RequestData {
55    pub request_id: DigestIdentifier,
56    pub subject_id: DigestIdentifier,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize)]
60pub struct RequestHandler {
61    #[serde(skip)]
62    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
63    #[serde(skip)]
64    our_key: Arc<PublicKey>,
65    handling: HashMap<DigestIdentifier, DigestIdentifier>,
66    in_queue: HashMap<
67        DigestIdentifier,
68        VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
69    >,
70}
71
72impl BorshSerialize for RequestHandler {
73    fn serialize<W: std::io::Write>(
74        &self,
75        writer: &mut W,
76    ) -> std::io::Result<()> {
77        // Serialize only the fields we want to persist, skipping 'owner'
78        BorshSerialize::serialize(&self.handling, writer)?;
79        BorshSerialize::serialize(&self.in_queue, writer)?;
80        Ok(())
81    }
82}
83
84impl BorshDeserialize for RequestHandler {
85    fn deserialize_reader<R: std::io::Read>(
86        reader: &mut R,
87    ) -> std::io::Result<Self> {
88        // Deserialize the persisted fields
89        let handling =
90            HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
91                reader,
92            )?;
93        let in_queue = HashMap::<
94            DigestIdentifier,
95            VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
96        >::deserialize_reader(reader)?;
97
98        let our_key = Arc::new(PublicKey::default());
99
100        Ok(Self {
101            helpers: None,
102            our_key,
103            handling,
104            in_queue,
105        })
106    }
107}
108
109impl RequestHandler {
110    async fn check_signature(
111        ctx: &mut ActorContext<Self>,
112        our_key: PublicKey,
113        signer: PublicKey,
114        governance_id: &DigestIdentifier,
115        event_request: &EventRequestType,
116        subject_data: SubjectData,
117    ) -> Result<(), ActorError> {
118        match event_request {
119            EventRequestType::Create
120            | EventRequestType::Transfer
121            | EventRequestType::Confirm
122            | EventRequestType::Reject
123            | EventRequestType::Eol => {
124                if signer != our_key {
125                    return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
126                }
127            }
128            EventRequestType::Fact => {
129                let gov = get_gov(ctx, governance_id).await?;
130                match subject_data {
131                    SubjectData::Tracker {
132                        schema_id,
133                        namespace,
134                        ..
135                    } => {
136                        if !gov.has_this_role(HashThisRole::Schema {
137                            who: signer,
138                            role: RoleTypes::Issuer,
139                            schema_id,
140                            namespace: Namespace::from(namespace),
141                        }) {
142                            return Err(ActorError::Functional {
143                            description:
144                                "In fact events, the signer has to be an issuer"
145                                    .to_string(),
146                        });
147                        }
148                    }
149                    SubjectData::Governance { .. } => {
150                        if !gov.has_this_role(HashThisRole::Gov {
151                            who: signer,
152                            role: RoleTypes::Issuer,
153                        }) {
154                            return Err(ActorError::Functional {
155                            description:
156                                "In fact events, the signer has to be an issuer"
157                                    .to_string(),
158                        });
159                        }
160                    }
161                }
162            }
163        }
164
165        Ok(())
166    }
167
168    async fn queued_event(
169        ctx: &ActorContext<Self>,
170        subject_id: &DigestIdentifier,
171    ) -> Result<(), ActorError> {
172        let request_actor = ctx.reference().await?;
173        request_actor
174            .tell(RequestHandlerMessage::PopQueue {
175                subject_id: subject_id.to_owned(),
176            })
177            .await
178    }
179
180    async fn error_queue_handling(
181        &mut self,
182        ctx: &mut ActorContext<Self>,
183        error: String,
184        subject_id: &DigestIdentifier,
185        request_id: &DigestIdentifier,
186    ) -> Result<(), ActorError> {
187        self.on_event(
188            RequestHandlerEvent::Invalid {
189                subject_id: subject_id.to_owned(),
190            },
191            ctx,
192        )
193        .await;
194
195        send_to_tracking(
196            ctx,
197            RequestTrackingMessage::UpdateState {
198                request_id: request_id.clone(),
199                state: RequestState::Invalid {
200                    error,
201                    sn: None,
202                    subject_id: subject_id.to_string(),
203                    who: self.our_key.to_string(),
204                },
205            },
206        )
207        .await?;
208
209        Self::queued_event(ctx, subject_id).await
210    }
211
212    async fn change_approval(
213        ctx: &ActorContext<Self>,
214        subject_id: &DigestIdentifier,
215        state: ApprovalStateRes,
216    ) -> Result<(), RequestHandlerError> {
217        if state == ApprovalStateRes::Obsolete {
218            return Err(RequestHandlerError::ObsoleteApproval);
219        }
220
221        let approver_path =
222            ActorPath::from(format!("/user/node/{}/approver", subject_id));
223        let approver_actor = ctx
224            .system()
225            .get_actor::<ApprPersist>(&approver_path)
226            .await
227            .map_err(|_| {
228                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
229            })?;
230
231        approver_actor
232            .tell(ApprPersistMessage::ChangeResponse {
233                response: state.clone(),
234            })
235            .await
236            .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
237    }
238
239    async fn get_approval(
240        ctx: &ActorContext<Self>,
241        subject_id: &DigestIdentifier,
242        state: Option<ApprovalState>,
243    ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
244        let approver_path =
245            ActorPath::from(format!("/user/node/{}/approver", subject_id));
246        let approver_actor = ctx
247            .system()
248            .get_actor::<ApprPersist>(&approver_path)
249            .await
250            .map_err(|_| {
251                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
252            })?;
253
254        let response = approver_actor
255            .ask(ApprPersistMessage::GetApproval { state })
256            .await
257            .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
258
259        let res = match response {
260            ApprPersistResponse::Ok => None,
261            ApprPersistResponse::Approval { request, state } => {
262                Some((request, state))
263            }
264        };
265
266        Ok(res)
267    }
268
269    async fn get_all_approvals(
270        ctx: &ActorContext<Self>,
271        state: Option<ApprovalState>,
272    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
273        let node_path = ActorPath::from("/user/node");
274        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
275        let response = node_actor.ask(NodeMessage::GetGovernances).await?;
276        let vec = match response {
277            NodeResponse::Governances(govs) => govs,
278            _ => {
279                return Err(ActorError::UnexpectedResponse {
280                    path: node_path,
281                    expected: "NodeResponse::Governances".to_string(),
282                });
283            }
284        };
285
286        let mut responses = vec![];
287        for governance in vec.iter() {
288            let approver_path =
289                ActorPath::from(format!("/user/node/{}/approver", governance));
290            if let Ok(approver_actor) =
291                ctx.system().get_actor::<ApprPersist>(&approver_path).await
292            {
293                let response = approver_actor
294                    .ask(ApprPersistMessage::GetApproval {
295                        state: state.clone(),
296                    })
297                    .await?;
298
299                match response {
300                    ApprPersistResponse::Ok => {}
301                    ApprPersistResponse::Approval { request, state } => {
302                        responses.push((request, state))
303                    }
304                };
305            };
306        }
307
308        Ok(responses)
309    }
310
311    async fn check_owner_new_owner(
312        ctx: &mut ActorContext<Self>,
313        request: &EventRequest,
314    ) -> Result<(), RequestHandlerError> {
315        match request {
316            EventRequest::Create(..) => {}
317            EventRequest::Fact(..)
318            | EventRequest::Transfer(..)
319            | EventRequest::EOL(..) => {
320                let subject_id = request.get_subject_id();
321                let (i_owner, i_new_owner) =
322                    i_owner_new_owner(ctx, &subject_id).await?;
323                if !i_owner {
324                    return Err(RequestHandlerError::NotOwner(
325                        subject_id.to_string(),
326                    ));
327                }
328
329                if i_new_owner.is_some() {
330                    return Err(RequestHandlerError::PendingNewOwner(
331                        subject_id.to_string(),
332                    ));
333                }
334            }
335            EventRequest::Confirm(..) | EventRequest::Reject(..) => {
336                let subject_id = request.get_subject_id();
337                let (i_owner, i_new_owner) =
338                    i_owner_new_owner(ctx, &subject_id).await?;
339                if i_owner {
340                    return Err(RequestHandlerError::IsOwner(
341                        subject_id.to_string(),
342                    ));
343                }
344
345                if let Some(new_owner) = i_new_owner {
346                    if !new_owner {
347                        return Err(RequestHandlerError::NotNewOwner(
348                            subject_id.to_string(),
349                        ));
350                    }
351                } else {
352                    return Err(RequestHandlerError::NoNewOwnerPending(
353                        subject_id.to_string(),
354                    ));
355                }
356            }
357        };
358        Ok(())
359    }
360
361    fn check_event_request(
362        request: &EventRequest,
363        is_gov: bool,
364    ) -> Result<(), RequestHandlerError> {
365        match request {
366            EventRequest::Create(create_request) => {
367                if let Some(name) = &create_request.name
368                    && (name.is_empty() || name.len() > 100)
369                {
370                    return Err(RequestHandlerError::InvalidName);
371                }
372
373                if let Some(description) = &create_request.description
374                    && (description.is_empty() || description.len() > 200)
375                {
376                    return Err(RequestHandlerError::InvalidDescription);
377                }
378
379                if !create_request.schema_id.is_valid_in_request() {
380                    return Err(RequestHandlerError::InvalidSchemaId);
381                }
382
383                if is_gov {
384                    if !create_request.governance_id.is_empty() {
385                        return Err(
386                            RequestHandlerError::GovernanceIdMustBeEmpty,
387                        );
388                    }
389
390                    if !create_request.namespace.is_empty() {
391                        return Err(RequestHandlerError::NamespaceMustBeEmpty);
392                    }
393                } else if create_request.governance_id.is_empty() {
394                    return Err(RequestHandlerError::GovernanceIdRequired);
395                }
396            }
397            EventRequest::Transfer(transfer_request) => {
398                if transfer_request.new_owner.is_empty() {
399                    return Err(RequestHandlerError::TransferNewOwnerEmpty);
400                }
401            }
402            EventRequest::Confirm(confirm_request) => {
403                if is_gov {
404                    if let Some(name_old_owner) =
405                        &confirm_request.name_old_owner
406                        && name_old_owner.is_empty()
407                    {
408                        return Err(
409                            RequestHandlerError::ConfirmNameOldOwnerEmpty,
410                        );
411                    }
412                } else if confirm_request.name_old_owner.is_some() {
413                    return Err(
414                        RequestHandlerError::ConfirmTrackerNameOldOwner,
415                    );
416                }
417            }
418            EventRequest::Fact(fact_request) => {
419                if is_gov
420                    && serde_json::from_value::<GovernanceEvent>(
421                        fact_request.payload.0.clone(),
422                    )
423                    .is_err()
424                {
425                    return Err(RequestHandlerError::GovFactInvalidEvent);
426                }
427            }
428            EventRequest::Reject(..) | EventRequest::EOL(..) => {}
429        }
430
431        Ok(())
432    }
433
434    async fn build_subject_data(
435        ctx: &mut ActorContext<Self>,
436        request: &EventRequest,
437    ) -> Result<SubjectData, RequestHandlerError> {
438        let subject_data = match request {
439            EventRequest::Create(create_request) => {
440                if create_request.schema_id.is_gov() {
441                    SubjectData::Governance { active: true }
442                } else {
443                    SubjectData::Tracker {
444                        governance_id: create_request.governance_id.clone(),
445                        schema_id: create_request.schema_id.clone(),
446                        namespace: create_request.namespace.to_string(),
447                        active: true,
448                    }
449                }
450            }
451            EventRequest::Fact(..)
452            | EventRequest::Transfer(..)
453            | EventRequest::Confirm(..)
454            | EventRequest::Reject(..)
455            | EventRequest::EOL(..) => {
456                let subject_id = request.get_subject_id();
457                let Some(subject_data) =
458                    get_subject_data(ctx, &subject_id).await?
459                else {
460                    return Err(RequestHandlerError::SubjectDataNotFound(
461                        subject_id.to_string(),
462                    ));
463                };
464
465                subject_data
466            }
467        };
468
469        Ok(subject_data)
470    }
471
472    async fn check_creation(
473        ctx: &mut ActorContext<Self>,
474        subject_data: SubjectData,
475        event_request: &EventRequestType,
476        signer: PublicKey,
477    ) -> Result<(), ActorError> {
478        match event_request {
479            EventRequestType::Create | EventRequestType::Confirm => {
480                if let SubjectData::Tracker {
481                    governance_id,
482                    schema_id,
483                    namespace,
484                    ..
485                } = subject_data
486                {
487                    let version = get_version(ctx, &governance_id).await?;
488                    check_subject_creation(
489                        ctx,
490                        &governance_id,
491                        signer,
492                        version,
493                        namespace,
494                        schema_id,
495                    )
496                    .await?;
497                }
498            }
499            _ => {}
500        }
501
502        Ok(())
503    }
504
505    fn build_request_id_subject_id(
506        hash: HashAlgorithm,
507        request: &Signed<EventRequest>,
508    ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
509        match &request.content() {
510            EventRequest::Create(..) => {
511                let request_id = hash_borsh(
512                    &*hash.hasher(),
513                    &(request.clone(), TimeStamp::now().as_nanos()),
514                )
515                .map_err(|e| {
516                    RequestHandlerError::RequestIdHash(e.to_string())
517                })?;
518
519                let subject_id =
520                    hash_borsh(&*hash.hasher(), request).map_err(|e| {
521                        RequestHandlerError::SubjectIdHash(e.to_string())
522                    })?;
523
524                Ok((request_id, subject_id))
525            }
526            EventRequest::Fact(..)
527            | EventRequest::Transfer(..)
528            | EventRequest::Confirm(..)
529            | EventRequest::Reject(..)
530            | EventRequest::EOL(..) => {
531                let request_id = hash_borsh(
532                    &*hash.hasher(),
533                    &(request.clone(), TimeStamp::now().as_nanos()),
534                )
535                .map_err(|e| {
536                    RequestHandlerError::RequestIdHash(e.to_string())
537                })?;
538
539                Ok((request_id, request.content().get_subject_id()))
540            }
541        }
542    }
543
544    async fn handle_queue_request(
545        &mut self,
546        ctx: &mut ActorContext<Self>,
547        request: Signed<EventRequest>,
548        request_id: &DigestIdentifier,
549        subject_id: &DigestIdentifier,
550        is_gov: bool,
551    ) -> Result<(), ActorError> {
552        let Some(helpers) = self.helpers.clone() else {
553            let e = " Can not obtain helpers".to_string();
554
555            return Err(ActorError::FunctionalCritical { description: e });
556        };
557
558        let in_handling = self.handling.contains_key(subject_id);
559        let in_queue = self.in_queue.contains_key(subject_id);
560
561        if !in_handling && !in_queue {
562            let command = Self::build_req_manager_init_msg(
563                &EventRequestType::from(request.content()),
564                is_gov,
565            );
566            let init_data = InitRequestManager {
567                our_key: self.our_key.clone(),
568                subject_id: subject_id.clone(),
569                helpers,
570            };
571
572            let actor = ctx
573                .create_child(
574                    &subject_id.to_string(),
575                    RequestManager::initial(init_data),
576                )
577                .await?;
578            actor
579                .tell(RequestManagerMessage::FirstRun {
580                    command,
581                    request,
582                    request_id: request_id.clone(),
583                })
584                .await?;
585
586            self.on_event(
587                RequestHandlerEvent::EventToHandling {
588                    subject_id: subject_id.clone(),
589                    request_id: request_id.clone(),
590                },
591                ctx,
592            )
593            .await;
594
595            send_to_tracking(
596                ctx,
597                RequestTrackingMessage::UpdateState {
598                    request_id: request_id.clone(),
599                    state: RequestState::Handling,
600                },
601            )
602            .await?;
603        } else {
604            self.on_event(
605                RequestHandlerEvent::EventToQueue {
606                    subject_id: subject_id.clone(),
607                    event: request,
608                    request_id: request_id.clone(),
609                },
610                ctx,
611            )
612            .await;
613
614            send_to_tracking(
615                ctx,
616                RequestTrackingMessage::UpdateState {
617                    request_id: request_id.clone(),
618                    state: RequestState::InQueue,
619                },
620            )
621            .await?;
622        }
623
624        Ok(())
625    }
626
627    const fn build_req_manager_init_msg(
628        event_request: &EventRequestType,
629        is_gov: bool,
630    ) -> ReqManInitMessage {
631        match event_request {
632            EventRequestType::Create => ReqManInitMessage::Validate,
633            EventRequestType::Fact => ReqManInitMessage::Evaluate,
634            EventRequestType::Transfer => ReqManInitMessage::Evaluate,
635            EventRequestType::Confirm => {
636                if is_gov {
637                    ReqManInitMessage::Evaluate
638                } else {
639                    ReqManInitMessage::Validate
640                }
641            }
642            EventRequestType::Reject => ReqManInitMessage::Validate,
643            EventRequestType::Eol => ReqManInitMessage::Validate,
644        }
645    }
646
647    async fn check_in_queue(
648        ctx: &mut ActorContext<Self>,
649        request: &Signed<EventRequest>,
650        our_key: PublicKey,
651    ) -> Result<bool, RequestHandlerError> {
652        if let EventRequest::Create(..) = request.content() {
653            return Err(RequestHandlerError::CreationNotQueued);
654        }
655
656        Self::check_owner_new_owner(ctx, request.content()).await?;
657
658        let subject_data =
659            Self::build_subject_data(ctx, request.content()).await?;
660        let event_request_type = EventRequestType::from(request.content());
661        let signer = request.signature().signer.clone();
662        let governance_id = subject_data
663            .get_governance_id()
664            .unwrap_or_else(|| request.content().get_subject_id());
665        let is_gov = subject_data.get_schema_id().is_gov();
666
667        if !subject_data.get_active() {
668            return Err(RequestHandlerError::SubjectNotActive(
669                request.content().get_subject_id().to_string(),
670            ));
671        }
672
673        Self::check_signature(
674            ctx,
675            our_key,
676            signer.clone(),
677            &governance_id,
678            &event_request_type,
679            subject_data.clone(),
680        )
681        .await?;
682
683        Self::check_creation(ctx, subject_data, &event_request_type, signer)
684            .await?;
685
686        Ok(is_gov)
687    }
688
689    async fn in_queue_to_handling(
690        &mut self,
691        ctx: &mut ActorContext<Self>,
692        request: Signed<EventRequest>,
693        request_id: &DigestIdentifier,
694        is_gov: bool,
695    ) -> Result<(), ActorError> {
696        let command = Self::build_req_manager_init_msg(
697            &EventRequestType::from(request.content()),
698            is_gov,
699        );
700        let subject_id = request.content().get_subject_id();
701
702        let actor = ctx
703            .get_child::<RequestManager>(&subject_id.to_string())
704            .await?;
705
706        actor
707            .tell(RequestManagerMessage::FirstRun {
708                command,
709                request,
710                request_id: request_id.clone(),
711            })
712            .await?;
713
714        self.on_event(
715            RequestHandlerEvent::EventToHandling {
716                subject_id: subject_id.clone(),
717                request_id: request_id.clone(),
718            },
719            ctx,
720        )
721        .await;
722
723        send_to_tracking(
724            ctx,
725            RequestTrackingMessage::UpdateState {
726                request_id: request_id.clone(),
727                state: RequestState::Handling,
728            },
729        )
730        .await
731    }
732
733    async fn end_child(
734        ctx: &ActorContext<Self>,
735        subject_id: &DigestIdentifier,
736    ) -> Result<(), ActorError> {
737        let actor = ctx
738            .get_child::<RequestManager>(&subject_id.to_string())
739            .await?;
740        actor.ask_stop().await
741    }
742
743    async fn manual_abort_request(
744        &self,
745        ctx: &ActorContext<Self>,
746        subject_id: &DigestIdentifier,
747    ) -> Result<(), ActorError> {
748        let actor = ctx
749            .get_child::<RequestManager>(&subject_id.to_string())
750            .await?;
751
752        actor.tell(RequestManagerMessage::ManualAbort).await
753    }
754}
755
756#[derive(Debug, Clone)]
757pub enum RequestHandlerMessage {
758    NewRequest {
759        request: Signed<EventRequest>,
760    },
761    RequestInManager,
762    RequestInManagerSubjectId {
763        subject_id: DigestIdentifier,
764    },
765    ChangeApprovalState {
766        subject_id: DigestIdentifier,
767        state: ApprovalStateRes,
768    },
769    GetApproval {
770        subject_id: DigestIdentifier,
771        state: Option<ApprovalState>,
772    },
773    GetAllApprovals {
774        state: Option<ApprovalState>,
775    },
776    PopQueue {
777        subject_id: DigestIdentifier,
778    },
779    EndHandling {
780        subject_id: DigestIdentifier,
781    },
782    AbortRequest {
783        subject_id: DigestIdentifier,
784    },
785}
786
787impl Message for RequestHandlerMessage {}
788
789#[derive(Debug, Clone)]
790pub enum RequestHandlerResponse {
791    RequestInManager(RequestsInManager),
792    RequestInManagerSubjectId(RequestsInManagerSubject),
793    Ok(RequestData),
794    Response(String),
795    Approval(Option<(ApprovalReq, ApprovalState)>),
796    Approvals(Vec<(ApprovalReq, ApprovalState)>),
797    None,
798}
799
800impl Response for RequestHandlerResponse {}
801
802#[derive(
803    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
804)]
805pub enum RequestHandlerEvent {
806    EventToQueue {
807        subject_id: DigestIdentifier,
808        event: Signed<EventRequest>,
809        request_id: DigestIdentifier,
810    },
811    Invalid {
812        subject_id: DigestIdentifier,
813    },
814    FinishHandling {
815        subject_id: DigestIdentifier,
816    },
817    EventToHandling {
818        subject_id: DigestIdentifier,
819        request_id: DigestIdentifier,
820    },
821}
822
823impl Event for RequestHandlerEvent {}
824
825#[async_trait]
826impl Actor for RequestHandler {
827    type Event = RequestHandlerEvent;
828    type Message = RequestHandlerMessage;
829    type Response = RequestHandlerResponse;
830
831    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
832        parent_span.map_or_else(
833            || info_span!("RequestHandler"),
834            |parent_span| info_span!(parent: parent_span, "RequestHandler"),
835        )
836    }
837
838    async fn pre_start(
839        &mut self,
840        ctx: &mut ActorContext<Self>,
841    ) -> Result<(), ActorError> {
842        if let Err(e) = self.init_store("request", None, false, ctx).await {
843            error!(
844                error = %e,
845                "Failed to initialize store during pre_start"
846            );
847            return Err(e);
848        }
849
850        let Some(ext_db): Option<Arc<ExternalDB>> =
851            ctx.system().get_helper("ext_db").await
852        else {
853            error!("External database helper not found");
854            return Err(ActorError::Helper {
855                name: "ext_db".to_string(),
856                reason: "Not found".to_string(),
857            });
858        };
859
860        let tracking_size = if let Some(config) =
861            ctx.system().get_helper::<ConfigHelper>("config").await
862        {
863            config.tracking_size
864        } else {
865            error!(
866                helper = "config",
867                "Config helper not found during pre_start"
868            );
869            return Err(ActorError::Helper {
870                name: "config".to_owned(),
871                reason: "Not found".to_string(),
872            });
873        };
874
875        let tracking = match ctx
876            .create_child("tracking", RequestTracking::new(tracking_size))
877            .await
878        {
879            Ok(actor) => actor,
880            Err(e) => {
881                error!(
882                    error = %e,
883                    "Failed to create tracking child during pre_start"
884                );
885                return Err(e);
886            }
887        };
888
889        let sink =
890            Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
891
892        ctx.system().run_sink(sink).await;
893
894        let Some((hash, network)) = self.helpers.clone() else {
895            let e = " Can not obtain helpers".to_string();
896            error!(
897                error = %e,
898                "Failed to obtain helpers during pre_start"
899            );
900            ctx.system().crash_system();
901            return Err(ActorError::FunctionalCritical { description: e });
902        };
903
904        for (subject_id, request_id) in self.handling.clone() {
905            let request_manager_init = InitRequestManager {
906                our_key: self.our_key.clone(),
907                subject_id: subject_id.clone(),
908                helpers: (hash, network.clone()),
909            };
910
911            let request_manager_actor = match ctx
912                .create_child(
913                    &subject_id.to_string(),
914                    RequestManager::initial(request_manager_init),
915                )
916                .await
917            {
918                Ok(actor) => actor,
919                Err(e) => {
920                    error!(
921                        subject_id = %subject_id,
922                        error = %e,
923                        "Failed to create request manager child during pre_start"
924                    );
925                    return Err(e);
926                }
927            };
928
929            if let Err(e) = request_manager_actor
930                .tell(RequestManagerMessage::Run {
931                    request_id: request_id.clone(),
932                })
933                .await
934            {
935                error!(
936                    subject_id = %subject_id,
937                    request_id = %request_id,
938                    error = %e,
939                    "Failed to send Run message to request manager during pre_start"
940                );
941                return Err(e);
942            }
943        }
944
945        Ok(())
946    }
947}
948
949#[async_trait]
950impl Handler<Self> for RequestHandler {
951    async fn handle_message(
952        &mut self,
953        _sender: ActorPath,
954        msg: RequestHandlerMessage,
955        ctx: &mut ave_actors::ActorContext<Self>,
956    ) -> Result<RequestHandlerResponse, ActorError> {
957        match msg {
958            RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
959                let handling =
960                    self.handling.get(&subject_id).map(|x| x.to_string());
961                let in_queue = self.in_queue.get(&subject_id).map(|x| {
962                    x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
963                });
964
965                Ok(RequestHandlerResponse::RequestInManagerSubjectId(
966                    RequestsInManagerSubject { handling, in_queue },
967                ))
968            }
969            RequestHandlerMessage::RequestInManager => Ok(
970                RequestHandlerResponse::RequestInManager(RequestsInManager {
971                    handling: self
972                        .handling
973                        .iter()
974                        .map(|x| (x.0.to_string(), x.1.to_string()))
975                        .collect(),
976                    in_queue: self
977                        .in_queue
978                        .iter()
979                        .map(|x| {
980                            (
981                                x.0.to_string(),
982                                x.1.iter()
983                                    .map(|x| x.1.to_string())
984                                    .collect::<Vec<String>>(),
985                            )
986                        })
987                        .collect(),
988                }),
989            ),
990            RequestHandlerMessage::AbortRequest { subject_id } => {
991                self.manual_abort_request(ctx, &subject_id).await?;
992                Ok(RequestHandlerResponse::None)
993            }
994            RequestHandlerMessage::ChangeApprovalState {
995                subject_id,
996                state,
997            } => {
998                Self::change_approval(ctx, &subject_id, state.clone())
999                    .await
1000                    .map_err(|e| {
1001                        error!(
1002                            error = %e,
1003                            "ChangeApprovalState failed"
1004                        );
1005                        ActorError::from(e)
1006                    })?;
1007
1008                Ok(RequestHandlerResponse::Response(format!(
1009                    "The approval request for subject {} has changed to {}",
1010                    subject_id, state
1011                )))
1012            }
1013            RequestHandlerMessage::GetApproval { subject_id, state } => {
1014                let res = Self::get_approval(ctx, &subject_id, state.clone())
1015                    .await
1016                    .map_err(ActorError::from)?;
1017
1018                Ok(RequestHandlerResponse::Approval(res))
1019            }
1020            RequestHandlerMessage::GetAllApprovals { state } => {
1021                let res = Self::get_all_approvals(ctx, state.clone())
1022                    .await
1023                    .map_err(|e| {
1024                        error!(
1025                            error = %e,
1026                            "GetAllApprovals failed"
1027                        );
1028                        e
1029                    })?;
1030
1031                Ok(RequestHandlerResponse::Approvals(res))
1032            }
1033            RequestHandlerMessage::NewRequest { request } => {
1034                if let Err(e) = request.verify() {
1035                    let err = RequestHandlerError::SignatureVerification(
1036                        e.to_string(),
1037                    );
1038                    error!(error = %err, "Request signature verification failed");
1039                    return Err(ActorError::from(err));
1040                };
1041
1042                let Some((hash, ..)) = self.helpers.clone() else {
1043                    let err = RequestHandlerError::HelpersNotInitialized;
1044                    error!(
1045                        msg_type = "NewRequest",
1046                        error = %err,
1047                        "Helpers not initialized"
1048                    );
1049                    return Err(emit_fail(ctx, ActorError::from(err)).await);
1050                };
1051
1052                if let Err(e) =
1053                    Self::check_owner_new_owner(ctx, request.content()).await
1054                {
1055                    error!(
1056                        msg_type = "NewRequest",
1057                        error = %e,
1058                        "Owner or new owner check failed"
1059                    );
1060                    return Err(ActorError::from(e));
1061                }
1062
1063                let subject_data = match Self::build_subject_data(
1064                    ctx,
1065                    request.content(),
1066                )
1067                .await
1068                {
1069                    Ok(data) => data,
1070                    Err(e) => {
1071                        error!(
1072                            msg_type = "NewRequest",
1073                            error = %e,
1074                            "Failed to build subject data"
1075                        );
1076                        return Err(ActorError::from(e));
1077                    }
1078                };
1079                let event_request_type =
1080                    EventRequestType::from(request.content());
1081                let signer = request.signature().signer.clone();
1082                let governance_id = subject_data
1083                    .get_governance_id()
1084                    .unwrap_or_else(|| request.content().get_subject_id());
1085                let is_gov = subject_data.get_schema_id().is_gov();
1086
1087                if !subject_data.get_active() {
1088                    let subject_id = request.content().get_subject_id();
1089                    error!(
1090                        msg_type = "NewRequest",
1091                        subject_id = %subject_id,
1092                        "Subject is not active"
1093                    );
1094                    return Err(ActorError::from(
1095                        RequestHandlerError::SubjectNotActive(
1096                            subject_id.to_string(),
1097                        ),
1098                    ));
1099                }
1100
1101                if let Err(e) =
1102                    Self::check_event_request(request.content(), is_gov)
1103                {
1104                    error!(
1105                        msg_type = "NewRequest",
1106                        error = %e,
1107                        "Event request validation failed"
1108                    );
1109                    return Err(ActorError::from(e));
1110                }
1111
1112                if let Err(e) = Self::check_signature(
1113                    ctx,
1114                    (*self.our_key).clone(),
1115                    signer.clone(),
1116                    &governance_id,
1117                    &event_request_type,
1118                    subject_data.clone(),
1119                )
1120                .await
1121                {
1122                    error!(
1123                        msg_type = "NewRequest",
1124                        governance_id = %governance_id,
1125                        error = %e,
1126                        "Signature check failed"
1127                    );
1128                    return Err(e);
1129                }
1130
1131                if let Err(e) = Self::check_creation(
1132                    ctx,
1133                    subject_data,
1134                    &event_request_type,
1135                    signer,
1136                )
1137                .await
1138                {
1139                    error!(
1140                        msg_type = "NewRequest",
1141                        error = %e,
1142                        "Creation check failed"
1143                    );
1144                    return Err(e);
1145                }
1146
1147                let (request_id, subject_id) =
1148                    match Self::build_request_id_subject_id(hash, &request) {
1149                        Ok(ids) => ids,
1150                        Err(e) => {
1151                            error!(
1152                                msg_type = "NewRequest",
1153                                error = %e,
1154                                "Failed to build request ID and subject ID"
1155                            );
1156                            return Err(ActorError::from(e));
1157                        }
1158                    };
1159
1160                if let Err(e) = self
1161                    .handle_queue_request(
1162                        ctx,
1163                        request,
1164                        &request_id,
1165                        &subject_id,
1166                        is_gov,
1167                    )
1168                    .await
1169                {
1170                    error!(
1171                        msg_type = "NewRequest",
1172                        request_id = %request_id,
1173                        subject_id = %subject_id,
1174                        error = %e,
1175                        "Failed to handle queue request"
1176                    );
1177                    return Err(e);
1178                }
1179
1180                Ok(RequestHandlerResponse::Ok(RequestData {
1181                    request_id,
1182                    subject_id,
1183                }))
1184            }
1185            RequestHandlerMessage::PopQueue { subject_id } => {
1186                let (event, request_id) = if let Some(events) =
1187                    self.in_queue.get(&subject_id)
1188                {
1189                    if let Some((event, request_id)) =
1190                        events.clone().pop_front()
1191                    {
1192                        (event, request_id)
1193                    } else {
1194                        if let Err(e) = Self::end_child(ctx, &subject_id).await
1195                        {
1196                            error!(
1197                                msg_type = "PopQueue",
1198                                subject_id = %subject_id,
1199                                error = %e,
1200                                "Failed to end child actor when queue is empty"
1201                            );
1202                            ctx.system().crash_system();
1203                            return Err(e);
1204                        }
1205                        return Ok(RequestHandlerResponse::None);
1206                    }
1207                } else {
1208                    if let Err(e) = Self::end_child(ctx, &subject_id).await {
1209                        error!(
1210                            msg_type = "PopQueue",
1211                            subject_id = %subject_id,
1212                            error = %e,
1213                            "Failed to end child actor when no events available"
1214                        );
1215                        ctx.system().crash_system();
1216                        return Err(e);
1217                    }
1218                    return Ok(RequestHandlerResponse::None);
1219                };
1220
1221                let is_gov = match Self::check_in_queue(
1222                    ctx,
1223                    &event,
1224                    (*self.our_key).clone(),
1225                )
1226                .await
1227                {
1228                    Ok(is_gov) => is_gov,
1229                    Err(e) => {
1230                        if let Err(e) = self
1231                            .error_queue_handling(
1232                                ctx,
1233                                e.to_string(),
1234                                &subject_id,
1235                                &request_id,
1236                            )
1237                            .await
1238                        {
1239                            error!(
1240                                msg_type = "PopQueue",
1241                                subject_id = %subject_id,
1242                                request_id = %request_id,
1243                                error = %e,
1244                                "Failed to handle queue error"
1245                            );
1246                            ctx.system().crash_system();
1247                            return Err(e);
1248                        };
1249
1250                        return Ok(RequestHandlerResponse::None);
1251                    }
1252                };
1253
1254                if let Err(e) = self
1255                    .in_queue_to_handling(ctx, event, &request_id, is_gov)
1256                    .await
1257                {
1258                    error!(
1259                        msg_type = "PopQueue",
1260                        request_id = %request_id,
1261                        error = %e,
1262                        "Failed to transition from queue to handling"
1263                    );
1264                    ctx.system().crash_system();
1265                    return Err(e);
1266                }
1267
1268                Ok(RequestHandlerResponse::None)
1269            }
1270            RequestHandlerMessage::EndHandling { subject_id } => {
1271                self.on_event(
1272                    RequestHandlerEvent::FinishHandling {
1273                        subject_id: subject_id.clone(),
1274                    },
1275                    ctx,
1276                )
1277                .await;
1278
1279                if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1280                    error!(
1281                        msg_type = "EndHandling",
1282                        subject_id = %subject_id,
1283                        error = %e,
1284                        "Failed to enqueue next event"
1285                    );
1286                    ctx.system().crash_system();
1287                    return Err(e);
1288                }
1289
1290                Ok(RequestHandlerResponse::None)
1291            }
1292        }
1293    }
1294
1295    async fn on_child_fault(
1296        &mut self,
1297        error: ActorError,
1298        ctx: &mut ActorContext<Self>,
1299    ) -> ChildAction {
1300        error!(
1301            error = %error,
1302            "Child fault in request handler"
1303        );
1304        ctx.system().crash_system();
1305        ChildAction::Stop
1306    }
1307
1308    async fn on_event(
1309        &mut self,
1310        event: RequestHandlerEvent,
1311        ctx: &mut ActorContext<Self>,
1312    ) {
1313        if let Err(e) = self.persist(&event, ctx).await {
1314            error!(
1315                error = %e,
1316                "Failed to persist event"
1317            );
1318            ctx.system().crash_system();
1319        };
1320    }
1321}
1322
1323#[async_trait]
1324impl Storable for RequestHandler {}
1325
1326#[async_trait]
1327impl PersistentActor for RequestHandler {
1328    type Persistence = LightPersistence;
1329    type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1330
1331    fn update(&mut self, state: Self) {
1332        self.in_queue = state.in_queue;
1333        self.handling = state.handling;
1334    }
1335
1336    fn create_initial(params: Self::InitParams) -> Self {
1337        Self {
1338            our_key: params.0,
1339            helpers: Some(params.1),
1340            handling: HashMap::new(),
1341            in_queue: HashMap::new(),
1342        }
1343    }
1344
1345    /// Change node state.
1346    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1347        match event {
1348            RequestHandlerEvent::EventToQueue {
1349                subject_id,
1350                event,
1351                request_id,
1352            } => {
1353                self.in_queue
1354                    .entry(subject_id.clone())
1355                    .or_default()
1356                    .push_back((event.clone(), request_id.clone()));
1357            }
1358            RequestHandlerEvent::Invalid { subject_id } => {
1359                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1360                    vec.pop_front();
1361                    if vec.is_empty() {
1362                        self.in_queue.remove(subject_id);
1363                    }
1364                }
1365            }
1366            RequestHandlerEvent::EventToHandling {
1367                subject_id,
1368                request_id,
1369            } => {
1370                self.handling.insert(subject_id.clone(), request_id.clone());
1371                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1372                    vec.pop_front();
1373                    if vec.is_empty() {
1374                        self.in_queue.remove(subject_id);
1375                    }
1376                }
1377            }
1378            RequestHandlerEvent::FinishHandling { subject_id } => {
1379                self.handling.remove(subject_id);
1380            }
1381        };
1382
1383        Ok(())
1384    }
1385}