Skip to main content

ave_core/
lib.rs

1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod model;
15pub mod node;
16pub mod request;
17pub mod subject;
18pub mod system;
19pub mod tracker;
20pub mod update;
21pub mod validation;
22
23use std::collections::HashSet;
24use std::str::FromStr;
25use std::sync::Arc;
26
27use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
28use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
29use ave_common::bridge::request::{
30    AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType, EventsQuery,
31};
32use ave_common::identity::keys::KeyPair;
33use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
34use ave_common::request::EventRequest;
35use ave_common::response::{
36    GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
37    RequestInfo, RequestInfoExtend, RequestsInManager,
38    RequestsInManagerSubject, SubjectDB, SubjsData,
39};
40use config::Config as AveBaseConfig;
41use error::Error;
42use helpers::network::*;
43use intermediary::Intermediary;
44use manual_distribution::{ManualDistribution, ManualDistributionMessage};
45use network::{
46    MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
47};
48
49use node::{Node, NodeMessage, NodeResponse, TransferSubject};
50use prometheus_client::registry::Registry;
51use request::{
52    RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
53};
54use system::system;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use tracing::{debug, error, warn};
58use validation::{Validation, ValidationMessage};
59
60use crate::approval::request::ApprovalReq;
61use crate::config::SinkAuth;
62use crate::helpers::db::{
63    DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
64};
65use crate::model::common::node::SignTypesNode;
66use crate::node::InitParamsNode;
67use crate::request::tracking::{
68    RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
69};
70
71#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
72compile_error!("Select only one: 'sqlite' or 'rocksdb'");
73
74#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
75compile_error!("You must enable 'sqlite' or 'rocksdb'");
76
77#[cfg(not(feature = "ext-sqlite"))]
78compile_error!("You must enable 'ext-sqlite'");
79
80#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
81compile_error!(
82    "The 'test' feature should only be used during development/testing"
83);
84
85#[derive(Clone)]
86pub struct Api {
87    peer_id: String,
88    public_key: String,
89    db: Arc<ExternalDB>,
90    request: ActorRef<RequestHandler>,
91    node: ActorRef<Node>,
92    auth: ActorRef<Auth>,
93    monitor: ActorRef<Monitor>,
94    manual_dis: ActorRef<ManualDistribution>,
95    tracking: ActorRef<RequestTracking>,
96}
97
98fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
99where
100    F: FnOnce(ActorError) -> Error,
101{
102    match err {
103        ActorError::Functional { description } => {
104            Error::ActorError(description)
105        }
106        ActorError::FunctionalCritical { description } => {
107            Error::Internal(description)
108        }
109        ActorError::NotFound { path } => Error::MissingResource {
110            name: path.to_string(),
111            reason: "Actor not found".to_string(),
112        },
113        other => fallback(other),
114    }
115}
116
117fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
118    preserve_functional_actor_error(err, |_| Error::ActorCommunication {
119        actor: actor.to_string(),
120    })
121}
122
123impl Api {
124    /// Creates a new `Api`.
125    pub async fn build(
126        keys: KeyPair,
127        config: AveBaseConfig,
128        sink_auth: SinkAuth,
129        registry: &mut Registry,
130        password: &str,
131        graceful_token: CancellationToken,
132        crash_token: CancellationToken,
133    ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
134        debug!("Creating Api");
135
136        let (system, runner) = system(
137            config.clone(),
138            sink_auth,
139            password,
140            graceful_token.clone(),
141            crash_token.clone(),
142        )
143        .await
144        .map_err(|e| {
145            error!(error = %e, "Failed to create system");
146            e
147        })?;
148
149        let newtork_monitor = Monitor::default();
150        let newtork_monitor_actor = system
151            .create_root_actor("network_monitor", newtork_monitor)
152            .await
153            .map_err(|e| {
154                error!(error = %e, "Can not create network_monitor actor");
155                Error::ActorCreation {
156                    actor: "network_monitor".to_string(),
157                    reason: e.to_string(),
158                }
159            })?;
160
161        let spec = config.spec.map(MachineSpec::from);
162        let network_metrics = network::metrics::register(registry);
163
164        let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
165            &keys,
166            config.network.clone(),
167            Some(newtork_monitor_actor.clone()),
168            graceful_token.clone(),
169            crash_token.clone(),
170            spec,
171            Some(network_metrics),
172        )
173        .map_err(|e| {
174            error!(error = %e, "Can not create networt");
175            Error::Network(e.to_string())
176        })?;
177
178        // Create worker
179        let service = Intermediary::build(
180            worker.service().sender(),
181            system.clone(),
182            graceful_token.clone(),
183            crash_token.clone(),
184        );
185
186        let peer_id = worker.local_peer_id().to_string();
187
188        worker.add_helper_sender(service.sender());
189
190        system.add_helper("network", service.clone()).await;
191
192        let worker_runner = tokio::spawn(async move {
193            let _ = worker.run().await;
194        });
195
196        let public_key = Arc::new(keys.public_key());
197        let node_actor = system
198            .create_root_actor(
199                "node",
200                Node::initial(InitParamsNode {
201                    key_pair: keys.clone(),
202                    hash: config.hash_algorithm,
203                    is_service: config.is_service,
204                    public_key: public_key.clone(),
205                }),
206            )
207            .await
208            .map_err(|e| {
209                error!(error = %e, "Init system, can not create node actor");
210                Error::ActorCreation {
211                    actor: "node".to_string(),
212                    reason: e.to_string(),
213                }
214            })?;
215
216        let manual_dis_actor: ActorRef<ManualDistribution> = system
217            .get_actor(&ActorPath::from("/user/node/manual_distribution"))
218            .await
219            .map_err(|e| {
220                error!(error = %e, "Failed to get manual_distribution actor");
221                e
222            })?;
223
224        let auth_actor: ActorRef<Auth> = system
225            .get_actor(&ActorPath::from("/user/node/auth"))
226            .await
227            .map_err(|e| {
228                error!(error = %e, "Failed to get auth actor");
229                e
230            })?;
231
232        let request_actor = system
233            .create_root_actor(
234                "request",
235                RequestHandler::initial((
236                    public_key,
237                    (config.hash_algorithm, service),
238                )),
239            )
240            .await
241            .map_err(|e| {
242                error!(error = %e, "Init system, can not create request actor");
243                Error::ActorCreation {
244                    actor: "request".to_string(),
245                    reason: e.to_string(),
246                }
247            })?;
248
249        let tracking_actor: ActorRef<RequestTracking> = system
250            .get_actor(&ActorPath::from("/user/request/tracking"))
251            .await
252            .map_err(|e| {
253                error!(error = %e, "Failed to get tracking actor");
254                e
255            })?;
256
257        let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
258        else {
259            error!("External database helper not found");
260            return Err(Error::MissingResource {
261                name: "ext_db".to_string(),
262                reason: "External database helper not found".to_string(),
263            });
264        };
265
266        ext_db.register_prometheus_metrics(registry);
267
268        let tasks = Vec::from([runner, worker_runner]);
269
270        Ok((
271            Self {
272                public_key: keys.public_key().to_string(),
273                peer_id,
274                db: ext_db,
275                request: request_actor,
276                auth: auth_actor,
277                node: node_actor,
278                monitor: newtork_monitor_actor,
279                manual_dis: manual_dis_actor,
280                tracking: tracking_actor,
281            },
282            tasks,
283        ))
284    }
285
286    ///////// General
287    ////////////////////////////
288
289    pub fn peer_id(&self) -> &str {
290        &self.peer_id
291    }
292
293    pub fn public_key(&self) -> &str {
294        &self.public_key
295    }
296
297    ///////// Network
298    ////////////////////////////
299    pub async fn get_network_state(
300        &self,
301    ) -> Result<MonitorNetworkState, Error> {
302        let response =
303            self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
304                warn!(error = %e, "Unable to retrieve network state");
305                preserve_functional_actor_error(e, |e| {
306                    Error::NetworkState(e.to_string())
307                })
308            })?;
309
310        match response {
311            MonitorResponse::State(state) => Ok(state),
312            _ => {
313                warn!("Unexpected response from network monitor");
314                Err(Error::UnexpectedResponse {
315                    actor: "network_monitor".to_string(),
316                    expected: "State".to_string(),
317                    received: "other".to_string(),
318                })
319            }
320        }
321    }
322
323    ///////// Request
324    ////////////////////////////
325
326    pub async fn get_requests_in_manager(
327        &self,
328    ) -> Result<RequestsInManager, Error> {
329        let response = self
330            .request
331            .ask(RequestHandlerMessage::RequestInManager)
332            .await
333            .map_err(|e| {
334                warn!(error = %e, "Request processing failed");
335                actor_communication_error("request", e)
336            })?;
337
338        match response {
339            RequestHandlerResponse::RequestInManager(request) => Ok(request),
340            _ => {
341                warn!("Unexpected response from request handler");
342                Err(Error::UnexpectedResponse {
343                    actor: "request".to_string(),
344                    expected: "RequestInManager".to_string(),
345                    received: "other".to_string(),
346                })
347            }
348        }
349    }
350
351    pub async fn get_requests_in_manager_subject_id(
352        &self,
353        subject_id: DigestIdentifier,
354    ) -> Result<RequestsInManagerSubject, Error> {
355        let response = self
356            .request
357            .ask(RequestHandlerMessage::RequestInManagerSubjectId {
358                subject_id,
359            })
360            .await
361            .map_err(|e| {
362                warn!(error = %e, "Request processing failed");
363                actor_communication_error("request", e)
364            })?;
365
366        match response {
367            RequestHandlerResponse::RequestInManagerSubjectId(request) => {
368                Ok(request)
369            }
370            _ => {
371                warn!("Unexpected response from request handler");
372                Err(Error::UnexpectedResponse {
373                    actor: "request".to_string(),
374                    expected: "RequestInManagerSubjectId".to_string(),
375                    received: "other".to_string(),
376                })
377            }
378        }
379    }
380
381    pub async fn external_request(
382        &self,
383        request: Signed<EventRequest>,
384    ) -> Result<RequestData, Error> {
385        let response = self
386            .request
387            .ask(RequestHandlerMessage::NewRequest { request })
388            .await
389            .map_err(|e| {
390                warn!(error = %e, "Request processing failed");
391                actor_communication_error("request", e)
392            })?;
393
394        match response {
395            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
396            _ => {
397                warn!("Unexpected response from request handler");
398                Err(Error::UnexpectedResponse {
399                    actor: "request".to_string(),
400                    expected: "Ok".to_string(),
401                    received: "other".to_string(),
402                })
403            }
404        }
405    }
406
407    pub async fn own_request(
408        &self,
409        request: EventRequest,
410    ) -> Result<RequestData, Error> {
411        let response = self
412            .node
413            .ask(NodeMessage::SignRequest(SignTypesNode::EventRequest(
414                request.clone(),
415            )))
416            .await
417            .map_err(|e| {
418                warn!(error = %e, "Node was unable to sign the request");
419                preserve_functional_actor_error(e, |e| {
420                    Error::SigningFailed(e.to_string())
421                })
422            })?;
423
424        let signature = match response {
425            NodeResponse::SignRequest(signature) => signature,
426            _ => {
427                warn!("Unexpected response from node");
428                return Err(Error::UnexpectedResponse {
429                    actor: "node".to_string(),
430                    expected: "SignRequest".to_string(),
431                    received: "other".to_string(),
432                });
433            }
434        };
435
436        let signed_event_req = Signed::from_parts(request, signature);
437
438        let response = self
439            .request
440            .ask(RequestHandlerMessage::NewRequest {
441                request: signed_event_req,
442            })
443            .await
444            .map_err(|e| {
445                warn!(error = %e, "Failed to send request");
446                actor_communication_error("request", e)
447            })?;
448
449        match response {
450            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
451            _ => {
452                warn!("Unexpected response from request handler");
453                Err(Error::UnexpectedResponse {
454                    actor: "request".to_string(),
455                    expected: "Ok".to_string(),
456                    received: "other".to_string(),
457                })
458            }
459        }
460    }
461
462    pub async fn get_approval(
463        &self,
464        subject_id: DigestIdentifier,
465        state: Option<ApprovalState>,
466    ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
467        let response = self
468            .request
469            .ask(RequestHandlerMessage::GetApproval { state, subject_id })
470            .await
471            .map_err(|e| {
472                warn!(error = %e, "Failed to get approval request");
473                actor_communication_error("request", e)
474            })?;
475
476        match response {
477            RequestHandlerResponse::Approval(data) => Ok(data),
478            _ => {
479                warn!("Unexpected response from request handler");
480                Err(Error::UnexpectedResponse {
481                    actor: "request".to_string(),
482                    expected: "Approval".to_string(),
483                    received: "other".to_string(),
484                })
485            }
486        }
487    }
488
489    pub async fn get_approvals(
490        &self,
491        state: Option<ApprovalState>,
492    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
493        let response = self
494            .request
495            .ask(RequestHandlerMessage::GetAllApprovals { state })
496            .await
497            .map_err(|e| {
498                warn!(error = %e, "Failed to get approval requests");
499                actor_communication_error("request", e)
500            })?;
501
502        match response {
503            RequestHandlerResponse::Approvals(data) => Ok(data),
504            _ => {
505                warn!("Unexpected response from request handler");
506                Err(Error::UnexpectedResponse {
507                    actor: "request".to_string(),
508                    expected: "Approvals".to_string(),
509                    received: "other".to_string(),
510                })
511            }
512        }
513    }
514
515    pub async fn approve(
516        &self,
517        subject_id: DigestIdentifier,
518        state: ApprovalStateRes,
519    ) -> Result<String, Error> {
520        if state == ApprovalStateRes::Obsolete {
521            warn!("Cannot set approval state to Obsolete");
522            return Err(Error::InvalidApprovalState("Obsolete".to_string()));
523        }
524
525        let response = self
526            .request
527            .ask(RequestHandlerMessage::ChangeApprovalState {
528                subject_id,
529                state,
530            })
531            .await
532            .map_err(|e| {
533                warn!(error = %e, "Failed to change approval state");
534                preserve_functional_actor_error(e, |e| {
535                    Error::ApprovalUpdateFailed(e.to_string())
536                })
537            })?;
538
539        match response {
540            RequestHandlerResponse::Response(res) => Ok(res),
541            _ => {
542                warn!("Unexpected response from request handler");
543                Err(Error::UnexpectedResponse {
544                    actor: "request".to_string(),
545                    expected: "Response".to_string(),
546                    received: "other".to_string(),
547                })
548            }
549        }
550    }
551
552    pub async fn manual_request_abort(
553        &self,
554        subject_id: DigestIdentifier,
555    ) -> Result<String, Error> {
556        self.request
557            .tell(RequestHandlerMessage::AbortRequest { subject_id })
558            .await
559            .map_err(|e| {
560                warn!(error = %e, "Failed to abort request");
561                actor_communication_error("request", e)
562            })?;
563
564        Ok("Trying to abort".to_string())
565    }
566
567    ///////// Tracking
568    ////////////////////////////
569    pub async fn get_request_state(
570        &self,
571        request_id: DigestIdentifier,
572    ) -> Result<RequestInfo, Error> {
573        let response = self
574            .tracking
575            .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
576            .await
577            .map_err(|e| {
578                warn!(error = %e, "Failed to get request state");
579                actor_communication_error("tracking", e)
580            })?;
581
582        match response {
583            RequestTrackingResponse::Info(state) => Ok(state),
584            RequestTrackingResponse::NotFound => {
585                Err(Error::RequestNotFound(request_id.to_string()))
586            }
587            _ => {
588                warn!("Unexpected response from tracking");
589                Err(Error::UnexpectedResponse {
590                    actor: "tracking".to_string(),
591                    expected: "Info".to_string(),
592                    received: "other".to_string(),
593                })
594            }
595        }
596    }
597
598    pub async fn all_request_state(
599        &self,
600    ) -> Result<Vec<RequestInfoExtend>, Error> {
601        let response = self
602            .tracking
603            .ask(RequestTrackingMessage::AllRequests)
604            .await
605            .map_err(|e| {
606                warn!(error = %e, "Failed to get all request states");
607                actor_communication_error("tracking", e)
608            })?;
609
610        match response {
611            RequestTrackingResponse::AllInfo(state) => Ok(state),
612            _ => {
613                warn!("Unexpected response from tracking");
614                Err(Error::UnexpectedResponse {
615                    actor: "tracking".to_string(),
616                    expected: "AllInfo".to_string(),
617                    received: "other".to_string(),
618                })
619            }
620        }
621    }
622
623    ///////// Node
624    ////////////////////////////
625
626    pub async fn get_pending_transfers(
627        &self,
628    ) -> Result<Vec<TransferSubject>, Error> {
629        let response =
630            self.node.ask(NodeMessage::PendingTransfers).await.map_err(
631                |e| {
632                    warn!(error = %e, "Failed to get pending transfers");
633                    actor_communication_error("node", e)
634                },
635            )?;
636
637        let NodeResponse::PendingTransfers(pending) = response else {
638            warn!("Unexpected response from node");
639            return Err(Error::UnexpectedResponse {
640                actor: "node".to_string(),
641                expected: "PendingTransfers".to_string(),
642                received: "other".to_string(),
643            });
644        };
645
646        Ok(pending)
647    }
648
649    ///////// Auth
650    ////////////////////////////
651
652    pub async fn auth_subject(
653        &self,
654        subject_id: DigestIdentifier,
655        witnesses: AuthWitness,
656    ) -> Result<String, Error> {
657        self.auth
658            .tell(AuthMessage::NewAuth {
659                subject_id,
660                witness: witnesses,
661            })
662            .await
663            .map_err(|e| {
664                warn!(error = %e, "Authentication operation failed");
665                preserve_functional_actor_error(e, |e| {
666                    Error::AuthOperation(e.to_string())
667                })
668            })?;
669
670        Ok("Ok".to_owned())
671    }
672
673    pub async fn all_auth_subjects(
674        &self,
675    ) -> Result<Vec<DigestIdentifier>, Error> {
676        let response =
677            self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
678                error!(error = %e, "Failed to get auth subjects");
679                actor_communication_error("auth", e)
680            })?;
681
682        match response {
683            AuthResponse::Auths { subjects } => Ok(subjects),
684            _ => {
685                warn!("Unexpected response from auth");
686                Err(Error::UnexpectedResponse {
687                    actor: "auth".to_string(),
688                    expected: "Auths".to_string(),
689                    received: "other".to_string(),
690                })
691            }
692        }
693    }
694
695    pub async fn witnesses_subject(
696        &self,
697        subject_id: DigestIdentifier,
698    ) -> Result<HashSet<PublicKey>, Error> {
699        let response = self
700            .auth
701            .ask(AuthMessage::GetAuth {
702                subject_id: subject_id.clone(),
703            })
704            .await
705            .map_err(|e| {
706                warn!(error = %e, "Failed to get witnesses for subject");
707                actor_communication_error("auth", e)
708            })?;
709
710        match response {
711            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
712            _ => {
713                warn!("Unexpected response from auth");
714                Err(Error::UnexpectedResponse {
715                    actor: "auth".to_string(),
716                    expected: "Witnesses".to_string(),
717                    received: "other".to_string(),
718                })
719            }
720        }
721    }
722
723    pub async fn delete_auth_subject(
724        &self,
725        subject_id: DigestIdentifier,
726    ) -> Result<String, Error> {
727        self.auth
728            .tell(AuthMessage::DeleteAuth { subject_id })
729            .await
730            .map_err(|e| {
731                warn!(error = %e, "Failed to delete auth subject");
732                preserve_functional_actor_error(e, |e| {
733                    Error::AuthOperation(e.to_string())
734                })
735            })?;
736
737        Ok("Ok".to_owned())
738    }
739
740    pub async fn update_subject(
741        &self,
742        subject_id: DigestIdentifier,
743    ) -> Result<String, Error> {
744        let response = self
745            .auth
746            .ask(AuthMessage::Update {
747                subject_id: subject_id.clone(),
748                objective: None,
749            })
750            .await
751            .map_err(|e| {
752                warn!(error = %e, "Failed to update subject");
753                preserve_functional_actor_error(e, |e| {
754                    Error::UpdateFailed(subject_id.to_string(), e.to_string())
755                })
756            })?;
757
758        match response {
759            AuthResponse::None => Ok("Update in progress".to_owned()),
760            _ => {
761                warn!("Unexpected response from auth");
762                Err(Error::UnexpectedResponse {
763                    actor: "auth".to_string(),
764                    expected: "None".to_string(),
765                    received: "other".to_string(),
766                })
767            }
768        }
769    }
770
771    ///////// manual distribution
772    ////////////////////////////
773
774    pub async fn manual_distribution(
775        &self,
776        subject_id: DigestIdentifier,
777    ) -> Result<String, Error> {
778        self.manual_dis
779            .ask(ManualDistributionMessage::Update(subject_id.clone()))
780            .await
781            .map_err(|e| {
782                warn!(error = %e, "Manual distribution failed");
783                preserve_functional_actor_error(e, |_| {
784                    Error::DistributionFailed(subject_id.to_string())
785                })
786            })?;
787
788        Ok("Manual distribution in progress".to_owned())
789    }
790
791    ///////// Register
792    ////////////////////////////
793    pub async fn all_govs(
794        &self,
795        active: Option<bool>,
796    ) -> Result<Vec<GovsData>, Error> {
797        self.db.get_governances(active).await.map_err(|e| {
798            warn!(error = %e, "Failed to get governances");
799            Error::QueryFailed(e.to_string())
800        })
801    }
802
803    pub async fn all_subjs(
804        &self,
805        governance_id: DigestIdentifier,
806        active: Option<bool>,
807        schema_id: Option<String>,
808    ) -> Result<Vec<SubjsData>, Error> {
809        let governance_id = governance_id.to_string();
810        match self
811            .db
812            .get_subjects(&governance_id, active, schema_id)
813            .await
814        {
815            Ok(subjects) => Ok(subjects),
816            Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
817                Err(Error::GovernanceNotFound(governance_id))
818            }
819            Err(e) => {
820                warn!(error = %e, "Failed to get subjects");
821                Err(Error::QueryFailed(e.to_string()))
822            }
823        }
824    }
825
826    ///////// Query
827    ////////////////////////////
828    pub async fn get_events(
829        &self,
830        subject_id: DigestIdentifier,
831        query: EventsQuery,
832    ) -> Result<PaginatorEvents, Error> {
833        let subject_id_str = subject_id.to_string();
834
835        match self.db.get_events(&subject_id_str, query).await {
836            Ok(data) => Ok(data),
837            Err(ExternalDatabaseError::NoEvents(_)) => {
838                Err(Error::NoEventsFound(subject_id_str))
839            }
840            Err(e) => {
841                warn!(error = %e, "Failed to get events");
842                Err(Error::QueryFailed(e.to_string()))
843            }
844        }
845    }
846
847    pub async fn get_aborts(
848        &self,
849        subject_id: DigestIdentifier,
850        query: AbortsQuery,
851    ) -> Result<PaginatorAborts, Error> {
852        let subject_id_str = subject_id.to_string();
853        let request_id = if let Some(request_id) = query.request_id.as_ref() {
854            Some(
855                DigestIdentifier::from_str(request_id)
856                    .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
857                    .to_string(),
858            )
859        } else {
860            None
861        };
862        let query = AbortsQuery {
863            request_id,
864            sn: query.sn,
865            quantity: query.quantity,
866            page: query.page,
867            reverse: query.reverse,
868        };
869
870        self.db
871            .get_aborts(&subject_id_str, query)
872            .await
873            .map_err(|e| {
874                warn!(error = %e, "Failed to get aborts");
875                Error::QueryFailed(e.to_string())
876            })
877    }
878
879    pub async fn get_event_sn(
880        &self,
881        subject_id: DigestIdentifier,
882        sn: u64,
883    ) -> Result<LedgerDB, Error> {
884        let subject_id_str = subject_id.to_string();
885
886        match self.db.get_event_sn(&subject_id_str, sn).await {
887            Ok(data) => Ok(data),
888            Err(ExternalDatabaseError::EventNotFound { .. }) => {
889                Err(Error::EventNotFound {
890                    subject: subject_id_str,
891                    sn,
892                })
893            }
894            Err(e) => {
895                warn!(error = %e, "Failed to get event");
896                Err(Error::QueryFailed(e.to_string()))
897            }
898        }
899    }
900
901    pub async fn get_first_or_end_events(
902        &self,
903        subject_id: DigestIdentifier,
904        quantity: Option<u64>,
905        reverse: Option<bool>,
906        event_type: Option<EventRequestType>,
907    ) -> Result<Vec<LedgerDB>, Error> {
908        let subject_id_str = subject_id.to_string();
909
910        match self
911            .db
912            .get_first_or_end_events(
913                &subject_id_str,
914                quantity,
915                reverse,
916                event_type,
917            )
918            .await
919        {
920            Ok(data) => Ok(data),
921            Err(ExternalDatabaseError::NoEvents(_)) => {
922                Err(Error::NoEventsFound(subject_id_str))
923            }
924            Err(e) => {
925                warn!(error = %e, "Failed to get events");
926                Err(Error::QueryFailed(e.to_string()))
927            }
928        }
929    }
930
931    pub async fn get_subject_state(
932        &self,
933        subject_id: DigestIdentifier,
934    ) -> Result<SubjectDB, Error> {
935        let subject_id_str = subject_id.to_string();
936
937        match self.db.get_subject_state(&subject_id_str).await {
938            Ok(data) => Ok(data),
939            Err(ExternalDatabaseError::SubjectNotFound(_)) => {
940                Err(Error::SubjectNotFound(subject_id_str))
941            }
942            Err(e) => {
943                warn!(error = %e, "Failed to get subject state");
944                Err(Error::QueryFailed(e.to_string()))
945            }
946        }
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953    use ave_actors::{ActorError, ActorPath};
954
955    #[test]
956    fn preserves_functional_actor_errors() {
957        let error = preserve_functional_actor_error(
958            ActorError::Functional {
959                description: "Is not a Creator".to_string(),
960            },
961            |_| Error::ActorCommunication {
962                actor: "request".to_string(),
963            },
964        );
965
966        assert!(
967            matches!(error, Error::ActorError(message) if message == "Is not a Creator")
968        );
969    }
970
971    #[test]
972    fn preserves_not_found_actor_errors() {
973        let error = preserve_functional_actor_error(
974            ActorError::NotFound {
975                path: ActorPath::from("/user/request"),
976            },
977            |_| Error::ActorCommunication {
978                actor: "request".to_string(),
979            },
980        );
981
982        assert!(matches!(
983            error,
984            Error::MissingResource { name, .. } if name == "/user/request"
985        ));
986    }
987}