Skip to main content

ave_core/
lib.rs

1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod metrics;
15pub mod model;
16pub mod node;
17pub mod request;
18pub mod subject;
19pub mod system;
20pub mod tracker;
21pub mod update;
22pub mod validation;
23
24use std::collections::HashSet;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
29use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
30use ave_common::bridge::request::{
31    AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType,
32    EventsQuery, SinkEventsQuery,
33};
34use ave_common::identity::keys::KeyPair;
35use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
36use ave_common::request::EventRequest;
37use ave_common::response::{
38    GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
39    RequestInfo, RequestInfoExtend, RequestsInManager,
40    RequestsInManagerSubject, SinkEventsPage, SubjectDB, SubjsData,
41};
42use config::Config as AveBaseConfig;
43use error::Error;
44use helpers::network::*;
45use intermediary::Intermediary;
46use manual_distribution::{ManualDistribution, ManualDistributionMessage};
47use network::{
48    MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
49};
50
51use node::{Node, NodeMessage, NodeResponse, TransferSubject};
52use prometheus_client::registry::Registry;
53use request::{
54    RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
55};
56use system::system;
57use tokio::task::JoinHandle;
58use tokio_util::sync::CancellationToken;
59use tracing::{debug, error, info, warn};
60use validation::{Validation, ValidationMessage};
61
62use crate::approval::request::ApprovalReq;
63use crate::config::SinkAuth;
64use crate::helpers::db::{
65    DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
66};
67use crate::model::common::node::SignTypesNode;
68use crate::node::InitParamsNode;
69use crate::node::subject_manager::{
70    SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
71};
72use crate::request::tracking::{
73    RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
74};
75
76#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
77compile_error!("Select only one: 'sqlite' or 'rocksdb'");
78
79#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
80compile_error!("You must enable 'sqlite' or 'rocksdb'");
81
82#[cfg(not(feature = "ext-sqlite"))]
83compile_error!("You must enable 'ext-sqlite'");
84
85#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
86compile_error!(
87    "The 'test' feature should only be used during development/testing"
88);
89
90#[derive(Clone)]
91pub struct Api {
92    peer_id: String,
93    public_key: String,
94    safe_mode: bool,
95    deleting_subject: Arc<tokio::sync::Mutex<Option<DigestIdentifier>>>,
96    db: Arc<ExternalDB>,
97    request: ActorRef<RequestHandler>,
98    node: ActorRef<Node>,
99    subject_manager: ActorRef<SubjectManager>,
100    auth: ActorRef<Auth>,
101    monitor: ActorRef<Monitor>,
102    manual_dis: Option<ActorRef<ManualDistribution>>,
103    tracking: Option<ActorRef<RequestTracking>>,
104}
105
106fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
107where
108    F: FnOnce(ActorError) -> Error,
109{
110    match err {
111        ActorError::Functional { description } => {
112            Error::ActorError(description)
113        }
114        ActorError::FunctionalCritical { description } => {
115            Error::Internal(description)
116        }
117        ActorError::NotFound { path } => Error::MissingResource {
118            name: path.to_string(),
119            reason: "Actor not found".to_string(),
120        },
121        other => fallback(other),
122    }
123}
124
125fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
126    preserve_functional_actor_error(err, |_| Error::ActorCommunication {
127        actor: actor.to_string(),
128    })
129}
130
131fn safe_mode_error() -> Error {
132    Error::SafeMode(
133        "node is running in safe mode; mutating operations are disabled"
134            .to_string(),
135    )
136}
137
138fn safe_mode_required_error(operation: &'static str) -> Error {
139    Error::SafeMode(format!(
140        "{operation} is only available while node is running in safe mode"
141    ))
142}
143
144impl Api {
145    async fn begin_subject_deletion(
146        &self,
147        subject_id: &DigestIdentifier,
148    ) -> Result<(), Error> {
149        {
150            let mut deleting = self.deleting_subject.lock().await;
151            if let Some(active_subject_id) = deleting.as_ref() {
152                return Err(Error::InvalidRequestState(format!(
153                    "subject deletion already in progress for '{}'",
154                    active_subject_id
155                )));
156            }
157            *deleting = Some(subject_id.clone());
158        }
159        Ok(())
160    }
161
162    async fn end_subject_deletion(&self, subject_id: &DigestIdentifier) {
163        let mut deleting = self.deleting_subject.lock().await;
164        if deleting.as_ref() == Some(subject_id) {
165            *deleting = None;
166        }
167    }
168
169    fn ensure_mutations_allowed(&self) -> Result<(), Error> {
170        if self.safe_mode {
171            return Err(safe_mode_error());
172        }
173        Ok(())
174    }
175
176    fn ensure_safe_mode_required(
177        &self,
178        operation: &'static str,
179    ) -> Result<(), Error> {
180        if !self.safe_mode {
181            return Err(safe_mode_required_error(operation));
182        }
183        Ok(())
184    }
185
186    async fn subject_data(
187        &self,
188        subject_id: &DigestIdentifier,
189    ) -> Result<node::SubjectData, Error> {
190        let response = self
191            .node
192            .ask(NodeMessage::GetSubjectData(subject_id.clone()))
193            .await
194            .map_err(|e| actor_communication_error("node", e))?;
195
196        let NodeResponse::SubjectData(subject_data) = response else {
197            return Err(Error::UnexpectedResponse {
198                actor: "node".to_string(),
199                expected: "NodeResponse::SubjectData".to_string(),
200                received: "other".to_string(),
201            });
202        };
203
204        subject_data
205            .ok_or_else(|| Error::SubjectNotFound(subject_id.to_string()))
206    }
207
208    async fn governance_trackers(
209        &self,
210        governance_id: &DigestIdentifier,
211    ) -> Result<Vec<DigestIdentifier>, Error> {
212        let response = self
213            .node
214            .ask(NodeMessage::GovernanceTrackers(governance_id.clone()))
215            .await
216            .map_err(|e| actor_communication_error("node", e))?;
217
218        let NodeResponse::GovernanceTrackers(trackers) = response else {
219            return Err(Error::UnexpectedResponse {
220                actor: "node".to_string(),
221                expected: "NodeResponse::GovernanceTrackers".to_string(),
222                received: "other".to_string(),
223            });
224        };
225
226        Ok(trackers)
227    }
228
229    async fn purge_common_subject_state(
230        &self,
231        subject_id: &DigestIdentifier,
232        cleanup_errors: &mut Vec<String>,
233    ) {
234        match self
235            .request
236            .ask(RequestHandlerMessage::PurgeSubject {
237                subject_id: subject_id.clone(),
238            })
239            .await
240        {
241            Ok(RequestHandlerResponse::None) => {}
242            Ok(other) => cleanup_errors
243                .push(format!("request: unexpected response {other:?}")),
244            Err(err) => cleanup_errors.push(format!("request: {err}")),
245        }
246
247        match self
248            .auth
249            .ask(AuthMessage::DeleteAuth {
250                subject_id: subject_id.clone(),
251            })
252            .await
253        {
254            Ok(AuthResponse::None) => {}
255            Ok(other) => cleanup_errors
256                .push(format!("auth: unexpected response {other:?}")),
257            Err(err) => cleanup_errors.push(format!("auth: {err}")),
258        }
259
260        if let Err(err) = self.db.delete_subject(&subject_id.to_string()).await
261        {
262            cleanup_errors.push(format!("external_db: {err}"));
263        }
264    }
265
266    async fn delete_subject_from_node(
267        &self,
268        subject_id: &DigestIdentifier,
269        cleanup_errors: &mut Vec<String>,
270    ) {
271        match self
272            .node
273            .ask(NodeMessage::DeleteSubject(subject_id.clone()))
274            .await
275        {
276            Ok(NodeResponse::Ok) => {}
277            Ok(other) => cleanup_errors
278                .push(format!("node: unexpected response {other:?}")),
279            Err(err) => cleanup_errors.push(format!("node: {err}")),
280        }
281    }
282
283    /// Creates a new `Api`.
284    pub async fn build(
285        keys: KeyPair,
286        mut config: AveBaseConfig,
287        sink_auth: SinkAuth,
288        registry: &mut Registry,
289        password: &str,
290        graceful_token: CancellationToken,
291        crash_token: CancellationToken,
292    ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
293        debug!("Creating Api");
294
295        let (system, runner) = system(
296            config.clone(),
297            sink_auth,
298            password,
299            graceful_token.clone(),
300            crash_token.clone(),
301        )
302        .await
303        .map_err(|e| {
304            error!(error = %e, "Failed to create system");
305            e
306        })?;
307
308        config.network.safe_mode = config.safe_mode;
309
310        let newtork_monitor = Monitor::default();
311        let newtork_monitor_actor = system
312            .create_root_actor("network_monitor", newtork_monitor)
313            .await
314            .map_err(|e| {
315                error!(error = %e, "Can not create network_monitor actor");
316                Error::ActorCreation {
317                    actor: "network_monitor".to_string(),
318                    reason: e.to_string(),
319                }
320            })?;
321
322        let spec = config.spec.map(MachineSpec::from);
323        let network_metrics = network::metrics::register(registry);
324        crate::metrics::register(registry);
325
326        let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
327            &keys,
328            config.network.clone(),
329            Some(newtork_monitor_actor.clone()),
330            graceful_token.clone(),
331            crash_token.clone(),
332            spec,
333            Some(network_metrics),
334        )
335        .map_err(|e| {
336            error!(error = %e, "Can not create networt");
337            Error::Network(e.to_string())
338        })?;
339
340        // Create worker
341        let service = Intermediary::build(
342            worker.service().sender(),
343            system.clone(),
344            graceful_token.clone(),
345            crash_token.clone(),
346        );
347
348        let peer_id = worker.local_peer_id().to_string();
349
350        worker.add_helper_sender(service.sender());
351
352        system.add_helper("network", service.clone()).await;
353
354        let worker_runner = tokio::spawn(async move {
355            let _ = worker.run().await;
356        });
357
358        let public_key = Arc::new(keys.public_key());
359        let node_actor = system
360            .create_root_actor(
361                "node",
362                Node::initial(InitParamsNode {
363                    key_pair: keys.clone(),
364                    hash: config.hash_algorithm,
365                    is_service: config.is_service,
366                    public_key: public_key.clone(),
367                }),
368            )
369            .await
370            .map_err(|e| {
371                error!(error = %e, "Init system, can not create node actor");
372                Error::ActorCreation {
373                    actor: "node".to_string(),
374                    reason: e.to_string(),
375                }
376            })?;
377
378        let manual_dis_actor = if config.safe_mode {
379            None
380        } else {
381            Some(
382                system
383                    .get_actor(&ActorPath::from(
384                        "/user/node/manual_distribution",
385                    ))
386                    .await
387                    .map_err(|e| {
388                        error!(
389                            error = %e,
390                            "Failed to get manual_distribution actor"
391                        );
392                        e
393                    })?,
394            )
395        };
396
397        let auth_actor: ActorRef<Auth> = system
398            .get_actor(&ActorPath::from("/user/node/auth"))
399            .await
400            .map_err(|e| {
401                error!(error = %e, "Failed to get auth actor");
402                e
403            })?;
404
405        let subject_manager_actor: ActorRef<SubjectManager> = system
406            .get_actor(&ActorPath::from("/user/node/subject_manager"))
407            .await
408            .map_err(|e| {
409                error!(error = %e, "Failed to get subject_manager actor");
410                e
411            })?;
412
413        let request_actor = system
414            .create_root_actor(
415                "request",
416                RequestHandler::initial((
417                    public_key,
418                    (config.hash_algorithm, service),
419                )),
420            )
421            .await
422            .map_err(|e| {
423                error!(error = %e, "Init system, can not create request actor");
424                Error::ActorCreation {
425                    actor: "request".to_string(),
426                    reason: e.to_string(),
427                }
428            })?;
429
430        let tracking_actor = if config.safe_mode {
431            None
432        } else {
433            Some(
434                system
435                    .get_actor(&ActorPath::from("/user/request/tracking"))
436                    .await
437                    .map_err(|e| {
438                        error!(error = %e, "Failed to get tracking actor");
439                        e
440                    })?,
441            )
442        };
443
444        let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
445        else {
446            error!("External database helper not found");
447            return Err(Error::MissingResource {
448                name: "ext_db".to_string(),
449                reason: "External database helper not found".to_string(),
450            });
451        };
452
453        ext_db.register_prometheus_metrics(registry);
454
455        let tasks = Vec::from([runner, worker_runner]);
456
457        Ok((
458            Self {
459                public_key: keys.public_key().to_string(),
460                peer_id,
461                safe_mode: config.safe_mode,
462                deleting_subject: Arc::new(tokio::sync::Mutex::new(None)),
463                db: ext_db,
464                request: request_actor,
465                auth: auth_actor,
466                node: node_actor,
467                subject_manager: subject_manager_actor,
468                monitor: newtork_monitor_actor,
469                manual_dis: manual_dis_actor,
470                tracking: tracking_actor,
471            },
472            tasks,
473        ))
474    }
475
476    ///////// General
477    ////////////////////////////
478
479    pub fn peer_id(&self) -> &str {
480        &self.peer_id
481    }
482
483    pub fn public_key(&self) -> &str {
484        &self.public_key
485    }
486
487    ///////// Network
488    ////////////////////////////
489    pub async fn get_network_state(
490        &self,
491    ) -> Result<MonitorNetworkState, Error> {
492        let response =
493            self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
494                warn!(error = %e, "Unable to retrieve network state");
495                preserve_functional_actor_error(e, |e| {
496                    Error::NetworkState(e.to_string())
497                })
498            })?;
499
500        match response {
501            MonitorResponse::State(state) => Ok(state),
502            _ => {
503                warn!("Unexpected response from network monitor");
504                Err(Error::UnexpectedResponse {
505                    actor: "network_monitor".to_string(),
506                    expected: "State".to_string(),
507                    received: "other".to_string(),
508                })
509            }
510        }
511    }
512
513    ///////// Request
514    ////////////////////////////
515
516    pub async fn get_requests_in_manager(
517        &self,
518    ) -> Result<RequestsInManager, Error> {
519        let response = self
520            .request
521            .ask(RequestHandlerMessage::RequestInManager)
522            .await
523            .map_err(|e| {
524                warn!(error = %e, "Request processing failed");
525                actor_communication_error("request", e)
526            })?;
527
528        match response {
529            RequestHandlerResponse::RequestInManager(request) => Ok(request),
530            _ => {
531                warn!("Unexpected response from request handler");
532                Err(Error::UnexpectedResponse {
533                    actor: "request".to_string(),
534                    expected: "RequestInManager".to_string(),
535                    received: "other".to_string(),
536                })
537            }
538        }
539    }
540
541    pub async fn get_requests_in_manager_subject_id(
542        &self,
543        subject_id: DigestIdentifier,
544    ) -> Result<RequestsInManagerSubject, Error> {
545        let response = self
546            .request
547            .ask(RequestHandlerMessage::RequestInManagerSubjectId {
548                subject_id,
549            })
550            .await
551            .map_err(|e| {
552                warn!(error = %e, "Request processing failed");
553                actor_communication_error("request", e)
554            })?;
555
556        match response {
557            RequestHandlerResponse::RequestInManagerSubjectId(request) => {
558                Ok(request)
559            }
560            _ => {
561                warn!("Unexpected response from request handler");
562                Err(Error::UnexpectedResponse {
563                    actor: "request".to_string(),
564                    expected: "RequestInManagerSubjectId".to_string(),
565                    received: "other".to_string(),
566                })
567            }
568        }
569    }
570
571    pub async fn external_request(
572        &self,
573        request: Signed<EventRequest>,
574    ) -> Result<RequestData, Error> {
575        self.ensure_mutations_allowed()?;
576        let response = self
577            .request
578            .ask(RequestHandlerMessage::NewRequest { request })
579            .await
580            .map_err(|e| {
581                warn!(error = %e, "Request processing failed");
582                actor_communication_error("request", e)
583            })?;
584
585        match response {
586            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
587            _ => {
588                warn!("Unexpected response from request handler");
589                Err(Error::UnexpectedResponse {
590                    actor: "request".to_string(),
591                    expected: "Ok".to_string(),
592                    received: "other".to_string(),
593                })
594            }
595        }
596    }
597
598    pub async fn own_request(
599        &self,
600        request: EventRequest,
601    ) -> Result<RequestData, Error> {
602        self.ensure_mutations_allowed()?;
603        let response = self
604            .node
605            .ask(NodeMessage::SignRequest(Box::new(
606                SignTypesNode::EventRequest(request.clone()),
607            )))
608            .await
609            .map_err(|e| {
610                warn!(error = %e, "Node was unable to sign the request");
611                preserve_functional_actor_error(e, |e| {
612                    Error::SigningFailed(e.to_string())
613                })
614            })?;
615
616        let signature = match response {
617            NodeResponse::SignRequest(signature) => signature,
618            _ => {
619                warn!("Unexpected response from node");
620                return Err(Error::UnexpectedResponse {
621                    actor: "node".to_string(),
622                    expected: "SignRequest".to_string(),
623                    received: "other".to_string(),
624                });
625            }
626        };
627
628        let signed_event_req = Signed::from_parts(request, signature);
629
630        let response = self
631            .request
632            .ask(RequestHandlerMessage::NewRequest {
633                request: signed_event_req,
634            })
635            .await
636            .map_err(|e| {
637                warn!(error = %e, "Failed to send request");
638                actor_communication_error("request", e)
639            })?;
640
641        match response {
642            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
643            _ => {
644                warn!("Unexpected response from request handler");
645                Err(Error::UnexpectedResponse {
646                    actor: "request".to_string(),
647                    expected: "Ok".to_string(),
648                    received: "other".to_string(),
649                })
650            }
651        }
652    }
653
654    pub async fn get_approval(
655        &self,
656        subject_id: DigestIdentifier,
657        state: Option<ApprovalState>,
658    ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
659        let response = self
660            .request
661            .ask(RequestHandlerMessage::GetApproval { state, subject_id })
662            .await
663            .map_err(|e| {
664                warn!(error = %e, "Failed to get approval request");
665                actor_communication_error("request", e)
666            })?;
667
668        match response {
669            RequestHandlerResponse::Approval(data) => Ok(data),
670            _ => {
671                warn!("Unexpected response from request handler");
672                Err(Error::UnexpectedResponse {
673                    actor: "request".to_string(),
674                    expected: "Approval".to_string(),
675                    received: "other".to_string(),
676                })
677            }
678        }
679    }
680
681    pub async fn get_approvals(
682        &self,
683        state: Option<ApprovalState>,
684    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
685        let response = self
686            .request
687            .ask(RequestHandlerMessage::GetAllApprovals { state })
688            .await
689            .map_err(|e| {
690                warn!(error = %e, "Failed to get approval requests");
691                actor_communication_error("request", e)
692            })?;
693
694        match response {
695            RequestHandlerResponse::Approvals(data) => Ok(data),
696            _ => {
697                warn!("Unexpected response from request handler");
698                Err(Error::UnexpectedResponse {
699                    actor: "request".to_string(),
700                    expected: "Approvals".to_string(),
701                    received: "other".to_string(),
702                })
703            }
704        }
705    }
706
707    pub async fn approve(
708        &self,
709        subject_id: DigestIdentifier,
710        state: ApprovalStateRes,
711    ) -> Result<String, Error> {
712        self.ensure_mutations_allowed()?;
713        if state == ApprovalStateRes::Obsolete {
714            warn!("Cannot set approval state to Obsolete");
715            return Err(Error::InvalidApprovalState("Obsolete".to_string()));
716        }
717
718        let response = self
719            .request
720            .ask(RequestHandlerMessage::ChangeApprovalState {
721                subject_id,
722                state,
723            })
724            .await
725            .map_err(|e| {
726                warn!(error = %e, "Failed to change approval state");
727                preserve_functional_actor_error(e, |e| {
728                    Error::ApprovalUpdateFailed(e.to_string())
729                })
730            })?;
731
732        match response {
733            RequestHandlerResponse::Response(res) => Ok(res),
734            _ => {
735                warn!("Unexpected response from request handler");
736                Err(Error::UnexpectedResponse {
737                    actor: "request".to_string(),
738                    expected: "Response".to_string(),
739                    received: "other".to_string(),
740                })
741            }
742        }
743    }
744
745    pub async fn manual_request_abort(
746        &self,
747        subject_id: DigestIdentifier,
748    ) -> Result<String, Error> {
749        self.ensure_mutations_allowed()?;
750        self.request
751            .tell(RequestHandlerMessage::AbortRequest { subject_id })
752            .await
753            .map_err(|e| {
754                warn!(error = %e, "Failed to abort request");
755                actor_communication_error("request", e)
756            })?;
757
758        Ok("Trying to abort".to_string())
759    }
760
761    ///////// Tracking
762    ////////////////////////////
763    pub async fn get_request_state(
764        &self,
765        request_id: DigestIdentifier,
766    ) -> Result<RequestInfo, Error> {
767        let Some(tracking) = &self.tracking else {
768            return Err(Error::SafeMode(
769                "request tracking is unavailable while node is running in safe mode"
770                    .to_string(),
771            ));
772        };
773        let response = tracking
774            .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
775            .await
776            .map_err(|e| {
777                warn!(error = %e, "Failed to get request state");
778                actor_communication_error("tracking", e)
779            })?;
780
781        match response {
782            RequestTrackingResponse::Info(state) => Ok(state),
783            RequestTrackingResponse::NotFound => {
784                Err(Error::RequestNotFound(request_id.to_string()))
785            }
786            _ => {
787                warn!("Unexpected response from tracking");
788                Err(Error::UnexpectedResponse {
789                    actor: "tracking".to_string(),
790                    expected: "Info".to_string(),
791                    received: "other".to_string(),
792                })
793            }
794        }
795    }
796
797    pub async fn all_request_state(
798        &self,
799    ) -> Result<Vec<RequestInfoExtend>, Error> {
800        let Some(tracking) = &self.tracking else {
801            return Err(Error::SafeMode(
802                "request tracking is unavailable while node is running in safe mode"
803                    .to_string(),
804            ));
805        };
806        let response = tracking
807            .ask(RequestTrackingMessage::AllRequests)
808            .await
809            .map_err(|e| {
810                warn!(error = %e, "Failed to get all request states");
811                actor_communication_error("tracking", e)
812            })?;
813
814        match response {
815            RequestTrackingResponse::AllInfo(state) => Ok(state),
816            _ => {
817                warn!("Unexpected response from tracking");
818                Err(Error::UnexpectedResponse {
819                    actor: "tracking".to_string(),
820                    expected: "AllInfo".to_string(),
821                    received: "other".to_string(),
822                })
823            }
824        }
825    }
826
827    ///////// Node
828    ////////////////////////////
829
830    pub async fn get_pending_transfers(
831        &self,
832    ) -> Result<Vec<TransferSubject>, Error> {
833        let response =
834            self.node.ask(NodeMessage::PendingTransfers).await.map_err(
835                |e| {
836                    warn!(error = %e, "Failed to get pending transfers");
837                    actor_communication_error("node", e)
838                },
839            )?;
840
841        let NodeResponse::PendingTransfers(pending) = response else {
842            warn!("Unexpected response from node");
843            return Err(Error::UnexpectedResponse {
844                actor: "node".to_string(),
845                expected: "PendingTransfers".to_string(),
846                received: "other".to_string(),
847            });
848        };
849
850        Ok(pending)
851    }
852
853    ///////// Auth
854    ////////////////////////////
855
856    pub async fn auth_subject(
857        &self,
858        subject_id: DigestIdentifier,
859        witnesses: AuthWitness,
860    ) -> Result<String, Error> {
861        self.ensure_mutations_allowed()?;
862        self.auth
863            .tell(AuthMessage::NewAuth {
864                subject_id,
865                witness: witnesses,
866            })
867            .await
868            .map_err(|e| {
869                warn!(error = %e, "Authentication operation failed");
870                preserve_functional_actor_error(e, |e| {
871                    Error::AuthOperation(e.to_string())
872                })
873            })?;
874
875        Ok("Ok".to_owned())
876    }
877
878    pub async fn all_auth_subjects(
879        &self,
880    ) -> Result<Vec<DigestIdentifier>, Error> {
881        let response =
882            self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
883                error!(error = %e, "Failed to get auth subjects");
884                actor_communication_error("auth", e)
885            })?;
886
887        match response {
888            AuthResponse::Auths { subjects } => Ok(subjects),
889            _ => {
890                warn!("Unexpected response from auth");
891                Err(Error::UnexpectedResponse {
892                    actor: "auth".to_string(),
893                    expected: "Auths".to_string(),
894                    received: "other".to_string(),
895                })
896            }
897        }
898    }
899
900    pub async fn witnesses_subject(
901        &self,
902        subject_id: DigestIdentifier,
903    ) -> Result<HashSet<PublicKey>, Error> {
904        let response = self
905            .auth
906            .ask(AuthMessage::GetAuth {
907                subject_id: subject_id.clone(),
908            })
909            .await
910            .map_err(|e| {
911                warn!(error = %e, "Failed to get witnesses for subject");
912                actor_communication_error("auth", e)
913            })?;
914
915        match response {
916            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
917            _ => {
918                warn!("Unexpected response from auth");
919                Err(Error::UnexpectedResponse {
920                    actor: "auth".to_string(),
921                    expected: "Witnesses".to_string(),
922                    received: "other".to_string(),
923                })
924            }
925        }
926    }
927
928    pub async fn delete_auth_subject(
929        &self,
930        subject_id: DigestIdentifier,
931    ) -> Result<String, Error> {
932        self.ensure_mutations_allowed()?;
933        self.auth
934            .tell(AuthMessage::DeleteAuth { subject_id })
935            .await
936            .map_err(|e| {
937                warn!(error = %e, "Failed to delete auth subject");
938                preserve_functional_actor_error(e, |e| {
939                    Error::AuthOperation(e.to_string())
940                })
941            })?;
942
943        Ok("Ok".to_owned())
944    }
945
946    pub async fn update_subject(
947        &self,
948        subject_id: DigestIdentifier,
949    ) -> Result<String, Error> {
950        self.ensure_mutations_allowed()?;
951        let response = self
952            .auth
953            .ask(AuthMessage::Update {
954                subject_id: subject_id.clone(),
955                objective: None,
956            })
957            .await
958            .map_err(|e| {
959                warn!(error = %e, "Failed to update subject");
960                preserve_functional_actor_error(e, |e| {
961                    Error::UpdateFailed(subject_id.to_string(), e.to_string())
962                })
963            })?;
964
965        match response {
966            AuthResponse::None => Ok("Update in progress".to_owned()),
967            _ => {
968                warn!("Unexpected response from auth");
969                Err(Error::UnexpectedResponse {
970                    actor: "auth".to_string(),
971                    expected: "None".to_string(),
972                    received: "other".to_string(),
973                })
974            }
975        }
976    }
977
978    ///////// manual distribution
979    ////////////////////////////
980
981    pub async fn manual_distribution(
982        &self,
983        subject_id: DigestIdentifier,
984    ) -> Result<String, Error> {
985        self.ensure_mutations_allowed()?;
986        let Some(manual_dis) = &self.manual_dis else {
987            return Err(safe_mode_error());
988        };
989        manual_dis
990            .ask(ManualDistributionMessage::Update(subject_id.clone()))
991            .await
992            .map_err(|e| {
993                warn!(error = %e, "Manual distribution failed");
994                preserve_functional_actor_error(e, |_| {
995                    Error::DistributionFailed(subject_id.to_string())
996                })
997            })?;
998
999        Ok("Manual distribution in progress".to_owned())
1000    }
1001
1002    pub async fn delete_subject(
1003        &self,
1004        subject_id: DigestIdentifier,
1005    ) -> Result<String, Error> {
1006        self.ensure_safe_mode_required("subject deletion")?;
1007        self.begin_subject_deletion(&subject_id).await?;
1008
1009        let result = async {
1010            let subject_data = self.subject_data(&subject_id).await?;
1011
1012            match subject_data {
1013                node::SubjectData::Governance { .. } => {
1014                    info!(
1015                        subject_id = %subject_id,
1016                        subject_type = "governance",
1017                        "Deleting subject"
1018                    );
1019                    let trackers =
1020                        self.governance_trackers(&subject_id).await?;
1021                    if !trackers.is_empty() {
1022                        return Err(Error::GovernanceHasTrackers {
1023                            governance_id: subject_id.to_string(),
1024                            trackers: trackers
1025                                .into_iter()
1026                                .map(|tracker| tracker.to_string())
1027                                .collect(),
1028                        });
1029                    }
1030                    let mut cleanup_errors = Vec::new();
1031
1032                    match self
1033                        .subject_manager
1034                        .ask(SubjectManagerMessage::DeleteGovernance {
1035                            subject_id: subject_id.clone(),
1036                        })
1037                        .await
1038                    {
1039                        Ok(SubjectManagerResponse::DeleteGovernance) => {}
1040                        Ok(other) => cleanup_errors.push(format!(
1041                            "subject_manager: unexpected response {other:?}"
1042                        )),
1043                        Err(err) => cleanup_errors
1044                            .push(format!("subject_manager: {err}")),
1045                    }
1046
1047                    self.purge_common_subject_state(
1048                        &subject_id,
1049                        &mut cleanup_errors,
1050                    )
1051                    .await;
1052
1053                    self.delete_subject_from_node(
1054                        &subject_id,
1055                        &mut cleanup_errors,
1056                    )
1057                    .await;
1058
1059                    if cleanup_errors.is_empty() {
1060                        info!(
1061                            subject_id = %subject_id,
1062                            subject_type = "governance",
1063                            "Subject deleted successfully"
1064                        );
1065                        Ok("Governance deleted successfully".to_owned())
1066                    } else {
1067                        Err(Error::Internal(format!(
1068                            "governance deletion completed partially: {}",
1069                            cleanup_errors.join("; ")
1070                        )))
1071                    }
1072                }
1073                node::SubjectData::Tracker { .. } => {
1074                    info!(
1075                        subject_id = %subject_id,
1076                        subject_type = "tracker",
1077                        "Deleting subject"
1078                    );
1079                    let mut cleanup_errors = Vec::new();
1080
1081                    self.purge_common_subject_state(
1082                        &subject_id,
1083                        &mut cleanup_errors,
1084                    )
1085                    .await;
1086
1087                    match self
1088                        .subject_manager
1089                        .ask(SubjectManagerMessage::DeleteTracker {
1090                            subject_id: subject_id.clone(),
1091                        })
1092                        .await
1093                    {
1094                        Ok(SubjectManagerResponse::DeleteTracker) => {}
1095                        Ok(other) => cleanup_errors.push(format!(
1096                            "subject_manager: unexpected response {other:?}"
1097                        )),
1098                        Err(err) => cleanup_errors
1099                            .push(format!("subject_manager: {err}")),
1100                    }
1101
1102                    self.delete_subject_from_node(
1103                        &subject_id,
1104                        &mut cleanup_errors,
1105                    )
1106                    .await;
1107
1108                    if cleanup_errors.is_empty() {
1109                        info!(
1110                            subject_id = %subject_id,
1111                            subject_type = "tracker",
1112                            "Subject deleted successfully"
1113                        );
1114                        Ok("Tracker deleted successfully".to_owned())
1115                    } else {
1116                        Err(Error::Internal(format!(
1117                            "tracker deletion completed partially: {}",
1118                            cleanup_errors.join("; ")
1119                        )))
1120                    }
1121                }
1122            }
1123        }
1124        .await;
1125
1126        self.end_subject_deletion(&subject_id).await;
1127        result
1128    }
1129
1130    ///////// Register
1131    ////////////////////////////
1132    pub async fn all_govs(
1133        &self,
1134        active: Option<bool>,
1135    ) -> Result<Vec<GovsData>, Error> {
1136        self.db.get_governances(active).await.map_err(|e| {
1137            warn!(error = %e, "Failed to get governances");
1138            Error::QueryFailed(e.to_string())
1139        })
1140    }
1141
1142    pub async fn all_subjs(
1143        &self,
1144        governance_id: DigestIdentifier,
1145        active: Option<bool>,
1146        schema_id: Option<String>,
1147    ) -> Result<Vec<SubjsData>, Error> {
1148        let governance_id = governance_id.to_string();
1149        match self
1150            .db
1151            .get_subjects(&governance_id, active, schema_id)
1152            .await
1153        {
1154            Ok(subjects) => Ok(subjects),
1155            Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
1156                Err(Error::GovernanceNotFound(governance_id))
1157            }
1158            Err(e) => {
1159                warn!(error = %e, "Failed to get subjects");
1160                Err(Error::QueryFailed(e.to_string()))
1161            }
1162        }
1163    }
1164
1165    ///////// Query
1166    ////////////////////////////
1167    pub async fn get_events(
1168        &self,
1169        subject_id: DigestIdentifier,
1170        query: EventsQuery,
1171    ) -> Result<PaginatorEvents, Error> {
1172        let subject_id_str = subject_id.to_string();
1173
1174        match self.db.get_events(&subject_id_str, query).await {
1175            Ok(data) => Ok(data),
1176            Err(ExternalDatabaseError::NoEvents(_)) => {
1177                Err(Error::NoEventsFound(subject_id_str))
1178            }
1179            Err(e) => {
1180                warn!(error = %e, "Failed to get events");
1181                Err(Error::QueryFailed(e.to_string()))
1182            }
1183        }
1184    }
1185
1186    pub async fn get_sink_events(
1187        &self,
1188        subject_id: DigestIdentifier,
1189        query: SinkEventsQuery,
1190    ) -> Result<SinkEventsPage, Error> {
1191        let response = self
1192            .node
1193            .ask(NodeMessage::GetSinkEvents {
1194                subject_id,
1195                from_sn: query.from_sn.unwrap_or(0),
1196                to_sn: query.to_sn,
1197                limit: query.limit.unwrap_or(100),
1198            })
1199            .await
1200            .map_err(|e| {
1201                warn!(error = %e, "Failed to replay sink events");
1202                Error::from(e)
1203            })?;
1204
1205        match response {
1206            NodeResponse::SinkEvents(events) => Ok(events),
1207            _ => Err(Error::UnexpectedResponse {
1208                actor: "node".to_string(),
1209                expected: "SinkEvents".to_string(),
1210                received: "other".to_string(),
1211            }),
1212        }
1213    }
1214
1215    pub async fn get_aborts(
1216        &self,
1217        subject_id: DigestIdentifier,
1218        query: AbortsQuery,
1219    ) -> Result<PaginatorAborts, Error> {
1220        let subject_id_str = subject_id.to_string();
1221        let request_id = if let Some(request_id) = query.request_id.as_ref() {
1222            Some(
1223                DigestIdentifier::from_str(request_id)
1224                    .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
1225                    .to_string(),
1226            )
1227        } else {
1228            None
1229        };
1230        let query = AbortsQuery {
1231            request_id,
1232            sn: query.sn,
1233            quantity: query.quantity,
1234            page: query.page,
1235            reverse: query.reverse,
1236        };
1237
1238        self.db
1239            .get_aborts(&subject_id_str, query)
1240            .await
1241            .map_err(|e| {
1242                warn!(error = %e, "Failed to get aborts");
1243                Error::QueryFailed(e.to_string())
1244            })
1245    }
1246
1247    pub async fn get_event_sn(
1248        &self,
1249        subject_id: DigestIdentifier,
1250        sn: u64,
1251    ) -> Result<LedgerDB, Error> {
1252        let subject_id_str = subject_id.to_string();
1253
1254        match self.db.get_event_sn(&subject_id_str, sn).await {
1255            Ok(data) => Ok(data),
1256            Err(ExternalDatabaseError::EventNotFound { .. }) => {
1257                Err(Error::EventNotFound {
1258                    subject: subject_id_str,
1259                    sn,
1260                })
1261            }
1262            Err(e) => {
1263                warn!(error = %e, "Failed to get event");
1264                Err(Error::QueryFailed(e.to_string()))
1265            }
1266        }
1267    }
1268
1269    pub async fn get_first_or_end_events(
1270        &self,
1271        subject_id: DigestIdentifier,
1272        quantity: Option<u64>,
1273        reverse: Option<bool>,
1274        event_type: Option<EventRequestType>,
1275    ) -> Result<Vec<LedgerDB>, Error> {
1276        let subject_id_str = subject_id.to_string();
1277
1278        match self
1279            .db
1280            .get_first_or_end_events(
1281                &subject_id_str,
1282                quantity,
1283                reverse,
1284                event_type,
1285            )
1286            .await
1287        {
1288            Ok(data) => Ok(data),
1289            Err(ExternalDatabaseError::NoEvents(_)) => {
1290                Err(Error::NoEventsFound(subject_id_str))
1291            }
1292            Err(e) => {
1293                warn!(error = %e, "Failed to get events");
1294                Err(Error::QueryFailed(e.to_string()))
1295            }
1296        }
1297    }
1298
1299    pub async fn get_subject_state(
1300        &self,
1301        subject_id: DigestIdentifier,
1302    ) -> Result<SubjectDB, Error> {
1303        let subject_id_str = subject_id.to_string();
1304
1305        match self.db.get_subject_state(&subject_id_str).await {
1306            Ok(data) => Ok(data),
1307            Err(ExternalDatabaseError::SubjectNotFound(_)) => {
1308                Err(Error::SubjectNotFound(subject_id_str))
1309            }
1310            Err(e) => {
1311                warn!(error = %e, "Failed to get subject state");
1312                Err(Error::QueryFailed(e.to_string()))
1313            }
1314        }
1315    }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320    use super::*;
1321    use ave_actors::{ActorError, ActorPath};
1322
1323    #[test]
1324    fn preserves_functional_actor_errors() {
1325        let error = preserve_functional_actor_error(
1326            ActorError::Functional {
1327                description: "Is not a Creator".to_string(),
1328            },
1329            |_| Error::ActorCommunication {
1330                actor: "request".to_string(),
1331            },
1332        );
1333
1334        assert!(
1335            matches!(error, Error::ActorError(message) if message == "Is not a Creator")
1336        );
1337    }
1338
1339    #[test]
1340    fn preserves_not_found_actor_errors() {
1341        let error = preserve_functional_actor_error(
1342            ActorError::NotFound {
1343                path: ActorPath::from("/user/request"),
1344            },
1345            |_| Error::ActorCommunication {
1346                actor: "request".to_string(),
1347            },
1348        );
1349
1350        assert!(matches!(
1351            error,
1352            Error::MissingResource { name, .. } if name == "/user/request"
1353        ));
1354    }
1355}