Skip to main content

ave_core/request/
mod.rs

1use async_trait::async_trait;
2use ave_actors::{
3    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
4    Message, Response, Sink,
5};
6use ave_actors::{LightPersistence, PersistentActor};
7use ave_common::Namespace;
8use ave_common::bridge::request::{
9    ApprovalState, ApprovalStateRes, EventRequestType,
10};
11use ave_common::identity::{
12    DigestIdentifier, HashAlgorithm, PublicKey, Signed, TimeStamp, hash_borsh,
13};
14use ave_common::request::EventRequest;
15use ave_common::response::{
16    RequestState, RequestsInManager, RequestsInManagerSubject,
17};
18
19use borsh::{BorshDeserialize, BorshSerialize};
20use error::RequestHandlerError;
21use manager::{RequestManager, RequestManagerMessage};
22use serde::{Deserialize, Serialize};
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use tracing::{Span, error, info_span};
26use types::ReqManInitMessage;
27
28use crate::approval::persist::{
29    ApprPersist, ApprPersistMessage, ApprPersistResponse,
30};
31use crate::approval::request::ApprovalReq;
32use crate::db::Storable;
33use crate::governance::events::GovernanceEvent;
34use crate::governance::model::{HashThisRole, RoleTypes};
35use crate::helpers::db::ExternalDB;
36use crate::helpers::network::service::NetworkSender;
37use crate::metrics::try_core_metrics;
38use crate::model::common::node::{get_subject_data, i_owner_new_owner};
39use crate::model::common::subject::{
40    get_gov, get_tracker_visibility_state, get_version,
41};
42use crate::model::common::{
43    check_subject_creation, emit_fail, send_to_tracking,
44    viewpoints::validate_fact_viewpoints,
45};
46use crate::node::{Node, NodeMessage, NodeResponse, SubjectData};
47use crate::request::manager::InitRequestManager;
48use crate::request::tracking::{RequestTracking, RequestTrackingMessage};
49use crate::system::ConfigHelper;
50
51pub mod error;
52pub mod manager;
53pub mod reboot;
54pub mod tracking;
55pub mod types;
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct RequestData {
59    pub request_id: DigestIdentifier,
60    pub subject_id: DigestIdentifier,
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct RequestHandler {
65    #[serde(skip)]
66    helpers: Option<(HashAlgorithm, Arc<NetworkSender>)>,
67    #[serde(skip)]
68    our_key: Arc<PublicKey>,
69    handling: HashMap<DigestIdentifier, DigestIdentifier>,
70    in_queue: HashMap<
71        DigestIdentifier,
72        VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
73    >,
74}
75
76impl BorshSerialize for RequestHandler {
77    fn serialize<W: std::io::Write>(
78        &self,
79        writer: &mut W,
80    ) -> std::io::Result<()> {
81        // Serialize only the fields we want to persist, skipping 'owner'
82        BorshSerialize::serialize(&self.handling, writer)?;
83        BorshSerialize::serialize(&self.in_queue, writer)?;
84        Ok(())
85    }
86}
87
88impl BorshDeserialize for RequestHandler {
89    fn deserialize_reader<R: std::io::Read>(
90        reader: &mut R,
91    ) -> std::io::Result<Self> {
92        // Deserialize the persisted fields
93        let handling =
94            HashMap::<DigestIdentifier, DigestIdentifier>::deserialize_reader(
95                reader,
96            )?;
97        let in_queue = HashMap::<
98            DigestIdentifier,
99            VecDeque<(Signed<EventRequest>, DigestIdentifier)>,
100        >::deserialize_reader(reader)?;
101
102        let our_key = Arc::new(PublicKey::default());
103
104        Ok(Self {
105            helpers: None,
106            our_key,
107            handling,
108            in_queue,
109        })
110    }
111}
112
113impl RequestHandler {
114    async fn check_signer_authorization(
115        ctx: &mut ActorContext<Self>,
116        our_key: PublicKey,
117        signer: PublicKey,
118        governance_id: &DigestIdentifier,
119        event_request: &EventRequestType,
120        subject_data: SubjectData,
121    ) -> Result<(), ActorError> {
122        let gov = if matches!(subject_data, SubjectData::Tracker { .. })
123            || matches!(event_request, EventRequestType::Fact)
124        {
125            Some(get_gov(ctx, governance_id).await?)
126        } else {
127            None
128        };
129
130        if let SubjectData::Tracker {
131            schema_id,
132            namespace,
133            ..
134        } = &subject_data
135        {
136            let Some(gov) = gov.as_ref() else {
137                return Err(ActorError::FunctionalCritical {
138                    description:
139                        "Governance required for tracker signer authorization"
140                            .to_owned(),
141                });
142            };
143
144            if !gov.has_this_role(HashThisRole::Schema {
145                who: our_key.clone(),
146                role: RoleTypes::Creator,
147                schema_id: schema_id.clone(),
148                namespace: Namespace::from(namespace.clone()),
149            }) {
150                return Err(ActorError::Functional {
151                    description:
152                        "In tracker events, the node has to be a creator"
153                            .to_string(),
154                });
155            }
156        }
157
158        match event_request {
159            EventRequestType::Create
160            | EventRequestType::Transfer
161            | EventRequestType::Confirm
162            | EventRequestType::Reject
163            | EventRequestType::Eol => {
164                if signer != our_key {
165                    return Err(ActorError::Functional { description: "In the events of Create, Transfer, Confirm, Reject or EOL, the event must be signed by the node".to_string() });
166                }
167            }
168            EventRequestType::Fact => match subject_data {
169                SubjectData::Tracker {
170                    schema_id,
171                    namespace,
172                    ..
173                } => {
174                    let Some(gov) = gov.as_ref() else {
175                        return Err(ActorError::FunctionalCritical {
176                            description:
177                                "Governance required for fact signer authorization"
178                                    .to_owned(),
179                        });
180                    };
181
182                    if !gov.has_this_role(HashThisRole::Schema {
183                        who: signer,
184                        role: RoleTypes::Issuer,
185                        schema_id,
186                        namespace: Namespace::from(namespace),
187                    }) {
188                        return Err(ActorError::Functional {
189                            description:
190                                "In fact events, the signer has to be an issuer"
191                                    .to_string(),
192                        });
193                    }
194                }
195                SubjectData::Governance { .. } => {
196                    let Some(gov) = gov.as_ref() else {
197                        return Err(ActorError::FunctionalCritical {
198                            description:
199                                "Governance required for fact signer authorization"
200                                    .to_owned(),
201                        });
202                    };
203
204                    if !gov.has_this_role(HashThisRole::Gov {
205                        who: signer,
206                        role: RoleTypes::Issuer,
207                    }) {
208                        return Err(ActorError::Functional {
209                            description:
210                                "In fact events, the signer has to be an issuer"
211                                    .to_string(),
212                        });
213                    }
214                }
215            },
216        }
217
218        Ok(())
219    }
220
221    async fn queued_event(
222        ctx: &ActorContext<Self>,
223        subject_id: &DigestIdentifier,
224    ) -> Result<(), ActorError> {
225        let request_actor = ctx.reference().await?;
226        request_actor
227            .tell(RequestHandlerMessage::PopQueue {
228                subject_id: subject_id.to_owned(),
229            })
230            .await
231    }
232
233    async fn error_queue_handling(
234        &mut self,
235        ctx: &mut ActorContext<Self>,
236        error: String,
237        subject_id: &DigestIdentifier,
238        request_id: &DigestIdentifier,
239    ) -> Result<(), ActorError> {
240        if let Some(metrics) = try_core_metrics() {
241            metrics.observe_request_invalid();
242        }
243
244        self.on_event(
245            RequestHandlerEvent::Invalid {
246                subject_id: subject_id.to_owned(),
247            },
248            ctx,
249        )
250        .await;
251
252        send_to_tracking(
253            ctx,
254            RequestTrackingMessage::UpdateState {
255                request_id: request_id.clone(),
256                state: RequestState::Invalid {
257                    error,
258                    sn: None,
259                    subject_id: subject_id.to_string(),
260                    who: self.our_key.to_string(),
261                },
262            },
263        )
264        .await?;
265
266        Self::queued_event(ctx, subject_id).await
267    }
268
269    async fn change_approval(
270        ctx: &ActorContext<Self>,
271        subject_id: &DigestIdentifier,
272        state: ApprovalStateRes,
273    ) -> Result<(), RequestHandlerError> {
274        if state == ApprovalStateRes::Obsolete {
275            return Err(RequestHandlerError::ObsoleteApproval);
276        }
277
278        let approver_path = ActorPath::from(format!(
279            "/user/node/subject_manager/{}/approver",
280            subject_id
281        ));
282        let approver_actor = ctx
283            .system()
284            .get_actor::<ApprPersist>(&approver_path)
285            .await
286            .map_err(|_| {
287                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
288            })?;
289
290        approver_actor
291            .tell(ApprPersistMessage::ChangeResponse {
292                response: state.clone(),
293            })
294            .await
295            .map_err(|_| RequestHandlerError::ApprovalChangeFailed)
296    }
297
298    async fn get_approval(
299        ctx: &ActorContext<Self>,
300        subject_id: &DigestIdentifier,
301        state: Option<ApprovalState>,
302    ) -> Result<Option<(ApprovalReq, ApprovalState)>, RequestHandlerError> {
303        let approver_path = ActorPath::from(format!(
304            "/user/node/subject_manager/{}/approver",
305            subject_id
306        ));
307        let approver_actor = ctx
308            .system()
309            .get_actor::<ApprPersist>(&approver_path)
310            .await
311            .map_err(|_| {
312                RequestHandlerError::ApprovalNotFound(subject_id.to_string())
313            })?;
314
315        let response = approver_actor
316            .ask(ApprPersistMessage::GetApproval { state })
317            .await
318            .map_err(|_| RequestHandlerError::ApprovalGetFailed)?;
319
320        let res = match response {
321            ApprPersistResponse::Ok => None,
322            ApprPersistResponse::Approval { request, state } => {
323                Some((request, state))
324            }
325        };
326
327        Ok(res)
328    }
329
330    async fn get_all_approvals(
331        ctx: &ActorContext<Self>,
332        state: Option<ApprovalState>,
333    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, ActorError> {
334        let node_path = ActorPath::from("/user/node");
335        let node_actor = ctx.system().get_actor::<Node>(&node_path).await?;
336        let response = node_actor.ask(NodeMessage::GetGovernances).await?;
337        let vec = match response {
338            NodeResponse::Governances(govs) => govs,
339            _ => {
340                return Err(ActorError::UnexpectedResponse {
341                    path: node_path,
342                    expected: "NodeResponse::Governances".to_string(),
343                });
344            }
345        };
346
347        let mut responses = vec![];
348        for governance in vec.iter() {
349            let approver_path = ActorPath::from(format!(
350                "/user/node/subject_manager/{}/approver",
351                governance
352            ));
353            if let Ok(approver_actor) =
354                ctx.system().get_actor::<ApprPersist>(&approver_path).await
355            {
356                let response = approver_actor
357                    .ask(ApprPersistMessage::GetApproval {
358                        state: state.clone(),
359                    })
360                    .await?;
361
362                match response {
363                    ApprPersistResponse::Ok => {}
364                    ApprPersistResponse::Approval { request, state } => {
365                        responses.push((request, state))
366                    }
367                };
368            };
369        }
370
371        Ok(responses)
372    }
373
374    async fn check_owner_new_owner(
375        ctx: &mut ActorContext<Self>,
376        request: &EventRequest,
377    ) -> Result<(), RequestHandlerError> {
378        match request {
379            EventRequest::Create(..) => {}
380            EventRequest::Fact(..)
381            | EventRequest::Transfer(..)
382            | EventRequest::EOL(..) => {
383                let subject_id = request.get_subject_id();
384                let (i_owner, i_new_owner) =
385                    i_owner_new_owner(ctx, &subject_id).await?;
386                if !i_owner {
387                    return Err(RequestHandlerError::NotOwner(
388                        subject_id.to_string(),
389                    ));
390                }
391
392                if i_new_owner.is_some() {
393                    return Err(RequestHandlerError::PendingNewOwner(
394                        subject_id.to_string(),
395                    ));
396                }
397            }
398            EventRequest::Confirm(..) | EventRequest::Reject(..) => {
399                let subject_id = request.get_subject_id();
400                let (i_owner, i_new_owner) =
401                    i_owner_new_owner(ctx, &subject_id).await?;
402                if i_owner {
403                    return Err(RequestHandlerError::IsOwner(
404                        subject_id.to_string(),
405                    ));
406                }
407
408                if let Some(new_owner) = i_new_owner {
409                    if !new_owner {
410                        return Err(RequestHandlerError::NotNewOwner(
411                            subject_id.to_string(),
412                        ));
413                    }
414                } else {
415                    return Err(RequestHandlerError::NoNewOwnerPending(
416                        subject_id.to_string(),
417                    ));
418                }
419            }
420        };
421        Ok(())
422    }
423
424    fn check_event_request(
425        request: &EventRequest,
426        is_gov: bool,
427    ) -> Result<(), RequestHandlerError> {
428        match request {
429            EventRequest::Create(create_request) => {
430                if let Some(name) = &create_request.name
431                    && (name.is_empty() || name.len() > 100)
432                {
433                    return Err(RequestHandlerError::InvalidName);
434                }
435
436                if let Some(description) = &create_request.description
437                    && (description.is_empty() || description.len() > 200)
438                {
439                    return Err(RequestHandlerError::InvalidDescription);
440                }
441
442                if !create_request.schema_id.is_valid_in_request() {
443                    return Err(RequestHandlerError::InvalidSchemaId);
444                }
445
446                if is_gov {
447                    if !create_request.governance_id.is_empty() {
448                        return Err(
449                            RequestHandlerError::GovernanceIdMustBeEmpty,
450                        );
451                    }
452
453                    if !create_request.namespace.is_empty() {
454                        return Err(RequestHandlerError::NamespaceMustBeEmpty);
455                    }
456                } else if create_request.governance_id.is_empty() {
457                    return Err(RequestHandlerError::GovernanceIdRequired);
458                }
459            }
460            EventRequest::Transfer(transfer_request) => {
461                if transfer_request.new_owner.is_empty() {
462                    return Err(RequestHandlerError::TransferNewOwnerEmpty);
463                }
464            }
465            EventRequest::Confirm(confirm_request) => {
466                if is_gov {
467                    if let Some(name_old_owner) =
468                        &confirm_request.name_old_owner
469                        && name_old_owner.is_empty()
470                    {
471                        return Err(
472                            RequestHandlerError::ConfirmNameOldOwnerEmpty,
473                        );
474                    }
475                } else if confirm_request.name_old_owner.is_some() {
476                    return Err(
477                        RequestHandlerError::ConfirmTrackerNameOldOwner,
478                    );
479                }
480            }
481            EventRequest::Fact(fact_request) => {
482                if is_gov && !fact_request.viewpoints.is_empty() {
483                    return Err(
484                        RequestHandlerError::GovFactViewpointsNotAllowed,
485                    );
486                }
487
488                if is_gov
489                    && serde_json::from_value::<GovernanceEvent>(
490                        fact_request.payload.0.clone(),
491                    )
492                    .is_err()
493                {
494                    return Err(RequestHandlerError::GovFactInvalidEvent);
495                }
496            }
497            EventRequest::Reject(..) | EventRequest::EOL(..) => {}
498        }
499
500        Ok(())
501    }
502
503    async fn check_fact_viewpoints(
504        ctx: &mut ActorContext<Self>,
505        request: &EventRequest,
506        subject_data: &SubjectData,
507    ) -> Result<(), RequestHandlerError> {
508        let EventRequest::Fact(fact_request) = request else {
509            return Ok(());
510        };
511
512        match subject_data {
513            SubjectData::Governance { .. } => validate_fact_viewpoints(
514                &fact_request.viewpoints,
515                &ave_common::SchemaType::Governance,
516                None,
517            )
518            .map_err(RequestHandlerError::InvalidTrackerFactViewpoints),
519            SubjectData::Tracker {
520                governance_id,
521                schema_id,
522                ..
523            } => {
524                let governance = get_gov(ctx, governance_id).await?;
525                let Some(schema) = governance.schemas.get(schema_id) else {
526                    return Err(RequestHandlerError::Actor(
527                        ActorError::FunctionalCritical {
528                            description: format!(
529                                "schema {} not found in governance {} while validating fact viewpoints",
530                                schema_id, governance_id
531                            ),
532                        },
533                    ));
534                };
535
536                validate_fact_viewpoints(
537                    &fact_request.viewpoints,
538                    schema_id,
539                    Some(&schema.viewpoints),
540                )
541                .map_err(RequestHandlerError::InvalidTrackerFactViewpoints)
542            }
543        }
544    }
545
546    async fn check_tracker_ledger_full(
547        ctx: &mut ActorContext<Self>,
548        request: &EventRequest,
549        subject_data: &SubjectData,
550    ) -> Result<(), RequestHandlerError> {
551        let SubjectData::Tracker { governance_id, .. } = subject_data else {
552            return Ok(());
553        };
554
555        if matches!(request, EventRequest::Create(..)) {
556            return Ok(());
557        }
558
559        let subject_id = request.get_subject_id();
560        let visibility_state =
561            get_tracker_visibility_state(ctx, governance_id, &subject_id)
562                .await?;
563
564        if !visibility_state.is_full() {
565            return Err(RequestHandlerError::TrackerLedgerNotFull(
566                subject_id.to_string(),
567            ));
568        }
569
570        Ok(())
571    }
572
573    async fn build_subject_data(
574        ctx: &mut ActorContext<Self>,
575        request: &EventRequest,
576    ) -> Result<SubjectData, RequestHandlerError> {
577        let subject_data = match request {
578            EventRequest::Create(create_request) => {
579                if create_request.schema_id.is_gov() {
580                    SubjectData::Governance { active: true }
581                } else {
582                    SubjectData::Tracker {
583                        governance_id: create_request.governance_id.clone(),
584                        schema_id: create_request.schema_id.clone(),
585                        namespace: create_request.namespace.to_string(),
586                        active: true,
587                    }
588                }
589            }
590            EventRequest::Fact(..)
591            | EventRequest::Transfer(..)
592            | EventRequest::Confirm(..)
593            | EventRequest::Reject(..)
594            | EventRequest::EOL(..) => {
595                let subject_id = request.get_subject_id();
596                let Some(subject_data) =
597                    get_subject_data(ctx, &subject_id).await?
598                else {
599                    return Err(RequestHandlerError::SubjectDataNotFound(
600                        subject_id.to_string(),
601                    ));
602                };
603
604                subject_data
605            }
606        };
607
608        Ok(subject_data)
609    }
610
611    async fn check_creation(
612        ctx: &mut ActorContext<Self>,
613        subject_data: SubjectData,
614        event_request: &EventRequestType,
615        signer: PublicKey,
616    ) -> Result<(), ActorError> {
617        match event_request {
618            EventRequestType::Create | EventRequestType::Confirm => {
619                if let SubjectData::Tracker {
620                    governance_id,
621                    schema_id,
622                    namespace,
623                    ..
624                } = subject_data
625                {
626                    let version = get_version(ctx, &governance_id).await?;
627                    check_subject_creation(
628                        ctx,
629                        &governance_id,
630                        signer,
631                        version,
632                        namespace,
633                        schema_id,
634                    )
635                    .await?;
636                }
637            }
638            _ => {}
639        }
640
641        Ok(())
642    }
643
644    fn build_request_id_subject_id(
645        hash: HashAlgorithm,
646        request: &Signed<EventRequest>,
647    ) -> Result<(DigestIdentifier, DigestIdentifier), RequestHandlerError> {
648        match &request.content() {
649            EventRequest::Create(..) => {
650                let request_id = hash_borsh(
651                    &*hash.hasher(),
652                    &(request.clone(), TimeStamp::now().as_nanos()),
653                )
654                .map_err(|e| {
655                    RequestHandlerError::RequestIdHash(e.to_string())
656                })?;
657
658                let subject_id =
659                    hash_borsh(&*hash.hasher(), request).map_err(|e| {
660                        RequestHandlerError::SubjectIdHash(e.to_string())
661                    })?;
662
663                Ok((request_id, subject_id))
664            }
665            EventRequest::Fact(..)
666            | EventRequest::Transfer(..)
667            | EventRequest::Confirm(..)
668            | EventRequest::Reject(..)
669            | EventRequest::EOL(..) => {
670                let request_id = hash_borsh(
671                    &*hash.hasher(),
672                    &(request.clone(), TimeStamp::now().as_nanos()),
673                )
674                .map_err(|e| {
675                    RequestHandlerError::RequestIdHash(e.to_string())
676                })?;
677
678                Ok((request_id, request.content().get_subject_id()))
679            }
680        }
681    }
682
683    async fn handle_queue_request(
684        &mut self,
685        ctx: &mut ActorContext<Self>,
686        request: Signed<EventRequest>,
687        request_id: &DigestIdentifier,
688        subject_id: &DigestIdentifier,
689        is_gov: bool,
690        governance_id: Option<DigestIdentifier>,
691    ) -> Result<(), ActorError> {
692        let Some(helpers) = self.helpers.clone() else {
693            let e = " Can not obtain helpers".to_string();
694
695            return Err(ActorError::FunctionalCritical { description: e });
696        };
697
698        let in_handling = self.handling.contains_key(subject_id);
699        let in_queue = self.in_queue.contains_key(subject_id);
700
701        if !in_handling && !in_queue {
702            let command = Self::build_req_manager_init_msg(
703                &EventRequestType::from(request.content()),
704                is_gov,
705            );
706            let init_data = InitRequestManager {
707                our_key: self.our_key.clone(),
708                subject_id: subject_id.clone(),
709                governance_id,
710                helpers,
711            };
712
713            let actor = ctx
714                .create_child(
715                    &subject_id.to_string(),
716                    RequestManager::initial(init_data),
717                )
718                .await?;
719            actor
720                .tell(RequestManagerMessage::FirstRun {
721                    command,
722                    request,
723                    request_id: request_id.clone(),
724                })
725                .await?;
726
727            self.on_event(
728                RequestHandlerEvent::EventToHandling {
729                    subject_id: subject_id.clone(),
730                    request_id: request_id.clone(),
731                },
732                ctx,
733            )
734            .await;
735
736            send_to_tracking(
737                ctx,
738                RequestTrackingMessage::UpdateState {
739                    request_id: request_id.clone(),
740                    state: RequestState::Handling,
741                },
742            )
743            .await?;
744        } else {
745            self.on_event(
746                RequestHandlerEvent::EventToQueue {
747                    subject_id: subject_id.clone(),
748                    event: request,
749                    request_id: request_id.clone(),
750                },
751                ctx,
752            )
753            .await;
754
755            send_to_tracking(
756                ctx,
757                RequestTrackingMessage::UpdateState {
758                    request_id: request_id.clone(),
759                    state: RequestState::InQueue,
760                },
761            )
762            .await?;
763        }
764
765        Ok(())
766    }
767
768    const fn build_req_manager_init_msg(
769        event_request: &EventRequestType,
770        is_gov: bool,
771    ) -> ReqManInitMessage {
772        match event_request {
773            EventRequestType::Create => ReqManInitMessage::Validate,
774            EventRequestType::Fact => ReqManInitMessage::Evaluate,
775            EventRequestType::Transfer => ReqManInitMessage::Evaluate,
776            EventRequestType::Confirm => {
777                if is_gov {
778                    ReqManInitMessage::Evaluate
779                } else {
780                    ReqManInitMessage::Validate
781                }
782            }
783            EventRequestType::Reject => ReqManInitMessage::Validate,
784            EventRequestType::Eol => ReqManInitMessage::Validate,
785        }
786    }
787
788    async fn check_in_queue(
789        ctx: &mut ActorContext<Self>,
790        request: &Signed<EventRequest>,
791        our_key: PublicKey,
792    ) -> Result<bool, RequestHandlerError> {
793        if let EventRequest::Create(..) = request.content() {
794            return Err(RequestHandlerError::CreationNotQueued);
795        }
796
797        Self::check_owner_new_owner(ctx, request.content()).await?;
798
799        let subject_data =
800            Self::build_subject_data(ctx, request.content()).await?;
801        let event_request_type = EventRequestType::from(request.content());
802        let signer = request.signature().signer.clone();
803        let governance_id = subject_data
804            .get_governance_id()
805            .unwrap_or_else(|| request.content().get_subject_id());
806        let is_gov = subject_data.get_schema_id().is_gov();
807
808        if !subject_data.get_active() {
809            return Err(RequestHandlerError::SubjectNotActive(
810                request.content().get_subject_id().to_string(),
811            ));
812        }
813
814        Self::check_tracker_ledger_full(ctx, request.content(), &subject_data)
815            .await?;
816
817        Self::check_signer_authorization(
818            ctx,
819            our_key,
820            signer.clone(),
821            &governance_id,
822            &event_request_type,
823            subject_data.clone(),
824        )
825        .await?;
826
827        Self::check_fact_viewpoints(ctx, request.content(), &subject_data)
828            .await?;
829
830        Self::check_creation(ctx, subject_data, &event_request_type, signer)
831            .await?;
832
833        Ok(is_gov)
834    }
835
836    async fn in_queue_to_handling(
837        &mut self,
838        ctx: &mut ActorContext<Self>,
839        request: Signed<EventRequest>,
840        request_id: &DigestIdentifier,
841        is_gov: bool,
842    ) -> Result<(), ActorError> {
843        let command = Self::build_req_manager_init_msg(
844            &EventRequestType::from(request.content()),
845            is_gov,
846        );
847        let subject_id = request.content().get_subject_id();
848
849        let actor = ctx
850            .get_child::<RequestManager>(&subject_id.to_string())
851            .await?;
852
853        actor
854            .tell(RequestManagerMessage::FirstRun {
855                command,
856                request,
857                request_id: request_id.clone(),
858            })
859            .await?;
860
861        self.on_event(
862            RequestHandlerEvent::EventToHandling {
863                subject_id: subject_id.clone(),
864                request_id: request_id.clone(),
865            },
866            ctx,
867        )
868        .await;
869
870        send_to_tracking(
871            ctx,
872            RequestTrackingMessage::UpdateState {
873                request_id: request_id.clone(),
874                state: RequestState::Handling,
875            },
876        )
877        .await
878    }
879
880    async fn end_child(
881        ctx: &ActorContext<Self>,
882        subject_id: &DigestIdentifier,
883    ) -> Result<(), ActorError> {
884        let actor = ctx
885            .get_child::<RequestManager>(&subject_id.to_string())
886            .await?;
887        actor.ask_stop().await
888    }
889
890    async fn manual_abort_request(
891        &self,
892        ctx: &ActorContext<Self>,
893        subject_id: &DigestIdentifier,
894    ) -> Result<(), ActorError> {
895        let actor = ctx
896            .get_child::<RequestManager>(&subject_id.to_string())
897            .await?;
898
899        actor.tell(RequestManagerMessage::ManualAbort).await
900    }
901
902    async fn purge_request_manager(
903        &self,
904        ctx: &mut ActorContext<Self>,
905        subject_id: &DigestIdentifier,
906    ) -> Result<(), ActorError> {
907        let Some((hash, network)) = self.helpers.clone() else {
908            return Err(ActorError::FunctionalCritical {
909                description: "Request handler helpers are not initialized"
910                    .to_string(),
911            });
912        };
913
914        let governance_id = get_subject_data(ctx, subject_id)
915            .await?
916            .and_then(|data| data.get_governance_id());
917        let request_manager_init = InitRequestManager {
918            our_key: self.our_key.clone(),
919            subject_id: subject_id.clone(),
920            governance_id,
921            helpers: (hash, network),
922        };
923
924        let actor = match ctx
925            .create_child(
926                &subject_id.to_string(),
927                RequestManager::initial(request_manager_init),
928            )
929            .await
930        {
931            Ok(actor) => actor,
932            Err(ActorError::Exists { .. }) => {
933                ctx.get_child::<RequestManager>(&subject_id.to_string())
934                    .await?
935            }
936            Err(err) => return Err(err),
937        };
938
939        actor.ask(RequestManagerMessage::PurgeStorage).await?;
940        actor.ask_stop().await?;
941
942        Ok(())
943    }
944}
945
946#[derive(Debug, Clone)]
947pub enum RequestHandlerMessage {
948    NewRequest {
949        request: Signed<EventRequest>,
950    },
951    RequestInManager,
952    RequestInManagerSubjectId {
953        subject_id: DigestIdentifier,
954    },
955    ChangeApprovalState {
956        subject_id: DigestIdentifier,
957        state: ApprovalStateRes,
958    },
959    GetApproval {
960        subject_id: DigestIdentifier,
961        state: Option<ApprovalState>,
962    },
963    GetAllApprovals {
964        state: Option<ApprovalState>,
965    },
966    PopQueue {
967        subject_id: DigestIdentifier,
968    },
969    EndHandling {
970        subject_id: DigestIdentifier,
971    },
972    PurgeSubject {
973        subject_id: DigestIdentifier,
974    },
975    AbortRequest {
976        subject_id: DigestIdentifier,
977    },
978}
979
980impl Message for RequestHandlerMessage {}
981
982#[derive(Debug, Clone)]
983pub enum RequestHandlerResponse {
984    RequestInManager(RequestsInManager),
985    RequestInManagerSubjectId(RequestsInManagerSubject),
986    Ok(RequestData),
987    Response(String),
988    Approval(Option<(ApprovalReq, ApprovalState)>),
989    Approvals(Vec<(ApprovalReq, ApprovalState)>),
990    None,
991}
992
993impl Response for RequestHandlerResponse {}
994
995#[derive(
996    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
997)]
998pub enum RequestHandlerEvent {
999    EventToQueue {
1000        subject_id: DigestIdentifier,
1001        event: Signed<EventRequest>,
1002        request_id: DigestIdentifier,
1003    },
1004    Invalid {
1005        subject_id: DigestIdentifier,
1006    },
1007    FinishHandling {
1008        subject_id: DigestIdentifier,
1009    },
1010    PurgeSubject {
1011        subject_id: DigestIdentifier,
1012    },
1013    EventToHandling {
1014        subject_id: DigestIdentifier,
1015        request_id: DigestIdentifier,
1016    },
1017}
1018
1019impl Event for RequestHandlerEvent {}
1020
1021#[async_trait]
1022impl Actor for RequestHandler {
1023    type Event = RequestHandlerEvent;
1024    type Message = RequestHandlerMessage;
1025    type Response = RequestHandlerResponse;
1026
1027    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
1028        parent_span.map_or_else(
1029            || info_span!("RequestHandler"),
1030            |parent_span| info_span!(parent: parent_span, "RequestHandler"),
1031        )
1032    }
1033
1034    async fn pre_start(
1035        &mut self,
1036        ctx: &mut ActorContext<Self>,
1037    ) -> Result<(), ActorError> {
1038        if let Err(e) = self.init_store("request", None, false, ctx).await {
1039            error!(
1040                error = %e,
1041                "Failed to initialize store during pre_start"
1042            );
1043            return Err(e);
1044        }
1045
1046        let Some(config) =
1047            ctx.system().get_helper::<ConfigHelper>("config").await
1048        else {
1049            error!(
1050                helper = "config",
1051                "Config helper not found during pre_start"
1052            );
1053            return Err(ActorError::Helper {
1054                name: "config".to_owned(),
1055                reason: "Not found".to_string(),
1056            });
1057        };
1058
1059        if !config.safe_mode {
1060            let Some(ext_db): Option<Arc<ExternalDB>> =
1061                ctx.system().get_helper("ext_db").await
1062            else {
1063                error!("External database helper not found");
1064                return Err(ActorError::Helper {
1065                    name: "ext_db".to_string(),
1066                    reason: "Not found".to_string(),
1067                });
1068            };
1069
1070            let tracking = match ctx
1071                .create_child(
1072                    "tracking",
1073                    RequestTracking::new(config.tracking_size),
1074                )
1075                .await
1076            {
1077                Ok(actor) => actor,
1078                Err(e) => {
1079                    error!(
1080                        error = %e,
1081                        "Failed to create tracking child during pre_start"
1082                    );
1083                    return Err(e);
1084                }
1085            };
1086
1087            let sink =
1088                Sink::new(tracking.subscribe(), ext_db.get_request_tracking());
1089
1090            ctx.system().run_sink(sink).await;
1091        }
1092
1093        let Some((hash, network)) = self.helpers.clone() else {
1094            let e = " Can not obtain helpers".to_string();
1095            error!(
1096                error = %e,
1097                "Failed to obtain helpers during pre_start"
1098            );
1099            ctx.system().crash_system();
1100            return Err(ActorError::FunctionalCritical { description: e });
1101        };
1102
1103        if config.safe_mode {
1104            return Ok(());
1105        }
1106
1107        for (subject_id, request_id) in self.handling.clone() {
1108            let governance_id = get_subject_data(ctx, &subject_id)
1109                .await?
1110                .and_then(|data| data.get_governance_id());
1111            let request_manager_init = InitRequestManager {
1112                our_key: self.our_key.clone(),
1113                subject_id: subject_id.clone(),
1114                governance_id,
1115                helpers: (hash, network.clone()),
1116            };
1117
1118            let request_manager_actor = match ctx
1119                .create_child(
1120                    &subject_id.to_string(),
1121                    RequestManager::initial(request_manager_init),
1122                )
1123                .await
1124            {
1125                Ok(actor) => actor,
1126                Err(e) => {
1127                    error!(
1128                        subject_id = %subject_id,
1129                        error = %e,
1130                        "Failed to create request manager child during pre_start"
1131                    );
1132                    return Err(e);
1133                }
1134            };
1135
1136            if let Err(e) = request_manager_actor
1137                .tell(RequestManagerMessage::Run {
1138                    request_id: request_id.clone(),
1139                })
1140                .await
1141            {
1142                error!(
1143                    subject_id = %subject_id,
1144                    request_id = %request_id,
1145                    error = %e,
1146                    "Failed to send Run message to request manager during pre_start"
1147                );
1148                return Err(e);
1149            }
1150        }
1151
1152        Ok(())
1153    }
1154}
1155
1156#[async_trait]
1157impl Handler<Self> for RequestHandler {
1158    async fn handle_message(
1159        &mut self,
1160        _sender: ActorPath,
1161        msg: RequestHandlerMessage,
1162        ctx: &mut ave_actors::ActorContext<Self>,
1163    ) -> Result<RequestHandlerResponse, ActorError> {
1164        match msg {
1165            RequestHandlerMessage::RequestInManagerSubjectId { subject_id } => {
1166                let handling =
1167                    self.handling.get(&subject_id).map(|x| x.to_string());
1168                let in_queue = self.in_queue.get(&subject_id).map(|x| {
1169                    x.iter().map(|x| x.1.to_string()).collect::<Vec<String>>()
1170                });
1171
1172                Ok(RequestHandlerResponse::RequestInManagerSubjectId(
1173                    RequestsInManagerSubject { handling, in_queue },
1174                ))
1175            }
1176            RequestHandlerMessage::RequestInManager => Ok(
1177                RequestHandlerResponse::RequestInManager(RequestsInManager {
1178                    handling: self
1179                        .handling
1180                        .iter()
1181                        .map(|x| (x.0.to_string(), x.1.to_string()))
1182                        .collect(),
1183                    in_queue: self
1184                        .in_queue
1185                        .iter()
1186                        .map(|x| {
1187                            (
1188                                x.0.to_string(),
1189                                x.1.iter()
1190                                    .map(|x| x.1.to_string())
1191                                    .collect::<Vec<String>>(),
1192                            )
1193                        })
1194                        .collect(),
1195                }),
1196            ),
1197            RequestHandlerMessage::AbortRequest { subject_id } => {
1198                self.manual_abort_request(ctx, &subject_id).await?;
1199                Ok(RequestHandlerResponse::None)
1200            }
1201            RequestHandlerMessage::PurgeSubject { subject_id } => {
1202                self.purge_request_manager(ctx, &subject_id).await?;
1203                self.on_event(
1204                    RequestHandlerEvent::PurgeSubject {
1205                        subject_id: subject_id.clone(),
1206                    },
1207                    ctx,
1208                )
1209                .await;
1210                Ok(RequestHandlerResponse::None)
1211            }
1212            RequestHandlerMessage::ChangeApprovalState {
1213                subject_id,
1214                state,
1215            } => {
1216                Self::change_approval(ctx, &subject_id, state.clone())
1217                    .await
1218                    .map_err(|e| {
1219                        error!(
1220                            error = %e,
1221                            "ChangeApprovalState failed"
1222                        );
1223                        ActorError::from(e)
1224                    })?;
1225
1226                Ok(RequestHandlerResponse::Response(format!(
1227                    "The approval request for subject {} has changed to {}",
1228                    subject_id, state
1229                )))
1230            }
1231            RequestHandlerMessage::GetApproval { subject_id, state } => {
1232                let res = Self::get_approval(ctx, &subject_id, state.clone())
1233                    .await
1234                    .map_err(ActorError::from)?;
1235
1236                Ok(RequestHandlerResponse::Approval(res))
1237            }
1238            RequestHandlerMessage::GetAllApprovals { state } => {
1239                let res = Self::get_all_approvals(ctx, state.clone())
1240                    .await
1241                    .map_err(|e| {
1242                        error!(
1243                            error = %e,
1244                            "GetAllApprovals failed"
1245                        );
1246                        e
1247                    })?;
1248
1249                Ok(RequestHandlerResponse::Approvals(res))
1250            }
1251            RequestHandlerMessage::NewRequest { request } => {
1252                if let Err(e) = request.verify() {
1253                    let err = RequestHandlerError::SignatureVerification(
1254                        e.to_string(),
1255                    );
1256                    error!(error = %err, "Request signature verification failed");
1257                    return Err(ActorError::from(err));
1258                };
1259
1260                let Some((hash, ..)) = self.helpers.clone() else {
1261                    let err = RequestHandlerError::HelpersNotInitialized;
1262                    error!(
1263                        msg_type = "NewRequest",
1264                        error = %err,
1265                        "Helpers not initialized"
1266                    );
1267                    return Err(emit_fail(ctx, ActorError::from(err)).await);
1268                };
1269
1270                if let Err(e) =
1271                    Self::check_owner_new_owner(ctx, request.content()).await
1272                {
1273                    error!(
1274                        msg_type = "NewRequest",
1275                        error = %e,
1276                        "Owner or new owner check failed"
1277                    );
1278                    return Err(ActorError::from(e));
1279                }
1280
1281                let subject_data = match Self::build_subject_data(
1282                    ctx,
1283                    request.content(),
1284                )
1285                .await
1286                {
1287                    Ok(data) => data,
1288                    Err(e) => {
1289                        error!(
1290                            msg_type = "NewRequest",
1291                            error = %e,
1292                            "Failed to build subject data"
1293                        );
1294                        return Err(ActorError::from(e));
1295                    }
1296                };
1297                let event_request_type =
1298                    EventRequestType::from(request.content());
1299                let signer = request.signature().signer.clone();
1300                let governance_id = subject_data.get_governance_id();
1301                let governance_subject_id = governance_id
1302                    .clone()
1303                    .unwrap_or_else(|| request.content().get_subject_id());
1304                let is_gov = subject_data.get_schema_id().is_gov();
1305
1306                if !subject_data.get_active() {
1307                    let subject_id = request.content().get_subject_id();
1308                    error!(
1309                        msg_type = "NewRequest",
1310                        subject_id = %subject_id,
1311                        "Subject is not active"
1312                    );
1313                    return Err(ActorError::from(
1314                        RequestHandlerError::SubjectNotActive(
1315                            subject_id.to_string(),
1316                        ),
1317                    ));
1318                }
1319
1320                if let Err(e) = Self::check_tracker_ledger_full(
1321                    ctx,
1322                    request.content(),
1323                    &subject_data,
1324                )
1325                .await
1326                {
1327                    error!(
1328                        msg_type = "NewRequest",
1329                        error = %e,
1330                        "Tracker full ledger check failed"
1331                    );
1332                    return Err(ActorError::from(e));
1333                }
1334
1335                if let Err(e) =
1336                    Self::check_event_request(request.content(), is_gov)
1337                {
1338                    error!(
1339                        msg_type = "NewRequest",
1340                        error = %e,
1341                        "Event request validation failed"
1342                    );
1343                    return Err(ActorError::from(e));
1344                }
1345
1346                if let Err(e) = Self::check_fact_viewpoints(
1347                    ctx,
1348                    request.content(),
1349                    &subject_data,
1350                )
1351                .await
1352                {
1353                    error!(
1354                        msg_type = "NewRequest",
1355                        error = %e,
1356                        "Fact viewpoints validation failed"
1357                    );
1358                    return Err(ActorError::from(e));
1359                }
1360
1361                if let Err(e) = Self::check_signer_authorization(
1362                    ctx,
1363                    (*self.our_key).clone(),
1364                    signer.clone(),
1365                    &governance_subject_id,
1366                    &event_request_type,
1367                    subject_data.clone(),
1368                )
1369                .await
1370                {
1371                    error!(
1372                        msg_type = "NewRequest",
1373                        governance_id = %governance_subject_id,
1374                        error = %e,
1375                        "Signature check failed"
1376                    );
1377                    return Err(e);
1378                }
1379
1380                if let Err(e) = Self::check_creation(
1381                    ctx,
1382                    subject_data,
1383                    &event_request_type,
1384                    signer,
1385                )
1386                .await
1387                {
1388                    error!(
1389                        msg_type = "NewRequest",
1390                        error = %e,
1391                        "Creation check failed"
1392                    );
1393                    return Err(e);
1394                }
1395
1396                let (request_id, subject_id) =
1397                    match Self::build_request_id_subject_id(hash, &request) {
1398                        Ok(ids) => ids,
1399                        Err(e) => {
1400                            error!(
1401                                msg_type = "NewRequest",
1402                                error = %e,
1403                                "Failed to build request ID and subject ID"
1404                            );
1405                            return Err(ActorError::from(e));
1406                        }
1407                    };
1408
1409                if let Err(e) = self
1410                    .handle_queue_request(
1411                        ctx,
1412                        request,
1413                        &request_id,
1414                        &subject_id,
1415                        is_gov,
1416                        governance_id,
1417                    )
1418                    .await
1419                {
1420                    error!(
1421                        msg_type = "NewRequest",
1422                        request_id = %request_id,
1423                        subject_id = %subject_id,
1424                        error = %e,
1425                        "Failed to handle queue request"
1426                    );
1427                    return Err(e);
1428                }
1429
1430                Ok(RequestHandlerResponse::Ok(RequestData {
1431                    request_id,
1432                    subject_id,
1433                }))
1434            }
1435            RequestHandlerMessage::PopQueue { subject_id } => {
1436                let (event, request_id) = if let Some(events) =
1437                    self.in_queue.get(&subject_id)
1438                {
1439                    if let Some((event, request_id)) =
1440                        events.clone().pop_front()
1441                    {
1442                        (event, request_id)
1443                    } else {
1444                        if let Err(e) = Self::end_child(ctx, &subject_id).await
1445                        {
1446                            error!(
1447                                msg_type = "PopQueue",
1448                                subject_id = %subject_id,
1449                                error = %e,
1450                                "Failed to end child actor when queue is empty"
1451                            );
1452                            ctx.system().crash_system();
1453                            return Err(e);
1454                        }
1455                        return Ok(RequestHandlerResponse::None);
1456                    }
1457                } else {
1458                    if let Err(e) = Self::end_child(ctx, &subject_id).await {
1459                        error!(
1460                            msg_type = "PopQueue",
1461                            subject_id = %subject_id,
1462                            error = %e,
1463                            "Failed to end child actor when no events available"
1464                        );
1465                        ctx.system().crash_system();
1466                        return Err(e);
1467                    }
1468                    return Ok(RequestHandlerResponse::None);
1469                };
1470
1471                let is_gov = match Self::check_in_queue(
1472                    ctx,
1473                    &event,
1474                    (*self.our_key).clone(),
1475                )
1476                .await
1477                {
1478                    Ok(is_gov) => is_gov,
1479                    Err(e) => {
1480                        if let Err(e) = self
1481                            .error_queue_handling(
1482                                ctx,
1483                                e.to_string(),
1484                                &subject_id,
1485                                &request_id,
1486                            )
1487                            .await
1488                        {
1489                            error!(
1490                                msg_type = "PopQueue",
1491                                subject_id = %subject_id,
1492                                request_id = %request_id,
1493                                error = %e,
1494                                "Failed to handle queue error"
1495                            );
1496                            ctx.system().crash_system();
1497                            return Err(e);
1498                        };
1499
1500                        return Ok(RequestHandlerResponse::None);
1501                    }
1502                };
1503
1504                if let Err(e) = self
1505                    .in_queue_to_handling(ctx, event, &request_id, is_gov)
1506                    .await
1507                {
1508                    error!(
1509                        msg_type = "PopQueue",
1510                        request_id = %request_id,
1511                        error = %e,
1512                        "Failed to transition from queue to handling"
1513                    );
1514                    ctx.system().crash_system();
1515                    return Err(e);
1516                }
1517
1518                Ok(RequestHandlerResponse::None)
1519            }
1520            RequestHandlerMessage::EndHandling { subject_id } => {
1521                self.on_event(
1522                    RequestHandlerEvent::FinishHandling {
1523                        subject_id: subject_id.clone(),
1524                    },
1525                    ctx,
1526                )
1527                .await;
1528
1529                if let Err(e) = Self::queued_event(ctx, &subject_id).await {
1530                    error!(
1531                        msg_type = "EndHandling",
1532                        subject_id = %subject_id,
1533                        error = %e,
1534                        "Failed to enqueue next event"
1535                    );
1536                    ctx.system().crash_system();
1537                    return Err(e);
1538                }
1539
1540                Ok(RequestHandlerResponse::None)
1541            }
1542        }
1543    }
1544
1545    async fn on_child_fault(
1546        &mut self,
1547        error: ActorError,
1548        ctx: &mut ActorContext<Self>,
1549    ) -> ChildAction {
1550        error!(
1551            error = %error,
1552            "Child fault in request handler"
1553        );
1554        ctx.system().crash_system();
1555        ChildAction::Stop
1556    }
1557
1558    async fn on_event(
1559        &mut self,
1560        event: RequestHandlerEvent,
1561        ctx: &mut ActorContext<Self>,
1562    ) {
1563        if let Err(e) = self.persist(&event, ctx).await {
1564            error!(
1565                error = %e,
1566                "Failed to persist event"
1567            );
1568            ctx.system().crash_system();
1569        };
1570    }
1571}
1572
1573#[async_trait]
1574impl Storable for RequestHandler {}
1575
1576#[async_trait]
1577impl PersistentActor for RequestHandler {
1578    type Persistence = LightPersistence;
1579    type InitParams = (Arc<PublicKey>, (HashAlgorithm, Arc<NetworkSender>));
1580
1581    fn update(&mut self, state: Self) {
1582        self.in_queue = state.in_queue;
1583        self.handling = state.handling;
1584    }
1585
1586    fn create_initial(params: Self::InitParams) -> Self {
1587        Self {
1588            our_key: params.0,
1589            helpers: Some(params.1),
1590            handling: HashMap::new(),
1591            in_queue: HashMap::new(),
1592        }
1593    }
1594
1595    /// Change node state.
1596    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1597        match event {
1598            RequestHandlerEvent::EventToQueue {
1599                subject_id,
1600                event,
1601                request_id,
1602            } => {
1603                self.in_queue
1604                    .entry(subject_id.clone())
1605                    .or_default()
1606                    .push_back((event.clone(), request_id.clone()));
1607            }
1608            RequestHandlerEvent::Invalid { subject_id } => {
1609                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1610                    vec.pop_front();
1611                    if vec.is_empty() {
1612                        self.in_queue.remove(subject_id);
1613                    }
1614                }
1615            }
1616            RequestHandlerEvent::EventToHandling {
1617                subject_id,
1618                request_id,
1619            } => {
1620                self.handling.insert(subject_id.clone(), request_id.clone());
1621                if let Some(vec) = self.in_queue.get_mut(subject_id) {
1622                    vec.pop_front();
1623                    if vec.is_empty() {
1624                        self.in_queue.remove(subject_id);
1625                    }
1626                }
1627            }
1628            RequestHandlerEvent::FinishHandling { subject_id } => {
1629                self.handling.remove(subject_id);
1630            }
1631            RequestHandlerEvent::PurgeSubject { subject_id } => {
1632                self.handling.remove(subject_id);
1633                self.in_queue.remove(subject_id);
1634            }
1635        };
1636
1637        Ok(())
1638    }
1639}