exocore_core/cell/
node.rs

1use std::{
2    collections::HashSet,
3    fmt::{Debug, Display},
4    ops::Deref,
5    path::Path,
6    str::FromStr,
7    sync::{Arc, RwLock},
8};
9
10use exocore_protos::{
11    core::{node_cell_config, NodeAddresses},
12    generated::exocore_core::{LocalNodeConfig, NodeConfig},
13};
14use libp2p::{core::Multiaddr, PeerId};
15use url::Url;
16
17use super::{error::Error, Cell, CellId};
18use crate::{
19    cell::LocalNodeConfigExt,
20    dir::{ram::RamDirectory, DynDirectory},
21    sec::{
22        keys::{Keypair, PublicKey},
23        signature::Signature,
24    },
25};
26
27const NODE_CONFIG_FILE: &str = "node.yaml";
28
29/// Represents a machine / process on which Exocore runs. A node can host
30/// multiple `Cell`.
31#[derive(Clone)]
32pub struct Node {
33    identity: Arc<NodeIdentity>,
34    addresses: Arc<RwLock<Addresses>>,
35}
36
37struct NodeIdentity {
38    node_id: NodeId,
39    peer_id: PeerId,
40    consistent_clock_id: u16,
41    public_key: PublicKey,
42    name: String,
43}
44
45impl Node {
46    pub fn from_public_key(public_key: PublicKey) -> Node {
47        Self::build(public_key, None)
48    }
49
50    pub fn from_config(config: NodeConfig) -> Result<Node, Error> {
51        let public_key = PublicKey::decode_base58_string(&config.public_key)
52            .map_err(|err| Error::Cell(anyhow!("Couldn't decode node public key: {}", err)))?;
53
54        let name = if !config.name.is_empty() {
55            Some(config.name)
56        } else {
57            None
58        };
59
60        let node = Self::build(public_key, name);
61
62        {
63            let mut addresses = node.addresses.write().unwrap();
64            *addresses = Addresses::parse(&config.addresses.unwrap_or_default())?;
65        }
66
67        Ok(node)
68    }
69
70    #[cfg(any(test, feature = "tests-utils"))]
71    pub fn generate_temporary() -> Node {
72        let keypair = Keypair::generate_ed25519();
73        Self::build(keypair.public(), None)
74    }
75
76    fn build(public_key: PublicKey, name: Option<String>) -> Node {
77        let node_id = NodeId::from_public_key(&public_key);
78        let peer_id = *node_id.to_peer_id();
79
80        let node_id_bytes = node_id.0.to_bytes();
81        let node_id_bytes_len = node_id_bytes.len();
82        let consistent_clock_id = u16::from_le_bytes([
83            node_id_bytes[node_id_bytes_len - 1],
84            node_id_bytes[node_id_bytes_len - 2],
85        ]);
86
87        let name = name.unwrap_or_else(|| public_key.generate_name());
88
89        Node {
90            identity: Arc::new(NodeIdentity {
91                node_id,
92                peer_id,
93                consistent_clock_id,
94                public_key,
95                name,
96            }),
97            addresses: Arc::new(RwLock::new(Addresses {
98                p2p: HashSet::new(),
99                http: HashSet::new(),
100            })),
101        }
102    }
103
104    pub fn id(&self) -> &NodeId {
105        &self.identity.node_id
106    }
107
108    pub fn public_key(&self) -> &PublicKey {
109        &self.identity.public_key
110    }
111
112    pub fn peer_id(&self) -> &PeerId {
113        &self.identity.peer_id
114    }
115
116    pub fn name(&self) -> &str {
117        &self.identity.name
118    }
119
120    pub fn consistent_clock_id(&self) -> u16 {
121        self.identity.consistent_clock_id
122    }
123
124    pub fn p2p_addresses(&self) -> Vec<Multiaddr> {
125        let addresses = self.addresses.read().expect("Couldn't get addresses lock");
126        addresses.p2p.iter().cloned().collect()
127    }
128
129    pub fn add_p2p_address(&self, address: Multiaddr) {
130        let mut addresses = self.addresses.write().expect("Couldn't get addresses lock");
131        addresses.p2p.insert(address);
132    }
133
134    pub fn http_addresses(&self) -> Vec<Url> {
135        let addresses = self.addresses.read().expect("Couldn't get addresses lock");
136        addresses.http.iter().cloned().collect()
137    }
138
139    pub fn add_http_address(&self, address: Url) {
140        let mut addresses = self.addresses.write().expect("Couldn't get addresses lock");
141        addresses.http.insert(address);
142    }
143}
144
145impl PartialEq for Node {
146    fn eq(&self, other: &Self) -> bool {
147        self.identity.node_id.eq(&other.identity.node_id)
148    }
149}
150
151impl Eq for Node {}
152
153impl Debug for Node {
154    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
155        let addresses = self.addresses.read().expect("Couldn't get addresses lock");
156        f.debug_struct("Node")
157            .field("name", &self.identity.name)
158            .field("node_id", &self.identity.node_id)
159            .field(
160                "public_key",
161                &self.identity.public_key.encode_base58_string(),
162            )
163            .field("p2p_addresses", &addresses.p2p)
164            .field("http_addresses", &addresses.http)
165            .finish()
166    }
167}
168
169impl Display for Node {
170    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
171        f.write_str("Node{")?;
172        f.write_str(&self.identity.name)?;
173        f.write_str("}")
174    }
175}
176
177/// Represents the local `Node` being run in the current process. Contrarily to
178/// other nodes, we have a full private+public keypair that we can sign messages
179/// with.
180#[derive(Clone)]
181pub struct LocalNode {
182    node: Node,
183    ident: Arc<LocalNodeIdentity>,
184    dir: DynDirectory,
185}
186
187struct LocalNodeIdentity {
188    keypair: Keypair,
189    config: LocalNodeConfig,
190    addresses: Addresses,
191}
192
193impl LocalNode {
194    pub fn from_config(
195        dir: impl Into<DynDirectory>,
196        config: LocalNodeConfig,
197    ) -> Result<LocalNode, Error> {
198        let keypair = Keypair::decode_base58_string(&config.keypair)
199            .map_err(|err| Error::Cell(anyhow!("Couldn't decode local node keypair: {}", err)))?;
200
201        let listen_addresses =
202            Addresses::parse(&config.listen_addresses.clone().unwrap_or_default())?;
203        let local_node = LocalNode {
204            node: Node::from_config(NodeConfig {
205                public_key: config.public_key.clone(),
206                name: config.name.clone(),
207                id: config.id.clone(),
208                addresses: config.addresses.clone(),
209            })?,
210            ident: Arc::new(LocalNodeIdentity {
211                keypair,
212                config,
213                addresses: listen_addresses,
214            }),
215            dir: dir.into(),
216        };
217
218        Ok(local_node)
219    }
220
221    pub fn generate() -> LocalNode {
222        Self::generate_in_directory(RamDirectory::default())
223            .expect("Couldn't generate a in-memory node")
224    }
225
226    pub fn generate_in_directory(dir: impl Into<DynDirectory>) -> Result<LocalNode, Error> {
227        let dir = dir.into();
228
229        let keypair = Keypair::generate_ed25519();
230        let node = Node::from_public_key(keypair.public());
231        let node_name = node.name().to_string();
232
233        let config = LocalNodeConfig {
234            keypair: keypair.encode_base58_string(),
235            public_key: keypair.public().encode_base58_string(),
236            id: node.id().to_string(),
237            name: node_name,
238            ..Default::default()
239        };
240
241        let node = Self::from_config(dir, config.clone())
242            .expect("Couldn't create node config generated config");
243
244        node.save_config(&config)?;
245
246        Ok(node)
247    }
248
249    pub fn from_directory(dir: impl Into<DynDirectory>) -> Result<LocalNode, Error> {
250        let dir = dir.into();
251
252        let config = {
253            let config_file = dir.open_read(Path::new(NODE_CONFIG_FILE))?;
254            LocalNodeConfig::read_yaml(config_file)?
255        };
256
257        let node = LocalNode::from_config(dir, config)?;
258
259        Ok(node)
260    }
261
262    pub fn directory(&self) -> &DynDirectory {
263        &self.dir
264    }
265
266    pub fn cell_directory(&self, cell_id: &CellId) -> DynDirectory {
267        let cell_path = Path::new("cells").join(cell_id.to_string());
268        self.dir.scope(cell_path)
269    }
270
271    pub fn node(&self) -> &Node {
272        &self.node
273    }
274
275    pub fn keypair(&self) -> &Keypair {
276        &self.ident.keypair
277    }
278
279    pub fn sign_message(&self, _message: &[u8]) -> Signature {
280        // TODO: Signature ticket: https://github.com/appaquet/exocore/issues/46
281        //       Make sure we're local and we have access to private key
282        Signature::empty()
283    }
284
285    pub fn config(&self) -> &LocalNodeConfig {
286        &self.ident.config
287    }
288
289    pub fn inlined_config(&self) -> Result<LocalNodeConfig, Error> {
290        let mut inlined = self.ident.config.clone();
291        for cell_config in &mut inlined.cells {
292            if cell_config.id.is_empty() {
293                continue;
294            }
295
296            let cell_id = CellId::from_str(&cell_config.id).unwrap();
297            let cell_dir = self.cell_directory(&cell_id);
298            let cell = Cell::from_directory(cell_dir, self.clone())?;
299            cell_config.location = Some(node_cell_config::Location::Inline(
300                cell.cell().config().clone(),
301            ));
302        }
303
304        Ok(inlined)
305    }
306
307    pub fn save_config(&self, config: &LocalNodeConfig) -> Result<(), Error> {
308        let config_file = self.dir.open_create(Path::new(NODE_CONFIG_FILE))?;
309        config.write_yaml(config_file)?;
310        Ok(())
311    }
312
313    pub fn p2p_listen_addresses(&self) -> Vec<Multiaddr> {
314        if !self.ident.addresses.p2p.is_empty() {
315            self.ident.addresses.p2p.iter().cloned().collect()
316        } else {
317            self.p2p_addresses()
318        }
319    }
320
321    pub fn http_listen_addresses(&self) -> Vec<Url> {
322        if !self.ident.addresses.http.is_empty() {
323            self.ident.addresses.http.iter().cloned().collect()
324        } else {
325            self.http_addresses()
326        }
327    }
328
329    pub fn config_exists(dir: impl Into<DynDirectory>) -> bool {
330        dir.into().exists(Path::new(NODE_CONFIG_FILE))
331    }
332}
333
334impl Deref for LocalNode {
335    type Target = Node;
336
337    fn deref(&self) -> &Self::Target {
338        &self.node
339    }
340}
341
342impl PartialEq for LocalNode {
343    fn eq(&self, other: &Self) -> bool {
344        self.node.eq(other)
345    }
346}
347
348impl Eq for LocalNode {}
349
350impl Debug for LocalNode {
351    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
352        f.debug_struct("LocalNode")
353            .field("node", &self.node)
354            .field("p2p_listen_addresses", &self.ident.addresses.p2p)
355            .field("http_listen_addresses", &self.ident.addresses.http)
356            .finish()
357    }
358}
359
360impl Display for LocalNode {
361    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
362        f.write_str("LocalNode{")?;
363        f.write_str(&self.ident.config.name)?;
364        f.write_str("}")
365    }
366}
367
368/// Unique identifier of a node, which is built by hashing the public key of the
369/// node.
370///
371/// For now, it has a one to one correspondence with libp2p's PeerId, which is a
372/// base58 encoded version of the public key of the node encoded in protobuf.
373#[derive(Clone, PartialEq, Eq, Debug, Hash)]
374pub struct NodeId(PeerId);
375
376impl NodeId {
377    /// Create a Node ID from a public key by using libp2p method to support
378    /// compatibility with PeerId
379    pub fn from_public_key(public_key: &PublicKey) -> NodeId {
380        let peer_id = PeerId::from_public_key(public_key.to_libp2p());
381        NodeId(peer_id)
382    }
383
384    pub fn from_peer_id(peer_id: PeerId) -> NodeId {
385        NodeId(peer_id)
386    }
387
388    pub fn to_peer_id(&self) -> &PeerId {
389        &self.0
390    }
391
392    pub fn from_bytes(id: Vec<u8>) -> Result<NodeId, Error> {
393        let peer_id = PeerId::from_bytes(id.as_ref())
394            .map_err(|_| Error::Node(anyhow!("Couldn't convert bytes to peer id")))?;
395        Ok(NodeId(peer_id))
396    }
397
398    pub fn to_bytes(&self) -> Vec<u8> {
399        self.0.to_bytes()
400    }
401}
402
403impl std::fmt::Display for NodeId {
404    #[inline]
405    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
406        std::fmt::Display::fmt(&self.0, f)
407    }
408}
409
410impl FromStr for NodeId {
411    type Err = ();
412
413    fn from_str(s: &str) -> Result<Self, Self::Err> {
414        let peer_id = PeerId::from_str(s).map_err(|_| ())?;
415        Ok(NodeId(peer_id))
416    }
417}
418
419/// Addresses of a node.
420struct Addresses {
421    p2p: HashSet<Multiaddr>,
422    http: HashSet<url::Url>,
423}
424
425impl Addresses {
426    fn parse(config: &NodeAddresses) -> Result<Addresses, Error> {
427        let mut addresses = Addresses {
428            p2p: HashSet::new(),
429            http: HashSet::new(),
430        };
431        for maddr_str in &config.p2p {
432            let maddr = maddr_str
433                .parse()
434                .map_err(|err| Error::Cell(anyhow!("Couldn't parse p2p multi-address: {}", err)))?;
435            addresses.p2p.insert(maddr);
436        }
437
438        for url_str in &config.http {
439            let url = url_str
440                .parse()
441                .map_err(|err| Error::Cell(anyhow!("Couldn't parse http url: {}", err)))?;
442            addresses.http.insert(url);
443        }
444
445        Ok(addresses)
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use exocore_protos::core::NodeCellConfig;
452
453    use super::*;
454    use crate::{
455        cell::FullCell,
456        dir::{ram::RamDirectory, Directory},
457    };
458
459    #[test]
460    fn node_equality() {
461        #![allow(clippy::eq_op)]
462        let node1 = LocalNode::generate();
463        let node2 = LocalNode::generate();
464
465        assert_eq!(node1, node1);
466        assert_eq!(node1, node1.clone());
467        assert_ne!(node1, node2);
468
469        assert!(!format!("{:?}", node1).is_empty());
470        assert!(!format!("{:?}", node1.node()).is_empty());
471    }
472
473    #[test]
474    fn node_id_bytes() {
475        let node1 = LocalNode::generate();
476        let node2 = LocalNode::generate();
477
478        assert_ne!(node1.id().to_bytes(), node2.id().to_bytes());
479        assert_eq!(node1.id().to_bytes(), node1.id().to_bytes());
480
481        let n1_bytes = node1.id().to_bytes();
482        let n1_id_bytes = NodeId::from_bytes(n1_bytes.to_vec()).unwrap();
483        assert_eq!(n1_id_bytes, *node1.id());
484    }
485
486    #[test]
487    fn node_deterministic_random_name() {
488        let pk = PublicKey::decode_base58_string("pe2AgPyBmJNztntK9n4vhLuEYN8P2kRfFXnaZFsiXqWacQ")
489            .unwrap();
490        let node = Node::from_public_key(pk);
491        assert_eq!("wholly-proud-gannet", node.identity.name);
492        assert_eq!("Node{wholly-proud-gannet}", node.to_string());
493    }
494
495    #[test]
496    fn local_node_from_generated_config() {
497        let node1 = LocalNode::generate();
498        let node2 =
499            LocalNode::from_config(RamDirectory::default(), node1.config().clone()).unwrap();
500
501        assert_eq!(node1.keypair().public(), node2.keypair().public());
502        assert_eq!(node1.config(), node2.config());
503    }
504
505    #[test]
506    fn local_node_from_directory() {
507        let dir = RamDirectory::new();
508
509        let node1 = LocalNode::generate_in_directory(dir.clone()).unwrap();
510        assert!(LocalNode::config_exists(dir.clone()));
511
512        // reload node from file system
513        let node2 = LocalNode::from_directory(dir).unwrap();
514        assert_eq!(node1.id(), node2.id());
515    }
516
517    #[test]
518    fn node_cell_config() {
519        // Create node + cell
520        let node = LocalNode::generate();
521        let cell = FullCell::generate(node.clone()).unwrap();
522
523        // Add cell to node's config + save it
524        let mut node_config = node.config().clone();
525        node_config.add_cell(NodeCellConfig {
526            id: cell.cell().id().to_string(),
527            ..Default::default()
528        });
529        node.save_config(&node_config).unwrap();
530
531        // Reload node with created cell
532        let node = LocalNode::from_directory(node.directory().clone()).unwrap();
533        let config = node.config();
534        assert_eq!(config.cells.len(), 1);
535
536        // Inline config and reload cell with it
537        let inline_config = node.inlined_config().unwrap();
538        let node = LocalNode::from_config(RamDirectory::new(), inline_config).unwrap();
539        let config = node.config();
540        assert_eq!(config.cells.len(), 1);
541
542        let cells = Cell::from_local_node(node).unwrap();
543        assert_eq!(cells.len(), 1);
544    }
545}