1use std::{collections::HashSet, str::FromStr};
2
3pub use ave_common::Namespace;
4pub use ave_common::response::MonitorNetworkState;
5use ave_common::{
6 bridge::request::{
7 AbortsQuery, ApprovalState, ApprovalStateRes, BridgeSignedEventRequest,
8 EventRequestType, EventsQuery, SinkEventsQuery, UpdateSubjectQuery,
9 },
10 identity::{DigestIdentifier, PublicKey, Signature, Signed},
11 request::EventRequest,
12 response::{
13 ApprovalEntry, GovsData, LedgerDB, PaginatorAborts, PaginatorEvents,
14 RequestData as RequestDataRes, RequestInfo, RequestInfoExtend,
15 RequestsInManager, RequestsInManagerSubject, SinkEventsPage, SubjectDB,
16 SubjsData, TransferSubject,
17 },
18};
19pub use ave_core::config::{MachineSpec, resolve_spec};
20pub use ave_core::{
21 Api as AveApi,
22 auth::AuthWitness,
23 config::Config as AveConfig,
24 config::{
25 AveExternalDBConfig, AveInternalDBConfig, LoggingConfig, LoggingOutput,
26 LoggingRotation, SinkConfig, SinkQueuePolicy, SinkRoutingStrategy,
27 SinkServer,
28 },
29 error::Error,
30};
31use ave_core::{config::SinkAuth, helpers::sink::obtain_token};
32pub use ave_network::{
33 Config as NetworkConfig, ControlListConfig, MemoryLimitsConfig,
34 RoutingConfig, RoutingNode,
35};
36use config::Config;
37use prometheus_client::registry::Registry;
38use tokio::{
39 signal::unix::{SignalKind, signal},
40 task::JoinHandle,
41};
42use tokio_util::sync::CancellationToken;
43use utils::key_pair;
44
45pub mod config;
46pub use http::{CorsConfig, HttpConfig, ProxyConfig, SelfSignedCertConfig};
47pub mod conversions;
48pub mod error;
49pub mod http;
50pub mod settings;
51pub mod utils;
52pub use clap;
53pub mod auth;
54
55pub use error::BridgeError;
56
57pub use ave_common;
58
59#[cfg(feature = "prometheus")]
60pub mod prometheus;
61
62use crate::conversions::{
63 core_approval_req_to_common, core_tranfer_subject_to_common,
64};
65
66#[cfg(all(feature = "sqlite", feature = "rocksdb"))]
67compile_error!("Select only one: 'sqlite' or 'rocksdb'");
68
69#[cfg(not(any(feature = "sqlite", feature = "rocksdb")))]
70compile_error!("You must enable 'sqlite' or 'rocksdb'");
71
72#[cfg(not(feature = "ext-sqlite"))]
73compile_error!("You must enable 'ext-sqlite'");
74
75#[derive(Clone)]
76pub struct Bridge {
77 api: AveApi,
78 config: Config,
79 graceful_token: CancellationToken,
80 crash_token: CancellationToken,
81 #[cfg(feature = "prometheus")]
82 registry: std::sync::Arc<
83 tokio::sync::Mutex<prometheus_client::registry::Registry>,
84 >,
85}
86
87impl Bridge {
88 pub async fn build(
89 settings: &Config,
90 password: &str,
91 password_sink: &str,
92 sink_api_key: &str,
93 graceful_token: Option<CancellationToken>,
94 crash_token: Option<CancellationToken>,
95 ) -> Result<(Self, Vec<JoinHandle<()>>), BridgeError> {
96 let keys = key_pair(settings, password)?;
97
98 let auth_token =
100 if sink_api_key.is_empty() && !settings.sink.auth.is_empty() {
101 Some(
102 obtain_token(
103 &settings.sink.auth,
104 &settings.sink.username,
105 password_sink,
106 )
107 .await?,
108 )
109 } else {
110 None
111 };
112
113 let mut registry = <Registry>::default();
114
115 let graceful_token = graceful_token.unwrap_or_default();
116 let crash_token = crash_token.unwrap_or_default();
117
118 let (api, runners) = AveApi::build(
119 keys,
120 settings.node.clone(),
121 SinkAuth {
122 sink: settings.sink.clone(),
123 token: auth_token,
124 password: password_sink.to_owned(),
125 api_key: sink_api_key.to_owned(),
126 },
127 &mut registry,
128 password,
129 graceful_token.clone(),
130 crash_token.clone(),
131 )
132 .await?;
133
134 Self::bind_with_shutdown(graceful_token.clone());
135
136 #[cfg(feature = "prometheus")]
137 let registry = std::sync::Arc::new(tokio::sync::Mutex::new(registry));
138
139 Ok((
140 Self {
141 api,
142 config: settings.clone(),
143 graceful_token,
144 crash_token,
145 #[cfg(feature = "prometheus")]
146 registry,
147 },
148 runners,
149 ))
150 }
151
152 pub const fn graceful_token(&self) -> &CancellationToken {
153 &self.graceful_token
154 }
155
156 pub const fn crash_token(&self) -> &CancellationToken {
157 &self.crash_token
158 }
159
160 #[cfg(feature = "prometheus")]
161 pub fn registry(
162 &self,
163 ) -> std::sync::Arc<tokio::sync::Mutex<prometheus_client::registry::Registry>>
164 {
165 self.registry.clone()
166 }
167
168 fn bind_with_shutdown(token: CancellationToken) {
169 let cancellation_token = token;
170 let mut sigterm = signal(SignalKind::terminate())
171 .expect("It could not be registered SIGTERM");
172
173 tokio::spawn(async move {
174 tokio::select! {
175 _ = tokio::signal::ctrl_c() => {},
176 _ = sigterm.recv() => {},
177 }
178
179 cancellation_token.cancel();
180 });
181 }
182
183 pub fn get_peer_id(&self) -> &str {
186 self.api.peer_id()
187 }
188
189 pub fn get_public_key(&self) -> &str {
190 self.api.public_key()
191 }
192
193 pub fn get_config(&self) -> Config {
194 self.config.clone()
195 }
196
197 pub async fn get_network_state(
200 &self,
201 ) -> Result<MonitorNetworkState, BridgeError> {
202 Ok(self.api.get_network_state().await?)
203 }
204
205 pub async fn get_requests_in_manager(
208 &self,
209 ) -> Result<RequestsInManager, BridgeError> {
210 Ok(self.api.get_requests_in_manager().await?)
211 }
212
213 pub async fn get_requests_in_manager_subject_id(
214 &self,
215 subject_id: String,
216 ) -> Result<RequestsInManagerSubject, BridgeError> {
217 let subject_id = DigestIdentifier::from_str(&subject_id)
218 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
219
220 Ok(self
221 .api
222 .get_requests_in_manager_subject_id(subject_id)
223 .await?)
224 }
225
226 pub async fn post_event_request(
227 &self,
228 request: BridgeSignedEventRequest,
229 ) -> Result<RequestDataRes, BridgeError> {
230 let event: EventRequest =
231 conversions::bridge_to_event_request(request.request)?;
232 let result = if let Some(signature) = request.signature {
233 let signature = Signature::try_from(signature).map_err(|e| {
234 BridgeError::InvalidSignature(format!("{:?}", e))
235 })?;
236
237 let signed_request = Signed::from_parts(event, signature);
238
239 self.api.external_request(signed_request).await?
240 } else {
241 self.api.own_request(event).await?
242 };
243 Ok(conversions::core_request_to_common(result))
244 }
245
246 pub async fn get_approval(
247 &self,
248 subject_id: String,
249 state: Option<ApprovalState>,
250 ) -> Result<Option<ApprovalEntry>, BridgeError> {
251 let subject_id = DigestIdentifier::from_str(&subject_id)
252 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
253
254 Ok(self.api.get_approval(subject_id, state).await?.map(|x| {
255 ApprovalEntry {
256 request: core_approval_req_to_common(x.0),
257 state: x.1,
258 }
259 }))
260 }
261
262 pub async fn get_approvals(
263 &self,
264 state: Option<ApprovalState>,
265 ) -> Result<Vec<ApprovalEntry>, BridgeError> {
266 let res = self.api.get_approvals(state).await?;
267
268 Ok(res
269 .iter()
270 .map(|x| ApprovalEntry {
271 request: core_approval_req_to_common(x.0.clone()),
272 state: x.1.clone(),
273 })
274 .collect())
275 }
276
277 pub async fn patch_approve(
278 &self,
279 subject_id: String,
280 state: ApprovalStateRes,
281 ) -> Result<String, BridgeError> {
282 let subject_id = DigestIdentifier::from_str(&subject_id)
283 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
284
285 Ok(self.api.approve(subject_id, state).await?)
286 }
287
288 pub async fn post_manual_request_abort(
289 &self,
290 subject_id: String,
291 ) -> Result<String, BridgeError> {
292 let subject_id = DigestIdentifier::from_str(&subject_id)
293 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
294
295 Ok(self.api.manual_request_abort(subject_id).await?)
296 }
297
298 pub async fn get_request_state(
301 &self,
302 request_id: String,
303 ) -> Result<RequestInfo, BridgeError> {
304 let request_id = DigestIdentifier::from_str(&request_id)
305 .map_err(|e| BridgeError::InvalidRequestId(e.to_string()))?;
306
307 Ok(self.api.get_request_state(request_id).await?)
308 }
309
310 pub async fn get_all_request_state(
311 &self,
312 ) -> Result<Vec<RequestInfoExtend>, BridgeError> {
313 Ok(self.api.all_request_state().await?)
314 }
315
316 pub async fn get_pending_transfers(
319 &self,
320 ) -> Result<Vec<TransferSubject>, BridgeError> {
321 let res = self.api.get_pending_transfers().await?;
322 Ok(res
323 .iter()
324 .map(|x| core_tranfer_subject_to_common(x.clone()))
325 .collect())
326 }
327
328 pub async fn put_auth_subject(
331 &self,
332 subject_id: String,
333 witnesses: Vec<String>,
334 ) -> Result<String, BridgeError> {
335 let subject_id = DigestIdentifier::from_str(&subject_id)
336 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
337
338 let mut witnesses_key = vec![];
339
340 for witness in witnesses {
341 witnesses_key.push(
342 PublicKey::from_str(&witness).map_err(|e| {
343 BridgeError::InvalidPublicKey(e.to_string())
344 })?,
345 );
346 }
347
348 let auh_witness = if witnesses_key.is_empty() {
349 AuthWitness::None
350 } else if witnesses_key.len() == 1 {
351 AuthWitness::One(witnesses_key[0].clone())
352 } else {
353 AuthWitness::Many(witnesses_key)
354 };
355
356 Ok(self.api.auth_subject(subject_id, auh_witness).await?)
357 }
358
359 pub async fn get_all_auth_subjects(
360 &self,
361 ) -> Result<Vec<String>, BridgeError> {
362 let res = self.api.all_auth_subjects().await?;
363
364 Ok(res.iter().map(|x| x.to_string()).collect())
365 }
366
367 pub async fn get_witnesses_subject(
368 &self,
369 subject_id: String,
370 ) -> Result<HashSet<String>, BridgeError> {
371 let subject_id = DigestIdentifier::from_str(&subject_id)
372 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
373
374 let res = self.api.witnesses_subject(subject_id).await?;
375
376 Ok(res.iter().map(|x| x.to_string()).collect())
377 }
378
379 pub async fn delete_auth_subject(
380 &self,
381 subject_id: String,
382 ) -> Result<String, BridgeError> {
383 let subject_id = DigestIdentifier::from_str(&subject_id)
384 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
385
386 Ok(self.api.delete_auth_subject(subject_id).await?)
387 }
388
389 pub async fn post_update_subject(
390 &self,
391 subject_id: String,
392 query: UpdateSubjectQuery,
393 ) -> Result<String, BridgeError> {
394 let subject_id = DigestIdentifier::from_str(&subject_id)
395 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
396
397 Ok(self
398 .api
399 .update_subject_with_options(
400 subject_id,
401 query.strict.unwrap_or(false),
402 )
403 .await?)
404 }
405
406 pub async fn delete_subject(
407 &self,
408 subject_id: String,
409 ) -> Result<String, BridgeError> {
410 let subject_id = DigestIdentifier::from_str(&subject_id)
411 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
412
413 Ok(self.api.delete_subject(subject_id).await?)
414 }
415
416 pub async fn post_manual_distribution(
419 &self,
420 subject_id: String,
421 ) -> Result<String, BridgeError> {
422 let subject_id = DigestIdentifier::from_str(&subject_id)
423 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
424
425 Ok(self.api.manual_distribution(subject_id).await?)
426 }
427
428 pub async fn get_all_govs(
431 &self,
432 active: Option<bool>,
433 ) -> Result<Vec<GovsData>, BridgeError> {
434 Ok(self.api.all_govs(active).await?)
435 }
436
437 pub async fn get_all_subjs(
438 &self,
439 governance_id: String,
440 active: Option<bool>,
441 schema_id: Option<String>,
442 ) -> Result<Vec<SubjsData>, BridgeError> {
443 let governance_id = DigestIdentifier::from_str(&governance_id)
444 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
445
446 Ok(self.api.all_subjs(governance_id, active, schema_id).await?)
447 }
448
449 pub async fn get_events(
452 &self,
453 subject_id: String,
454 query: EventsQuery,
455 ) -> Result<PaginatorEvents, BridgeError> {
456 let subject_id = DigestIdentifier::from_str(&subject_id)
457 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
458
459 Ok(self.api.get_events(subject_id, query).await?)
460 }
461
462 pub async fn get_sink_events(
463 &self,
464 subject_id: String,
465 query: SinkEventsQuery,
466 ) -> Result<SinkEventsPage, BridgeError> {
467 let subject_id = DigestIdentifier::from_str(&subject_id)
468 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
469
470 Ok(self.api.get_sink_events(subject_id, query).await?)
471 }
472
473 pub async fn get_aborts(
474 &self,
475 subject_id: String,
476 query: AbortsQuery,
477 ) -> Result<PaginatorAborts, BridgeError> {
478 let subject_id = DigestIdentifier::from_str(&subject_id)
479 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
480
481 Ok(self.api.get_aborts(subject_id, query).await?)
482 }
483
484 pub async fn get_event_sn(
485 &self,
486 subject_id: String,
487 sn: u64,
488 ) -> Result<LedgerDB, BridgeError> {
489 let subject_id = DigestIdentifier::from_str(&subject_id)
490 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
491
492 Ok(self.api.get_event_sn(subject_id, sn).await?)
493 }
494
495 pub async fn get_first_or_end_events(
496 &self,
497 subject_id: String,
498 quantity: Option<u64>,
499 reverse: Option<bool>,
500 event_type: Option<EventRequestType>,
501 ) -> Result<Vec<LedgerDB>, BridgeError> {
502 let subject_id = DigestIdentifier::from_str(&subject_id)
503 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
504
505 Ok(self
506 .api
507 .get_first_or_end_events(subject_id, quantity, reverse, event_type)
508 .await?)
509 }
510
511 pub async fn get_subject_state(
512 &self,
513 subject_id: String,
514 ) -> Result<SubjectDB, BridgeError> {
515 let subject_id = DigestIdentifier::from_str(&subject_id)
516 .map_err(|e| BridgeError::InvalidSubjectId(e.to_string()))?;
517
518 Ok(self.api.get_subject_state(subject_id).await?)
519 }
520}