1#![recursion_limit = "256"]
2pub mod config;
3pub mod error;
4
5pub mod approval;
6pub mod auth;
7pub mod db;
8pub mod distribution;
9pub mod evaluation;
10pub mod external_db;
11pub mod governance;
12pub mod helpers;
13pub mod manual_distribution;
14pub mod metrics;
15pub mod model;
16pub mod node;
17pub mod request;
18pub mod subject;
19pub mod system;
20pub mod tracker;
21pub mod update;
22pub mod validation;
23
24use std::collections::HashSet;
25use std::str::FromStr;
26use std::sync::Arc;
27
28use auth::{Auth, AuthMessage, AuthResponse, AuthWitness};
29use ave_actors::{ActorError, ActorPath, ActorRef, PersistentActor};
30use ave_common::bridge::request::{
31 AbortsQuery, ApprovalState, ApprovalStateRes, EventRequestType,
32 EventsQuery, SinkEventsQuery,
33};
34use ave_common::identity::keys::KeyPair;
35use ave_common::identity::{DigestIdentifier, PublicKey, Signed};
36use ave_common::request::EventRequest;
37use ave_common::response::{
38 GovsData, LedgerDB, MonitorNetworkState, PaginatorAborts, PaginatorEvents,
39 RequestInfo, RequestInfoExtend, RequestsInManager,
40 RequestsInManagerSubject, SinkEventsPage, SubjectDB, SubjsData,
41};
42use config::Config as AveBaseConfig;
43use error::Error;
44use helpers::network::*;
45use intermediary::Intermediary;
46use manual_distribution::{ManualDistribution, ManualDistributionMessage};
47use network::{
48 MachineSpec, Monitor, MonitorMessage, MonitorResponse, NetworkWorker,
49};
50
51use node::{Node, NodeMessage, NodeResponse, TransferSubject};
52use prometheus_client::registry::Registry;
53use request::{
54 RequestData, RequestHandler, RequestHandlerMessage, RequestHandlerResponse,
55};
56use system::system;
57use tokio::task::JoinHandle;
58use tokio_util::sync::CancellationToken;
59use tracing::{debug, error, info, warn};
60use validation::{Validation, ValidationMessage};
61
62use crate::approval::request::ApprovalReq;
63use crate::config::SinkAuth;
64use crate::helpers::db::{
65 DatabaseError as ExternalDatabaseError, ExternalDB, ReadStore,
66};
67use crate::model::common::node::SignTypesNode;
68use crate::node::InitParamsNode;
69use crate::node::subject_manager::{
70 SubjectManager, SubjectManagerMessage, SubjectManagerResponse,
71};
72use crate::request::tracking::{
73 RequestTracking, RequestTrackingMessage, RequestTrackingResponse,
74};
75
76#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
77compile_error!("Select only one: 'sqlite' or 'rocksdb'");
78
79#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
80compile_error!("You must enable 'sqlite' or 'rocksdb'");
81
82#[cfg(not(feature = "ext-sqlite"))]
83compile_error!("You must enable 'ext-sqlite'");
84
85#[cfg(all(feature = "test", not(test), not(debug_assertions)))]
86compile_error!(
87 "The 'test' feature should only be used during development/testing"
88);
89
90#[derive(Clone)]
91pub struct Api {
92 peer_id: String,
93 public_key: String,
94 safe_mode: bool,
95 deleting_subject: Arc<tokio::sync::Mutex<Option<DigestIdentifier>>>,
96 db: Arc<ExternalDB>,
97 request: ActorRef<RequestHandler>,
98 node: ActorRef<Node>,
99 subject_manager: ActorRef<SubjectManager>,
100 auth: ActorRef<Auth>,
101 monitor: ActorRef<Monitor>,
102 manual_dis: Option<ActorRef<ManualDistribution>>,
103 tracking: Option<ActorRef<RequestTracking>>,
104}
105
106fn preserve_functional_actor_error<F>(err: ActorError, fallback: F) -> Error
107where
108 F: FnOnce(ActorError) -> Error,
109{
110 match err {
111 ActorError::Functional { description } => {
112 Error::ActorError(description)
113 }
114 ActorError::FunctionalCritical { description } => {
115 Error::Internal(description)
116 }
117 ActorError::NotFound { path } => Error::MissingResource {
118 name: path.to_string(),
119 reason: "Actor not found".to_string(),
120 },
121 other => fallback(other),
122 }
123}
124
125fn actor_communication_error(actor: &'static str, err: ActorError) -> Error {
126 preserve_functional_actor_error(err, |_| Error::ActorCommunication {
127 actor: actor.to_string(),
128 })
129}
130
131fn safe_mode_error() -> Error {
132 Error::SafeMode(
133 "node is running in safe mode; mutating operations are disabled"
134 .to_string(),
135 )
136}
137
138fn safe_mode_required_error(operation: &'static str) -> Error {
139 Error::SafeMode(format!(
140 "{operation} is only available while node is running in safe mode"
141 ))
142}
143
144impl Api {
145 async fn begin_subject_deletion(
146 &self,
147 subject_id: &DigestIdentifier,
148 ) -> Result<(), Error> {
149 {
150 let mut deleting = self.deleting_subject.lock().await;
151 if let Some(active_subject_id) = deleting.as_ref() {
152 return Err(Error::InvalidRequestState(format!(
153 "subject deletion already in progress for '{}'",
154 active_subject_id
155 )));
156 }
157 *deleting = Some(subject_id.clone());
158 }
159 Ok(())
160 }
161
162 async fn end_subject_deletion(&self, subject_id: &DigestIdentifier) {
163 let mut deleting = self.deleting_subject.lock().await;
164 if deleting.as_ref() == Some(subject_id) {
165 *deleting = None;
166 }
167 }
168
169 fn ensure_mutations_allowed(&self) -> Result<(), Error> {
170 if self.safe_mode {
171 return Err(safe_mode_error());
172 }
173 Ok(())
174 }
175
176 fn ensure_safe_mode_required(
177 &self,
178 operation: &'static str,
179 ) -> Result<(), Error> {
180 if !self.safe_mode {
181 return Err(safe_mode_required_error(operation));
182 }
183 Ok(())
184 }
185
186 async fn subject_data(
187 &self,
188 subject_id: &DigestIdentifier,
189 ) -> Result<node::SubjectData, Error> {
190 let response = self
191 .node
192 .ask(NodeMessage::GetSubjectData(subject_id.clone()))
193 .await
194 .map_err(|e| actor_communication_error("node", e))?;
195
196 let NodeResponse::SubjectData(subject_data) = response else {
197 return Err(Error::UnexpectedResponse {
198 actor: "node".to_string(),
199 expected: "NodeResponse::SubjectData".to_string(),
200 received: "other".to_string(),
201 });
202 };
203
204 subject_data
205 .ok_or_else(|| Error::SubjectNotFound(subject_id.to_string()))
206 }
207
208 async fn governance_trackers(
209 &self,
210 governance_id: &DigestIdentifier,
211 ) -> Result<Vec<DigestIdentifier>, Error> {
212 let response = self
213 .node
214 .ask(NodeMessage::GovernanceTrackers(governance_id.clone()))
215 .await
216 .map_err(|e| actor_communication_error("node", e))?;
217
218 let NodeResponse::GovernanceTrackers(trackers) = response else {
219 return Err(Error::UnexpectedResponse {
220 actor: "node".to_string(),
221 expected: "NodeResponse::GovernanceTrackers".to_string(),
222 received: "other".to_string(),
223 });
224 };
225
226 Ok(trackers)
227 }
228
229 async fn purge_common_subject_state(
230 &self,
231 subject_id: &DigestIdentifier,
232 cleanup_errors: &mut Vec<String>,
233 ) {
234 match self
235 .request
236 .ask(RequestHandlerMessage::PurgeSubject {
237 subject_id: subject_id.clone(),
238 })
239 .await
240 {
241 Ok(RequestHandlerResponse::None) => {}
242 Ok(other) => cleanup_errors
243 .push(format!("request: unexpected response {other:?}")),
244 Err(err) => cleanup_errors.push(format!("request: {err}")),
245 }
246
247 match self
248 .auth
249 .ask(AuthMessage::DeleteAuth {
250 subject_id: subject_id.clone(),
251 })
252 .await
253 {
254 Ok(AuthResponse::None) => {}
255 Ok(other) => cleanup_errors
256 .push(format!("auth: unexpected response {other:?}")),
257 Err(err) => cleanup_errors.push(format!("auth: {err}")),
258 }
259
260 if let Err(err) = self.db.delete_subject(&subject_id.to_string()).await
261 {
262 cleanup_errors.push(format!("external_db: {err}"));
263 }
264 }
265
266 async fn delete_subject_from_node(
267 &self,
268 subject_id: &DigestIdentifier,
269 cleanup_errors: &mut Vec<String>,
270 ) {
271 match self
272 .node
273 .ask(NodeMessage::DeleteSubject(subject_id.clone()))
274 .await
275 {
276 Ok(NodeResponse::Ok) => {}
277 Ok(other) => cleanup_errors
278 .push(format!("node: unexpected response {other:?}")),
279 Err(err) => cleanup_errors.push(format!("node: {err}")),
280 }
281 }
282
283 pub async fn build(
285 keys: KeyPair,
286 mut config: AveBaseConfig,
287 sink_auth: SinkAuth,
288 registry: &mut Registry,
289 password: &str,
290 graceful_token: CancellationToken,
291 crash_token: CancellationToken,
292 ) -> Result<(Self, Vec<JoinHandle<()>>), Error> {
293 debug!("Creating Api");
294
295 let (system, runner) = system(
296 config.clone(),
297 sink_auth,
298 password,
299 graceful_token.clone(),
300 crash_token.clone(),
301 )
302 .await
303 .map_err(|e| {
304 error!(error = %e, "Failed to create system");
305 e
306 })?;
307
308 config.network.safe_mode = config.safe_mode;
309
310 let newtork_monitor = Monitor::default();
311 let newtork_monitor_actor = system
312 .create_root_actor("network_monitor", newtork_monitor)
313 .await
314 .map_err(|e| {
315 error!(error = %e, "Can not create network_monitor actor");
316 Error::ActorCreation {
317 actor: "network_monitor".to_string(),
318 reason: e.to_string(),
319 }
320 })?;
321
322 let spec = config.spec.map(MachineSpec::from);
323 let network_metrics = network::metrics::register(registry);
324 crate::metrics::register(registry);
325
326 let mut worker: NetworkWorker<NetworkMessage> = NetworkWorker::new(
327 &keys,
328 config.network.clone(),
329 Some(newtork_monitor_actor.clone()),
330 graceful_token.clone(),
331 crash_token.clone(),
332 spec,
333 Some(network_metrics),
334 )
335 .map_err(|e| {
336 error!(error = %e, "Can not create networt");
337 Error::Network(e.to_string())
338 })?;
339
340 let service = Intermediary::build(
342 worker.service().sender(),
343 system.clone(),
344 graceful_token.clone(),
345 crash_token.clone(),
346 );
347
348 let peer_id = worker.local_peer_id().to_string();
349
350 worker.add_helper_sender(service.sender());
351
352 system.add_helper("network", service.clone()).await;
353
354 let worker_runner = tokio::spawn(async move {
355 let _ = worker.run().await;
356 });
357
358 let public_key = Arc::new(keys.public_key());
359 let node_actor = system
360 .create_root_actor(
361 "node",
362 Node::initial(InitParamsNode {
363 key_pair: keys.clone(),
364 hash: config.hash_algorithm,
365 is_service: config.is_service,
366 public_key: public_key.clone(),
367 }),
368 )
369 .await
370 .map_err(|e| {
371 error!(error = %e, "Init system, can not create node actor");
372 Error::ActorCreation {
373 actor: "node".to_string(),
374 reason: e.to_string(),
375 }
376 })?;
377
378 let manual_dis_actor = if config.safe_mode {
379 None
380 } else {
381 Some(
382 system
383 .get_actor(&ActorPath::from(
384 "/user/node/manual_distribution",
385 ))
386 .await
387 .map_err(|e| {
388 error!(
389 error = %e,
390 "Failed to get manual_distribution actor"
391 );
392 e
393 })?,
394 )
395 };
396
397 let auth_actor: ActorRef<Auth> = system
398 .get_actor(&ActorPath::from("/user/node/auth"))
399 .await
400 .map_err(|e| {
401 error!(error = %e, "Failed to get auth actor");
402 e
403 })?;
404
405 let subject_manager_actor: ActorRef<SubjectManager> = system
406 .get_actor(&ActorPath::from("/user/node/subject_manager"))
407 .await
408 .map_err(|e| {
409 error!(error = %e, "Failed to get subject_manager actor");
410 e
411 })?;
412
413 let request_actor = system
414 .create_root_actor(
415 "request",
416 RequestHandler::initial((
417 public_key,
418 (config.hash_algorithm, service),
419 )),
420 )
421 .await
422 .map_err(|e| {
423 error!(error = %e, "Init system, can not create request actor");
424 Error::ActorCreation {
425 actor: "request".to_string(),
426 reason: e.to_string(),
427 }
428 })?;
429
430 let tracking_actor = if config.safe_mode {
431 None
432 } else {
433 Some(
434 system
435 .get_actor(&ActorPath::from("/user/request/tracking"))
436 .await
437 .map_err(|e| {
438 error!(error = %e, "Failed to get tracking actor");
439 e
440 })?,
441 )
442 };
443
444 let Some(ext_db) = system.get_helper::<Arc<ExternalDB>>("ext_db").await
445 else {
446 error!("External database helper not found");
447 return Err(Error::MissingResource {
448 name: "ext_db".to_string(),
449 reason: "External database helper not found".to_string(),
450 });
451 };
452
453 ext_db.register_prometheus_metrics(registry);
454
455 let tasks = Vec::from([runner, worker_runner]);
456
457 Ok((
458 Self {
459 public_key: keys.public_key().to_string(),
460 peer_id,
461 safe_mode: config.safe_mode,
462 deleting_subject: Arc::new(tokio::sync::Mutex::new(None)),
463 db: ext_db,
464 request: request_actor,
465 auth: auth_actor,
466 node: node_actor,
467 subject_manager: subject_manager_actor,
468 monitor: newtork_monitor_actor,
469 manual_dis: manual_dis_actor,
470 tracking: tracking_actor,
471 },
472 tasks,
473 ))
474 }
475
476 pub fn peer_id(&self) -> &str {
480 &self.peer_id
481 }
482
483 pub fn public_key(&self) -> &str {
484 &self.public_key
485 }
486
487 pub async fn get_network_state(
490 &self,
491 ) -> Result<MonitorNetworkState, Error> {
492 let response =
493 self.monitor.ask(MonitorMessage::State).await.map_err(|e| {
494 warn!(error = %e, "Unable to retrieve network state");
495 preserve_functional_actor_error(e, |e| {
496 Error::NetworkState(e.to_string())
497 })
498 })?;
499
500 match response {
501 MonitorResponse::State(state) => Ok(state),
502 _ => {
503 warn!("Unexpected response from network monitor");
504 Err(Error::UnexpectedResponse {
505 actor: "network_monitor".to_string(),
506 expected: "State".to_string(),
507 received: "other".to_string(),
508 })
509 }
510 }
511 }
512
513 pub async fn get_requests_in_manager(
517 &self,
518 ) -> Result<RequestsInManager, Error> {
519 let response = self
520 .request
521 .ask(RequestHandlerMessage::RequestInManager)
522 .await
523 .map_err(|e| {
524 warn!(error = %e, "Request processing failed");
525 actor_communication_error("request", e)
526 })?;
527
528 match response {
529 RequestHandlerResponse::RequestInManager(request) => Ok(request),
530 _ => {
531 warn!("Unexpected response from request handler");
532 Err(Error::UnexpectedResponse {
533 actor: "request".to_string(),
534 expected: "RequestInManager".to_string(),
535 received: "other".to_string(),
536 })
537 }
538 }
539 }
540
541 pub async fn get_requests_in_manager_subject_id(
542 &self,
543 subject_id: DigestIdentifier,
544 ) -> Result<RequestsInManagerSubject, Error> {
545 let response = self
546 .request
547 .ask(RequestHandlerMessage::RequestInManagerSubjectId {
548 subject_id,
549 })
550 .await
551 .map_err(|e| {
552 warn!(error = %e, "Request processing failed");
553 actor_communication_error("request", e)
554 })?;
555
556 match response {
557 RequestHandlerResponse::RequestInManagerSubjectId(request) => {
558 Ok(request)
559 }
560 _ => {
561 warn!("Unexpected response from request handler");
562 Err(Error::UnexpectedResponse {
563 actor: "request".to_string(),
564 expected: "RequestInManagerSubjectId".to_string(),
565 received: "other".to_string(),
566 })
567 }
568 }
569 }
570
571 pub async fn external_request(
572 &self,
573 request: Signed<EventRequest>,
574 ) -> Result<RequestData, Error> {
575 self.ensure_mutations_allowed()?;
576 let response = self
577 .request
578 .ask(RequestHandlerMessage::NewRequest { request })
579 .await
580 .map_err(|e| {
581 warn!(error = %e, "Request processing failed");
582 actor_communication_error("request", e)
583 })?;
584
585 match response {
586 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
587 _ => {
588 warn!("Unexpected response from request handler");
589 Err(Error::UnexpectedResponse {
590 actor: "request".to_string(),
591 expected: "Ok".to_string(),
592 received: "other".to_string(),
593 })
594 }
595 }
596 }
597
598 pub async fn own_request(
599 &self,
600 request: EventRequest,
601 ) -> Result<RequestData, Error> {
602 self.ensure_mutations_allowed()?;
603 let response = self
604 .node
605 .ask(NodeMessage::SignRequest(Box::new(
606 SignTypesNode::EventRequest(request.clone()),
607 )))
608 .await
609 .map_err(|e| {
610 warn!(error = %e, "Node was unable to sign the request");
611 preserve_functional_actor_error(e, |e| {
612 Error::SigningFailed(e.to_string())
613 })
614 })?;
615
616 let signature = match response {
617 NodeResponse::SignRequest(signature) => signature,
618 _ => {
619 warn!("Unexpected response from node");
620 return Err(Error::UnexpectedResponse {
621 actor: "node".to_string(),
622 expected: "SignRequest".to_string(),
623 received: "other".to_string(),
624 });
625 }
626 };
627
628 let signed_event_req = Signed::from_parts(request, signature);
629
630 let response = self
631 .request
632 .ask(RequestHandlerMessage::NewRequest {
633 request: signed_event_req,
634 })
635 .await
636 .map_err(|e| {
637 warn!(error = %e, "Failed to send request");
638 actor_communication_error("request", e)
639 })?;
640
641 match response {
642 RequestHandlerResponse::Ok(request_data) => Ok(request_data),
643 _ => {
644 warn!("Unexpected response from request handler");
645 Err(Error::UnexpectedResponse {
646 actor: "request".to_string(),
647 expected: "Ok".to_string(),
648 received: "other".to_string(),
649 })
650 }
651 }
652 }
653
654 pub async fn get_approval(
655 &self,
656 subject_id: DigestIdentifier,
657 state: Option<ApprovalState>,
658 ) -> Result<Option<(ApprovalReq, ApprovalState)>, Error> {
659 let response = self
660 .request
661 .ask(RequestHandlerMessage::GetApproval { state, subject_id })
662 .await
663 .map_err(|e| {
664 warn!(error = %e, "Failed to get approval request");
665 actor_communication_error("request", e)
666 })?;
667
668 match response {
669 RequestHandlerResponse::Approval(data) => Ok(data),
670 _ => {
671 warn!("Unexpected response from request handler");
672 Err(Error::UnexpectedResponse {
673 actor: "request".to_string(),
674 expected: "Approval".to_string(),
675 received: "other".to_string(),
676 })
677 }
678 }
679 }
680
681 pub async fn get_approvals(
682 &self,
683 state: Option<ApprovalState>,
684 ) -> Result<Vec<(ApprovalReq, ApprovalState)>, Error> {
685 let response = self
686 .request
687 .ask(RequestHandlerMessage::GetAllApprovals { state })
688 .await
689 .map_err(|e| {
690 warn!(error = %e, "Failed to get approval requests");
691 actor_communication_error("request", e)
692 })?;
693
694 match response {
695 RequestHandlerResponse::Approvals(data) => Ok(data),
696 _ => {
697 warn!("Unexpected response from request handler");
698 Err(Error::UnexpectedResponse {
699 actor: "request".to_string(),
700 expected: "Approvals".to_string(),
701 received: "other".to_string(),
702 })
703 }
704 }
705 }
706
707 pub async fn approve(
708 &self,
709 subject_id: DigestIdentifier,
710 state: ApprovalStateRes,
711 ) -> Result<String, Error> {
712 self.ensure_mutations_allowed()?;
713 if state == ApprovalStateRes::Obsolete {
714 warn!("Cannot set approval state to Obsolete");
715 return Err(Error::InvalidApprovalState("Obsolete".to_string()));
716 }
717
718 let response = self
719 .request
720 .ask(RequestHandlerMessage::ChangeApprovalState {
721 subject_id,
722 state,
723 })
724 .await
725 .map_err(|e| {
726 warn!(error = %e, "Failed to change approval state");
727 preserve_functional_actor_error(e, |e| {
728 Error::ApprovalUpdateFailed(e.to_string())
729 })
730 })?;
731
732 match response {
733 RequestHandlerResponse::Response(res) => Ok(res),
734 _ => {
735 warn!("Unexpected response from request handler");
736 Err(Error::UnexpectedResponse {
737 actor: "request".to_string(),
738 expected: "Response".to_string(),
739 received: "other".to_string(),
740 })
741 }
742 }
743 }
744
745 pub async fn manual_request_abort(
746 &self,
747 subject_id: DigestIdentifier,
748 ) -> Result<String, Error> {
749 self.ensure_mutations_allowed()?;
750 self.request
751 .tell(RequestHandlerMessage::AbortRequest { subject_id })
752 .await
753 .map_err(|e| {
754 warn!(error = %e, "Failed to abort request");
755 actor_communication_error("request", e)
756 })?;
757
758 Ok("Trying to abort".to_string())
759 }
760
761 pub async fn get_request_state(
764 &self,
765 request_id: DigestIdentifier,
766 ) -> Result<RequestInfo, Error> {
767 let Some(tracking) = &self.tracking else {
768 return Err(Error::SafeMode(
769 "request tracking is unavailable while node is running in safe mode"
770 .to_string(),
771 ));
772 };
773 let response = tracking
774 .ask(RequestTrackingMessage::SearchRequest(request_id.clone()))
775 .await
776 .map_err(|e| {
777 warn!(error = %e, "Failed to get request state");
778 actor_communication_error("tracking", e)
779 })?;
780
781 match response {
782 RequestTrackingResponse::Info(state) => Ok(state),
783 RequestTrackingResponse::NotFound => {
784 Err(Error::RequestNotFound(request_id.to_string()))
785 }
786 _ => {
787 warn!("Unexpected response from tracking");
788 Err(Error::UnexpectedResponse {
789 actor: "tracking".to_string(),
790 expected: "Info".to_string(),
791 received: "other".to_string(),
792 })
793 }
794 }
795 }
796
797 pub async fn all_request_state(
798 &self,
799 ) -> Result<Vec<RequestInfoExtend>, Error> {
800 let Some(tracking) = &self.tracking else {
801 return Err(Error::SafeMode(
802 "request tracking is unavailable while node is running in safe mode"
803 .to_string(),
804 ));
805 };
806 let response = tracking
807 .ask(RequestTrackingMessage::AllRequests)
808 .await
809 .map_err(|e| {
810 warn!(error = %e, "Failed to get all request states");
811 actor_communication_error("tracking", e)
812 })?;
813
814 match response {
815 RequestTrackingResponse::AllInfo(state) => Ok(state),
816 _ => {
817 warn!("Unexpected response from tracking");
818 Err(Error::UnexpectedResponse {
819 actor: "tracking".to_string(),
820 expected: "AllInfo".to_string(),
821 received: "other".to_string(),
822 })
823 }
824 }
825 }
826
827 pub async fn get_pending_transfers(
831 &self,
832 ) -> Result<Vec<TransferSubject>, Error> {
833 let response =
834 self.node.ask(NodeMessage::PendingTransfers).await.map_err(
835 |e| {
836 warn!(error = %e, "Failed to get pending transfers");
837 actor_communication_error("node", e)
838 },
839 )?;
840
841 let NodeResponse::PendingTransfers(pending) = response else {
842 warn!("Unexpected response from node");
843 return Err(Error::UnexpectedResponse {
844 actor: "node".to_string(),
845 expected: "PendingTransfers".to_string(),
846 received: "other".to_string(),
847 });
848 };
849
850 Ok(pending)
851 }
852
853 pub async fn auth_subject(
857 &self,
858 subject_id: DigestIdentifier,
859 witnesses: AuthWitness,
860 ) -> Result<String, Error> {
861 self.ensure_mutations_allowed()?;
862 self.auth
863 .tell(AuthMessage::NewAuth {
864 subject_id,
865 witness: witnesses,
866 })
867 .await
868 .map_err(|e| {
869 warn!(error = %e, "Authentication operation failed");
870 preserve_functional_actor_error(e, |e| {
871 Error::AuthOperation(e.to_string())
872 })
873 })?;
874
875 Ok("Ok".to_owned())
876 }
877
878 pub async fn all_auth_subjects(
879 &self,
880 ) -> Result<Vec<DigestIdentifier>, Error> {
881 let response =
882 self.auth.ask(AuthMessage::GetAuths).await.map_err(|e| {
883 error!(error = %e, "Failed to get auth subjects");
884 actor_communication_error("auth", e)
885 })?;
886
887 match response {
888 AuthResponse::Auths { subjects } => Ok(subjects),
889 _ => {
890 warn!("Unexpected response from auth");
891 Err(Error::UnexpectedResponse {
892 actor: "auth".to_string(),
893 expected: "Auths".to_string(),
894 received: "other".to_string(),
895 })
896 }
897 }
898 }
899
900 pub async fn witnesses_subject(
901 &self,
902 subject_id: DigestIdentifier,
903 ) -> Result<HashSet<PublicKey>, Error> {
904 let response = self
905 .auth
906 .ask(AuthMessage::GetAuth {
907 subject_id: subject_id.clone(),
908 })
909 .await
910 .map_err(|e| {
911 warn!(error = %e, "Failed to get witnesses for subject");
912 actor_communication_error("auth", e)
913 })?;
914
915 match response {
916 AuthResponse::Witnesses(witnesses) => Ok(witnesses),
917 _ => {
918 warn!("Unexpected response from auth");
919 Err(Error::UnexpectedResponse {
920 actor: "auth".to_string(),
921 expected: "Witnesses".to_string(),
922 received: "other".to_string(),
923 })
924 }
925 }
926 }
927
928 pub async fn delete_auth_subject(
929 &self,
930 subject_id: DigestIdentifier,
931 ) -> Result<String, Error> {
932 self.ensure_mutations_allowed()?;
933 self.auth
934 .tell(AuthMessage::DeleteAuth { subject_id })
935 .await
936 .map_err(|e| {
937 warn!(error = %e, "Failed to delete auth subject");
938 preserve_functional_actor_error(e, |e| {
939 Error::AuthOperation(e.to_string())
940 })
941 })?;
942
943 Ok("Ok".to_owned())
944 }
945
946 pub async fn update_subject(
947 &self,
948 subject_id: DigestIdentifier,
949 ) -> Result<String, Error> {
950 self.ensure_mutations_allowed()?;
951 let response = self
952 .auth
953 .ask(AuthMessage::Update {
954 subject_id: subject_id.clone(),
955 objective: None,
956 })
957 .await
958 .map_err(|e| {
959 warn!(error = %e, "Failed to update subject");
960 preserve_functional_actor_error(e, |e| {
961 Error::UpdateFailed(subject_id.to_string(), e.to_string())
962 })
963 })?;
964
965 match response {
966 AuthResponse::None => Ok("Update in progress".to_owned()),
967 _ => {
968 warn!("Unexpected response from auth");
969 Err(Error::UnexpectedResponse {
970 actor: "auth".to_string(),
971 expected: "None".to_string(),
972 received: "other".to_string(),
973 })
974 }
975 }
976 }
977
978 pub async fn manual_distribution(
982 &self,
983 subject_id: DigestIdentifier,
984 ) -> Result<String, Error> {
985 self.ensure_mutations_allowed()?;
986 let Some(manual_dis) = &self.manual_dis else {
987 return Err(safe_mode_error());
988 };
989 manual_dis
990 .ask(ManualDistributionMessage::Update(subject_id.clone()))
991 .await
992 .map_err(|e| {
993 warn!(error = %e, "Manual distribution failed");
994 preserve_functional_actor_error(e, |_| {
995 Error::DistributionFailed(subject_id.to_string())
996 })
997 })?;
998
999 Ok("Manual distribution in progress".to_owned())
1000 }
1001
1002 pub async fn delete_subject(
1003 &self,
1004 subject_id: DigestIdentifier,
1005 ) -> Result<String, Error> {
1006 self.ensure_safe_mode_required("subject deletion")?;
1007 self.begin_subject_deletion(&subject_id).await?;
1008
1009 let result = async {
1010 let subject_data = self.subject_data(&subject_id).await?;
1011
1012 match subject_data {
1013 node::SubjectData::Governance { .. } => {
1014 info!(
1015 subject_id = %subject_id,
1016 subject_type = "governance",
1017 "Deleting subject"
1018 );
1019 let trackers =
1020 self.governance_trackers(&subject_id).await?;
1021 if !trackers.is_empty() {
1022 return Err(Error::GovernanceHasTrackers {
1023 governance_id: subject_id.to_string(),
1024 trackers: trackers
1025 .into_iter()
1026 .map(|tracker| tracker.to_string())
1027 .collect(),
1028 });
1029 }
1030 let mut cleanup_errors = Vec::new();
1031
1032 match self
1033 .subject_manager
1034 .ask(SubjectManagerMessage::DeleteGovernance {
1035 subject_id: subject_id.clone(),
1036 })
1037 .await
1038 {
1039 Ok(SubjectManagerResponse::DeleteGovernance) => {}
1040 Ok(other) => cleanup_errors.push(format!(
1041 "subject_manager: unexpected response {other:?}"
1042 )),
1043 Err(err) => cleanup_errors
1044 .push(format!("subject_manager: {err}")),
1045 }
1046
1047 self.purge_common_subject_state(
1048 &subject_id,
1049 &mut cleanup_errors,
1050 )
1051 .await;
1052
1053 self.delete_subject_from_node(
1054 &subject_id,
1055 &mut cleanup_errors,
1056 )
1057 .await;
1058
1059 if cleanup_errors.is_empty() {
1060 info!(
1061 subject_id = %subject_id,
1062 subject_type = "governance",
1063 "Subject deleted successfully"
1064 );
1065 Ok("Governance deleted successfully".to_owned())
1066 } else {
1067 Err(Error::Internal(format!(
1068 "governance deletion completed partially: {}",
1069 cleanup_errors.join("; ")
1070 )))
1071 }
1072 }
1073 node::SubjectData::Tracker { .. } => {
1074 info!(
1075 subject_id = %subject_id,
1076 subject_type = "tracker",
1077 "Deleting subject"
1078 );
1079 let mut cleanup_errors = Vec::new();
1080
1081 self.purge_common_subject_state(
1082 &subject_id,
1083 &mut cleanup_errors,
1084 )
1085 .await;
1086
1087 match self
1088 .subject_manager
1089 .ask(SubjectManagerMessage::DeleteTracker {
1090 subject_id: subject_id.clone(),
1091 })
1092 .await
1093 {
1094 Ok(SubjectManagerResponse::DeleteTracker) => {}
1095 Ok(other) => cleanup_errors.push(format!(
1096 "subject_manager: unexpected response {other:?}"
1097 )),
1098 Err(err) => cleanup_errors
1099 .push(format!("subject_manager: {err}")),
1100 }
1101
1102 self.delete_subject_from_node(
1103 &subject_id,
1104 &mut cleanup_errors,
1105 )
1106 .await;
1107
1108 if cleanup_errors.is_empty() {
1109 info!(
1110 subject_id = %subject_id,
1111 subject_type = "tracker",
1112 "Subject deleted successfully"
1113 );
1114 Ok("Tracker deleted successfully".to_owned())
1115 } else {
1116 Err(Error::Internal(format!(
1117 "tracker deletion completed partially: {}",
1118 cleanup_errors.join("; ")
1119 )))
1120 }
1121 }
1122 }
1123 }
1124 .await;
1125
1126 self.end_subject_deletion(&subject_id).await;
1127 result
1128 }
1129
1130 pub async fn all_govs(
1133 &self,
1134 active: Option<bool>,
1135 ) -> Result<Vec<GovsData>, Error> {
1136 self.db.get_governances(active).await.map_err(|e| {
1137 warn!(error = %e, "Failed to get governances");
1138 Error::QueryFailed(e.to_string())
1139 })
1140 }
1141
1142 pub async fn all_subjs(
1143 &self,
1144 governance_id: DigestIdentifier,
1145 active: Option<bool>,
1146 schema_id: Option<String>,
1147 ) -> Result<Vec<SubjsData>, Error> {
1148 let governance_id = governance_id.to_string();
1149 match self
1150 .db
1151 .get_subjects(&governance_id, active, schema_id)
1152 .await
1153 {
1154 Ok(subjects) => Ok(subjects),
1155 Err(ExternalDatabaseError::GovernanceNotFound(_)) => {
1156 Err(Error::GovernanceNotFound(governance_id))
1157 }
1158 Err(e) => {
1159 warn!(error = %e, "Failed to get subjects");
1160 Err(Error::QueryFailed(e.to_string()))
1161 }
1162 }
1163 }
1164
1165 pub async fn get_events(
1168 &self,
1169 subject_id: DigestIdentifier,
1170 query: EventsQuery,
1171 ) -> Result<PaginatorEvents, Error> {
1172 let subject_id_str = subject_id.to_string();
1173
1174 match self.db.get_events(&subject_id_str, query).await {
1175 Ok(data) => Ok(data),
1176 Err(ExternalDatabaseError::NoEvents(_)) => {
1177 Err(Error::NoEventsFound(subject_id_str))
1178 }
1179 Err(e) => {
1180 warn!(error = %e, "Failed to get events");
1181 Err(Error::QueryFailed(e.to_string()))
1182 }
1183 }
1184 }
1185
1186 pub async fn get_sink_events(
1187 &self,
1188 subject_id: DigestIdentifier,
1189 query: SinkEventsQuery,
1190 ) -> Result<SinkEventsPage, Error> {
1191 let response = self
1192 .node
1193 .ask(NodeMessage::GetSinkEvents {
1194 subject_id,
1195 from_sn: query.from_sn.unwrap_or(0),
1196 to_sn: query.to_sn,
1197 limit: query.limit.unwrap_or(100),
1198 })
1199 .await
1200 .map_err(|e| {
1201 warn!(error = %e, "Failed to replay sink events");
1202 Error::from(e)
1203 })?;
1204
1205 match response {
1206 NodeResponse::SinkEvents(events) => Ok(events),
1207 _ => Err(Error::UnexpectedResponse {
1208 actor: "node".to_string(),
1209 expected: "SinkEvents".to_string(),
1210 received: "other".to_string(),
1211 }),
1212 }
1213 }
1214
1215 pub async fn get_aborts(
1216 &self,
1217 subject_id: DigestIdentifier,
1218 query: AbortsQuery,
1219 ) -> Result<PaginatorAborts, Error> {
1220 let subject_id_str = subject_id.to_string();
1221 let request_id = if let Some(request_id) = query.request_id.as_ref() {
1222 Some(
1223 DigestIdentifier::from_str(request_id)
1224 .map_err(|e| Error::InvalidQueryParams(e.to_string()))?
1225 .to_string(),
1226 )
1227 } else {
1228 None
1229 };
1230 let query = AbortsQuery {
1231 request_id,
1232 sn: query.sn,
1233 quantity: query.quantity,
1234 page: query.page,
1235 reverse: query.reverse,
1236 };
1237
1238 self.db
1239 .get_aborts(&subject_id_str, query)
1240 .await
1241 .map_err(|e| {
1242 warn!(error = %e, "Failed to get aborts");
1243 Error::QueryFailed(e.to_string())
1244 })
1245 }
1246
1247 pub async fn get_event_sn(
1248 &self,
1249 subject_id: DigestIdentifier,
1250 sn: u64,
1251 ) -> Result<LedgerDB, Error> {
1252 let subject_id_str = subject_id.to_string();
1253
1254 match self.db.get_event_sn(&subject_id_str, sn).await {
1255 Ok(data) => Ok(data),
1256 Err(ExternalDatabaseError::EventNotFound { .. }) => {
1257 Err(Error::EventNotFound {
1258 subject: subject_id_str,
1259 sn,
1260 })
1261 }
1262 Err(e) => {
1263 warn!(error = %e, "Failed to get event");
1264 Err(Error::QueryFailed(e.to_string()))
1265 }
1266 }
1267 }
1268
1269 pub async fn get_first_or_end_events(
1270 &self,
1271 subject_id: DigestIdentifier,
1272 quantity: Option<u64>,
1273 reverse: Option<bool>,
1274 event_type: Option<EventRequestType>,
1275 ) -> Result<Vec<LedgerDB>, Error> {
1276 let subject_id_str = subject_id.to_string();
1277
1278 match self
1279 .db
1280 .get_first_or_end_events(
1281 &subject_id_str,
1282 quantity,
1283 reverse,
1284 event_type,
1285 )
1286 .await
1287 {
1288 Ok(data) => Ok(data),
1289 Err(ExternalDatabaseError::NoEvents(_)) => {
1290 Err(Error::NoEventsFound(subject_id_str))
1291 }
1292 Err(e) => {
1293 warn!(error = %e, "Failed to get events");
1294 Err(Error::QueryFailed(e.to_string()))
1295 }
1296 }
1297 }
1298
1299 pub async fn get_subject_state(
1300 &self,
1301 subject_id: DigestIdentifier,
1302 ) -> Result<SubjectDB, Error> {
1303 let subject_id_str = subject_id.to_string();
1304
1305 match self.db.get_subject_state(&subject_id_str).await {
1306 Ok(data) => Ok(data),
1307 Err(ExternalDatabaseError::SubjectNotFound(_)) => {
1308 Err(Error::SubjectNotFound(subject_id_str))
1309 }
1310 Err(e) => {
1311 warn!(error = %e, "Failed to get subject state");
1312 Err(Error::QueryFailed(e.to_string()))
1313 }
1314 }
1315 }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320 use super::*;
1321 use ave_actors::{ActorError, ActorPath};
1322
1323 #[test]
1324 fn preserves_functional_actor_errors() {
1325 let error = preserve_functional_actor_error(
1326 ActorError::Functional {
1327 description: "Is not a Creator".to_string(),
1328 },
1329 |_| Error::ActorCommunication {
1330 actor: "request".to_string(),
1331 },
1332 );
1333
1334 assert!(
1335 matches!(error, Error::ActorError(message) if message == "Is not a Creator")
1336 );
1337 }
1338
1339 #[test]
1340 fn preserves_not_found_actor_errors() {
1341 let error = preserve_functional_actor_error(
1342 ActorError::NotFound {
1343 path: ActorPath::from("/user/request"),
1344 },
1345 |_| Error::ActorCommunication {
1346 actor: "request".to_string(),
1347 },
1348 );
1349
1350 assert!(matches!(
1351 error,
1352 Error::MissingResource { name, .. } if name == "/user/request"
1353 ));
1354 }
1355}