1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod metrics;
15pub mod model;
16pub mod node;
17pub mod request;
18pub mod subject;
19pub mod system;
20pub mod tracker;
21pub mod update;
22pub mod validation;
23
24use std::collections::HashSet;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
29use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
30use ave_common::bridge::request::{
31 AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType,
32 EventsQuery, SinkEventsQuery,
33};
34use ave_common::identity::keys::KeyPair;
35use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
36use ave_common::request::EventRequest;
37use ave_common::response::{
38 GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
39 RequestInfo, RequestInfoExtend, RequestsInManager,
40 RequestsInManagerSubject, SinkEventsPage, SubjectDB, SubjsData,
41};
42use ave_network::{
43 MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
44 NetworkWorkerRuntime,
45};
46use config::Config as AveBaseConfig;
47use error::Error;
48use helpers::network::*;
49use intermediary::Intermediary;
50use manual_distribution::{ManualDistribution, ManualDistributionMessage};
51
52use node::{Node, NodeMessage, NodeResponse, TransferSubject};
53use prometheus_client::registry::Registry;
54use request::{
55 RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
56};
57use system::system;
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60use tracing::{debug, error, info, warn};
61use validation::{Validation, ValidationMessage};
62
63use crate::approval::request::ApprovalReq;
64use crate::config::SinkAuth;
65use crate::helpers::db::{
66 DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
67};
68use crate::model::common::node::SignTypesNode;
69use crate::node::InitParamsNode;
70use crate::node::subject_manager::{
71 SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
72};
73use crate::request::tracking::{
74 RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
75};
76
77#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
78compile_error!("Select only one: 'sqlite' or 'rocksdb'");
79
80#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
81compile_error!("You must enable 'sqlite' or 'rocksdb'");
82
83#[cfg(not(feature = "ext-sqlite"))]
84compile_error!("You must enable 'ext-sqlite'");
85
86#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
87compile_error!(
88 "The 'test' feature should only be used during development/testing"
89);
90
91#[derive(Clone)]
92pub struct Api {
93 peer_id: String,
94 public_key: String,
95 safe_mode: bool,
96 deleting_subject: Arc<tokio::sync::Mutex<Option<DigestIdentifier>>>,
97 db: Arc<ExternalDB>,
98 request: ActorRef<RequestHandler>,
99 node: ActorRef<Node>,
100 subject_manager: ActorRef<SubjectManager>,
101 auth: ActorRef<Auth>,
102 monitor: ActorRef<Monitor>,
103 manual_dis: Option<ActorRef<ManualDistribution>>,
104 tracking: Option<ActorRef<RequestTracking>>,
105}
106
107fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
108where
109 F: FnOnce(ActorError) -> Error,
110{
111 match err {
112 ActorError::Functional { description } => {
113 Error::ActorError(description)
114 }
115 ActorError::FunctionalCritical { description } => {
116 Error::Internal(description)
117 }
118 ActorError::NotFound { path } => Error::MissingResource {
119 name: path.to_string(),
120 reason: "Actor not found".to_string(),
121 },
122 other => fallback(other),
123 }
124}
125
126fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
127 preserve_functional_actor_error(err, |_| Error::ActorCommunication {
128 actor: actor.to_string(),
129 })
130}
131
132fn safe_mode_error() -> Error {
133 Error::SafeMode(
134 "node is running in safe mode; mutating operations are disabled"
135 .to_string(),
136 )
137}
138
139fn safe_mode_required_error(operation: &'static str) -> Error {
140 Error::SafeMode(format!(
141 "{operation} is only available while node is running in safe mode"
142 ))
143}
144
145impl Api {
146 async fn begin_subject_deletion(
147 &self,
148 subject_id: &DigestIdentifier,
149 ) -> Result<(), Error> {
150 {
151 let mut deleting = self.deleting_subject.lock().await;
152 if let Some(active_subject_id) = deleting.as_ref() {
153 return Err(Error::InvalidRequestState(format!(
154 "subject deletion already in progress for '{}'",
155 active_subject_id
156 )));
157 }
158 *deleting = Some(subject_id.clone());
159 }
160 Ok(())
161 }
162
163 async fn end_subject_deletion(&self, subject_id: &DigestIdentifier) {
164 let mut deleting = self.deleting_subject.lock().await;
165 if deleting.as_ref() == Some(subject_id) {
166 *deleting = None;
167 }
168 }
169
170 fn ensure_mutations_allowed(&self) -> Result<(), Error> {
171 if self.safe_mode {
172 return Err(safe_mode_error());
173 }
174 Ok(())
175 }
176
177 fn ensure_safe_mode_required(
178 &self,
179 operation: &'static str,
180 ) -> Result<(), Error> {
181 if !self.safe_mode {
182 return Err(safe_mode_required_error(operation));
183 }
184 Ok(())
185 }
186
187 async fn subject_data(
188 &self,
189 subject_id: &DigestIdentifier,
190 ) -> Result<node::SubjectData, Error> {
191 let response = self
192 .node
193 .ask(NodeMessage::GetSubjectData(subject_id.clone()))
194 .await
195 .map_err(|e| actor_communication_error("node", e))?;
196
197 let NodeResponse::SubjectData(subject_data) = response else {
198 return Err(Error::UnexpectedResponse {
199 actor: "node".to_string(),
200 expected: "NodeResponse::SubjectData".to_string(),
201 received: "other".to_string(),
202 });
203 };
204
205 subject_data
206 .ok_or_else(|| Error::SubjectNotFound(subject_id.to_string()))
207 }
208
209 async fn governance_trackers(
210 &self,
211 governance_id: &DigestIdentifier,
212 ) -> Result<Vec<DigestIdentifier>, Error> {
213 let response = self
214 .node
215 .ask(NodeMessage::GovernanceTrackers(governance_id.clone()))
216 .await
217 .map_err(|e| actor_communication_error("node", e))?;
218
219 let NodeResponse::GovernanceTrackers(trackers) = response else {
220 return Err(Error::UnexpectedResponse {
221 actor: "node".to_string(),
222 expected: "NodeResponse::GovernanceTrackers".to_string(),
223 received: "other".to_string(),
224 });
225 };
226
227 Ok(trackers)
228 }
229
230 async fn purge_common_subject_state(
231 &self,
232 subject_id: &DigestIdentifier,
233 cleanup_errors: &mut Vec<String>,
234 ) {
235 match self
236 .request
237 .ask(RequestHandlerMessage::PurgeSubject {
238 subject_id: subject_id.clone(),
239 })
240 .await
241 {
242 Ok(RequestHandlerResponse::None) => {}
243 Ok(other) => cleanup_errors
244 .push(format!("request: unexpected response {other:?}")),
245 Err(err) => cleanup_errors.push(format!("request: {err}")),
246 }
247
248 match self
249 .auth
250 .ask(AuthMessage::DeleteAuth {
251 subject_id: subject_id.clone(),
252 })
253 .await
254 {
255 Ok(AuthResponse::None) => {}
256 Ok(other) => cleanup_errors
257 .push(format!("auth: unexpected response {other:?}")),
258 Err(err) => cleanup_errors.push(format!("auth: {err}")),
259 }
260
261 if let Err(err) = self.db.delete_subject(&subject_id.to_string()).await
262 {
263 cleanup_errors.push(format!("external_db: {err}"));
264 }
265 }
266
267 async fn delete_subject_from_node(
268 &self,
269 subject_id: &DigestIdentifier,
270 cleanup_errors: &mut Vec<String>,
271 ) {
272 match self
273 .node
274 .ask(NodeMessage::DeleteSubject(subject_id.clone()))
275 .await
276 {
277 Ok(NodeResponse::Ok) => {}
278 Ok(other) => cleanup_errors
279 .push(format!("node: unexpected response {other:?}")),
280 Err(err) => cleanup_errors.push(format!("node: {err}")),
281 }
282 }
283
284 pub async fn build(
286 keys: KeyPair,
287 config: AveBaseConfig,
288 sink_auth: SinkAuth,
289 registry: &mut Registry,
290 password: &str,
291 graceful_token: CancellationToken,
292 crash_token: CancellationToken,
293 ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
294 debug!("Creating Api");
295
296 let (system, runner) = system(
297 config.clone(),
298 sink_auth,
299 password,
300 graceful_token.clone(),
301 crash_token.clone(),
302 )
303 .await
304 .map_err(|e| {
305 error!(error = %e, "Failed to create system");
306 e
307 })?;
308
309 let newtork_monitor = Monitor::default();
310 let newtork_monitor_actor = system
311 .create_root_actor("network_monitor", newtork_monitor)
312 .await
313 .map_err(|e| {
314 error!(error = %e, "Can not create network_monitor actor");
315 Error::ActorCreation {
316 actor: "network_monitor".to_string(),
317 reason: e.to_string(),
318 }
319 })?;
320
321 let spec = config.spec.map(MachineSpec::from);
322 let network_metrics = ave_network::metrics::register(registry);
323 crate::metrics::register(registry);
324
325 let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
326 &keys,
327 config.network.clone(),
328 config.safe_mode,
329 NetworkWorkerRuntime {
330 monitor: Some(newtork_monitor_actor.clone()),
331 graceful_token: graceful_token.clone(),
332 crash_token: crash_token.clone(),
333 machine_spec: spec,
334 metrics: Some(network_metrics),
335 },
336 )
337 .map_err(|e| {
338 error!(error = %e, "Can not create networt");
339 Error::Network(e.to_string())
340 })?;
341
342 let service = Intermediary::build(
344 worker.service().sender(),
345 system.clone(),
346 graceful_token.clone(),
347 crash_token.clone(),
348 );
349
350 let peer_id = worker.local_peer_id().to_string();
351
352 worker.add_helper_sender(service.sender());
353
354 system.add_helper("network", service.clone()).await;
355
356 let worker_runner = tokio::spawn(async move {
357 let _ = worker.run().await;
358 });
359
360 let public_key = Arc::new(keys.public_key());
361 let node_actor = system
362 .create_root_actor(
363 "node",
364 Node::initial(InitParamsNode {
365 key_pair: keys.clone(),
366 hash: config.hash_algorithm,
367 is_service: config.is_service,
368 only_clear_events: config.only_clear_events,
369 ledger_batch_size: config.sync.ledger_batch_size as u64,
370 public_key: public_key.clone(),
371 }),
372 )
373 .await
374 .map_err(|e| {
375 error!(error = %e, "Init system, can not create node actor");
376 Error::ActorCreation {
377 actor: "node".to_string(),
378 reason: e.to_string(),
379 }
380 })?;
381
382 let manual_dis_actor = if config.safe_mode {
383 None
384 } else {
385 Some(
386 system
387 .get_actor(&ActorPath::from(
388 "/user/node/manual_distribution",
389 ))
390 .await
391 .map_err(|e| {
392 error!(
393 error = %e,
394 "Failed to get manual_distribution actor"
395 );
396 e
397 })?,
398 )
399 };
400
401 let auth_actor: ActorRef<Auth> = system
402 .get_actor(&ActorPath::from("/user/node/auth"))
403 .await
404 .map_err(|e| {
405 error!(error = %e, "Failed to get auth actor");
406 e
407 })?;
408
409 let subject_manager_actor: ActorRef<SubjectManager> = system
410 .get_actor(&ActorPath::from("/user/node/subject_manager"))
411 .await
412 .map_err(|e| {
413 error!(error = %e, "Failed to get subject_manager actor");
414 e
415 })?;
416
417 let request_actor = system
418 .create_root_actor(
419 "request",
420 RequestHandler::initial((
421 public_key,
422 (config.hash_algorithm, service),
423 )),
424 )
425 .await
426 .map_err(|e| {
427 error!(error = %e, "Init system, can not create request actor");
428 Error::ActorCreation {
429 actor: "request".to_string(),
430 reason: e.to_string(),
431 }
432 })?;
433
434 let tracking_actor = if config.safe_mode {
435 None
436 } else {
437 Some(
438 system
439 .get_actor(&ActorPath::from("/user/request/tracking"))
440 .await
441 .map_err(|e| {
442 error!(error = %e, "Failed to get tracking actor");
443 e
444 })?,
445 )
446 };
447
448 let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
449 else {
450 error!("External database helper not found");
451 return Err(Error::MissingResource {
452 name: "ext_db".to_string(),
453 reason: "External database helper not found".to_string(),
454 });
455 };
456
457 ext_db.register_prometheus_metrics(registry);
458
459 let tasks = Vec::from([runner, worker_runner]);
460
461 Ok((
462 Self {
463 public_key: keys.public_key().to_string(),
464 peer_id,
465 safe_mode: config.safe_mode,
466 deleting_subject: Arc::new(tokio::sync::Mutex::new(None)),
467 db: ext_db,
468 request: request_actor,
469 auth: auth_actor,
470 node: node_actor,
471 subject_manager: subject_manager_actor,
472 monitor: newtork_monitor_actor,
473 manual_dis: manual_dis_actor,
474 tracking: tracking_actor,
475 },
476 tasks,
477 ))
478 }
479
480 pub fn peer_id(&self) -> &str {
484 &self.peer_id
485 }
486
487 pub fn public_key(&self) -> &str {
488 &self.public_key
489 }
490
491 pub async fn get_network_state(
494 &self,
495 ) -> Result<MonitorNetworkState, Error> {
496 let response =
497 self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
498 warn!(error = %e, "Unable to retrieve network state");
499 preserve_functional_actor_error(e, |e| {
500 Error::NetworkState(e.to_string())
501 })
502 })?;
503
504 match response {
505 MonitorResponse::State(state) => Ok(state),
506 _ => {
507 warn!("Unexpected response from network monitor");
508 Err(Error::UnexpectedResponse {
509 actor: "network_monitor".to_string(),
510 expected: "State".to_string(),
511 received: "other".to_string(),
512 })
513 }
514 }
515 }
516
517 pub async fn get_requests_in_manager(
521 &self,
522 ) -> Result<RequestsInManager, Error> {
523 let response = self
524 .request
525 .ask(RequestHandlerMessage::RequestInManager)
526 .await
527 .map_err(|e| {
528 warn!(error = %e, "Request processing failed");
529 actor_communication_error("request", e)
530 })?;
531
532 match response {
533 RequestHandlerResponse::RequestInManager(request) => Ok(request),
534 _ => {
535 warn!("Unexpected response from request handler");
536 Err(Error::UnexpectedResponse {
537 actor: "request".to_string(),
538 expected: "RequestInManager".to_string(),
539 received: "other".to_string(),
540 })
541 }
542 }
543 }
544
545 pub async fn get_requests_in_manager_subject_id(
546 &self,
547 subject_id: DigestIdentifier,
548 ) -> Result<RequestsInManagerSubject, Error> {
549 let response = self
550 .request
551 .ask(RequestHandlerMessage::RequestInManagerSubjectId {
552 subject_id,
553 })
554 .await
555 .map_err(|e| {
556 warn!(error = %e, "Request processing failed");
557 actor_communication_error("request", e)
558 })?;
559
560 match response {
561 RequestHandlerResponse::RequestInManagerSubjectId(request) => {
562 Ok(request)
563 }
564 _ => {
565 warn!("Unexpected response from request handler");
566 Err(Error::UnexpectedResponse {
567 actor: "request".to_string(),
568 expected: "RequestInManagerSubjectId".to_string(),
569 received: "other".to_string(),
570 })
571 }
572 }
573 }
574
575 pub async fn external_request(
576 &self,
577 request: Signed<EventRequest>,
578 ) -> Result<RequestData, Error> {
579 self.ensure_mutations_allowed()?;
580 let response = self
581 .request
582 .ask(RequestHandlerMessage::NewRequest { request })
583 .await
584 .map_err(|e| {
585 warn!(error = %e, "Request processing failed");
586 actor_communication_error("request", e)
587 })?;
588
589 match response {
590 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
591 _ => {
592 warn!("Unexpected response from request handler");
593 Err(Error::UnexpectedResponse {
594 actor: "request".to_string(),
595 expected: "Ok".to_string(),
596 received: "other".to_string(),
597 })
598 }
599 }
600 }
601
602 pub async fn own_request(
603 &self,
604 request: EventRequest,
605 ) -> Result<RequestData, Error> {
606 self.ensure_mutations_allowed()?;
607 let response = self
608 .node
609 .ask(NodeMessage::SignRequest(Box::new(
610 SignTypesNode::EventRequest(request.clone()),
611 )))
612 .await
613 .map_err(|e| {
614 warn!(error = %e, "Node was unable to sign the request");
615 preserve_functional_actor_error(e, |e| {
616 Error::SigningFailed(e.to_string())
617 })
618 })?;
619
620 let signature = match response {
621 NodeResponse::SignRequest(signature) => signature,
622 _ => {
623 warn!("Unexpected response from node");
624 return Err(Error::UnexpectedResponse {
625 actor: "node".to_string(),
626 expected: "SignRequest".to_string(),
627 received: "other".to_string(),
628 });
629 }
630 };
631
632 let signed_event_req = Signed::from_parts(request, signature);
633
634 let response = self
635 .request
636 .ask(RequestHandlerMessage::NewRequest {
637 request: signed_event_req,
638 })
639 .await
640 .map_err(|e| {
641 warn!(error = %e, "Failed to send request");
642 actor_communication_error("request", e)
643 })?;
644
645 match response {
646 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
647 _ => {
648 warn!("Unexpected response from request handler");
649 Err(Error::UnexpectedResponse {
650 actor: "request".to_string(),
651 expected: "Ok".to_string(),
652 received: "other".to_string(),
653 })
654 }
655 }
656 }
657
658 pub async fn get_approval(
659 &self,
660 subject_id: DigestIdentifier,
661 state: Option<ApprovalState>,
662 ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
663 let response = self
664 .request
665 .ask(RequestHandlerMessage::GetApproval { state, subject_id })
666 .await
667 .map_err(|e| {
668 warn!(error = %e, "Failed to get approval request");
669 actor_communication_error("request", e)
670 })?;
671
672 match response {
673 RequestHandlerResponse::Approval(data) => Ok(data),
674 _ => {
675 warn!("Unexpected response from request handler");
676 Err(Error::UnexpectedResponse {
677 actor: "request".to_string(),
678 expected: "Approval".to_string(),
679 received: "other".to_string(),
680 })
681 }
682 }
683 }
684
685 pub async fn get_approvals(
686 &self,
687 state: Option<ApprovalState>,
688 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
689 let response = self
690 .request
691 .ask(RequestHandlerMessage::GetAllApprovals { state })
692 .await
693 .map_err(|e| {
694 warn!(error = %e, "Failed to get approval requests");
695 actor_communication_error("request", e)
696 })?;
697
698 match response {
699 RequestHandlerResponse::Approvals(data) => Ok(data),
700 _ => {
701 warn!("Unexpected response from request handler");
702 Err(Error::UnexpectedResponse {
703 actor: "request".to_string(),
704 expected: "Approvals".to_string(),
705 received: "other".to_string(),
706 })
707 }
708 }
709 }
710
711 pub async fn approve(
712 &self,
713 subject_id: DigestIdentifier,
714 state: ApprovalStateRes,
715 ) -> Result<String, Error> {
716 self.ensure_mutations_allowed()?;
717 if state == ApprovalStateRes::Obsolete {
718 warn!("Cannot set approval state to Obsolete");
719 return Err(Error::InvalidApprovalState("Obsolete".to_string()));
720 }
721
722 let response = self
723 .request
724 .ask(RequestHandlerMessage::ChangeApprovalState {
725 subject_id,
726 state,
727 })
728 .await
729 .map_err(|e| {
730 warn!(error = %e, "Failed to change approval state");
731 preserve_functional_actor_error(e, |e| {
732 Error::ApprovalUpdateFailed(e.to_string())
733 })
734 })?;
735
736 match response {
737 RequestHandlerResponse::Response(res) => Ok(res),
738 _ => {
739 warn!("Unexpected response from request handler");
740 Err(Error::UnexpectedResponse {
741 actor: "request".to_string(),
742 expected: "Response".to_string(),
743 received: "other".to_string(),
744 })
745 }
746 }
747 }
748
749 pub async fn manual_request_abort(
750 &self,
751 subject_id: DigestIdentifier,
752 ) -> Result<String, Error> {
753 self.ensure_mutations_allowed()?;
754 self.request
755 .tell(RequestHandlerMessage::AbortRequest { subject_id })
756 .await
757 .map_err(|e| {
758 warn!(error = %e, "Failed to abort request");
759 actor_communication_error("request", e)
760 })?;
761
762 Ok("Trying to abort".to_string())
763 }
764
765 pub async fn get_request_state(
768 &self,
769 request_id: DigestIdentifier,
770 ) -> Result<RequestInfo, Error> {
771 let Some(tracking) = &self.tracking else {
772 return Err(Error::SafeMode(
773 "request tracking is unavailable while node is running in safe mode"
774 .to_string(),
775 ));
776 };
777 let response = tracking
778 .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
779 .await
780 .map_err(|e| {
781 warn!(error = %e, "Failed to get request state");
782 actor_communication_error("tracking", e)
783 })?;
784
785 match response {
786 RequestTrackingResponse::Info(state) => Ok(state),
787 RequestTrackingResponse::NotFound => {
788 Err(Error::RequestNotFound(request_id.to_string()))
789 }
790 _ => {
791 warn!("Unexpected response from tracking");
792 Err(Error::UnexpectedResponse {
793 actor: "tracking".to_string(),
794 expected: "Info".to_string(),
795 received: "other".to_string(),
796 })
797 }
798 }
799 }
800
801 pub async fn all_request_state(
802 &self,
803 ) -> Result<Vec<RequestInfoExtend>, Error> {
804 let Some(tracking) = &self.tracking else {
805 return Err(Error::SafeMode(
806 "request tracking is unavailable while node is running in safe mode"
807 .to_string(),
808 ));
809 };
810 let response = tracking
811 .ask(RequestTrackingMessage::AllRequests)
812 .await
813 .map_err(|e| {
814 warn!(error = %e, "Failed to get all request states");
815 actor_communication_error("tracking", e)
816 })?;
817
818 match response {
819 RequestTrackingResponse::AllInfo(state) => Ok(state),
820 _ => {
821 warn!("Unexpected response from tracking");
822 Err(Error::UnexpectedResponse {
823 actor: "tracking".to_string(),
824 expected: "AllInfo".to_string(),
825 received: "other".to_string(),
826 })
827 }
828 }
829 }
830
831 pub async fn get_pending_transfers(
835 &self,
836 ) -> Result<Vec<TransferSubject>, Error> {
837 let response =
838 self.node.ask(NodeMessage::PendingTransfers).await.map_err(
839 |e| {
840 warn!(error = %e, "Failed to get pending transfers");
841 actor_communication_error("node", e)
842 },
843 )?;
844
845 let NodeResponse::PendingTransfers(pending) = response else {
846 warn!("Unexpected response from node");
847 return Err(Error::UnexpectedResponse {
848 actor: "node".to_string(),
849 expected: "PendingTransfers".to_string(),
850 received: "other".to_string(),
851 });
852 };
853
854 Ok(pending)
855 }
856
857 pub async fn auth_subject(
861 &self,
862 subject_id: DigestIdentifier,
863 witnesses: AuthWitness,
864 ) -> Result<String, Error> {
865 self.ensure_mutations_allowed()?;
866 self.auth
867 .tell(AuthMessage::NewAuth {
868 subject_id,
869 witness: witnesses,
870 })
871 .await
872 .map_err(|e| {
873 warn!(error = %e, "Authentication operation failed");
874 preserve_functional_actor_error(e, |e| {
875 Error::AuthOperation(e.to_string())
876 })
877 })?;
878
879 Ok("Ok".to_owned())
880 }
881
882 pub async fn all_auth_subjects(
883 &self,
884 ) -> Result<Vec<DigestIdentifier>, Error> {
885 let response =
886 self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
887 error!(error = %e, "Failed to get auth subjects");
888 actor_communication_error("auth", e)
889 })?;
890
891 match response {
892 AuthResponse::Auths { subjects } => Ok(subjects),
893 _ => {
894 warn!("Unexpected response from auth");
895 Err(Error::UnexpectedResponse {
896 actor: "auth".to_string(),
897 expected: "Auths".to_string(),
898 received: "other".to_string(),
899 })
900 }
901 }
902 }
903
904 pub async fn witnesses_subject(
905 &self,
906 subject_id: DigestIdentifier,
907 ) -> Result<HashSet<PublicKey>, Error> {
908 let response = self
909 .auth
910 .ask(AuthMessage::GetAuth {
911 subject_id: subject_id.clone(),
912 })
913 .await
914 .map_err(|e| {
915 warn!(error = %e, "Failed to get witnesses for subject");
916 actor_communication_error("auth", e)
917 })?;
918
919 match response {
920 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
921 _ => {
922 warn!("Unexpected response from auth");
923 Err(Error::UnexpectedResponse {
924 actor: "auth".to_string(),
925 expected: "Witnesses".to_string(),
926 received: "other".to_string(),
927 })
928 }
929 }
930 }
931
932 pub async fn delete_auth_subject(
933 &self,
934 subject_id: DigestIdentifier,
935 ) -> Result<String, Error> {
936 self.ensure_mutations_allowed()?;
937 self.auth
938 .tell(AuthMessage::DeleteAuth { subject_id })
939 .await
940 .map_err(|e| {
941 warn!(error = %e, "Failed to delete auth subject");
942 preserve_functional_actor_error(e, |e| {
943 Error::AuthOperation(e.to_string())
944 })
945 })?;
946
947 Ok("Ok".to_owned())
948 }
949
950 pub async fn update_subject(
951 &self,
952 subject_id: DigestIdentifier,
953 ) -> Result<String, Error> {
954 self.update_subject_with_options(subject_id, false).await
955 }
956
957 pub async fn update_subject_with_options(
958 &self,
959 subject_id: DigestIdentifier,
960 strict: bool,
961 ) -> Result<String, Error> {
962 self.ensure_mutations_allowed()?;
963 let response = self
964 .auth
965 .ask(AuthMessage::Update {
966 subject_id: subject_id.clone(),
967 objective: None,
968 strict,
969 })
970 .await
971 .map_err(|e| {
972 warn!(error = %e, "Failed to update subject");
973 preserve_functional_actor_error(e, |e| {
974 Error::UpdateFailed(subject_id.to_string(), e.to_string())
975 })
976 })?;
977
978 match response {
979 AuthResponse::None => Ok("Update in progress".to_owned()),
980 _ => {
981 warn!("Unexpected response from auth");
982 Err(Error::UnexpectedResponse {
983 actor: "auth".to_string(),
984 expected: "None".to_string(),
985 received: "other".to_string(),
986 })
987 }
988 }
989 }
990
991 pub async fn manual_distribution(
995 &self,
996 subject_id: DigestIdentifier,
997 ) -> Result<String, Error> {
998 self.ensure_mutations_allowed()?;
999 let Some(manual_dis) = &self.manual_dis else {
1000 return Err(safe_mode_error());
1001 };
1002 manual_dis
1003 .ask(ManualDistributionMessage::Update(subject_id.clone()))
1004 .await
1005 .map_err(|e| {
1006 warn!(error = %e, "Manual distribution failed");
1007 preserve_functional_actor_error(e, |_| {
1008 Error::DistributionFailed(subject_id.to_string())
1009 })
1010 })?;
1011
1012 Ok("Manual distribution in progress".to_owned())
1013 }
1014
1015 pub async fn delete_subject(
1016 &self,
1017 subject_id: DigestIdentifier,
1018 ) -> Result<String, Error> {
1019 self.ensure_safe_mode_required("subject deletion")?;
1020 self.begin_subject_deletion(&subject_id).await?;
1021
1022 let result = async {
1023 let subject_data = self.subject_data(&subject_id).await?;
1024
1025 match subject_data {
1026 node::SubjectData::Governance { .. } => {
1027 info!(
1028 subject_id = %subject_id,
1029 subject_type = "governance",
1030 "Deleting subject"
1031 );
1032 let trackers =
1033 self.governance_trackers(&subject_id).await?;
1034 if !trackers.is_empty() {
1035 return Err(Error::GovernanceHasTrackers {
1036 governance_id: subject_id.to_string(),
1037 trackers: trackers
1038 .into_iter()
1039 .map(|tracker| tracker.to_string())
1040 .collect(),
1041 });
1042 }
1043 let mut cleanup_errors = Vec::new();
1044
1045 match self
1046 .subject_manager
1047 .ask(SubjectManagerMessage::DeleteGovernance {
1048 subject_id: subject_id.clone(),
1049 })
1050 .await
1051 {
1052 Ok(SubjectManagerResponse::DeleteGovernance) => {}
1053 Ok(other) => cleanup_errors.push(format!(
1054 "subject_manager: unexpected response {other:?}"
1055 )),
1056 Err(err) => cleanup_errors
1057 .push(format!("subject_manager: {err}")),
1058 }
1059
1060 self.purge_common_subject_state(
1061 &subject_id,
1062 &mut cleanup_errors,
1063 )
1064 .await;
1065
1066 self.delete_subject_from_node(
1067 &subject_id,
1068 &mut cleanup_errors,
1069 )
1070 .await;
1071
1072 if cleanup_errors.is_empty() {
1073 info!(
1074 subject_id = %subject_id,
1075 subject_type = "governance",
1076 "Subject deleted successfully"
1077 );
1078 Ok("Governance deleted successfully".to_owned())
1079 } else {
1080 Err(Error::Internal(format!(
1081 "governance deletion completed partially: {}",
1082 cleanup_errors.join("; ")
1083 )))
1084 }
1085 }
1086 node::SubjectData::Tracker { .. } => {
1087 info!(
1088 subject_id = %subject_id,
1089 subject_type = "tracker",
1090 "Deleting subject"
1091 );
1092 let mut cleanup_errors = Vec::new();
1093
1094 self.purge_common_subject_state(
1095 &subject_id,
1096 &mut cleanup_errors,
1097 )
1098 .await;
1099
1100 match self
1101 .subject_manager
1102 .ask(SubjectManagerMessage::DeleteTracker {
1103 subject_id: subject_id.clone(),
1104 })
1105 .await
1106 {
1107 Ok(SubjectManagerResponse::DeleteTracker) => {}
1108 Ok(other) => cleanup_errors.push(format!(
1109 "subject_manager: unexpected response {other:?}"
1110 )),
1111 Err(err) => cleanup_errors
1112 .push(format!("subject_manager: {err}")),
1113 }
1114
1115 self.delete_subject_from_node(
1116 &subject_id,
1117 &mut cleanup_errors,
1118 )
1119 .await;
1120
1121 if cleanup_errors.is_empty() {
1122 info!(
1123 subject_id = %subject_id,
1124 subject_type = "tracker",
1125 "Subject deleted successfully"
1126 );
1127 Ok("Tracker deleted successfully".to_owned())
1128 } else {
1129 Err(Error::Internal(format!(
1130 "tracker deletion completed partially: {}",
1131 cleanup_errors.join("; ")
1132 )))
1133 }
1134 }
1135 }
1136 }
1137 .await;
1138
1139 self.end_subject_deletion(&subject_id).await;
1140 result
1141 }
1142
1143 pub async fn all_govs(
1146 &self,
1147 active: Option<bool>,
1148 ) -> Result<Vec<GovsData>, Error> {
1149 self.db.get_governances(active).await.map_err(|e| {
1150 warn!(error = %e, "Failed to get governances");
1151 Error::QueryFailed(e.to_string())
1152 })
1153 }
1154
1155 pub async fn all_subjs(
1156 &self,
1157 governance_id: DigestIdentifier,
1158 active: Option<bool>,
1159 schema_id: Option<String>,
1160 ) -> Result<Vec<SubjsData>, Error> {
1161 let governance_id = governance_id.to_string();
1162 match self
1163 .db
1164 .get_subjects(&governance_id, active, schema_id)
1165 .await
1166 {
1167 Ok(subjects) => Ok(subjects),
1168 Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
1169 Err(Error::GovernanceNotFound(governance_id))
1170 }
1171 Err(e) => {
1172 warn!(error = %e, "Failed to get subjects");
1173 Err(Error::QueryFailed(e.to_string()))
1174 }
1175 }
1176 }
1177
1178 pub async fn get_events(
1181 &self,
1182 subject_id: DigestIdentifier,
1183 query: EventsQuery,
1184 ) -> Result<PaginatorEvents, Error> {
1185 let subject_id_str = subject_id.to_string();
1186
1187 match self.db.get_events(&subject_id_str, query).await {
1188 Ok(data) => Ok(data),
1189 Err(ExternalDatabaseError::NoEvents(_)) => {
1190 Err(Error::NoEventsFound(subject_id_str))
1191 }
1192 Err(e) => {
1193 warn!(error = %e, "Failed to get events");
1194 Err(Error::QueryFailed(e.to_string()))
1195 }
1196 }
1197 }
1198
1199 pub async fn get_sink_events(
1200 &self,
1201 subject_id: DigestIdentifier,
1202 query: SinkEventsQuery,
1203 ) -> Result<SinkEventsPage, Error> {
1204 let response = self
1205 .node
1206 .ask(NodeMessage::GetSinkEvents {
1207 subject_id,
1208 from_sn: query.from_sn.unwrap_or(0),
1209 to_sn: query.to_sn,
1210 limit: query.limit.unwrap_or(100),
1211 })
1212 .await
1213 .map_err(|e| {
1214 warn!(error = %e, "Failed to replay sink events");
1215 Error::from(e)
1216 })?;
1217
1218 match response {
1219 NodeResponse::SinkEvents(events) => Ok(events),
1220 _ => Err(Error::UnexpectedResponse {
1221 actor: "node".to_string(),
1222 expected: "SinkEvents".to_string(),
1223 received: "other".to_string(),
1224 }),
1225 }
1226 }
1227
1228 pub async fn get_aborts(
1229 &self,
1230 subject_id: DigestIdentifier,
1231 query: AbortsQuery,
1232 ) -> Result<PaginatorAborts, Error> {
1233 let subject_id_str = subject_id.to_string();
1234 let request_id = if let Some(request_id) = query.request_id.as_ref() {
1235 Some(
1236 DigestIdentifier::from_str(request_id)
1237 .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
1238 .to_string(),
1239 )
1240 } else {
1241 None
1242 };
1243 let query = AbortsQuery {
1244 request_id,
1245 sn: query.sn,
1246 quantity: query.quantity,
1247 page: query.page,
1248 reverse: query.reverse,
1249 };
1250
1251 self.db
1252 .get_aborts(&subject_id_str, query)
1253 .await
1254 .map_err(|e| {
1255 warn!(error = %e, "Failed to get aborts");
1256 Error::QueryFailed(e.to_string())
1257 })
1258 }
1259
1260 pub async fn get_event_sn(
1261 &self,
1262 subject_id: DigestIdentifier,
1263 sn: u64,
1264 ) -> Result<LedgerDB, Error> {
1265 let subject_id_str = subject_id.to_string();
1266
1267 match self.db.get_event_sn(&subject_id_str, sn).await {
1268 Ok(data) => Ok(data),
1269 Err(ExternalDatabaseError::EventNotFound { .. }) => {
1270 Err(Error::EventNotFound {
1271 subject: subject_id_str,
1272 sn,
1273 })
1274 }
1275 Err(e) => {
1276 warn!(error = %e, "Failed to get event");
1277 Err(Error::QueryFailed(e.to_string()))
1278 }
1279 }
1280 }
1281
1282 pub async fn get_first_or_end_events(
1283 &self,
1284 subject_id: DigestIdentifier,
1285 quantity: Option<u64>,
1286 reverse: Option<bool>,
1287 event_type: Option<EventRequestType>,
1288 ) -> Result<Vec<LedgerDB>, Error> {
1289 let subject_id_str = subject_id.to_string();
1290
1291 match self
1292 .db
1293 .get_first_or_end_events(
1294 &subject_id_str,
1295 quantity,
1296 reverse,
1297 event_type,
1298 )
1299 .await
1300 {
1301 Ok(data) => Ok(data),
1302 Err(ExternalDatabaseError::NoEvents(_)) => {
1303 Err(Error::NoEventsFound(subject_id_str))
1304 }
1305 Err(e) => {
1306 warn!(error = %e, "Failed to get events");
1307 Err(Error::QueryFailed(e.to_string()))
1308 }
1309 }
1310 }
1311
1312 pub async fn get_subject_state(
1313 &self,
1314 subject_id: DigestIdentifier,
1315 ) -> Result<SubjectDB, Error> {
1316 let subject_id_str = subject_id.to_string();
1317
1318 match self.db.get_subject_state(&subject_id_str).await {
1319 Ok(data) => Ok(data),
1320 Err(ExternalDatabaseError::SubjectNotFound(_)) => {
1321 Err(Error::SubjectNotFound(subject_id_str))
1322 }
1323 Err(e) => {
1324 warn!(error = %e, "Failed to get subject state");
1325 Err(Error::QueryFailed(e.to_string()))
1326 }
1327 }
1328 }
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333 use super::*;
1334 use ave_actors::{ActorError, ActorPath};
1335
1336 #[test]
1337 fn preserves_functional_actor_errors() {
1338 let error = preserve_functional_actor_error(
1339 ActorError::Functional {
1340 description: "Is not a Creator".to_string(),
1341 },
1342 |_| Error::ActorCommunication {
1343 actor: "request".to_string(),
1344 },
1345 );
1346
1347 assert!(
1348 matches!(error, Error::ActorError(message) if message == "Is not a Creator")
1349 );
1350 }
1351
1352 #[test]
1353 fn preserves_not_found_actor_errors() {
1354 let error = preserve_functional_actor_error(
1355 ActorError::NotFound {
1356 path: ActorPath::from("/user/request"),
1357 },
1358 |_| Error::ActorCommunication {
1359 actor: "request".to_string(),
1360 },
1361 );
1362
1363 assert!(matches!(
1364 error,
1365 Error::MissingResource { name, .. } if name == "/user/request"
1366 ));
1367 }
1368}