Skip to main content

ave_core/
lib.rs

1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod metrics;
15pub mod model;
16pub mod node;
17pub mod request;
18pub mod subject;
19pub mod system;
20pub mod tracker;
21pub mod update;
22pub mod validation;
23
24use std::collections::HashSet;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
29use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
30use ave_common::bridge::request::{
31    AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType,
32    EventsQuery, SinkEventsQuery,
33};
34use ave_common::identity::keys::KeyPair;
35use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
36use ave_common::request::EventRequest;
37use ave_common::response::{
38    GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
39    RequestInfo, RequestInfoExtend, RequestsInManager,
40    RequestsInManagerSubject, SinkEventsPage, SubjectDB, SubjsData,
41};
42use config::Config as AveBaseConfig;
43use error::Error;
44use helpers::network::*;
45use intermediary::Intermediary;
46use manual_distribution::{ManualDistribution, ManualDistributionMessage};
47use ave_network::{
48    MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
49    NetworkWorkerRuntime,
50};
51
52use node::{Node, NodeMessage, NodeResponse, TransferSubject};
53use prometheus_client::registry::Registry;
54use request::{
55    RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
56};
57use system::system;
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60use tracing::{debug, error, info, warn};
61use validation::{Validation, ValidationMessage};
62
63use crate::approval::request::ApprovalReq;
64use crate::config::SinkAuth;
65use crate::helpers::db::{
66    DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
67};
68use crate::model::common::node::SignTypesNode;
69use crate::node::InitParamsNode;
70use crate::node::subject_manager::{
71    SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
72};
73use crate::request::tracking::{
74    RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
75};
76
77#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
78compile_error!("Select only one: 'sqlite' or 'rocksdb'");
79
80#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
81compile_error!("You must enable 'sqlite' or 'rocksdb'");
82
83#[cfg(not(feature = "ext-sqlite"))]
84compile_error!("You must enable 'ext-sqlite'");
85
86#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
87compile_error!(
88    "The 'test' feature should only be used during development/testing"
89);
90
91#[derive(Clone)]
92pub struct Api {
93    peer_id: String,
94    public_key: String,
95    safe_mode: bool,
96    deleting_subject: Arc<tokio::sync::Mutex<Option<DigestIdentifier>>>,
97    db: Arc<ExternalDB>,
98    request: ActorRef<RequestHandler>,
99    node: ActorRef<Node>,
100    subject_manager: ActorRef<SubjectManager>,
101    auth: ActorRef<Auth>,
102    monitor: ActorRef<Monitor>,
103    manual_dis: Option<ActorRef<ManualDistribution>>,
104    tracking: Option<ActorRef<RequestTracking>>,
105}
106
107fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
108where
109    F: FnOnce(ActorError) -> Error,
110{
111    match err {
112        ActorError::Functional { description } => {
113            Error::ActorError(description)
114        }
115        ActorError::FunctionalCritical { description } => {
116            Error::Internal(description)
117        }
118        ActorError::NotFound { path } => Error::MissingResource {
119            name: path.to_string(),
120            reason: "Actor not found".to_string(),
121        },
122        other => fallback(other),
123    }
124}
125
126fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
127    preserve_functional_actor_error(err, |_| Error::ActorCommunication {
128        actor: actor.to_string(),
129    })
130}
131
132fn safe_mode_error() -> Error {
133    Error::SafeMode(
134        "node is running in safe mode; mutating operations are disabled"
135            .to_string(),
136    )
137}
138
139fn safe_mode_required_error(operation: &'static str) -> Error {
140    Error::SafeMode(format!(
141        "{operation} is only available while node is running in safe mode"
142    ))
143}
144
145impl Api {
146    async fn begin_subject_deletion(
147        &self,
148        subject_id: &DigestIdentifier,
149    ) -> Result<(), Error> {
150        {
151            let mut deleting = self.deleting_subject.lock().await;
152            if let Some(active_subject_id) = deleting.as_ref() {
153                return Err(Error::InvalidRequestState(format!(
154                    "subject deletion already in progress for '{}'",
155                    active_subject_id
156                )));
157            }
158            *deleting = Some(subject_id.clone());
159        }
160        Ok(())
161    }
162
163    async fn end_subject_deletion(&self, subject_id: &DigestIdentifier) {
164        let mut deleting = self.deleting_subject.lock().await;
165        if deleting.as_ref() == Some(subject_id) {
166            *deleting = None;
167        }
168    }
169
170    fn ensure_mutations_allowed(&self) -> Result<(), Error> {
171        if self.safe_mode {
172            return Err(safe_mode_error());
173        }
174        Ok(())
175    }
176
177    fn ensure_safe_mode_required(
178        &self,
179        operation: &'static str,
180    ) -> Result<(), Error> {
181        if !self.safe_mode {
182            return Err(safe_mode_required_error(operation));
183        }
184        Ok(())
185    }
186
187    async fn subject_data(
188        &self,
189        subject_id: &DigestIdentifier,
190    ) -> Result<node::SubjectData, Error> {
191        let response = self
192            .node
193            .ask(NodeMessage::GetSubjectData(subject_id.clone()))
194            .await
195            .map_err(|e| actor_communication_error("node", e))?;
196
197        let NodeResponse::SubjectData(subject_data) = response else {
198            return Err(Error::UnexpectedResponse {
199                actor: "node".to_string(),
200                expected: "NodeResponse::SubjectData".to_string(),
201                received: "other".to_string(),
202            });
203        };
204
205        subject_data
206            .ok_or_else(|| Error::SubjectNotFound(subject_id.to_string()))
207    }
208
209    async fn governance_trackers(
210        &self,
211        governance_id: &DigestIdentifier,
212    ) -> Result<Vec<DigestIdentifier>, Error> {
213        let response = self
214            .node
215            .ask(NodeMessage::GovernanceTrackers(governance_id.clone()))
216            .await
217            .map_err(|e| actor_communication_error("node", e))?;
218
219        let NodeResponse::GovernanceTrackers(trackers) = response else {
220            return Err(Error::UnexpectedResponse {
221                actor: "node".to_string(),
222                expected: "NodeResponse::GovernanceTrackers".to_string(),
223                received: "other".to_string(),
224            });
225        };
226
227        Ok(trackers)
228    }
229
230    async fn purge_common_subject_state(
231        &self,
232        subject_id: &DigestIdentifier,
233        cleanup_errors: &mut Vec<String>,
234    ) {
235        match self
236            .request
237            .ask(RequestHandlerMessage::PurgeSubject {
238                subject_id: subject_id.clone(),
239            })
240            .await
241        {
242            Ok(RequestHandlerResponse::None) => {}
243            Ok(other) => cleanup_errors
244                .push(format!("request: unexpected response {other:?}")),
245            Err(err) => cleanup_errors.push(format!("request: {err}")),
246        }
247
248        match self
249            .auth
250            .ask(AuthMessage::DeleteAuth {
251                subject_id: subject_id.clone(),
252            })
253            .await
254        {
255            Ok(AuthResponse::None) => {}
256            Ok(other) => cleanup_errors
257                .push(format!("auth: unexpected response {other:?}")),
258            Err(err) => cleanup_errors.push(format!("auth: {err}")),
259        }
260
261        if let Err(err) = self.db.delete_subject(&subject_id.to_string()).await
262        {
263            cleanup_errors.push(format!("external_db: {err}"));
264        }
265    }
266
267    async fn delete_subject_from_node(
268        &self,
269        subject_id: &DigestIdentifier,
270        cleanup_errors: &mut Vec<String>,
271    ) {
272        match self
273            .node
274            .ask(NodeMessage::DeleteSubject(subject_id.clone()))
275            .await
276        {
277            Ok(NodeResponse::Ok) => {}
278            Ok(other) => cleanup_errors
279                .push(format!("node: unexpected response {other:?}")),
280            Err(err) => cleanup_errors.push(format!("node: {err}")),
281        }
282    }
283
284    /// Creates a new `Api`.
285    pub async fn build(
286        keys: KeyPair,
287        config: AveBaseConfig,
288        sink_auth: SinkAuth,
289        registry: &mut Registry,
290        password: &str,
291        graceful_token: CancellationToken,
292        crash_token: CancellationToken,
293    ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
294        debug!("Creating Api");
295
296        let (system, runner) = system(
297            config.clone(),
298            sink_auth,
299            password,
300            graceful_token.clone(),
301            crash_token.clone(),
302        )
303        .await
304        .map_err(|e| {
305            error!(error = %e, "Failed to create system");
306            e
307        })?;
308
309        let newtork_monitor = Monitor::default();
310        let newtork_monitor_actor = system
311            .create_root_actor("network_monitor", newtork_monitor)
312            .await
313            .map_err(|e| {
314                error!(error = %e, "Can not create network_monitor actor");
315                Error::ActorCreation {
316                    actor: "network_monitor".to_string(),
317                    reason: e.to_string(),
318                }
319            })?;
320
321        let spec = config.spec.map(MachineSpec::from);
322        let network_metrics = ave_network::metrics::register(registry);
323        crate::metrics::register(registry);
324
325        let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
326            &keys,
327            config.network.clone(),
328            config.safe_mode,
329            NetworkWorkerRuntime {
330                monitor: Some(newtork_monitor_actor.clone()),
331                graceful_token: graceful_token.clone(),
332                crash_token: crash_token.clone(),
333                machine_spec: spec,
334                metrics: Some(network_metrics),
335            },
336        )
337        .map_err(|e| {
338            error!(error = %e, "Can not create networt");
339            Error::Network(e.to_string())
340        })?;
341
342        // Create worker
343        let service = Intermediary::build(
344            worker.service().sender(),
345            system.clone(),
346            graceful_token.clone(),
347            crash_token.clone(),
348        );
349
350        let peer_id = worker.local_peer_id().to_string();
351
352        worker.add_helper_sender(service.sender());
353
354        system.add_helper("network", service.clone()).await;
355
356        let worker_runner = tokio::spawn(async move {
357            let _ = worker.run().await;
358        });
359
360        let public_key = Arc::new(keys.public_key());
361        let node_actor = system
362            .create_root_actor(
363                "node",
364                Node::initial(InitParamsNode {
365                    key_pair: keys.clone(),
366                    hash: config.hash_algorithm,
367                    is_service: config.is_service,
368                    only_clear_events: config.only_clear_events,
369                    ledger_batch_size: config.sync.ledger_batch_size as u64,
370                    public_key: public_key.clone(),
371                }),
372            )
373            .await
374            .map_err(|e| {
375                error!(error = %e, "Init system, can not create node actor");
376                Error::ActorCreation {
377                    actor: "node".to_string(),
378                    reason: e.to_string(),
379                }
380            })?;
381
382        let manual_dis_actor = if config.safe_mode {
383            None
384        } else {
385            Some(
386                system
387                    .get_actor(&ActorPath::from(
388                        "/user/node/manual_distribution",
389                    ))
390                    .await
391                    .map_err(|e| {
392                        error!(
393                            error = %e,
394                            "Failed to get manual_distribution actor"
395                        );
396                        e
397                    })?,
398            )
399        };
400
401        let auth_actor: ActorRef<Auth> = system
402            .get_actor(&ActorPath::from("/user/node/auth"))
403            .await
404            .map_err(|e| {
405                error!(error = %e, "Failed to get auth actor");
406                e
407            })?;
408
409        let subject_manager_actor: ActorRef<SubjectManager> = system
410            .get_actor(&ActorPath::from("/user/node/subject_manager"))
411            .await
412            .map_err(|e| {
413                error!(error = %e, "Failed to get subject_manager actor");
414                e
415            })?;
416
417        let request_actor = system
418            .create_root_actor(
419                "request",
420                RequestHandler::initial((
421                    public_key,
422                    (config.hash_algorithm, service),
423                )),
424            )
425            .await
426            .map_err(|e| {
427                error!(error = %e, "Init system, can not create request actor");
428                Error::ActorCreation {
429                    actor: "request".to_string(),
430                    reason: e.to_string(),
431                }
432            })?;
433
434        let tracking_actor = if config.safe_mode {
435            None
436        } else {
437            Some(
438                system
439                    .get_actor(&ActorPath::from("/user/request/tracking"))
440                    .await
441                    .map_err(|e| {
442                        error!(error = %e, "Failed to get tracking actor");
443                        e
444                    })?,
445            )
446        };
447
448        let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
449        else {
450            error!("External database helper not found");
451            return Err(Error::MissingResource {
452                name: "ext_db".to_string(),
453                reason: "External database helper not found".to_string(),
454            });
455        };
456
457        ext_db.register_prometheus_metrics(registry);
458
459        let tasks = Vec::from([runner, worker_runner]);
460
461        Ok((
462            Self {
463                public_key: keys.public_key().to_string(),
464                peer_id,
465                safe_mode: config.safe_mode,
466                deleting_subject: Arc::new(tokio::sync::Mutex::new(None)),
467                db: ext_db,
468                request: request_actor,
469                auth: auth_actor,
470                node: node_actor,
471                subject_manager: subject_manager_actor,
472                monitor: newtork_monitor_actor,
473                manual_dis: manual_dis_actor,
474                tracking: tracking_actor,
475            },
476            tasks,
477        ))
478    }
479
480    ///////// General
481    ////////////////////////////
482
483    pub fn peer_id(&self) -> &str {
484        &self.peer_id
485    }
486
487    pub fn public_key(&self) -> &str {
488        &self.public_key
489    }
490
491    ///////// Network
492    ////////////////////////////
493    pub async fn get_network_state(
494        &self,
495    ) -> Result<MonitorNetworkState, Error> {
496        let response =
497            self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
498                warn!(error = %e, "Unable to retrieve network state");
499                preserve_functional_actor_error(e, |e| {
500                    Error::NetworkState(e.to_string())
501                })
502            })?;
503
504        match response {
505            MonitorResponse::State(state) => Ok(state),
506            _ => {
507                warn!("Unexpected response from network monitor");
508                Err(Error::UnexpectedResponse {
509                    actor: "network_monitor".to_string(),
510                    expected: "State".to_string(),
511                    received: "other".to_string(),
512                })
513            }
514        }
515    }
516
517    ///////// Request
518    ////////////////////////////
519
520    pub async fn get_requests_in_manager(
521        &self,
522    ) -> Result<RequestsInManager, Error> {
523        let response = self
524            .request
525            .ask(RequestHandlerMessage::RequestInManager)
526            .await
527            .map_err(|e| {
528                warn!(error = %e, "Request processing failed");
529                actor_communication_error("request", e)
530            })?;
531
532        match response {
533            RequestHandlerResponse::RequestInManager(request) => Ok(request),
534            _ => {
535                warn!("Unexpected response from request handler");
536                Err(Error::UnexpectedResponse {
537                    actor: "request".to_string(),
538                    expected: "RequestInManager".to_string(),
539                    received: "other".to_string(),
540                })
541            }
542        }
543    }
544
545    pub async fn get_requests_in_manager_subject_id(
546        &self,
547        subject_id: DigestIdentifier,
548    ) -> Result<RequestsInManagerSubject, Error> {
549        let response = self
550            .request
551            .ask(RequestHandlerMessage::RequestInManagerSubjectId {
552                subject_id,
553            })
554            .await
555            .map_err(|e| {
556                warn!(error = %e, "Request processing failed");
557                actor_communication_error("request", e)
558            })?;
559
560        match response {
561            RequestHandlerResponse::RequestInManagerSubjectId(request) => {
562                Ok(request)
563            }
564            _ => {
565                warn!("Unexpected response from request handler");
566                Err(Error::UnexpectedResponse {
567                    actor: "request".to_string(),
568                    expected: "RequestInManagerSubjectId".to_string(),
569                    received: "other".to_string(),
570                })
571            }
572        }
573    }
574
575    pub async fn external_request(
576        &self,
577        request: Signed<EventRequest>,
578    ) -> Result<RequestData, Error> {
579        self.ensure_mutations_allowed()?;
580        let response = self
581            .request
582            .ask(RequestHandlerMessage::NewRequest { request })
583            .await
584            .map_err(|e| {
585                warn!(error = %e, "Request processing failed");
586                actor_communication_error("request", e)
587            })?;
588
589        match response {
590            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
591            _ => {
592                warn!("Unexpected response from request handler");
593                Err(Error::UnexpectedResponse {
594                    actor: "request".to_string(),
595                    expected: "Ok".to_string(),
596                    received: "other".to_string(),
597                })
598            }
599        }
600    }
601
602    pub async fn own_request(
603        &self,
604        request: EventRequest,
605    ) -> Result<RequestData, Error> {
606        self.ensure_mutations_allowed()?;
607        let response = self
608            .node
609            .ask(NodeMessage::SignRequest(Box::new(
610                SignTypesNode::EventRequest(request.clone()),
611            )))
612            .await
613            .map_err(|e| {
614                warn!(error = %e, "Node was unable to sign the request");
615                preserve_functional_actor_error(e, |e| {
616                    Error::SigningFailed(e.to_string())
617                })
618            })?;
619
620        let signature = match response {
621            NodeResponse::SignRequest(signature) => signature,
622            _ => {
623                warn!("Unexpected response from node");
624                return Err(Error::UnexpectedResponse {
625                    actor: "node".to_string(),
626                    expected: "SignRequest".to_string(),
627                    received: "other".to_string(),
628                });
629            }
630        };
631
632        let signed_event_req = Signed::from_parts(request, signature);
633
634        let response = self
635            .request
636            .ask(RequestHandlerMessage::NewRequest {
637                request: signed_event_req,
638            })
639            .await
640            .map_err(|e| {
641                warn!(error = %e, "Failed to send request");
642                actor_communication_error("request", e)
643            })?;
644
645        match response {
646            RequestHandlerResponse::Ok(request_data) => Ok(request_data),
647            _ => {
648                warn!("Unexpected response from request handler");
649                Err(Error::UnexpectedResponse {
650                    actor: "request".to_string(),
651                    expected: "Ok".to_string(),
652                    received: "other".to_string(),
653                })
654            }
655        }
656    }
657
658    pub async fn get_approval(
659        &self,
660        subject_id: DigestIdentifier,
661        state: Option<ApprovalState>,
662    ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
663        let response = self
664            .request
665            .ask(RequestHandlerMessage::GetApproval { state, subject_id })
666            .await
667            .map_err(|e| {
668                warn!(error = %e, "Failed to get approval request");
669                actor_communication_error("request", e)
670            })?;
671
672        match response {
673            RequestHandlerResponse::Approval(data) => Ok(data),
674            _ => {
675                warn!("Unexpected response from request handler");
676                Err(Error::UnexpectedResponse {
677                    actor: "request".to_string(),
678                    expected: "Approval".to_string(),
679                    received: "other".to_string(),
680                })
681            }
682        }
683    }
684
685    pub async fn get_approvals(
686        &self,
687        state: Option<ApprovalState>,
688    ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
689        let response = self
690            .request
691            .ask(RequestHandlerMessage::GetAllApprovals { state })
692            .await
693            .map_err(|e| {
694                warn!(error = %e, "Failed to get approval requests");
695                actor_communication_error("request", e)
696            })?;
697
698        match response {
699            RequestHandlerResponse::Approvals(data) => Ok(data),
700            _ => {
701                warn!("Unexpected response from request handler");
702                Err(Error::UnexpectedResponse {
703                    actor: "request".to_string(),
704                    expected: "Approvals".to_string(),
705                    received: "other".to_string(),
706                })
707            }
708        }
709    }
710
711    pub async fn approve(
712        &self,
713        subject_id: DigestIdentifier,
714        state: ApprovalStateRes,
715    ) -> Result<String, Error> {
716        self.ensure_mutations_allowed()?;
717        if state == ApprovalStateRes::Obsolete {
718            warn!("Cannot set approval state to Obsolete");
719            return Err(Error::InvalidApprovalState("Obsolete".to_string()));
720        }
721
722        let response = self
723            .request
724            .ask(RequestHandlerMessage::ChangeApprovalState {
725                subject_id,
726                state,
727            })
728            .await
729            .map_err(|e| {
730                warn!(error = %e, "Failed to change approval state");
731                preserve_functional_actor_error(e, |e| {
732                    Error::ApprovalUpdateFailed(e.to_string())
733                })
734            })?;
735
736        match response {
737            RequestHandlerResponse::Response(res) => Ok(res),
738            _ => {
739                warn!("Unexpected response from request handler");
740                Err(Error::UnexpectedResponse {
741                    actor: "request".to_string(),
742                    expected: "Response".to_string(),
743                    received: "other".to_string(),
744                })
745            }
746        }
747    }
748
749    pub async fn manual_request_abort(
750        &self,
751        subject_id: DigestIdentifier,
752    ) -> Result<String, Error> {
753        self.ensure_mutations_allowed()?;
754        self.request
755            .tell(RequestHandlerMessage::AbortRequest { subject_id })
756            .await
757            .map_err(|e| {
758                warn!(error = %e, "Failed to abort request");
759                actor_communication_error("request", e)
760            })?;
761
762        Ok("Trying to abort".to_string())
763    }
764
765    ///////// Tracking
766    ////////////////////////////
767    pub async fn get_request_state(
768        &self,
769        request_id: DigestIdentifier,
770    ) -> Result<RequestInfo, Error> {
771        let Some(tracking) = &self.tracking else {
772            return Err(Error::SafeMode(
773                "request tracking is unavailable while node is running in safe mode"
774                    .to_string(),
775            ));
776        };
777        let response = tracking
778            .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
779            .await
780            .map_err(|e| {
781                warn!(error = %e, "Failed to get request state");
782                actor_communication_error("tracking", e)
783            })?;
784
785        match response {
786            RequestTrackingResponse::Info(state) => Ok(state),
787            RequestTrackingResponse::NotFound => {
788                Err(Error::RequestNotFound(request_id.to_string()))
789            }
790            _ => {
791                warn!("Unexpected response from tracking");
792                Err(Error::UnexpectedResponse {
793                    actor: "tracking".to_string(),
794                    expected: "Info".to_string(),
795                    received: "other".to_string(),
796                })
797            }
798        }
799    }
800
801    pub async fn all_request_state(
802        &self,
803    ) -> Result<Vec<RequestInfoExtend>, Error> {
804        let Some(tracking) = &self.tracking else {
805            return Err(Error::SafeMode(
806                "request tracking is unavailable while node is running in safe mode"
807                    .to_string(),
808            ));
809        };
810        let response = tracking
811            .ask(RequestTrackingMessage::AllRequests)
812            .await
813            .map_err(|e| {
814                warn!(error = %e, "Failed to get all request states");
815                actor_communication_error("tracking", e)
816            })?;
817
818        match response {
819            RequestTrackingResponse::AllInfo(state) => Ok(state),
820            _ => {
821                warn!("Unexpected response from tracking");
822                Err(Error::UnexpectedResponse {
823                    actor: "tracking".to_string(),
824                    expected: "AllInfo".to_string(),
825                    received: "other".to_string(),
826                })
827            }
828        }
829    }
830
831    ///////// Node
832    ////////////////////////////
833
834    pub async fn get_pending_transfers(
835        &self,
836    ) -> Result<Vec<TransferSubject>, Error> {
837        let response =
838            self.node.ask(NodeMessage::PendingTransfers).await.map_err(
839                |e| {
840                    warn!(error = %e, "Failed to get pending transfers");
841                    actor_communication_error("node", e)
842                },
843            )?;
844
845        let NodeResponse::PendingTransfers(pending) = response else {
846            warn!("Unexpected response from node");
847            return Err(Error::UnexpectedResponse {
848                actor: "node".to_string(),
849                expected: "PendingTransfers".to_string(),
850                received: "other".to_string(),
851            });
852        };
853
854        Ok(pending)
855    }
856
857    ///////// Auth
858    ////////////////////////////
859
860    pub async fn auth_subject(
861        &self,
862        subject_id: DigestIdentifier,
863        witnesses: AuthWitness,
864    ) -> Result<String, Error> {
865        self.ensure_mutations_allowed()?;
866        self.auth
867            .tell(AuthMessage::NewAuth {
868                subject_id,
869                witness: witnesses,
870            })
871            .await
872            .map_err(|e| {
873                warn!(error = %e, "Authentication operation failed");
874                preserve_functional_actor_error(e, |e| {
875                    Error::AuthOperation(e.to_string())
876                })
877            })?;
878
879        Ok("Ok".to_owned())
880    }
881
882    pub async fn all_auth_subjects(
883        &self,
884    ) -> Result<Vec<DigestIdentifier>, Error> {
885        let response =
886            self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
887                error!(error = %e, "Failed to get auth subjects");
888                actor_communication_error("auth", e)
889            })?;
890
891        match response {
892            AuthResponse::Auths { subjects } => Ok(subjects),
893            _ => {
894                warn!("Unexpected response from auth");
895                Err(Error::UnexpectedResponse {
896                    actor: "auth".to_string(),
897                    expected: "Auths".to_string(),
898                    received: "other".to_string(),
899                })
900            }
901        }
902    }
903
904    pub async fn witnesses_subject(
905        &self,
906        subject_id: DigestIdentifier,
907    ) -> Result<HashSet<PublicKey>, Error> {
908        let response = self
909            .auth
910            .ask(AuthMessage::GetAuth {
911                subject_id: subject_id.clone(),
912            })
913            .await
914            .map_err(|e| {
915                warn!(error = %e, "Failed to get witnesses for subject");
916                actor_communication_error("auth", e)
917            })?;
918
919        match response {
920            AuthResponse::Witnesses(witnesses) => Ok(witnesses),
921            _ => {
922                warn!("Unexpected response from auth");
923                Err(Error::UnexpectedResponse {
924                    actor: "auth".to_string(),
925                    expected: "Witnesses".to_string(),
926                    received: "other".to_string(),
927                })
928            }
929        }
930    }
931
932    pub async fn delete_auth_subject(
933        &self,
934        subject_id: DigestIdentifier,
935    ) -> Result<String, Error> {
936        self.ensure_mutations_allowed()?;
937        self.auth
938            .tell(AuthMessage::DeleteAuth { subject_id })
939            .await
940            .map_err(|e| {
941                warn!(error = %e, "Failed to delete auth subject");
942                preserve_functional_actor_error(e, |e| {
943                    Error::AuthOperation(e.to_string())
944                })
945            })?;
946
947        Ok("Ok".to_owned())
948    }
949
950    pub async fn update_subject(
951        &self,
952        subject_id: DigestIdentifier,
953    ) -> Result<String, Error> {
954        self.update_subject_with_options(subject_id, false).await
955    }
956
957    pub async fn update_subject_with_options(
958        &self,
959        subject_id: DigestIdentifier,
960        strict: bool,
961    ) -> Result<String, Error> {
962        self.ensure_mutations_allowed()?;
963        let response = self
964            .auth
965            .ask(AuthMessage::Update {
966                subject_id: subject_id.clone(),
967                objective: None,
968                strict,
969            })
970            .await
971            .map_err(|e| {
972                warn!(error = %e, "Failed to update subject");
973                preserve_functional_actor_error(e, |e| {
974                    Error::UpdateFailed(subject_id.to_string(), e.to_string())
975                })
976            })?;
977
978        match response {
979            AuthResponse::None => Ok("Update in progress".to_owned()),
980            _ => {
981                warn!("Unexpected response from auth");
982                Err(Error::UnexpectedResponse {
983                    actor: "auth".to_string(),
984                    expected: "None".to_string(),
985                    received: "other".to_string(),
986                })
987            }
988        }
989    }
990
991    ///////// manual distribution
992    ////////////////////////////
993
994    pub async fn manual_distribution(
995        &self,
996        subject_id: DigestIdentifier,
997    ) -> Result<String, Error> {
998        self.ensure_mutations_allowed()?;
999        let Some(manual_dis) = &self.manual_dis else {
1000            return Err(safe_mode_error());
1001        };
1002        manual_dis
1003            .ask(ManualDistributionMessage::Update(subject_id.clone()))
1004            .await
1005            .map_err(|e| {
1006                warn!(error = %e, "Manual distribution failed");
1007                preserve_functional_actor_error(e, |_| {
1008                    Error::DistributionFailed(subject_id.to_string())
1009                })
1010            })?;
1011
1012        Ok("Manual distribution in progress".to_owned())
1013    }
1014
1015    pub async fn delete_subject(
1016        &self,
1017        subject_id: DigestIdentifier,
1018    ) -> Result<String, Error> {
1019        self.ensure_safe_mode_required("subject deletion")?;
1020        self.begin_subject_deletion(&subject_id).await?;
1021
1022        let result = async {
1023            let subject_data = self.subject_data(&subject_id).await?;
1024
1025            match subject_data {
1026                node::SubjectData::Governance { .. } => {
1027                    info!(
1028                        subject_id = %subject_id,
1029                        subject_type = "governance",
1030                        "Deleting subject"
1031                    );
1032                    let trackers =
1033                        self.governance_trackers(&subject_id).await?;
1034                    if !trackers.is_empty() {
1035                        return Err(Error::GovernanceHasTrackers {
1036                            governance_id: subject_id.to_string(),
1037                            trackers: trackers
1038                                .into_iter()
1039                                .map(|tracker| tracker.to_string())
1040                                .collect(),
1041                        });
1042                    }
1043                    let mut cleanup_errors = Vec::new();
1044
1045                    match self
1046                        .subject_manager
1047                        .ask(SubjectManagerMessage::DeleteGovernance {
1048                            subject_id: subject_id.clone(),
1049                        })
1050                        .await
1051                    {
1052                        Ok(SubjectManagerResponse::DeleteGovernance) => {}
1053                        Ok(other) => cleanup_errors.push(format!(
1054                            "subject_manager: unexpected response {other:?}"
1055                        )),
1056                        Err(err) => cleanup_errors
1057                            .push(format!("subject_manager: {err}")),
1058                    }
1059
1060                    self.purge_common_subject_state(
1061                        &subject_id,
1062                        &mut cleanup_errors,
1063                    )
1064                    .await;
1065
1066                    self.delete_subject_from_node(
1067                        &subject_id,
1068                        &mut cleanup_errors,
1069                    )
1070                    .await;
1071
1072                    if cleanup_errors.is_empty() {
1073                        info!(
1074                            subject_id = %subject_id,
1075                            subject_type = "governance",
1076                            "Subject deleted successfully"
1077                        );
1078                        Ok("Governance deleted successfully".to_owned())
1079                    } else {
1080                        Err(Error::Internal(format!(
1081                            "governance deletion completed partially: {}",
1082                            cleanup_errors.join("; ")
1083                        )))
1084                    }
1085                }
1086                node::SubjectData::Tracker { .. } => {
1087                    info!(
1088                        subject_id = %subject_id,
1089                        subject_type = "tracker",
1090                        "Deleting subject"
1091                    );
1092                    let mut cleanup_errors = Vec::new();
1093
1094                    self.purge_common_subject_state(
1095                        &subject_id,
1096                        &mut cleanup_errors,
1097                    )
1098                    .await;
1099
1100                    match self
1101                        .subject_manager
1102                        .ask(SubjectManagerMessage::DeleteTracker {
1103                            subject_id: subject_id.clone(),
1104                        })
1105                        .await
1106                    {
1107                        Ok(SubjectManagerResponse::DeleteTracker) => {}
1108                        Ok(other) => cleanup_errors.push(format!(
1109                            "subject_manager: unexpected response {other:?}"
1110                        )),
1111                        Err(err) => cleanup_errors
1112                            .push(format!("subject_manager: {err}")),
1113                    }
1114
1115                    self.delete_subject_from_node(
1116                        &subject_id,
1117                        &mut cleanup_errors,
1118                    )
1119                    .await;
1120
1121                    if cleanup_errors.is_empty() {
1122                        info!(
1123                            subject_id = %subject_id,
1124                            subject_type = "tracker",
1125                            "Subject deleted successfully"
1126                        );
1127                        Ok("Tracker deleted successfully".to_owned())
1128                    } else {
1129                        Err(Error::Internal(format!(
1130                            "tracker deletion completed partially: {}",
1131                            cleanup_errors.join("; ")
1132                        )))
1133                    }
1134                }
1135            }
1136        }
1137        .await;
1138
1139        self.end_subject_deletion(&subject_id).await;
1140        result
1141    }
1142
1143    ///////// Register
1144    ////////////////////////////
1145    pub async fn all_govs(
1146        &self,
1147        active: Option<bool>,
1148    ) -> Result<Vec<GovsData>, Error> {
1149        self.db.get_governances(active).await.map_err(|e| {
1150            warn!(error = %e, "Failed to get governances");
1151            Error::QueryFailed(e.to_string())
1152        })
1153    }
1154
1155    pub async fn all_subjs(
1156        &self,
1157        governance_id: DigestIdentifier,
1158        active: Option<bool>,
1159        schema_id: Option<String>,
1160    ) -> Result<Vec<SubjsData>, Error> {
1161        let governance_id = governance_id.to_string();
1162        match self
1163            .db
1164            .get_subjects(&governance_id, active, schema_id)
1165            .await
1166        {
1167            Ok(subjects) => Ok(subjects),
1168            Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
1169                Err(Error::GovernanceNotFound(governance_id))
1170            }
1171            Err(e) => {
1172                warn!(error = %e, "Failed to get subjects");
1173                Err(Error::QueryFailed(e.to_string()))
1174            }
1175        }
1176    }
1177
1178    ///////// Query
1179    ////////////////////////////
1180    pub async fn get_events(
1181        &self,
1182        subject_id: DigestIdentifier,
1183        query: EventsQuery,
1184    ) -> Result<PaginatorEvents, Error> {
1185        let subject_id_str = subject_id.to_string();
1186
1187        match self.db.get_events(&subject_id_str, query).await {
1188            Ok(data) => Ok(data),
1189            Err(ExternalDatabaseError::NoEvents(_)) => {
1190                Err(Error::NoEventsFound(subject_id_str))
1191            }
1192            Err(e) => {
1193                warn!(error = %e, "Failed to get events");
1194                Err(Error::QueryFailed(e.to_string()))
1195            }
1196        }
1197    }
1198
1199    pub async fn get_sink_events(
1200        &self,
1201        subject_id: DigestIdentifier,
1202        query: SinkEventsQuery,
1203    ) -> Result<SinkEventsPage, Error> {
1204        let response = self
1205            .node
1206            .ask(NodeMessage::GetSinkEvents {
1207                subject_id,
1208                from_sn: query.from_sn.unwrap_or(0),
1209                to_sn: query.to_sn,
1210                limit: query.limit.unwrap_or(100),
1211            })
1212            .await
1213            .map_err(|e| {
1214                warn!(error = %e, "Failed to replay sink events");
1215                Error::from(e)
1216            })?;
1217
1218        match response {
1219            NodeResponse::SinkEvents(events) => Ok(events),
1220            _ => Err(Error::UnexpectedResponse {
1221                actor: "node".to_string(),
1222                expected: "SinkEvents".to_string(),
1223                received: "other".to_string(),
1224            }),
1225        }
1226    }
1227
1228    pub async fn get_aborts(
1229        &self,
1230        subject_id: DigestIdentifier,
1231        query: AbortsQuery,
1232    ) -> Result<PaginatorAborts, Error> {
1233        let subject_id_str = subject_id.to_string();
1234        let request_id = if let Some(request_id) = query.request_id.as_ref() {
1235            Some(
1236                DigestIdentifier::from_str(request_id)
1237                    .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
1238                    .to_string(),
1239            )
1240        } else {
1241            None
1242        };
1243        let query = AbortsQuery {
1244            request_id,
1245            sn: query.sn,
1246            quantity: query.quantity,
1247            page: query.page,
1248            reverse: query.reverse,
1249        };
1250
1251        self.db
1252            .get_aborts(&subject_id_str, query)
1253            .await
1254            .map_err(|e| {
1255                warn!(error = %e, "Failed to get aborts");
1256                Error::QueryFailed(e.to_string())
1257            })
1258    }
1259
1260    pub async fn get_event_sn(
1261        &self,
1262        subject_id: DigestIdentifier,
1263        sn: u64,
1264    ) -> Result<LedgerDB, Error> {
1265        let subject_id_str = subject_id.to_string();
1266
1267        match self.db.get_event_sn(&subject_id_str, sn).await {
1268            Ok(data) => Ok(data),
1269            Err(ExternalDatabaseError::EventNotFound { .. }) => {
1270                Err(Error::EventNotFound {
1271                    subject: subject_id_str,
1272                    sn,
1273                })
1274            }
1275            Err(e) => {
1276                warn!(error = %e, "Failed to get event");
1277                Err(Error::QueryFailed(e.to_string()))
1278            }
1279        }
1280    }
1281
1282    pub async fn get_first_or_end_events(
1283        &self,
1284        subject_id: DigestIdentifier,
1285        quantity: Option<u64>,
1286        reverse: Option<bool>,
1287        event_type: Option<EventRequestType>,
1288    ) -> Result<Vec<LedgerDB>, Error> {
1289        let subject_id_str = subject_id.to_string();
1290
1291        match self
1292            .db
1293            .get_first_or_end_events(
1294                &subject_id_str,
1295                quantity,
1296                reverse,
1297                event_type,
1298            )
1299            .await
1300        {
1301            Ok(data) => Ok(data),
1302            Err(ExternalDatabaseError::NoEvents(_)) => {
1303                Err(Error::NoEventsFound(subject_id_str))
1304            }
1305            Err(e) => {
1306                warn!(error = %e, "Failed to get events");
1307                Err(Error::QueryFailed(e.to_string()))
1308            }
1309        }
1310    }
1311
1312    pub async fn get_subject_state(
1313        &self,
1314        subject_id: DigestIdentifier,
1315    ) -> Result<SubjectDB, Error> {
1316        let subject_id_str = subject_id.to_string();
1317
1318        match self.db.get_subject_state(&subject_id_str).await {
1319            Ok(data) => Ok(data),
1320            Err(ExternalDatabaseError::SubjectNotFound(_)) => {
1321                Err(Error::SubjectNotFound(subject_id_str))
1322            }
1323            Err(e) => {
1324                warn!(error = %e, "Failed to get subject state");
1325                Err(Error::QueryFailed(e.to_string()))
1326            }
1327        }
1328    }
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333    use super::*;
1334    use ave_actors::{ActorError, ActorPath};
1335
1336    #[test]
1337    fn preserves_functional_actor_errors() {
1338        let error = preserve_functional_actor_error(
1339            ActorError::Functional {
1340                description: "Is not a Creator".to_string(),
1341            },
1342            |_| Error::ActorCommunication {
1343                actor: "request".to_string(),
1344            },
1345        );
1346
1347        assert!(
1348            matches!(error, Error::ActorError(message) if message == "Is not a Creator")
1349        );
1350    }
1351
1352    #[test]
1353    fn preserves_not_found_actor_errors() {
1354        let error = preserve_functional_actor_error(
1355            ActorError::NotFound {
1356                path: ActorPath::from("/user/request"),
1357            },
1358            |_| Error::ActorCommunication {
1359                actor: "request".to_string(),
1360            },
1361        );
1362
1363        assert!(matches!(
1364            error,
1365            Error::MissingResource { name, .. } if name == "/user/request"
1366        ));
1367    }
1368}