taple_core/
node.rs

1#[cfg(feature = "approval")]
2use crate::approval::manager::{ApprovalAPI, ApprovalManager};
3#[cfg(feature = "approval")]
4use crate::approval::{ApprovalMessages, ApprovalResponses};
5use crate::authorized_subjecs::manager::{AuthorizedSubjectsAPI, AuthorizedSubjectsManager};
6use crate::authorized_subjecs::{AuthorizedSubjectsCommand, AuthorizedSubjectsResponse};
7use crate::commons::channel::MpscChannel;
8use crate::commons::crypto::{KeyMaterial, KeyPair};
9use crate::commons::identifier::derive::KeyDerivator;
10use crate::commons::identifier::{Derivable, KeyIdentifier};
11use crate::commons::models::notification::Notification;
12use crate::commons::self_signature_manager::{SelfSignatureInterface, SelfSignatureManager};
13use crate::commons::settings::Settings;
14use crate::database::{DatabaseCollection, DatabaseManager, DB};
15use crate::distribution::error::DistributionErrorResponses;
16use crate::distribution::manager::DistributionManager;
17use crate::distribution::DistributionMessagesNew;
18#[cfg(feature = "evaluation")]
19use crate::evaluator::{EvaluatorManager, EvaluatorMessage, EvaluatorResponse};
20use crate::event::manager::{EventAPI, EventManager};
21use crate::event::{EventCommand, EventResponse};
22use crate::governance::GovernanceAPI;
23use crate::governance::{governance::Governance, GovernanceMessage, GovernanceResponse};
24use crate::ledger::manager::EventManagerAPI;
25use crate::ledger::{manager::LedgerManager, LedgerCommand, LedgerResponse};
26use crate::message::{
27    MessageContent, MessageReceiver, MessageSender, MessageTaskCommand, MessageTaskManager,
28    NetworkEvent,
29};
30use crate::network::network::NetworkProcessor;
31use crate::protocol::protocol_message_manager::{ProtocolManager, TapleMessages};
32use crate::signature::Signed;
33#[cfg(feature = "validation")]
34use crate::validation::manager::ValidationManager;
35#[cfg(feature = "validation")]
36use crate::validation::{ValidationCommand, ValidationResponse};
37use ::futures::Future;
38use libp2p::{Multiaddr, PeerId};
39use log::{error, info};
40use std::marker::PhantomData;
41use std::sync::Arc;
42use tokio::sync::*;
43use tokio_util::sync::CancellationToken;
44
45use crate::api::{Api, ApiManager};
46use crate::error::Error;
47
48const BUFFER_SIZE: usize = 1000;
49
50/// Structure representing a TAPLE node
51///
52/// A node must be instantiated using the [`Taple::build`] method, which requires a set
53/// of [configuration](Settings) parameters in order to be properly initialized.
54///
55#[derive(Debug)]
56pub struct Node<M: DatabaseManager<C>, C: DatabaseCollection> {
57    notification_rx: mpsc::Receiver<Notification>,
58    token: CancellationToken,
59    _m: PhantomData<M>,
60    _c: PhantomData<C>,
61}
62
63impl<M: DatabaseManager<C> + 'static, C: DatabaseCollection + 'static> Node<M, C> {
64    /// This method creates and initializes a TAPLE node.
65    /// # Possible results
66    /// If the process is successful, the method will return `Ok(())`.
67    /// An error will be returned only if it has not been possible to generate the necessary data
68    /// for the initialization of the components, mainly due to problems in the initial [configuration](Settings).
69    /// # Panics
70    /// This method panics if it has not been possible to generate the network layer.
71    pub fn build(settings: Settings, database: M) -> Result<(Self, Api), Error> {
72        let (api_rx, api_tx) = MpscChannel::new(BUFFER_SIZE);
73
74        let (notification_tx, notification_rx) = mpsc::channel(BUFFER_SIZE);
75
76        let (network_tx, network_rx): (mpsc::Sender<NetworkEvent>, mpsc::Receiver<NetworkEvent>) =
77            mpsc::channel(BUFFER_SIZE);
78
79        let (event_rx, event_tx) = MpscChannel::<EventCommand, EventResponse>::new(BUFFER_SIZE);
80
81        let (ledger_rx, ledger_tx) = MpscChannel::<LedgerCommand, LedgerResponse>::new(BUFFER_SIZE);
82
83        let (as_rx, as_tx) =
84            MpscChannel::<AuthorizedSubjectsCommand, AuthorizedSubjectsResponse>::new(BUFFER_SIZE);
85
86        let (governance_rx, governance_tx) =
87            MpscChannel::<GovernanceMessage, GovernanceResponse>::new(BUFFER_SIZE);
88
89        // TODO: broadcast channel. Is a lag corretly managed?
90        let (governance_update_sx, governance_update_rx) = broadcast::channel(BUFFER_SIZE);
91
92        let (task_rx, task_tx) =
93            MpscChannel::<MessageTaskCommand<TapleMessages>, ()>::new(BUFFER_SIZE);
94
95        let (protocol_rx, protocol_tx) =
96            MpscChannel::<Signed<MessageContent<TapleMessages>>, ()>::new(BUFFER_SIZE);
97
98        let (distribution_rx, distribution_tx) = MpscChannel::<
99            DistributionMessagesNew,
100            Result<(), DistributionErrorResponses>,
101        >::new(BUFFER_SIZE);
102
103        #[cfg(feature = "approval")]
104        let (approval_rx, approval_tx) =
105            MpscChannel::<ApprovalMessages, ApprovalResponses>::new(BUFFER_SIZE);
106
107        #[cfg(feature = "evaluation")]
108        let (evaluation_rx, evaluation_tx) =
109            MpscChannel::<EvaluatorMessage, EvaluatorResponse>::new(BUFFER_SIZE);
110
111        #[cfg(feature = "validation")]
112        let (validation_rx, validation_tx) =
113            MpscChannel::<ValidationCommand, ValidationResponse>::new(BUFFER_SIZE);
114
115        let database = Arc::new(database);
116
117        let kp = Self::register_node_key(
118            &settings.node.key_derivator,
119            &settings.node.secret_key,
120            DB::new(database.clone()),
121        )?;
122
123        let controller_id = KeyIdentifier::new(kp.get_key_derivator(), &kp.public_key_bytes());
124        info!("Controller ID: {}", &controller_id);
125
126        let token = CancellationToken::new();
127
128        let network_manager = NetworkProcessor::new(
129            settings.network.listen_addr.clone(),
130            network_access_points(&settings.network.known_nodes)?,
131            network_tx,
132            kp.clone(),
133            token.clone(),
134            notification_tx.clone(),
135            external_addresses(&settings.network.external_address)?,
136        )
137        .expect("Network created");
138
139        //TODO: change name. It's not a task
140        let signature_manager = SelfSignatureManager::new(kp.clone(), &settings);
141
142        //TODO: change name. It's a task
143        let network_rx = MessageReceiver::new(
144            network_rx,
145            protocol_tx,
146            token.clone(),
147            notification_tx.clone(),
148            signature_manager.get_own_identifier(),
149        );
150
151        let network_tx = MessageSender::new(
152            network_manager.client(),
153            controller_id.clone(),
154            signature_manager.clone(),
155            settings.node.digest_derivator
156        );
157
158        let task_manager =
159            MessageTaskManager::new(network_tx, task_rx, token.clone(), notification_tx.clone());
160
161        let protocol_manager = ProtocolManager::new(
162            protocol_rx,
163            distribution_tx.clone(),
164            #[cfg(feature = "evaluation")]
165            evaluation_tx,
166            #[cfg(feature = "validation")]
167            validation_tx,
168            event_tx.clone(),
169            #[cfg(feature = "approval")]
170            approval_tx.clone(),
171            ledger_tx.clone(),
172            token.clone(),
173            notification_tx.clone(),
174        );
175
176        let mut governance_manager = Governance::<M, C>::new(
177            governance_rx,
178            token.clone(),
179            notification_tx.clone(),
180            DB::new(database.clone()),
181            governance_update_sx.clone(),
182        );
183
184        let event_manager = EventManager::new(
185            event_rx,
186            governance_update_rx,
187            GovernanceAPI::new(governance_tx.clone()),
188            DB::new(database.clone()),
189            token.clone(),
190            task_tx.clone(),
191            notification_tx.clone(),
192            ledger_tx.clone(),
193            signature_manager.get_own_identifier(),
194            signature_manager.clone(),
195            settings.node.digest_derivator
196        );
197
198        let ledger_manager = LedgerManager::new(
199            ledger_rx,
200            token.clone(),
201            notification_tx.clone(),
202            GovernanceAPI::new(governance_tx.clone()),
203            DB::new(database.clone()),
204            task_tx.clone(),
205            distribution_tx,
206            controller_id.clone(),
207            settings.node.digest_derivator
208        );
209
210        let as_manager = AuthorizedSubjectsManager::new(
211            as_rx,
212            DB::new(database.clone()),
213            task_tx.clone(),
214            controller_id.clone(),
215            token.clone(),
216            notification_tx.clone(),
217        );
218
219        let api_manager = ApiManager::new(
220            api_rx,
221            EventAPI::new(event_tx),
222            #[cfg(feature = "approval")]
223            ApprovalAPI::new(approval_tx),
224            AuthorizedSubjectsAPI::new(as_tx),
225            EventManagerAPI::new(ledger_tx),
226            token.clone(),
227            notification_tx.clone(),
228            DB::new(database.clone()),
229        );
230
231        #[cfg(feature = "evaluation")]
232        let evaluator_manager = EvaluatorManager::new(
233            evaluation_rx,
234            database.clone(),
235            signature_manager.clone(),
236            governance_update_sx.subscribe(),
237            token.clone(),
238            notification_tx.clone(),
239            GovernanceAPI::new(governance_tx.clone()),
240            settings.node.smartcontracts_directory.clone(),
241            task_tx.clone(),
242            settings.node.digest_derivator
243        );
244
245        #[cfg(feature = "approval")]
246        let approval_manager = ApprovalManager::new(
247            GovernanceAPI::new(governance_tx.clone()),
248            approval_rx,
249            token.clone(),
250            task_tx.clone(),
251            governance_update_sx.subscribe(),
252            signature_manager.clone(),
253            notification_tx.clone(),
254            settings.clone(),
255            DB::new(database.clone()),
256            settings.node.digest_derivator
257        );
258
259        let distribution_manager = DistributionManager::new(
260            distribution_rx,
261            governance_update_sx.subscribe(),
262            token.clone(),
263            notification_tx.clone(),
264            task_tx.clone(),
265            GovernanceAPI::new(governance_tx.clone()),
266            signature_manager.clone(),
267            settings.clone(),
268            DB::new(database.clone()),
269            settings.node.digest_derivator
270        );
271
272        #[cfg(feature = "validation")]
273        let validation_manager = ValidationManager::new(
274            validation_rx,
275            GovernanceAPI::new(governance_tx),
276            DB::new(database),
277            signature_manager,
278            token.clone(),
279            notification_tx,
280            task_tx,
281            settings.node.digest_derivator
282        );
283
284        let taple = Node {
285            notification_rx,
286            token,
287            _m: PhantomData::default(),
288            _c: PhantomData::default(),
289        };
290
291        let api = Api::new(
292            network_manager.local_peer_id().to_owned(),
293            controller_id.to_str(),
294            kp.public_key_bytes(),
295            api_tx,
296        );
297
298        tokio::spawn(async move {
299            governance_manager.run().await;
300        });
301
302        tokio::spawn(async move {
303            ledger_manager.run().await;
304        });
305
306        tokio::spawn(async move {
307            event_manager.run().await;
308        });
309
310        tokio::spawn(async move {
311            task_manager.run().await;
312        });
313
314        tokio::spawn(async move {
315            protocol_manager.run().await;
316        });
317
318        tokio::spawn(async move {
319            network_rx.run().await;
320        });
321
322        #[cfg(feature = "evaluation")]
323        tokio::spawn(async move {
324            evaluator_manager.run().await;
325        });
326
327        #[cfg(feature = "validation")]
328        tokio::spawn(async move {
329            validation_manager.run().await;
330        });
331
332        tokio::spawn(async move {
333            distribution_manager.run().await;
334        });
335
336        #[cfg(feature = "approval")]
337        tokio::spawn(async move {
338            approval_manager.run().await;
339        });
340
341        tokio::spawn(async move {
342            as_manager.run().await;
343        });
344
345        tokio::spawn(async move {
346            network_manager.run().await;
347        });
348
349        tokio::spawn(async move {
350            api_manager.run().await;
351        });
352
353        Ok((taple, api))
354    }
355
356    /// Receive a single notification
357    ///
358    /// All notifications must be consumed. If the notification buffer is full the node
359    /// will be blocked until there is space in the buffer. Notifications can be consumed
360    /// in different ways.
361    ///
362    /// `recv_notification` allows to consume the notifications one by one and keep control
363    /// of the execution flow.  
364    pub async fn recv_notification(&mut self) -> Option<Notification> {
365        self.notification_rx.recv().await
366    }
367
368    /// Handle all notifications
369    ///
370    /// All notifications must be consumed. If the notification buffer is full the node
371    /// will be blocked until there is space in the buffer. Notifications can be consumed
372    /// in different ways.
373    ///
374    /// `handle_notifications` processes all notifications from the node. For this purpose,
375    /// the function in charge of processing the notifications is passed as input.  This
376    /// function blocks the task where it is invoked until the shutdown signal is produced.
377    pub async fn handle_notifications<H>(mut self, handler: H)
378    where
379        H: Fn(Notification),
380    {
381        while let Some(notification) = self.recv_notification().await {
382            handler(notification);
383        }
384    }
385
386    /// Drop all notifications
387    ///
388    /// All notifications must be consumed. If the notification buffer is full the node
389    /// will be blocked until there is space in the buffer. Notifications can be consumed
390    /// in different ways.
391    ///
392    /// `drop_notifications` discards all notifications from the node.
393    pub async fn drop_notifications(self) {
394        self.handle_notifications(|_| {}).await;
395    }
396
397    /// Bind the node with a shutdown signal.
398    ///
399    /// When the signal completes, the server will start the graceful shutdown
400    /// process. The node can be bind to multiple signals.
401    pub fn bind_with_shutdown(&self, signal: impl Future<Output = ()> + Send + 'static) {
402        let token = self.token.clone();
403        tokio::spawn(async move {
404            signal.await;
405            token.cancel();
406        });
407    }
408
409    /// Shutdown gracefully the node
410    ///
411    /// This function triggers the shutdown signal and waits until the node is safely terminated.
412    /// This function can only be used if Y or Z has not been used to process the notifications.
413    pub async fn shutdown_gracefully(self) {
414        self.token.cancel();
415        self.drop_notifications().await;
416    }
417
418    fn register_node_key(
419        key_derivator: &KeyDerivator,
420        secret_key: &str,
421        db: DB<C>,
422    ) -> Result<KeyPair, Error> {
423        let key = KeyPair::from_hex(key_derivator, secret_key)
424            .map_err(|_| Error::InvalidHexString)
425            .unwrap();
426        let identifier =
427            KeyIdentifier::new(key.get_key_derivator(), &key.public_key_bytes()).to_str();
428        let stored_identifier = db.get_controller_id().ok();
429        if let Some(stored_identifier) = stored_identifier {
430            if identifier != stored_identifier {
431                error!("Invalid key. There is a differente key stored");
432                return Err(Error::InvalidKeyPairSpecified(stored_identifier));
433            }
434        } else {
435            db.set_controller_id(identifier)
436                .map_err(|e| Error::DatabaseError(e.to_string()))?;
437        }
438        Ok(key)
439    }
440}
441
442// TODO: move to better place, maybe settings
443fn network_access_points(points: &[String]) -> Result<Vec<(PeerId, Multiaddr)>, Error> {
444    let mut access_points: Vec<(PeerId, Multiaddr)> = Vec::new();
445    for point in points {
446        let data: Vec<&str> = point.split("/p2p/").collect();
447        if data.len() != 2 {
448            return Err(Error::AcessPointError(point.to_string()));
449        }
450        if let Some(value) = multiaddr(point) {
451            if let Ok(id) = data[1].parse::<PeerId>() {
452                access_points.push((id, value));
453            } else {
454                return Err(Error::AcessPointError(format!(
455                    "Invalid PeerId conversion: {}",
456                    point
457                )));
458            }
459        } else {
460            return Err(Error::AcessPointError(format!(
461                "Invalid MultiAddress conversion: {}",
462                point
463            )));
464        }
465    }
466    Ok(access_points)
467}
468
469// TODO: move to better place, maybe settings
470fn external_addresses(addresses: &[String]) -> Result<Vec<Multiaddr>, Error> {
471    let mut external_addresses: Vec<Multiaddr> = Vec::new();
472    for address in addresses {
473        if let Some(value) = multiaddr(address) {
474            external_addresses.push(value);
475        } else {
476            return Err(Error::AcessPointError(format!(
477                "Invalid MultiAddress conversion in External Address: {}",
478                address
479            )));
480        }
481    }
482    Ok(external_addresses)
483}
484
485// TODO: move to better place, maybe settings
486fn multiaddr(addr: &str) -> Option<Multiaddr> {
487    match addr.parse::<Multiaddr>() {
488        Ok(a) => Some(a),
489        Err(_) => None,
490    }
491}