taple_core/api/
api.rs

1use std::collections::HashSet;
2
3use super::{
4    error::{APIInternalError, ApiError},
5    inner_api::InnerApi,
6    APICommands, ApiResponses, GetAllowedSubjects,
7};
8use super::{GetEvents, GetGovernanceSubjects};
9#[cfg(feature = "approval")]
10use crate::approval::manager::ApprovalAPI;
11use crate::commons::models::request::TapleRequest;
12use crate::commons::models::state::SubjectData;
13use crate::event::manager::EventAPI;
14use crate::ledger::manager::EventManagerAPI;
15use crate::signature::Signature;
16#[cfg(feature = "approval")]
17use crate::ApprovalEntity;
18use crate::ValidationProof;
19use crate::{
20    authorized_subjecs::manager::AuthorizedSubjectsAPI, signature::Signed, Event, EventRequest,
21};
22use crate::{
23    commons::channel::{ChannelData, MpscChannel, SenderEnd},
24    Notification,
25};
26use crate::{identifier::DigestIdentifier, DatabaseCollection, DB};
27use crate::{KeyDerivator, KeyIdentifier};
28use libp2p::PeerId;
29use log::{error, info};
30use tokio_util::sync::CancellationToken;
31
32/// Object that allows interaction with a TAPLE node.
33///
34/// It has methods to perform all available read and write operations.
35#[derive(Clone, Debug)]
36pub struct Api {
37    peer_id: PeerId,
38    controller_id: String,
39    public_key: Vec<u8>,
40    sender: SenderEnd<APICommands, ApiResponses>,
41}
42
43impl Api {
44    pub fn new(
45        peer_id: PeerId,
46        controller_id: String,
47        public_key: Vec<u8>,
48        sender: SenderEnd<APICommands, ApiResponses>,
49    ) -> Self {
50        Self {
51            peer_id,
52            controller_id,
53            public_key,
54            sender,
55        }
56    }
57
58    pub fn peer_id(&self) -> &PeerId {
59        &self.peer_id
60    }
61
62    pub fn controller_id(&self) -> &String {
63        &self.controller_id
64    }
65
66    pub fn public_key(&self) -> &Vec<u8> {
67        &self.public_key
68    }
69
70    pub async fn get_request(
71        &self,
72        request_id: DigestIdentifier,
73    ) -> Result<TapleRequest, ApiError> {
74        let response = self.sender.ask(APICommands::GetRequest(request_id)).await;
75        if response.is_err() {
76            log::debug!(
77                "EN EL MODULE INTERFACE ES ERROR {}",
78                response.clone().unwrap_err().to_string()
79            );
80        }
81        let response = response.unwrap();
82        if let ApiResponses::GetRequest(data) = response {
83            data
84        } else {
85            unreachable!()
86        }
87    }
88
89    /// Allows to make a request to the node from an external Invoker
90    pub async fn external_request(
91        &self,
92        event_request: Signed<EventRequest>,
93    ) -> Result<DigestIdentifier, ApiError> {
94        let response = self
95            .sender
96            .ask(APICommands::ExternalRequest(event_request))
97            .await
98            .unwrap();
99        if let ApiResponses::HandleExternalRequest(data) = response {
100            data
101        } else {
102            unreachable!()
103        }
104    }
105
106    /// It allows to obtain all the voting requests pending to be resolved in the node.
107    /// These requests are received from other nodes in the network when they try to update
108    /// a governance subject. It is necessary to vote their agreement or disagreement with
109    /// the proposed changes in order for the events to be implemented.
110    /// # Possible errors
111    /// • [ApiError::InternalError] if an internal error occurs during operation execution.
112    #[cfg(feature = "approval")]
113    pub async fn get_pending_requests(&self) -> Result<Vec<ApprovalEntity>, ApiError> {
114        let response = self
115            .sender
116            .ask(APICommands::GetPendingRequests)
117            .await
118            .unwrap();
119        if let ApiResponses::GetPendingRequests(data) = response {
120            data
121        } else {
122            unreachable!()
123        }
124    }
125
126    /// It allows to obtain a single voting request pending to be resolved in the node.
127    /// This request is received from other nodes in the network when they try to update
128    /// a governance subject. It is necessary to vote its agreement or disagreement with
129    /// the proposed changes in order for the events to be implemented.
130    /// # Possible errors
131    /// • [ApiError::InternalError] if an internal error occurs during operation execution.
132    /// • [ApiError::NotFound] if the requested request does not exist.
133    #[cfg(feature = "approval")]
134    pub async fn get_single_request(
135        &self,
136        id: DigestIdentifier,
137    ) -> Result<ApprovalEntity, ApiError> {
138        let response = self
139            .sender
140            .ask(APICommands::GetSingleRequest(id))
141            .await
142            .unwrap();
143        if let ApiResponses::GetSingleRequest(data) = response {
144            data
145        } else {
146            unreachable!()
147        }
148    }
149
150    /// Allows to get all subjects that are known to the current node, regardless of their governance.
151    /// Paging can be performed using the optional arguments `from` and `quantity`.
152    /// Regarding the first one, note that it admits negative values, in which case the paging is
153    /// performed in the opposite direction starting from the end of the collection. Note that this method
154    /// also returns the subjects that model governance.
155    /// # Possible errors
156    /// • [ApiError::InternalError] if an internal error occurred during the execution of the operation.
157    pub async fn get_subjects(
158        &self,
159        namespace: String,
160        from: Option<String>,
161        quantity: Option<i64>,
162    ) -> Result<Vec<SubjectData>, ApiError> {
163        let response = self
164            .sender
165            .ask(APICommands::GetSubjects(super::GetSubjects {
166                namespace,
167                from,
168                quantity,
169            }))
170            .await
171            .unwrap();
172        if let ApiResponses::GetSubjects(data) = response {
173            data
174        } else {
175            unreachable!()
176        }
177    }
178
179    pub async fn get_subjects_by_governance(
180        &self,
181        governance_id: DigestIdentifier,
182        from: Option<String>,
183        quantity: Option<i64>,
184    ) -> Result<Vec<SubjectData>, ApiError> {
185        let response = self
186            .sender
187            .ask(APICommands::GetSubjectByGovernance(
188                super::GetSubjects {
189                    namespace: "".into(),
190                    from,
191                    quantity,
192                },
193                governance_id,
194            ))
195            .await
196            .unwrap();
197        if let ApiResponses::GetSubjectByGovernance(data) = response {
198            data
199        } else {
200            unreachable!()
201        }
202    }
203
204    /// It allows to obtain all the subjects that model existing governance in the node.
205    /// # Possible errors
206    /// • [ApiError::InternalError] if an internal error occurred during the execution of the operation.
207    pub async fn get_governances(
208        &self,
209        namespace: String,
210        from: Option<String>,
211        quantity: Option<i64>,
212    ) -> Result<Vec<SubjectData>, ApiError> {
213        let response = self
214            .sender
215            .ask(APICommands::GetGovernances(super::GetSubjects {
216                namespace,
217                from,
218                quantity,
219            }))
220            .await
221            .unwrap();
222        if let ApiResponses::GetGovernances(data) = response {
223            data
224        } else {
225            unreachable!()
226        }
227    }
228
229    pub async fn get_event(
230        &self,
231        subject_id: DigestIdentifier,
232        sn: u64,
233    ) -> Result<Signed<Event>, ApiError> {
234        let response = self
235            .sender
236            .ask(APICommands::GetEvent(subject_id, sn))
237            .await
238            .unwrap();
239        if let ApiResponses::GetEvent(data) = response {
240            data
241        } else {
242            unreachable!()
243        }
244    }
245
246    /// Allows to obtain events from a specific subject previously existing in the node.
247    /// Paging can be performed by means of the optional arguments `from` and `quantity`.
248    /// Regarding the former, it should be noted that negative values are allowed, in which case
249    /// the paging is performed in the opposite direction starting from the end of the string.
250    /// # Possible errors
251    /// • [ApiError::InvalidParameters] if the specified subject identifier does not match a valid [DigestIdentifier].
252    pub async fn get_events(
253        &self,
254        subject_id: DigestIdentifier,
255        from: Option<i64>,
256        quantity: Option<i64>,
257    ) -> Result<Vec<Signed<Event>>, ApiError> {
258        let response = self
259            .sender
260            .ask(APICommands::GetEvents(GetEvents {
261                subject_id,
262                from,
263                quantity,
264            }))
265            .await
266            .unwrap();
267        if let ApiResponses::GetEvents(data) = response {
268            data
269        } else {
270            unreachable!()
271        }
272    }
273
274    /// Allows to obtain a specified subject by specifying its identifier.
275    /// # Possible errors
276    /// • [ApiError::InvalidParameters] if the specified identifier does not match a valid [DigestIdentifier].<br />
277    /// • [ApiError::NotFound] if the subject does not exist.
278    pub async fn get_subject(&self, subject_id: DigestIdentifier) -> Result<SubjectData, ApiError> {
279        let response = self
280            .sender
281            .ask(APICommands::GetSubject(super::GetSubject { subject_id }))
282            .await
283            .unwrap();
284        if let ApiResponses::GetSubject(data) = response {
285            data
286        } else {
287            unreachable!()
288        }
289    }
290
291    /// Allows to vote on a voting request that previously exists in the system.
292    /// This vote will be sent to the corresponding node in charge of its collection.
293    /// # Possible errors
294    /// • [ApiError::InternalError] if an internal error occurs during operation execution.<br />
295    /// • [ApiError::NotFound] if the request does not exist in the system.<br />
296    /// • [ApiError::InvalidParameters] if the specified request identifier does not match a valid [DigestIdentifier].<br />
297    /// • [ApiError::VoteNotNeeded] if the node's vote is no longer required. <br />
298    /// This occurs when the acceptance of the changes proposed by the petition has already been resolved by the rest of
299    /// the nodes in the network or when the node cannot participate in the voting process because it lacks the voting role.
300    #[cfg(feature = "approval")]
301    pub async fn approval_request(
302        &self,
303        request_id: DigestIdentifier,
304        acceptance: bool,
305    ) -> Result<ApprovalEntity, ApiError> {
306        let response = self
307            .sender
308            .ask(APICommands::VoteResolve(acceptance, request_id))
309            .await
310            .unwrap();
311        if let ApiResponses::VoteResolve(data) = response {
312            data
313        } else {
314            unreachable!()
315        }
316    }
317
318    pub async fn add_preauthorize_subject(
319        &self,
320        subject_id: &DigestIdentifier,
321        providers: &HashSet<KeyIdentifier>,
322    ) -> Result<(), ApiError> {
323        let response = self
324            .sender
325            .ask(APICommands::SetPreauthorizedSubject(
326                subject_id.clone(),
327                providers.clone(),
328            ))
329            .await
330            .unwrap();
331        if let ApiResponses::SetPreauthorizedSubjectCompleted = response {
332            Ok(())
333        } else {
334            unreachable!()
335        }
336    }
337
338    pub async fn get_all_allowed_subjects_and_providers(
339        &self,
340        from: Option<String>,
341        quantity: Option<i64>,
342    ) -> Result<Vec<(DigestIdentifier, HashSet<KeyIdentifier>)>, ApiError> {
343        let response = self
344            .sender
345            .ask(APICommands::GetAllPreauthorizedSubjects(
346                GetAllowedSubjects { from, quantity },
347            ))
348            .await
349            .unwrap();
350        if let ApiResponses::GetAllPreauthorizedSubjects(data) = response {
351            data
352        } else {
353            unreachable!()
354        }
355    }
356
357    pub async fn add_keys(&self, derivator: KeyDerivator) -> Result<KeyIdentifier, ApiError> {
358        let response = self
359            .sender
360            .ask(APICommands::AddKeys(derivator))
361            .await
362            .unwrap();
363        if let ApiResponses::AddKeys(data) = response {
364            data
365        } else {
366            unreachable!()
367        }
368    }
369
370    pub async fn get_validation_proof(
371        &self,
372        subject_id: DigestIdentifier,
373    ) -> Result<(HashSet<Signature>, ValidationProof), ApiError> {
374        let response = self
375            .sender
376            .ask(APICommands::GetValidationProof(subject_id))
377            .await
378            .unwrap();
379        if let ApiResponses::GetValidationProof(data) = response {
380            data
381        } else {
382            unreachable!()
383        }
384    }
385
386    pub async fn get_governance_subjects(
387        &self,
388        governance_id: DigestIdentifier,
389        from: Option<String>,
390        quantity: Option<i64>,
391    ) -> Result<Vec<SubjectData>, ApiError> {
392        let response = self
393            .sender
394            .ask(APICommands::GetGovernanceSubjects(GetGovernanceSubjects {
395                governance_id,
396                from,
397                quantity,
398            }))
399            .await
400            .unwrap();
401        if let ApiResponses::GetGovernanceSubjects(data) = response {
402            data
403        } else {
404            unreachable!()
405        }
406    }
407
408    #[cfg(feature = "approval")]
409    pub async fn get_approval(
410        &self,
411        request_id: DigestIdentifier,
412    ) -> Result<ApprovalEntity, ApiError> {
413        let response = self
414            .sender
415            .ask(APICommands::GetApproval(request_id))
416            .await
417            .unwrap();
418        if let ApiResponses::GetApproval(data) = response {
419            data
420        } else {
421            unreachable!()
422        }
423    }
424
425    #[cfg(feature = "approval")]
426    pub async fn get_approvals(
427        &self,
428        state: Option<crate::ApprovalState>,
429        from: Option<String>,
430        quantity: Option<i64>,
431    ) -> Result<Vec<ApprovalEntity>, ApiError> {
432        let response = self
433            .sender
434            .ask(APICommands::GetApprovals(super::GetApprovals {
435                state,
436                from,
437                quantity,
438            }))
439            .await
440            .unwrap();
441        if let ApiResponses::GetApprovals(data) = response {
442            data
443        } else {
444            unreachable!()
445        }
446    }
447}
448
449pub struct ApiManager<C: DatabaseCollection> {
450    input: MpscChannel<APICommands, ApiResponses>,
451    inner_api: InnerApi<C>,
452    token: CancellationToken,
453    notification_tx: tokio::sync::mpsc::Sender<Notification>,
454}
455
456impl<C: DatabaseCollection> ApiManager<C> {
457    pub fn new(
458        input: MpscChannel<APICommands, ApiResponses>,
459        event_api: EventAPI,
460        #[cfg(feature = "approval")] approval_api: ApprovalAPI,
461        authorized_subjects_api: AuthorizedSubjectsAPI,
462        ledger_api: EventManagerAPI,
463        token: CancellationToken,
464        notification_tx: tokio::sync::mpsc::Sender<Notification>,
465        db: DB<C>,
466    ) -> Self {
467        Self {
468            input,
469            inner_api: InnerApi::new(
470                event_api,
471                authorized_subjects_api,
472                db,
473                #[cfg(feature = "approval")]
474                approval_api,
475                ledger_api,
476            ),
477            token,
478            notification_tx,
479        }
480    }
481
482    pub async fn run(mut self) {
483        loop {
484            tokio::select! {
485                command = self.input.receive() => {
486                    if command.is_some() {
487                        let result = self.process_command(command.unwrap()).await;
488                        if result.is_err() {
489                            error!("API error detected");
490                            self.token.cancel();
491                            break;
492                        }
493                    }
494                },
495                _ = self.token.cancelled() => {
496                    info!("API module shutdown received");
497                    break;
498                }
499            }
500        }
501        info!("Ended");
502    }
503
504    async fn process_command(
505        &mut self,
506        input: ChannelData<APICommands, ApiResponses>,
507    ) -> Result<(), APIInternalError> {
508        // TODO: API commands to change the configuration are missing
509        match input {
510            ChannelData::AskData(data) => {
511                let (sx, command) = data.get();
512                let response = match command {
513                    APICommands::GetSubjects(data) => self.inner_api.get_all_subjects(data),
514                    APICommands::GetGovernances(data) => {
515                        self.inner_api.get_all_governances(data).await
516                    }
517                    APICommands::GetEvents(data) => {
518                        self.inner_api.get_events_of_subject(data).await
519                    }
520                    APICommands::GetSubject(data) => self.inner_api.get_single_subject(data).await,
521                    APICommands::GetRequest(request_id) => {
522                        self.inner_api.get_request(request_id).await
523                    }
524                    APICommands::GetEvent(subject_id, sn) => {
525                        self.inner_api.get_event(subject_id, sn)
526                    }
527                    #[cfg(feature = "approval")]
528                    APICommands::VoteResolve(acceptance, id) => {
529                        self.inner_api.emit_vote(id, acceptance).await?
530                    }
531                    #[cfg(feature = "approval")]
532                    APICommands::GetPendingRequests => self.inner_api.get_pending_request().await,
533                    #[cfg(feature = "approval")]
534                    APICommands::GetSingleRequest(data) => {
535                        self.inner_api.get_single_request(data).await
536                    }
537                    APICommands::ExternalRequest(event_request) => {
538                        let response = self.inner_api.handle_external_request(event_request).await;
539                        response?
540                    }
541                    APICommands::SetPreauthorizedSubject(subject_id, providers) => {
542                        self.inner_api
543                            .set_preauthorized_subject(subject_id, providers)
544                            .await?
545                    }
546                    APICommands::AddKeys(derivator) => {
547                        self.inner_api.generate_keys(derivator).await?
548                    }
549                    APICommands::GetValidationProof(subject_id) => {
550                        self.inner_api.get_validation_proof(subject_id).await
551                    }
552                    APICommands::GetGovernanceSubjects(data) => {
553                        self.inner_api.get_governance_subjects(data).await
554                    }
555                    #[cfg(feature = "approval")]
556                    APICommands::GetApproval(request_id) => {
557                        self.inner_api.get_approval(request_id).await
558                    }
559                    #[cfg(feature = "approval")]
560                    APICommands::GetApprovals(get_approvals) => {
561                        self.inner_api
562                            .get_approvals(
563                                get_approvals.state,
564                                get_approvals.from,
565                                get_approvals.quantity,
566                            )
567                            .await
568                    }
569                    APICommands::GetAllPreauthorizedSubjects(data) => {
570                        self.inner_api
571                            .get_all_preauthorized_subjects_and_providers(data)
572                            .await?
573                    }
574                    APICommands::GetSubjectByGovernance(params, gov_id) => {
575                        self.inner_api.get_subjects_by_governance(params, gov_id)
576                    }
577                };
578                sx.send(response)
579                    .map_err(|_| APIInternalError::OneshotUnavailable)?;
580            }
581            ChannelData::TellData(_data) => {
582                panic!("Tell in API")
583            }
584        }
585        Ok(())
586    }
587}