Skip to main content

ave_bridge/
lib.rs

1use std::{collections::HashSet, str::FromStr};
2
3pub use ave_common::Namespace;
4pub use ave_common::response::MonitorNetworkState;
5use ave_common::{
6    bridge::request::{
7        AbortsQuery, ApprovalState, ApprovalStateRes, BridgeSignedEventRequest,
8        EventRequestType, EventsQuery, SinkEventsQuery, UpdateSubjectQuery,
9    },
10    identity::{DigestIdentifier, PublicKey, Signature, Signed},
11    request::EventRequest,
12    response::{
13        ApprovalEntry, GovsData, LedgerDB, PaginatorAborts, PaginatorEvents,
14        RequestData as RequestDataRes, RequestInfo, RequestInfoExtend,
15        RequestsInManager, RequestsInManagerSubject, SinkEventsPage, SubjectDB,
16        SubjsData, TransferSubject,
17    },
18};
19pub use ave_core::config::{MachineSpec, resolve_spec};
20pub use ave_core::{
21    Api as AveApi,
22    auth::AuthWitness,
23    config::Config as AveConfig,
24    config::{
25        AveExternalDBConfig, AveInternalDBConfig, LoggingConfig, LoggingOutput,
26        LoggingRotation, SinkConfig, SinkQueuePolicy, SinkRoutingStrategy,
27        SinkServer,
28    },
29    error::Error,
30};
31use ave_core::{config::SinkAuth, helpers::sink::obtain_token};
32pub use ave_network::{
33    Config as NetworkConfig, ControlListConfig, MemoryLimitsConfig,
34    RoutingConfig, RoutingNode,
35};
36use config::Config;
37use prometheus_client::registry::Registry;
38use tokio::{
39    signal::unix::{SignalKind, signal},
40    task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use utils::key_pair;
44
45pub mod config;
46pub use http::{CorsConfig, HttpConfig, ProxyConfig, SelfSignedCertConfig};
47pub mod conversions;
48pub mod error;
49pub mod http;
50pub mod settings;
51pub mod utils;
52pub use clap;
53pub mod auth;
54
55pub use error::BridgeError;
56
57pub use ave_common;
58
59#[cfg(feature = "prometheus")]
60pub mod prometheus;
61
62use crate::conversions::{
63    core_approval_req_to_common, core_tranfer_subject_to_common,
64};
65
66#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
67compile_error!("Select only one: 'sqlite' or 'rocksdb'");
68
69#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
70compile_error!("You must enable 'sqlite' or 'rocksdb'");
71
72#[cfg(not(feature = "ext-sqlite"))]
73compile_error!("You must enable 'ext-sqlite'");
74
75#[derive(Clone)]
76pub struct Bridge {
77    api: AveApi,
78    config: Config,
79    graceful_token: CancellationToken,
80    crash_token: CancellationToken,
81    #[cfg(feature = "prometheus")]
82    registry: std::sync::Arc<
83        tokio::sync::Mutex<prometheus_client::registry::Registry>,
84    >,
85}
86
87impl Bridge {
88    pub async fn build(
89        settings: &Config,
90        password: &str,
91        password_sink: &str,
92        sink_api_key: &str,
93        graceful_token: Option<CancellationToken>,
94        crash_token: Option<CancellationToken>,
95    ) -> Result<(Self, Vec<JoinHandle<()>>), BridgeError> {
96        let keys = key_pair(settings, password)?;
97
98        // Skip bearer token acquisition when using api_key mode
99        let auth_token =
100            if sink_api_key.is_empty() && !settings.sink.auth.is_empty() {
101                Some(
102                    obtain_token(
103                        &settings.sink.auth,
104                        &settings.sink.username,
105                        password_sink,
106                    )
107                    .await?,
108                )
109            } else {
110                None
111            };
112
113        let mut registry = <Registry>::default();
114
115        let graceful_token = graceful_token.unwrap_or_default();
116        let crash_token = crash_token.unwrap_or_default();
117
118        let (api, runners) = AveApi::build(
119            keys,
120            settings.node.clone(),
121            SinkAuth {
122                sink: settings.sink.clone(),
123                token: auth_token,
124                password: password_sink.to_owned(),
125                api_key: sink_api_key.to_owned(),
126            },
127            &mut registry,
128            password,
129            graceful_token.clone(),
130            crash_token.clone(),
131        )
132        .await?;
133
134        Self::bind_with_shutdown(graceful_token.clone());
135
136        #[cfg(feature = "prometheus")]
137        let registry = std::sync::Arc::new(tokio::sync::Mutex::new(registry));
138
139        Ok((
140            Self {
141                api,
142                config: settings.clone(),
143                graceful_token,
144                crash_token,
145                #[cfg(feature = "prometheus")]
146                registry,
147            },
148            runners,
149        ))
150    }
151
152    pub const fn graceful_token(&self) -> &CancellationToken {
153        &self.graceful_token
154    }
155
156    pub const fn crash_token(&self) -> &CancellationToken {
157        &self.crash_token
158    }
159
160    #[cfg(feature = "prometheus")]
161    pub fn registry(
162        &self,
163    ) -> std::sync::Arc<tokio::sync::Mutex<prometheus_client::registry::Registry>>
164    {
165        self.registry.clone()
166    }
167
168    fn bind_with_shutdown(token: CancellationToken) {
169        let cancellation_token = token;
170        let mut sigterm = signal(SignalKind::terminate())
171            .expect("It could not be registered SIGTERM");
172
173        tokio::spawn(async move {
174            tokio::select! {
175                _ = tokio::signal::ctrl_c() => {},
176                _ = sigterm.recv() => {},
177            }
178
179            cancellation_token.cancel();
180        });
181    }
182
183    ///////// General
184    ////////////////////////////
185    pub fn get_peer_id(&self) -> &str {
186        self.api.peer_id()
187    }
188
189    pub fn get_public_key(&self) -> &str {
190        self.api.public_key()
191    }
192
193    pub fn get_config(&self) -> Config {
194        self.config.clone()
195    }
196
197    ///////// Network
198    ////////////////////////////
199    pub async fn get_network_state(
200        &self,
201    ) -> Result<MonitorNetworkState, BridgeError> {
202        Ok(self.api.get_network_state().await?)
203    }
204
205    ///////// Request
206    ////////////////////////////
207    pub async fn get_requests_in_manager(
208        &self,
209    ) -> Result<RequestsInManager, BridgeError> {
210        Ok(self.api.get_requests_in_manager().await?)
211    }
212
213    pub async fn get_requests_in_manager_subject_id(
214        &self,
215        subject_id: String,
216    ) -> Result<RequestsInManagerSubject, BridgeError> {
217        let subject_id = DigestIdentifier::from_str(&subject_id)
218            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
219
220        Ok(self
221            .api
222            .get_requests_in_manager_subject_id(subject_id)
223            .await?)
224    }
225
226    pub async fn post_event_request(
227        &self,
228        request: BridgeSignedEventRequest,
229    ) -> Result<RequestDataRes, BridgeError> {
230        let event: EventRequest =
231            conversions::bridge_to_event_request(request.request)?;
232        let result = if let Some(signature) = request.signature {
233            let signature = Signature::try_from(signature).map_err(|e| {
234                BridgeError::InvalidSignature(format!("{:?}", e))
235            })?;
236
237            let signed_request = Signed::from_parts(event, signature);
238
239            self.api.external_request(signed_request).await?
240        } else {
241            self.api.own_request(event).await?
242        };
243        Ok(conversions::core_request_to_common(result))
244    }
245
246    pub async fn get_approval(
247        &self,
248        subject_id: String,
249        state: Option<ApprovalState>,
250    ) -> Result<Option<ApprovalEntry>, BridgeError> {
251        let subject_id = DigestIdentifier::from_str(&subject_id)
252            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
253
254        Ok(self.api.get_approval(subject_id, state).await?.map(|x| {
255            ApprovalEntry {
256                request: core_approval_req_to_common(x.0),
257                state: x.1,
258            }
259        }))
260    }
261
262    pub async fn get_approvals(
263        &self,
264        state: Option<ApprovalState>,
265    ) -> Result<Vec<ApprovalEntry>, BridgeError> {
266        let res = self.api.get_approvals(state).await?;
267
268        Ok(res
269            .iter()
270            .map(|x| ApprovalEntry {
271                request: core_approval_req_to_common(x.0.clone()),
272                state: x.1.clone(),
273            })
274            .collect())
275    }
276
277    pub async fn patch_approve(
278        &self,
279        subject_id: String,
280        state: ApprovalStateRes,
281    ) -> Result<String, BridgeError> {
282        let subject_id = DigestIdentifier::from_str(&subject_id)
283            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
284
285        Ok(self.api.approve(subject_id, state).await?)
286    }
287
288    pub async fn post_manual_request_abort(
289        &self,
290        subject_id: String,
291    ) -> Result<String, BridgeError> {
292        let subject_id = DigestIdentifier::from_str(&subject_id)
293            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
294
295        Ok(self.api.manual_request_abort(subject_id).await?)
296    }
297
298    ///////// Tracking
299    ////////////////////////////
300    pub async fn get_request_state(
301        &self,
302        request_id: String,
303    ) -> Result<RequestInfo, BridgeError> {
304        let request_id = DigestIdentifier::from_str(&request_id)
305            .map_err(|e| BridgeError::InvalidRequestId(e.to_string()))?;
306
307        Ok(self.api.get_request_state(request_id).await?)
308    }
309
310    pub async fn get_all_request_state(
311        &self,
312    ) -> Result<Vec<RequestInfoExtend>, BridgeError> {
313        Ok(self.api.all_request_state().await?)
314    }
315
316    ///////// Node
317    ////////////////////////////
318    pub async fn get_pending_transfers(
319        &self,
320    ) -> Result<Vec<TransferSubject>, BridgeError> {
321        let res = self.api.get_pending_transfers().await?;
322        Ok(res
323            .iter()
324            .map(|x| core_tranfer_subject_to_common(x.clone()))
325            .collect())
326    }
327
328    ///////// Auth
329    ////////////////////////////
330    pub async fn put_auth_subject(
331        &self,
332        subject_id: String,
333        witnesses: Vec<String>,
334    ) -> Result<String, BridgeError> {
335        let subject_id = DigestIdentifier::from_str(&subject_id)
336            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
337
338        let mut witnesses_key = vec![];
339
340        for witness in witnesses {
341            witnesses_key.push(
342                PublicKey::from_str(&witness).map_err(|e| {
343                    BridgeError::InvalidPublicKey(e.to_string())
344                })?,
345            );
346        }
347
348        let auh_witness = if witnesses_key.is_empty() {
349            AuthWitness::None
350        } else if witnesses_key.len() == 1 {
351            AuthWitness::One(witnesses_key[0].clone())
352        } else {
353            AuthWitness::Many(witnesses_key)
354        };
355
356        Ok(self.api.auth_subject(subject_id, auh_witness).await?)
357    }
358
359    pub async fn get_all_auth_subjects(
360        &self,
361    ) -> Result<Vec<String>, BridgeError> {
362        let res = self.api.all_auth_subjects().await?;
363
364        Ok(res.iter().map(|x| x.to_string()).collect())
365    }
366
367    pub async fn get_witnesses_subject(
368        &self,
369        subject_id: String,
370    ) -> Result<HashSet<String>, BridgeError> {
371        let subject_id = DigestIdentifier::from_str(&subject_id)
372            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
373
374        let res = self.api.witnesses_subject(subject_id).await?;
375
376        Ok(res.iter().map(|x| x.to_string()).collect())
377    }
378
379    pub async fn delete_auth_subject(
380        &self,
381        subject_id: String,
382    ) -> Result<String, BridgeError> {
383        let subject_id = DigestIdentifier::from_str(&subject_id)
384            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
385
386        Ok(self.api.delete_auth_subject(subject_id).await?)
387    }
388
389    pub async fn post_update_subject(
390        &self,
391        subject_id: String,
392        query: UpdateSubjectQuery,
393    ) -> Result<String, BridgeError> {
394        let subject_id = DigestIdentifier::from_str(&subject_id)
395            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
396
397        Ok(self
398            .api
399            .update_subject_with_options(
400                subject_id,
401                query.strict.unwrap_or(false),
402            )
403            .await?)
404    }
405
406    pub async fn delete_subject(
407        &self,
408        subject_id: String,
409    ) -> Result<String, BridgeError> {
410        let subject_id = DigestIdentifier::from_str(&subject_id)
411            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
412
413        Ok(self.api.delete_subject(subject_id).await?)
414    }
415
416    ///////// manual distribution
417    ////////////////////////////
418    pub async fn post_manual_distribution(
419        &self,
420        subject_id: String,
421    ) -> Result<String, BridgeError> {
422        let subject_id = DigestIdentifier::from_str(&subject_id)
423            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
424
425        Ok(self.api.manual_distribution(subject_id).await?)
426    }
427
428    ///////// Register
429    ////////////////////////////
430    pub async fn get_all_govs(
431        &self,
432        active: Option<bool>,
433    ) -> Result<Vec<GovsData>, BridgeError> {
434        Ok(self.api.all_govs(active).await?)
435    }
436
437    pub async fn get_all_subjs(
438        &self,
439        governance_id: String,
440        active: Option<bool>,
441        schema_id: Option<String>,
442    ) -> Result<Vec<SubjsData>, BridgeError> {
443        let governance_id = DigestIdentifier::from_str(&governance_id)
444            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
445
446        Ok(self.api.all_subjs(governance_id, active, schema_id).await?)
447    }
448
449    ///////// Query
450    ////////////////////////////
451    pub async fn get_events(
452        &self,
453        subject_id: String,
454        query: EventsQuery,
455    ) -> Result<PaginatorEvents, BridgeError> {
456        let subject_id = DigestIdentifier::from_str(&subject_id)
457            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
458
459        Ok(self.api.get_events(subject_id, query).await?)
460    }
461
462    pub async fn get_sink_events(
463        &self,
464        subject_id: String,
465        query: SinkEventsQuery,
466    ) -> Result<SinkEventsPage, BridgeError> {
467        let subject_id = DigestIdentifier::from_str(&subject_id)
468            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
469
470        Ok(self.api.get_sink_events(subject_id, query).await?)
471    }
472
473    pub async fn get_aborts(
474        &self,
475        subject_id: String,
476        query: AbortsQuery,
477    ) -> Result<PaginatorAborts, BridgeError> {
478        let subject_id = DigestIdentifier::from_str(&subject_id)
479            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
480
481        Ok(self.api.get_aborts(subject_id, query).await?)
482    }
483
484    pub async fn get_event_sn(
485        &self,
486        subject_id: String,
487        sn: u64,
488    ) -> Result<LedgerDB, BridgeError> {
489        let subject_id = DigestIdentifier::from_str(&subject_id)
490            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
491
492        Ok(self.api.get_event_sn(subject_id, sn).await?)
493    }
494
495    pub async fn get_first_or_end_events(
496        &self,
497        subject_id: String,
498        quantity: Option<u64>,
499        reverse: Option<bool>,
500        event_type: Option<EventRequestType>,
501    ) -> Result<Vec<LedgerDB>, BridgeError> {
502        let subject_id = DigestIdentifier::from_str(&subject_id)
503            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
504
505        Ok(self
506            .api
507            .get_first_or_end_events(subject_id, quantity, reverse, event_type)
508            .await?)
509    }
510
511    pub async fn get_subject_state(
512        &self,
513        subject_id: String,
514    ) -> Result<SubjectDB, BridgeError> {
515        let subject_id = DigestIdentifier::from_str(&subject_id)
516            .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
517
518        Ok(self.api.get_subject_state(subject_id).await?)
519    }
520}