1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod model;
15pub mod node;
16pub mod request;
17pub mod subject;
18pub mod system;
19pub mod tracker;
20pub mod update;
21pub mod validation;
22
23use std::collections::HashSet;
24use std::str::FromStr;
25use std::sync::Arc;
26
27use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
28use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
29use ave_common::bridge::request::{
30 AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType, EventsQuery,
31};
32use ave_common::identity::keys::KeyPair;
33use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
34use ave_common::request::EventRequest;
35use ave_common::response::{
36 GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
37 RequestInfo, RequestInfoExtend, RequestsInManager,
38 RequestsInManagerSubject, SubjectDB, SubjsData,
39};
40use config::Config as AveBaseConfig;
41use error::Error;
42use helpers::network::*;
43use intermediary::Intermediary;
44use manual_distribution::{ManualDistribution, ManualDistributionMessage};
45use network::{
46 MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
47};
48
49use node::{Node, NodeMessage, NodeResponse, TransferSubject};
50use prometheus_client::registry::Registry;
51use request::{
52 RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
53};
54use system::system;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use tracing::{debug, error, warn};
58use validation::{Validation, ValidationMessage};
59
60use crate::approval::request::ApprovalReq;
61use crate::config::SinkAuth;
62use crate::helpers::db::{
63 DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
64};
65use crate::model::common::node::SignTypesNode;
66use crate::node::InitParamsNode;
67use crate::request::tracking::{
68 RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
69};
70
71#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
72compile_error!("Select only one: 'sqlite' or 'rocksdb'");
73
74#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
75compile_error!("You must enable 'sqlite' or 'rocksdb'");
76
77#[cfg(not(feature = "ext-sqlite"))]
78compile_error!("You must enable 'ext-sqlite'");
79
80#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
81compile_error!(
82 "The 'test' feature should only be used during development/testing"
83);
84
85#[derive(Clone)]
86pub struct Api {
87 peer_id: String,
88 public_key: String,
89 db: Arc<ExternalDB>,
90 request: ActorRef<RequestHandler>,
91 node: ActorRef<Node>,
92 auth: ActorRef<Auth>,
93 monitor: ActorRef<Monitor>,
94 manual_dis: ActorRef<ManualDistribution>,
95 tracking: ActorRef<RequestTracking>,
96}
97
98fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
99where
100 F: FnOnce(ActorError) -> Error,
101{
102 match err {
103 ActorError::Functional { description } => {
104 Error::ActorError(description)
105 }
106 ActorError::FunctionalCritical { description } => {
107 Error::Internal(description)
108 }
109 ActorError::NotFound { path } => Error::MissingResource {
110 name: path.to_string(),
111 reason: "Actor not found".to_string(),
112 },
113 other => fallback(other),
114 }
115}
116
117fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
118 preserve_functional_actor_error(err, |_| Error::ActorCommunication {
119 actor: actor.to_string(),
120 })
121}
122
123impl Api {
124 pub async fn build(
126 keys: KeyPair,
127 config: AveBaseConfig,
128 sink_auth: SinkAuth,
129 registry: &mut Registry,
130 password: &str,
131 graceful_token: CancellationToken,
132 crash_token: CancellationToken,
133 ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
134 debug!("Creating Api");
135
136 let (system, runner) = system(
137 config.clone(),
138 sink_auth,
139 password,
140 graceful_token.clone(),
141 crash_token.clone(),
142 )
143 .await
144 .map_err(|e| {
145 error!(error = %e, "Failed to create system");
146 e
147 })?;
148
149 let newtork_monitor = Monitor::default();
150 let newtork_monitor_actor = system
151 .create_root_actor("network_monitor", newtork_monitor)
152 .await
153 .map_err(|e| {
154 error!(error = %e, "Can not create network_monitor actor");
155 Error::ActorCreation {
156 actor: "network_monitor".to_string(),
157 reason: e.to_string(),
158 }
159 })?;
160
161 let spec = config.spec.map(MachineSpec::from);
162 let network_metrics = network::metrics::register(registry);
163
164 let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
165 &keys,
166 config.network.clone(),
167 Some(newtork_monitor_actor.clone()),
168 graceful_token.clone(),
169 crash_token.clone(),
170 spec,
171 Some(network_metrics),
172 )
173 .map_err(|e| {
174 error!(error = %e, "Can not create networt");
175 Error::Network(e.to_string())
176 })?;
177
178 let service = Intermediary::build(
180 worker.service().sender(),
181 system.clone(),
182 graceful_token.clone(),
183 crash_token.clone(),
184 );
185
186 let peer_id = worker.local_peer_id().to_string();
187
188 worker.add_helper_sender(service.sender());
189
190 system.add_helper("network", service.clone()).await;
191
192 let worker_runner = tokio::spawn(async move {
193 let _ = worker.run().await;
194 });
195
196 let public_key = Arc::new(keys.public_key());
197 let node_actor = system
198 .create_root_actor(
199 "node",
200 Node::initial(InitParamsNode {
201 key_pair: keys.clone(),
202 hash: config.hash_algorithm,
203 is_service: config.is_service,
204 public_key: public_key.clone(),
205 }),
206 )
207 .await
208 .map_err(|e| {
209 error!(error = %e, "Init system, can not create node actor");
210 Error::ActorCreation {
211 actor: "node".to_string(),
212 reason: e.to_string(),
213 }
214 })?;
215
216 let manual_dis_actor: ActorRef<ManualDistribution> = system
217 .get_actor(&ActorPath::from("/user/node/manual_distribution"))
218 .await
219 .map_err(|e| {
220 error!(error = %e, "Failed to get manual_distribution actor");
221 e
222 })?;
223
224 let auth_actor: ActorRef<Auth> = system
225 .get_actor(&ActorPath::from("/user/node/auth"))
226 .await
227 .map_err(|e| {
228 error!(error = %e, "Failed to get auth actor");
229 e
230 })?;
231
232 let request_actor = system
233 .create_root_actor(
234 "request",
235 RequestHandler::initial((
236 public_key,
237 (config.hash_algorithm, service),
238 )),
239 )
240 .await
241 .map_err(|e| {
242 error!(error = %e, "Init system, can not create request actor");
243 Error::ActorCreation {
244 actor: "request".to_string(),
245 reason: e.to_string(),
246 }
247 })?;
248
249 let tracking_actor: ActorRef<RequestTracking> = system
250 .get_actor(&ActorPath::from("/user/request/tracking"))
251 .await
252 .map_err(|e| {
253 error!(error = %e, "Failed to get tracking actor");
254 e
255 })?;
256
257 let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
258 else {
259 error!("External database helper not found");
260 return Err(Error::MissingResource {
261 name: "ext_db".to_string(),
262 reason: "External database helper not found".to_string(),
263 });
264 };
265
266 ext_db.register_prometheus_metrics(registry);
267
268 let tasks = Vec::from([runner, worker_runner]);
269
270 Ok((
271 Self {
272 public_key: keys.public_key().to_string(),
273 peer_id,
274 db: ext_db,
275 request: request_actor,
276 auth: auth_actor,
277 node: node_actor,
278 monitor: newtork_monitor_actor,
279 manual_dis: manual_dis_actor,
280 tracking: tracking_actor,
281 },
282 tasks,
283 ))
284 }
285
286 pub fn peer_id(&self) -> &str {
290 &self.peer_id
291 }
292
293 pub fn public_key(&self) -> &str {
294 &self.public_key
295 }
296
297 pub async fn get_network_state(
300 &self,
301 ) -> Result<MonitorNetworkState, Error> {
302 let response =
303 self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
304 warn!(error = %e, "Unable to retrieve network state");
305 preserve_functional_actor_error(e, |e| {
306 Error::NetworkState(e.to_string())
307 })
308 })?;
309
310 match response {
311 MonitorResponse::State(state) => Ok(state),
312 _ => {
313 warn!("Unexpected response from network monitor");
314 Err(Error::UnexpectedResponse {
315 actor: "network_monitor".to_string(),
316 expected: "State".to_string(),
317 received: "other".to_string(),
318 })
319 }
320 }
321 }
322
323 pub async fn get_requests_in_manager(
327 &self,
328 ) -> Result<RequestsInManager, Error> {
329 let response = self
330 .request
331 .ask(RequestHandlerMessage::RequestInManager)
332 .await
333 .map_err(|e| {
334 warn!(error = %e, "Request processing failed");
335 actor_communication_error("request", e)
336 })?;
337
338 match response {
339 RequestHandlerResponse::RequestInManager(request) => Ok(request),
340 _ => {
341 warn!("Unexpected response from request handler");
342 Err(Error::UnexpectedResponse {
343 actor: "request".to_string(),
344 expected: "RequestInManager".to_string(),
345 received: "other".to_string(),
346 })
347 }
348 }
349 }
350
351 pub async fn get_requests_in_manager_subject_id(
352 &self,
353 subject_id: DigestIdentifier,
354 ) -> Result<RequestsInManagerSubject, Error> {
355 let response = self
356 .request
357 .ask(RequestHandlerMessage::RequestInManagerSubjectId {
358 subject_id,
359 })
360 .await
361 .map_err(|e| {
362 warn!(error = %e, "Request processing failed");
363 actor_communication_error("request", e)
364 })?;
365
366 match response {
367 RequestHandlerResponse::RequestInManagerSubjectId(request) => {
368 Ok(request)
369 }
370 _ => {
371 warn!("Unexpected response from request handler");
372 Err(Error::UnexpectedResponse {
373 actor: "request".to_string(),
374 expected: "RequestInManagerSubjectId".to_string(),
375 received: "other".to_string(),
376 })
377 }
378 }
379 }
380
381 pub async fn external_request(
382 &self,
383 request: Signed<EventRequest>,
384 ) -> Result<RequestData, Error> {
385 let response = self
386 .request
387 .ask(RequestHandlerMessage::NewRequest { request })
388 .await
389 .map_err(|e| {
390 warn!(error = %e, "Request processing failed");
391 actor_communication_error("request", e)
392 })?;
393
394 match response {
395 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
396 _ => {
397 warn!("Unexpected response from request handler");
398 Err(Error::UnexpectedResponse {
399 actor: "request".to_string(),
400 expected: "Ok".to_string(),
401 received: "other".to_string(),
402 })
403 }
404 }
405 }
406
407 pub async fn own_request(
408 &self,
409 request: EventRequest,
410 ) -> Result<RequestData, Error> {
411 let response = self
412 .node
413 .ask(NodeMessage::SignRequest(SignTypesNode::EventRequest(
414 request.clone(),
415 )))
416 .await
417 .map_err(|e| {
418 warn!(error = %e, "Node was unable to sign the request");
419 preserve_functional_actor_error(e, |e| {
420 Error::SigningFailed(e.to_string())
421 })
422 })?;
423
424 let signature = match response {
425 NodeResponse::SignRequest(signature) => signature,
426 _ => {
427 warn!("Unexpected response from node");
428 return Err(Error::UnexpectedResponse {
429 actor: "node".to_string(),
430 expected: "SignRequest".to_string(),
431 received: "other".to_string(),
432 });
433 }
434 };
435
436 let signed_event_req = Signed::from_parts(request, signature);
437
438 let response = self
439 .request
440 .ask(RequestHandlerMessage::NewRequest {
441 request: signed_event_req,
442 })
443 .await
444 .map_err(|e| {
445 warn!(error = %e, "Failed to send request");
446 actor_communication_error("request", e)
447 })?;
448
449 match response {
450 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
451 _ => {
452 warn!("Unexpected response from request handler");
453 Err(Error::UnexpectedResponse {
454 actor: "request".to_string(),
455 expected: "Ok".to_string(),
456 received: "other".to_string(),
457 })
458 }
459 }
460 }
461
462 pub async fn get_approval(
463 &self,
464 subject_id: DigestIdentifier,
465 state: Option<ApprovalState>,
466 ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
467 let response = self
468 .request
469 .ask(RequestHandlerMessage::GetApproval { state, subject_id })
470 .await
471 .map_err(|e| {
472 warn!(error = %e, "Failed to get approval request");
473 actor_communication_error("request", e)
474 })?;
475
476 match response {
477 RequestHandlerResponse::Approval(data) => Ok(data),
478 _ => {
479 warn!("Unexpected response from request handler");
480 Err(Error::UnexpectedResponse {
481 actor: "request".to_string(),
482 expected: "Approval".to_string(),
483 received: "other".to_string(),
484 })
485 }
486 }
487 }
488
489 pub async fn get_approvals(
490 &self,
491 state: Option<ApprovalState>,
492 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
493 let response = self
494 .request
495 .ask(RequestHandlerMessage::GetAllApprovals { state })
496 .await
497 .map_err(|e| {
498 warn!(error = %e, "Failed to get approval requests");
499 actor_communication_error("request", e)
500 })?;
501
502 match response {
503 RequestHandlerResponse::Approvals(data) => Ok(data),
504 _ => {
505 warn!("Unexpected response from request handler");
506 Err(Error::UnexpectedResponse {
507 actor: "request".to_string(),
508 expected: "Approvals".to_string(),
509 received: "other".to_string(),
510 })
511 }
512 }
513 }
514
515 pub async fn approve(
516 &self,
517 subject_id: DigestIdentifier,
518 state: ApprovalStateRes,
519 ) -> Result<String, Error> {
520 if state == ApprovalStateRes::Obsolete {
521 warn!("Cannot set approval state to Obsolete");
522 return Err(Error::InvalidApprovalState("Obsolete".to_string()));
523 }
524
525 let response = self
526 .request
527 .ask(RequestHandlerMessage::ChangeApprovalState {
528 subject_id,
529 state,
530 })
531 .await
532 .map_err(|e| {
533 warn!(error = %e, "Failed to change approval state");
534 preserve_functional_actor_error(e, |e| {
535 Error::ApprovalUpdateFailed(e.to_string())
536 })
537 })?;
538
539 match response {
540 RequestHandlerResponse::Response(res) => Ok(res),
541 _ => {
542 warn!("Unexpected response from request handler");
543 Err(Error::UnexpectedResponse {
544 actor: "request".to_string(),
545 expected: "Response".to_string(),
546 received: "other".to_string(),
547 })
548 }
549 }
550 }
551
552 pub async fn manual_request_abort(
553 &self,
554 subject_id: DigestIdentifier,
555 ) -> Result<String, Error> {
556 self.request
557 .tell(RequestHandlerMessage::AbortRequest { subject_id })
558 .await
559 .map_err(|e| {
560 warn!(error = %e, "Failed to abort request");
561 actor_communication_error("request", e)
562 })?;
563
564 Ok("Trying to abort".to_string())
565 }
566
567 pub async fn get_request_state(
570 &self,
571 request_id: DigestIdentifier,
572 ) -> Result<RequestInfo, Error> {
573 let response = self
574 .tracking
575 .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
576 .await
577 .map_err(|e| {
578 warn!(error = %e, "Failed to get request state");
579 actor_communication_error("tracking", e)
580 })?;
581
582 match response {
583 RequestTrackingResponse::Info(state) => Ok(state),
584 RequestTrackingResponse::NotFound => {
585 Err(Error::RequestNotFound(request_id.to_string()))
586 }
587 _ => {
588 warn!("Unexpected response from tracking");
589 Err(Error::UnexpectedResponse {
590 actor: "tracking".to_string(),
591 expected: "Info".to_string(),
592 received: "other".to_string(),
593 })
594 }
595 }
596 }
597
598 pub async fn all_request_state(
599 &self,
600 ) -> Result<Vec<RequestInfoExtend>, Error> {
601 let response = self
602 .tracking
603 .ask(RequestTrackingMessage::AllRequests)
604 .await
605 .map_err(|e| {
606 warn!(error = %e, "Failed to get all request states");
607 actor_communication_error("tracking", e)
608 })?;
609
610 match response {
611 RequestTrackingResponse::AllInfo(state) => Ok(state),
612 _ => {
613 warn!("Unexpected response from tracking");
614 Err(Error::UnexpectedResponse {
615 actor: "tracking".to_string(),
616 expected: "AllInfo".to_string(),
617 received: "other".to_string(),
618 })
619 }
620 }
621 }
622
623 pub async fn get_pending_transfers(
627 &self,
628 ) -> Result<Vec<TransferSubject>, Error> {
629 let response =
630 self.node.ask(NodeMessage::PendingTransfers).await.map_err(
631 |e| {
632 warn!(error = %e, "Failed to get pending transfers");
633 actor_communication_error("node", e)
634 },
635 )?;
636
637 let NodeResponse::PendingTransfers(pending) = response else {
638 warn!("Unexpected response from node");
639 return Err(Error::UnexpectedResponse {
640 actor: "node".to_string(),
641 expected: "PendingTransfers".to_string(),
642 received: "other".to_string(),
643 });
644 };
645
646 Ok(pending)
647 }
648
649 pub async fn auth_subject(
653 &self,
654 subject_id: DigestIdentifier,
655 witnesses: AuthWitness,
656 ) -> Result<String, Error> {
657 self.auth
658 .tell(AuthMessage::NewAuth {
659 subject_id,
660 witness: witnesses,
661 })
662 .await
663 .map_err(|e| {
664 warn!(error = %e, "Authentication operation failed");
665 preserve_functional_actor_error(e, |e| {
666 Error::AuthOperation(e.to_string())
667 })
668 })?;
669
670 Ok("Ok".to_owned())
671 }
672
673 pub async fn all_auth_subjects(
674 &self,
675 ) -> Result<Vec<DigestIdentifier>, Error> {
676 let response =
677 self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
678 error!(error = %e, "Failed to get auth subjects");
679 actor_communication_error("auth", e)
680 })?;
681
682 match response {
683 AuthResponse::Auths { subjects } => Ok(subjects),
684 _ => {
685 warn!("Unexpected response from auth");
686 Err(Error::UnexpectedResponse {
687 actor: "auth".to_string(),
688 expected: "Auths".to_string(),
689 received: "other".to_string(),
690 })
691 }
692 }
693 }
694
695 pub async fn witnesses_subject(
696 &self,
697 subject_id: DigestIdentifier,
698 ) -> Result<HashSet<PublicKey>, Error> {
699 let response = self
700 .auth
701 .ask(AuthMessage::GetAuth {
702 subject_id: subject_id.clone(),
703 })
704 .await
705 .map_err(|e| {
706 warn!(error = %e, "Failed to get witnesses for subject");
707 actor_communication_error("auth", e)
708 })?;
709
710 match response {
711 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
712 _ => {
713 warn!("Unexpected response from auth");
714 Err(Error::UnexpectedResponse {
715 actor: "auth".to_string(),
716 expected: "Witnesses".to_string(),
717 received: "other".to_string(),
718 })
719 }
720 }
721 }
722
723 pub async fn delete_auth_subject(
724 &self,
725 subject_id: DigestIdentifier,
726 ) -> Result<String, Error> {
727 self.auth
728 .tell(AuthMessage::DeleteAuth { subject_id })
729 .await
730 .map_err(|e| {
731 warn!(error = %e, "Failed to delete auth subject");
732 preserve_functional_actor_error(e, |e| {
733 Error::AuthOperation(e.to_string())
734 })
735 })?;
736
737 Ok("Ok".to_owned())
738 }
739
740 pub async fn update_subject(
741 &self,
742 subject_id: DigestIdentifier,
743 ) -> Result<String, Error> {
744 let response = self
745 .auth
746 .ask(AuthMessage::Update {
747 subject_id: subject_id.clone(),
748 objective: None,
749 })
750 .await
751 .map_err(|e| {
752 warn!(error = %e, "Failed to update subject");
753 preserve_functional_actor_error(e, |e| {
754 Error::UpdateFailed(subject_id.to_string(), e.to_string())
755 })
756 })?;
757
758 match response {
759 AuthResponse::None => Ok("Update in progress".to_owned()),
760 _ => {
761 warn!("Unexpected response from auth");
762 Err(Error::UnexpectedResponse {
763 actor: "auth".to_string(),
764 expected: "None".to_string(),
765 received: "other".to_string(),
766 })
767 }
768 }
769 }
770
771 pub async fn manual_distribution(
775 &self,
776 subject_id: DigestIdentifier,
777 ) -> Result<String, Error> {
778 self.manual_dis
779 .ask(ManualDistributionMessage::Update(subject_id.clone()))
780 .await
781 .map_err(|e| {
782 warn!(error = %e, "Manual distribution failed");
783 preserve_functional_actor_error(e, |_| {
784 Error::DistributionFailed(subject_id.to_string())
785 })
786 })?;
787
788 Ok("Manual distribution in progress".to_owned())
789 }
790
791 pub async fn all_govs(
794 &self,
795 active: Option<bool>,
796 ) -> Result<Vec<GovsData>, Error> {
797 self.db.get_governances(active).await.map_err(|e| {
798 warn!(error = %e, "Failed to get governances");
799 Error::QueryFailed(e.to_string())
800 })
801 }
802
803 pub async fn all_subjs(
804 &self,
805 governance_id: DigestIdentifier,
806 active: Option<bool>,
807 schema_id: Option<String>,
808 ) -> Result<Vec<SubjsData>, Error> {
809 let governance_id = governance_id.to_string();
810 match self
811 .db
812 .get_subjects(&governance_id, active, schema_id)
813 .await
814 {
815 Ok(subjects) => Ok(subjects),
816 Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
817 Err(Error::GovernanceNotFound(governance_id))
818 }
819 Err(e) => {
820 warn!(error = %e, "Failed to get subjects");
821 Err(Error::QueryFailed(e.to_string()))
822 }
823 }
824 }
825
826 pub async fn get_events(
829 &self,
830 subject_id: DigestIdentifier,
831 query: EventsQuery,
832 ) -> Result<PaginatorEvents, Error> {
833 let subject_id_str = subject_id.to_string();
834
835 match self.db.get_events(&subject_id_str, query).await {
836 Ok(data) => Ok(data),
837 Err(ExternalDatabaseError::NoEvents(_)) => {
838 Err(Error::NoEventsFound(subject_id_str))
839 }
840 Err(e) => {
841 warn!(error = %e, "Failed to get events");
842 Err(Error::QueryFailed(e.to_string()))
843 }
844 }
845 }
846
847 pub async fn get_aborts(
848 &self,
849 subject_id: DigestIdentifier,
850 query: AbortsQuery,
851 ) -> Result<PaginatorAborts, Error> {
852 let subject_id_str = subject_id.to_string();
853 let request_id = if let Some(request_id) = query.request_id.as_ref() {
854 Some(
855 DigestIdentifier::from_str(request_id)
856 .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
857 .to_string(),
858 )
859 } else {
860 None
861 };
862 let query = AbortsQuery {
863 request_id,
864 sn: query.sn,
865 quantity: query.quantity,
866 page: query.page,
867 reverse: query.reverse,
868 };
869
870 self.db
871 .get_aborts(&subject_id_str, query)
872 .await
873 .map_err(|e| {
874 warn!(error = %e, "Failed to get aborts");
875 Error::QueryFailed(e.to_string())
876 })
877 }
878
879 pub async fn get_event_sn(
880 &self,
881 subject_id: DigestIdentifier,
882 sn: u64,
883 ) -> Result<LedgerDB, Error> {
884 let subject_id_str = subject_id.to_string();
885
886 match self.db.get_event_sn(&subject_id_str, sn).await {
887 Ok(data) => Ok(data),
888 Err(ExternalDatabaseError::EventNotFound { .. }) => {
889 Err(Error::EventNotFound {
890 subject: subject_id_str,
891 sn,
892 })
893 }
894 Err(e) => {
895 warn!(error = %e, "Failed to get event");
896 Err(Error::QueryFailed(e.to_string()))
897 }
898 }
899 }
900
901 pub async fn get_first_or_end_events(
902 &self,
903 subject_id: DigestIdentifier,
904 quantity: Option<u64>,
905 reverse: Option<bool>,
906 event_type: Option<EventRequestType>,
907 ) -> Result<Vec<LedgerDB>, Error> {
908 let subject_id_str = subject_id.to_string();
909
910 match self
911 .db
912 .get_first_or_end_events(
913 &subject_id_str,
914 quantity,
915 reverse,
916 event_type,
917 )
918 .await
919 {
920 Ok(data) => Ok(data),
921 Err(ExternalDatabaseError::NoEvents(_)) => {
922 Err(Error::NoEventsFound(subject_id_str))
923 }
924 Err(e) => {
925 warn!(error = %e, "Failed to get events");
926 Err(Error::QueryFailed(e.to_string()))
927 }
928 }
929 }
930
931 pub async fn get_subject_state(
932 &self,
933 subject_id: DigestIdentifier,
934 ) -> Result<SubjectDB, Error> {
935 let subject_id_str = subject_id.to_string();
936
937 match self.db.get_subject_state(&subject_id_str).await {
938 Ok(data) => Ok(data),
939 Err(ExternalDatabaseError::SubjectNotFound(_)) => {
940 Err(Error::SubjectNotFound(subject_id_str))
941 }
942 Err(e) => {
943 warn!(error = %e, "Failed to get subject state");
944 Err(Error::QueryFailed(e.to_string()))
945 }
946 }
947 }
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use ave_actors::{ActorError, ActorPath};
954
955 #[test]
956 fn preserves_functional_actor_errors() {
957 let error = preserve_functional_actor_error(
958 ActorError::Functional {
959 description: "Is not a Creator".to_string(),
960 },
961 |_| Error::ActorCommunication {
962 actor: "request".to_string(),
963 },
964 );
965
966 assert!(
967 matches!(error, Error::ActorError(message) if message == "Is not a Creator")
968 );
969 }
970
971 #[test]
972 fn preserves_not_found_actor_errors() {
973 let error = preserve_functional_actor_error(
974 ActorError::NotFound {
975 path: ActorPath::from("/user/request"),
976 },
977 |_| Error::ActorCommunication {
978 actor: "request".to_string(),
979 },
980 );
981
982 assert!(matches!(
983 error,
984 Error::MissingResource { name, .. } if name == "/user/request"
985 ));
986 }
987}