1#[cfg(feature = "approval")]
2use crate::approval::manager::{ApprovalAPI, ApprovalManager};
3#[cfg(feature = "approval")]
4use crate::approval::{ApprovalMessages, ApprovalResponses};
5use crate::authorized_subjecs::manager::{AuthorizedSubjectsAPI, AuthorizedSubjectsManager};
6use crate::authorized_subjecs::{AuthorizedSubjectsCommand, AuthorizedSubjectsResponse};
7use crate::commons::channel::MpscChannel;
8use crate::commons::crypto::{KeyMaterial, KeyPair};
9use crate::commons::identifier::derive::KeyDerivator;
10use crate::commons::identifier::{Derivable, KeyIdentifier};
11use crate::commons::models::notification::Notification;
12use crate::commons::self_signature_manager::{SelfSignatureInterface, SelfSignatureManager};
13use crate::commons::settings::Settings;
14use crate::database::{DatabaseCollection, DatabaseManager, DB};
15use crate::distribution::error::DistributionErrorResponses;
16use crate::distribution::manager::DistributionManager;
17use crate::distribution::DistributionMessagesNew;
18#[cfg(feature = "evaluation")]
19use crate::evaluator::{EvaluatorManager, EvaluatorMessage, EvaluatorResponse};
20use crate::event::manager::{EventAPI, EventManager};
21use crate::event::{EventCommand, EventResponse};
22use crate::governance::GovernanceAPI;
23use crate::governance::{governance::Governance, GovernanceMessage, GovernanceResponse};
24use crate::ledger::manager::EventManagerAPI;
25use crate::ledger::{manager::LedgerManager, LedgerCommand, LedgerResponse};
26use crate::message::{
27 MessageContent, MessageReceiver, MessageSender, MessageTaskCommand, MessageTaskManager,
28 NetworkEvent,
29};
30use crate::network::network::NetworkProcessor;
31use crate::protocol::protocol_message_manager::{ProtocolManager, TapleMessages};
32use crate::signature::Signed;
33#[cfg(feature = "validation")]
34use crate::validation::manager::ValidationManager;
35#[cfg(feature = "validation")]
36use crate::validation::{ValidationCommand, ValidationResponse};
37use ::futures::Future;
38use libp2p::{Multiaddr, PeerId};
39use log::{error, info};
40use std::marker::PhantomData;
41use std::sync::Arc;
42use tokio::sync::*;
43use tokio_util::sync::CancellationToken;
44
45use crate::api::{Api, ApiManager};
46use crate::error::Error;
47
48const BUFFER_SIZE: usize = 1000;
49
50#[derive(Debug)]
56pub struct Node<M: DatabaseManager<C>, C: DatabaseCollection> {
57 notification_rx: mpsc::Receiver<Notification>,
58 token: CancellationToken,
59 _m: PhantomData<M>,
60 _c: PhantomData<C>,
61}
62
63impl<M: DatabaseManager<C> + 'static, C: DatabaseCollection + 'static> Node<M, C> {
64 pub fn build(settings: Settings, database: M) -> Result<(Self, Api), Error> {
72 let (api_rx, api_tx) = MpscChannel::new(BUFFER_SIZE);
73
74 let (notification_tx, notification_rx) = mpsc::channel(BUFFER_SIZE);
75
76 let (network_tx, network_rx): (mpsc::Sender<NetworkEvent>, mpsc::Receiver<NetworkEvent>) =
77 mpsc::channel(BUFFER_SIZE);
78
79 let (event_rx, event_tx) = MpscChannel::<EventCommand, EventResponse>::new(BUFFER_SIZE);
80
81 let (ledger_rx, ledger_tx) = MpscChannel::<LedgerCommand, LedgerResponse>::new(BUFFER_SIZE);
82
83 let (as_rx, as_tx) =
84 MpscChannel::<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>::new(BUFFER_SIZE);
85
86 let (governance_rx, governance_tx) =
87 MpscChannel::<GovernanceMessage, GovernanceResponse>::new(BUFFER_SIZE);
88
89 let (governance_update_sx, governance_update_rx) = broadcast::channel(BUFFER_SIZE);
91
92 let (task_rx, task_tx) =
93 MpscChannel::<MessageTaskCommand<TapleMessages>, ()>::new(BUFFER_SIZE);
94
95 let (protocol_rx, protocol_tx) =
96 MpscChannel::<Signed<MessageContent<TapleMessages>>, ()>::new(BUFFER_SIZE);
97
98 let (distribution_rx, distribution_tx) = MpscChannel::<
99 DistributionMessagesNew,
100 Result<(), DistributionErrorResponses>,
101 >::new(BUFFER_SIZE);
102
103 #[cfg(feature = "approval")]
104 let (approval_rx, approval_tx) =
105 MpscChannel::<ApprovalMessages, ApprovalResponses>::new(BUFFER_SIZE);
106
107 #[cfg(feature = "evaluation")]
108 let (evaluation_rx, evaluation_tx) =
109 MpscChannel::<EvaluatorMessage, EvaluatorResponse>::new(BUFFER_SIZE);
110
111 #[cfg(feature = "validation")]
112 let (validation_rx, validation_tx) =
113 MpscChannel::<ValidationCommand, ValidationResponse>::new(BUFFER_SIZE);
114
115 let database = Arc::new(database);
116
117 let kp = Self::register_node_key(
118 &settings.node.key_derivator,
119 &settings.node.secret_key,
120 DB::new(database.clone()),
121 )?;
122
123 let controller_id = KeyIdentifier::new(kp.get_key_derivator(), &kp.public_key_bytes());
124 info!("Controller ID: {}", &controller_id);
125
126 let token = CancellationToken::new();
127
128 let network_manager = NetworkProcessor::new(
129 settings.network.listen_addr.clone(),
130 network_access_points(&settings.network.known_nodes)?,
131 network_tx,
132 kp.clone(),
133 token.clone(),
134 notification_tx.clone(),
135 external_addresses(&settings.network.external_address)?,
136 )
137 .expect("Network created");
138
139 let signature_manager = SelfSignatureManager::new(kp.clone(), &settings);
141
142 let network_rx = MessageReceiver::new(
144 network_rx,
145 protocol_tx,
146 token.clone(),
147 notification_tx.clone(),
148 signature_manager.get_own_identifier(),
149 );
150
151 let network_tx = MessageSender::new(
152 network_manager.client(),
153 controller_id.clone(),
154 signature_manager.clone(),
155 settings.node.digest_derivator
156 );
157
158 let task_manager =
159 MessageTaskManager::new(network_tx, task_rx, token.clone(), notification_tx.clone());
160
161 let protocol_manager = ProtocolManager::new(
162 protocol_rx,
163 distribution_tx.clone(),
164 #[cfg(feature = "evaluation")]
165 evaluation_tx,
166 #[cfg(feature = "validation")]
167 validation_tx,
168 event_tx.clone(),
169 #[cfg(feature = "approval")]
170 approval_tx.clone(),
171 ledger_tx.clone(),
172 token.clone(),
173 notification_tx.clone(),
174 );
175
176 let mut governance_manager = Governance::<M, C>::new(
177 governance_rx,
178 token.clone(),
179 notification_tx.clone(),
180 DB::new(database.clone()),
181 governance_update_sx.clone(),
182 );
183
184 let event_manager = EventManager::new(
185 event_rx,
186 governance_update_rx,
187 GovernanceAPI::new(governance_tx.clone()),
188 DB::new(database.clone()),
189 token.clone(),
190 task_tx.clone(),
191 notification_tx.clone(),
192 ledger_tx.clone(),
193 signature_manager.get_own_identifier(),
194 signature_manager.clone(),
195 settings.node.digest_derivator
196 );
197
198 let ledger_manager = LedgerManager::new(
199 ledger_rx,
200 token.clone(),
201 notification_tx.clone(),
202 GovernanceAPI::new(governance_tx.clone()),
203 DB::new(database.clone()),
204 task_tx.clone(),
205 distribution_tx,
206 controller_id.clone(),
207 settings.node.digest_derivator
208 );
209
210 let as_manager = AuthorizedSubjectsManager::new(
211 as_rx,
212 DB::new(database.clone()),
213 task_tx.clone(),
214 controller_id.clone(),
215 token.clone(),
216 notification_tx.clone(),
217 );
218
219 let api_manager = ApiManager::new(
220 api_rx,
221 EventAPI::new(event_tx),
222 #[cfg(feature = "approval")]
223 ApprovalAPI::new(approval_tx),
224 AuthorizedSubjectsAPI::new(as_tx),
225 EventManagerAPI::new(ledger_tx),
226 token.clone(),
227 notification_tx.clone(),
228 DB::new(database.clone()),
229 );
230
231 #[cfg(feature = "evaluation")]
232 let evaluator_manager = EvaluatorManager::new(
233 evaluation_rx,
234 database.clone(),
235 signature_manager.clone(),
236 governance_update_sx.subscribe(),
237 token.clone(),
238 notification_tx.clone(),
239 GovernanceAPI::new(governance_tx.clone()),
240 settings.node.smartcontracts_directory.clone(),
241 task_tx.clone(),
242 settings.node.digest_derivator
243 );
244
245 #[cfg(feature = "approval")]
246 let approval_manager = ApprovalManager::new(
247 GovernanceAPI::new(governance_tx.clone()),
248 approval_rx,
249 token.clone(),
250 task_tx.clone(),
251 governance_update_sx.subscribe(),
252 signature_manager.clone(),
253 notification_tx.clone(),
254 settings.clone(),
255 DB::new(database.clone()),
256 settings.node.digest_derivator
257 );
258
259 let distribution_manager = DistributionManager::new(
260 distribution_rx,
261 governance_update_sx.subscribe(),
262 token.clone(),
263 notification_tx.clone(),
264 task_tx.clone(),
265 GovernanceAPI::new(governance_tx.clone()),
266 signature_manager.clone(),
267 settings.clone(),
268 DB::new(database.clone()),
269 settings.node.digest_derivator
270 );
271
272 #[cfg(feature = "validation")]
273 let validation_manager = ValidationManager::new(
274 validation_rx,
275 GovernanceAPI::new(governance_tx),
276 DB::new(database),
277 signature_manager,
278 token.clone(),
279 notification_tx,
280 task_tx,
281 settings.node.digest_derivator
282 );
283
284 let taple = Node {
285 notification_rx,
286 token,
287 _m: PhantomData::default(),
288 _c: PhantomData::default(),
289 };
290
291 let api = Api::new(
292 network_manager.local_peer_id().to_owned(),
293 controller_id.to_str(),
294 kp.public_key_bytes(),
295 api_tx,
296 );
297
298 tokio::spawn(async move {
299 governance_manager.run().await;
300 });
301
302 tokio::spawn(async move {
303 ledger_manager.run().await;
304 });
305
306 tokio::spawn(async move {
307 event_manager.run().await;
308 });
309
310 tokio::spawn(async move {
311 task_manager.run().await;
312 });
313
314 tokio::spawn(async move {
315 protocol_manager.run().await;
316 });
317
318 tokio::spawn(async move {
319 network_rx.run().await;
320 });
321
322 #[cfg(feature = "evaluation")]
323 tokio::spawn(async move {
324 evaluator_manager.run().await;
325 });
326
327 #[cfg(feature = "validation")]
328 tokio::spawn(async move {
329 validation_manager.run().await;
330 });
331
332 tokio::spawn(async move {
333 distribution_manager.run().await;
334 });
335
336 #[cfg(feature = "approval")]
337 tokio::spawn(async move {
338 approval_manager.run().await;
339 });
340
341 tokio::spawn(async move {
342 as_manager.run().await;
343 });
344
345 tokio::spawn(async move {
346 network_manager.run().await;
347 });
348
349 tokio::spawn(async move {
350 api_manager.run().await;
351 });
352
353 Ok((taple, api))
354 }
355
356 pub async fn recv_notification(&mut self) -> Option<Notification> {
365 self.notification_rx.recv().await
366 }
367
368 pub async fn handle_notifications<H>(mut self, handler: H)
378 where
379 H: Fn(Notification),
380 {
381 while let Some(notification) = self.recv_notification().await {
382 handler(notification);
383 }
384 }
385
386 pub async fn drop_notifications(self) {
394 self.handle_notifications(|_| {}).await;
395 }
396
397 pub fn bind_with_shutdown(&self, signal: impl Future<Output = ()> + Send + 'static) {
402 let token = self.token.clone();
403 tokio::spawn(async move {
404 signal.await;
405 token.cancel();
406 });
407 }
408
409 pub async fn shutdown_gracefully(self) {
414 self.token.cancel();
415 self.drop_notifications().await;
416 }
417
418 fn register_node_key(
419 key_derivator: &KeyDerivator,
420 secret_key: &str,
421 db: DB<C>,
422 ) -> Result<KeyPair, Error> {
423 let key = KeyPair::from_hex(key_derivator, secret_key)
424 .map_err(|_| Error::InvalidHexString)
425 .unwrap();
426 let identifier =
427 KeyIdentifier::new(key.get_key_derivator(), &key.public_key_bytes()).to_str();
428 let stored_identifier = db.get_controller_id().ok();
429 if let Some(stored_identifier) = stored_identifier {
430 if identifier != stored_identifier {
431 error!("Invalid key. There is a differente key stored");
432 return Err(Error::InvalidKeyPairSpecified(stored_identifier));
433 }
434 } else {
435 db.set_controller_id(identifier)
436 .map_err(|e| Error::DatabaseError(e.to_string()))?;
437 }
438 Ok(key)
439 }
440}
441
442fn network_access_points(points: &[String]) -> Result<Vec<(PeerId, Multiaddr)>, Error> {
444 let mut access_points: Vec<(PeerId, Multiaddr)> = Vec::new();
445 for point in points {
446 let data: Vec<&str> = point.split("/p2p/").collect();
447 if data.len() != 2 {
448 return Err(Error::AcessPointError(point.to_string()));
449 }
450 if let Some(value) = multiaddr(point) {
451 if let Ok(id) = data[1].parse::<PeerId>() {
452 access_points.push((id, value));
453 } else {
454 return Err(Error::AcessPointError(format!(
455 "Invalid PeerId conversion: {}",
456 point
457 )));
458 }
459 } else {
460 return Err(Error::AcessPointError(format!(
461 "Invalid MultiAddress conversion: {}",
462 point
463 )));
464 }
465 }
466 Ok(access_points)
467}
468
469fn external_addresses(addresses: &[String]) -> Result<Vec<Multiaddr>, Error> {
471 let mut external_addresses: Vec<Multiaddr> = Vec::new();
472 for address in addresses {
473 if let Some(value) = multiaddr(address) {
474 external_addresses.push(value);
475 } else {
476 return Err(Error::AcessPointError(format!(
477 "Invalid MultiAddress conversion in External Address: {}",
478 address
479 )));
480 }
481 }
482 Ok(external_addresses)
483}
484
485fn multiaddr(addr: &str) -> Option<Multiaddr> {
487 match addr.parse::<Multiaddr>() {
488 Ok(a) => Some(a),
489 Err(_) => None,
490 }
491}