1use std::collections::HashSet;
2
3use super::{
4 error::{APIInternalError, ApiError},
5 inner_api::InnerApi,
6 APICommands, ApiResponses, GetAllowedSubjects,
7};
8use super::{GetEvents, GetGovernanceSubjects};
9#[cfg(feature = "approval")]
10use crate::approval::manager::ApprovalAPI;
11use crate::commons::models::request::TapleRequest;
12use crate::commons::models::state::SubjectData;
13use crate::event::manager::EventAPI;
14use crate::ledger::manager::EventManagerAPI;
15use crate::signature::Signature;
16#[cfg(feature = "approval")]
17use crate::ApprovalEntity;
18use crate::ValidationProof;
19use crate::{
20 authorized_subjecs::manager::AuthorizedSubjectsAPI, signature::Signed, Event, EventRequest,
21};
22use crate::{
23 commons::channel::{ChannelData, MpscChannel, SenderEnd},
24 Notification,
25};
26use crate::{identifier::DigestIdentifier, DatabaseCollection, DB};
27use crate::{KeyDerivator, KeyIdentifier};
28use libp2p::PeerId;
29use log::{error, info};
30use tokio_util::sync::CancellationToken;
31
32#[derive(Clone, Debug)]
36pub struct Api {
37 peer_id: PeerId,
38 controller_id: String,
39 public_key: Vec<u8>,
40 sender: SenderEnd<APICommands, ApiResponses>,
41}
42
43impl Api {
44 pub fn new(
45 peer_id: PeerId,
46 controller_id: String,
47 public_key: Vec<u8>,
48 sender: SenderEnd<APICommands, ApiResponses>,
49 ) -> Self {
50 Self {
51 peer_id,
52 controller_id,
53 public_key,
54 sender,
55 }
56 }
57
58 pub fn peer_id(&self) -> &PeerId {
59 &self.peer_id
60 }
61
62 pub fn controller_id(&self) -> &String {
63 &self.controller_id
64 }
65
66 pub fn public_key(&self) -> &Vec<u8> {
67 &self.public_key
68 }
69
70 pub async fn get_request(
71 &self,
72 request_id: DigestIdentifier,
73 ) -> Result<TapleRequest, ApiError> {
74 let response = self.sender.ask(APICommands::GetRequest(request_id)).await;
75 if response.is_err() {
76 log::debug!(
77 "EN EL MODULE INTERFACE ES ERROR {}",
78 response.clone().unwrap_err().to_string()
79 );
80 }
81 let response = response.unwrap();
82 if let ApiResponses::GetRequest(data) = response {
83 data
84 } else {
85 unreachable!()
86 }
87 }
88
89 pub async fn external_request(
91 &self,
92 event_request: Signed<EventRequest>,
93 ) -> Result<DigestIdentifier, ApiError> {
94 let response = self
95 .sender
96 .ask(APICommands::ExternalRequest(event_request))
97 .await
98 .unwrap();
99 if let ApiResponses::HandleExternalRequest(data) = response {
100 data
101 } else {
102 unreachable!()
103 }
104 }
105
106 #[cfg(feature = "approval")]
113 pub async fn get_pending_requests(&self) -> Result<Vec<ApprovalEntity>, ApiError> {
114 let response = self
115 .sender
116 .ask(APICommands::GetPendingRequests)
117 .await
118 .unwrap();
119 if let ApiResponses::GetPendingRequests(data) = response {
120 data
121 } else {
122 unreachable!()
123 }
124 }
125
126 #[cfg(feature = "approval")]
134 pub async fn get_single_request(
135 &self,
136 id: DigestIdentifier,
137 ) -> Result<ApprovalEntity, ApiError> {
138 let response = self
139 .sender
140 .ask(APICommands::GetSingleRequest(id))
141 .await
142 .unwrap();
143 if let ApiResponses::GetSingleRequest(data) = response {
144 data
145 } else {
146 unreachable!()
147 }
148 }
149
150 pub async fn get_subjects(
158 &self,
159 namespace: String,
160 from: Option<String>,
161 quantity: Option<i64>,
162 ) -> Result<Vec<SubjectData>, ApiError> {
163 let response = self
164 .sender
165 .ask(APICommands::GetSubjects(super::GetSubjects {
166 namespace,
167 from,
168 quantity,
169 }))
170 .await
171 .unwrap();
172 if let ApiResponses::GetSubjects(data) = response {
173 data
174 } else {
175 unreachable!()
176 }
177 }
178
179 pub async fn get_subjects_by_governance(
180 &self,
181 governance_id: DigestIdentifier,
182 from: Option<String>,
183 quantity: Option<i64>,
184 ) -> Result<Vec<SubjectData>, ApiError> {
185 let response = self
186 .sender
187 .ask(APICommands::GetSubjectByGovernance(
188 super::GetSubjects {
189 namespace: "".into(),
190 from,
191 quantity,
192 },
193 governance_id,
194 ))
195 .await
196 .unwrap();
197 if let ApiResponses::GetSubjectByGovernance(data) = response {
198 data
199 } else {
200 unreachable!()
201 }
202 }
203
204 pub async fn get_governances(
208 &self,
209 namespace: String,
210 from: Option<String>,
211 quantity: Option<i64>,
212 ) -> Result<Vec<SubjectData>, ApiError> {
213 let response = self
214 .sender
215 .ask(APICommands::GetGovernances(super::GetSubjects {
216 namespace,
217 from,
218 quantity,
219 }))
220 .await
221 .unwrap();
222 if let ApiResponses::GetGovernances(data) = response {
223 data
224 } else {
225 unreachable!()
226 }
227 }
228
229 pub async fn get_event(
230 &self,
231 subject_id: DigestIdentifier,
232 sn: u64,
233 ) -> Result<Signed<Event>, ApiError> {
234 let response = self
235 .sender
236 .ask(APICommands::GetEvent(subject_id, sn))
237 .await
238 .unwrap();
239 if let ApiResponses::GetEvent(data) = response {
240 data
241 } else {
242 unreachable!()
243 }
244 }
245
246 pub async fn get_events(
253 &self,
254 subject_id: DigestIdentifier,
255 from: Option<i64>,
256 quantity: Option<i64>,
257 ) -> Result<Vec<Signed<Event>>, ApiError> {
258 let response = self
259 .sender
260 .ask(APICommands::GetEvents(GetEvents {
261 subject_id,
262 from,
263 quantity,
264 }))
265 .await
266 .unwrap();
267 if let ApiResponses::GetEvents(data) = response {
268 data
269 } else {
270 unreachable!()
271 }
272 }
273
274 pub async fn get_subject(&self, subject_id: DigestIdentifier) -> Result<SubjectData, ApiError> {
279 let response = self
280 .sender
281 .ask(APICommands::GetSubject(super::GetSubject { subject_id }))
282 .await
283 .unwrap();
284 if let ApiResponses::GetSubject(data) = response {
285 data
286 } else {
287 unreachable!()
288 }
289 }
290
291 #[cfg(feature = "approval")]
301 pub async fn approval_request(
302 &self,
303 request_id: DigestIdentifier,
304 acceptance: bool,
305 ) -> Result<ApprovalEntity, ApiError> {
306 let response = self
307 .sender
308 .ask(APICommands::VoteResolve(acceptance, request_id))
309 .await
310 .unwrap();
311 if let ApiResponses::VoteResolve(data) = response {
312 data
313 } else {
314 unreachable!()
315 }
316 }
317
318 pub async fn add_preauthorize_subject(
319 &self,
320 subject_id: &DigestIdentifier,
321 providers: &HashSet<KeyIdentifier>,
322 ) -> Result<(), ApiError> {
323 let response = self
324 .sender
325 .ask(APICommands::SetPreauthorizedSubject(
326 subject_id.clone(),
327 providers.clone(),
328 ))
329 .await
330 .unwrap();
331 if let ApiResponses::SetPreauthorizedSubjectCompleted = response {
332 Ok(())
333 } else {
334 unreachable!()
335 }
336 }
337
338 pub async fn get_all_allowed_subjects_and_providers(
339 &self,
340 from: Option<String>,
341 quantity: Option<i64>,
342 ) -> Result<Vec<(DigestIdentifier, HashSet<KeyIdentifier>)>, ApiError> {
343 let response = self
344 .sender
345 .ask(APICommands::GetAllPreauthorizedSubjects(
346 GetAllowedSubjects { from, quantity },
347 ))
348 .await
349 .unwrap();
350 if let ApiResponses::GetAllPreauthorizedSubjects(data) = response {
351 data
352 } else {
353 unreachable!()
354 }
355 }
356
357 pub async fn add_keys(&self, derivator: KeyDerivator) -> Result<KeyIdentifier, ApiError> {
358 let response = self
359 .sender
360 .ask(APICommands::AddKeys(derivator))
361 .await
362 .unwrap();
363 if let ApiResponses::AddKeys(data) = response {
364 data
365 } else {
366 unreachable!()
367 }
368 }
369
370 pub async fn get_validation_proof(
371 &self,
372 subject_id: DigestIdentifier,
373 ) -> Result<(HashSet<Signature>, ValidationProof), ApiError> {
374 let response = self
375 .sender
376 .ask(APICommands::GetValidationProof(subject_id))
377 .await
378 .unwrap();
379 if let ApiResponses::GetValidationProof(data) = response {
380 data
381 } else {
382 unreachable!()
383 }
384 }
385
386 pub async fn get_governance_subjects(
387 &self,
388 governance_id: DigestIdentifier,
389 from: Option<String>,
390 quantity: Option<i64>,
391 ) -> Result<Vec<SubjectData>, ApiError> {
392 let response = self
393 .sender
394 .ask(APICommands::GetGovernanceSubjects(GetGovernanceSubjects {
395 governance_id,
396 from,
397 quantity,
398 }))
399 .await
400 .unwrap();
401 if let ApiResponses::GetGovernanceSubjects(data) = response {
402 data
403 } else {
404 unreachable!()
405 }
406 }
407
408 #[cfg(feature = "approval")]
409 pub async fn get_approval(
410 &self,
411 request_id: DigestIdentifier,
412 ) -> Result<ApprovalEntity, ApiError> {
413 let response = self
414 .sender
415 .ask(APICommands::GetApproval(request_id))
416 .await
417 .unwrap();
418 if let ApiResponses::GetApproval(data) = response {
419 data
420 } else {
421 unreachable!()
422 }
423 }
424
425 #[cfg(feature = "approval")]
426 pub async fn get_approvals(
427 &self,
428 state: Option<crate::ApprovalState>,
429 from: Option<String>,
430 quantity: Option<i64>,
431 ) -> Result<Vec<ApprovalEntity>, ApiError> {
432 let response = self
433 .sender
434 .ask(APICommands::GetApprovals(super::GetApprovals {
435 state,
436 from,
437 quantity,
438 }))
439 .await
440 .unwrap();
441 if let ApiResponses::GetApprovals(data) = response {
442 data
443 } else {
444 unreachable!()
445 }
446 }
447}
448
449pub struct ApiManager<C: DatabaseCollection> {
450 input: MpscChannel<APICommands, ApiResponses>,
451 inner_api: InnerApi<C>,
452 token: CancellationToken,
453 notification_tx: tokio::sync::mpsc::Sender<Notification>,
454}
455
456impl<C: DatabaseCollection> ApiManager<C> {
457 pub fn new(
458 input: MpscChannel<APICommands, ApiResponses>,
459 event_api: EventAPI,
460 #[cfg(feature = "approval")] approval_api: ApprovalAPI,
461 authorized_subjects_api: AuthorizedSubjectsAPI,
462 ledger_api: EventManagerAPI,
463 token: CancellationToken,
464 notification_tx: tokio::sync::mpsc::Sender<Notification>,
465 db: DB<C>,
466 ) -> Self {
467 Self {
468 input,
469 inner_api: InnerApi::new(
470 event_api,
471 authorized_subjects_api,
472 db,
473 #[cfg(feature = "approval")]
474 approval_api,
475 ledger_api,
476 ),
477 token,
478 notification_tx,
479 }
480 }
481
482 pub async fn run(mut self) {
483 loop {
484 tokio::select! {
485 command = self.input.receive() => {
486 if command.is_some() {
487 let result = self.process_command(command.unwrap()).await;
488 if result.is_err() {
489 error!("API error detected");
490 self.token.cancel();
491 break;
492 }
493 }
494 },
495 _ = self.token.cancelled() => {
496 info!("API module shutdown received");
497 break;
498 }
499 }
500 }
501 info!("Ended");
502 }
503
504 async fn process_command(
505 &mut self,
506 input: ChannelData<APICommands, ApiResponses>,
507 ) -> Result<(), APIInternalError> {
508 match input {
510 ChannelData::AskData(data) => {
511 let (sx, command) = data.get();
512 let response = match command {
513 APICommands::GetSubjects(data) => self.inner_api.get_all_subjects(data),
514 APICommands::GetGovernances(data) => {
515 self.inner_api.get_all_governances(data).await
516 }
517 APICommands::GetEvents(data) => {
518 self.inner_api.get_events_of_subject(data).await
519 }
520 APICommands::GetSubject(data) => self.inner_api.get_single_subject(data).await,
521 APICommands::GetRequest(request_id) => {
522 self.inner_api.get_request(request_id).await
523 }
524 APICommands::GetEvent(subject_id, sn) => {
525 self.inner_api.get_event(subject_id, sn)
526 }
527 #[cfg(feature = "approval")]
528 APICommands::VoteResolve(acceptance, id) => {
529 self.inner_api.emit_vote(id, acceptance).await?
530 }
531 #[cfg(feature = "approval")]
532 APICommands::GetPendingRequests => self.inner_api.get_pending_request().await,
533 #[cfg(feature = "approval")]
534 APICommands::GetSingleRequest(data) => {
535 self.inner_api.get_single_request(data).await
536 }
537 APICommands::ExternalRequest(event_request) => {
538 let response = self.inner_api.handle_external_request(event_request).await;
539 response?
540 }
541 APICommands::SetPreauthorizedSubject(subject_id, providers) => {
542 self.inner_api
543 .set_preauthorized_subject(subject_id, providers)
544 .await?
545 }
546 APICommands::AddKeys(derivator) => {
547 self.inner_api.generate_keys(derivator).await?
548 }
549 APICommands::GetValidationProof(subject_id) => {
550 self.inner_api.get_validation_proof(subject_id).await
551 }
552 APICommands::GetGovernanceSubjects(data) => {
553 self.inner_api.get_governance_subjects(data).await
554 }
555 #[cfg(feature = "approval")]
556 APICommands::GetApproval(request_id) => {
557 self.inner_api.get_approval(request_id).await
558 }
559 #[cfg(feature = "approval")]
560 APICommands::GetApprovals(get_approvals) => {
561 self.inner_api
562 .get_approvals(
563 get_approvals.state,
564 get_approvals.from,
565 get_approvals.quantity,
566 )
567 .await
568 }
569 APICommands::GetAllPreauthorizedSubjects(data) => {
570 self.inner_api
571 .get_all_preauthorized_subjects_and_providers(data)
572 .await?
573 }
574 APICommands::GetSubjectByGovernance(params, gov_id) => {
575 self.inner_api.get_subjects_by_governance(params, gov_id)
576 }
577 };
578 sx.send(response)
579 .map_err(|_| APIInternalError::OneshotUnavailable)?;
580 }
581 ChannelData::TellData(_data) => {
582 panic!("Tell in API")
583 }
584 }
585 Ok(())
586 }
587}