netmod_tcp/
lib.rs

1//! A tcp overlay netmod to connect router across the internet
2
3#[macro_use]
4extern crate tracing;
5
6mod error;
7mod io;
8mod peer;
9mod proto;
10mod ptr;
11mod routes;
12mod server;
13
14pub use error::{Error, Result};
15
16pub(crate) use io::IoPair;
17pub(crate) use peer::{DstAddr, Peer, PeerState, SourceAddr};
18pub(crate) use proto::{Packet, PacketBuilder};
19pub(crate) use ptr::AtomPtr;
20pub(crate) use routes::Routes;
21pub(crate) use server::{LockedStream, Server};
22
23use async_std::sync::Arc;
24use async_trait::async_trait;
25use netmod::{self, Endpoint as EndpointExt, Frame, Target};
26use serde::{Deserialize, Serialize};
27
28/// Define the runtime mode for this endpount
29///
30/// In dynamic mode any new peer can introduce itself to start a link,
31/// while in static mode only known peers will be accepted.
32#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
33pub enum Mode {
34    Static,
35    Dynamic,
36}
37
38/// Specify the conneciton types used by this node
39///
40/// By default netmod-tcp tries to establish bi-directional
41/// connections, meaning that two nodes each have a dedicated
42/// transmission (tx) and receiving (rx) channels.  However on some
43/// networks this isn't possible.  While `Bidirect` is a good default,
44/// it's possible to override this behaviour.
45///
46/// `Limited` will open connections to peers with a special flag that
47/// makes it use a different reverse-channel strategy.  The server
48/// won't try to create full reverse channels, and instead use the
49#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
50pub enum LinkType {
51    /// Default connection type
52    Bidirect,
53    /// Fallback connection type
54    Limited,
55}
56
57impl Default for LinkType {
58    fn default() -> Self {
59        Self::Bidirect
60    }
61}
62
63#[derive(Clone)]
64pub struct Endpoint {
65    server: Arc<Server>,
66    routes: Arc<Routes>,
67}
68
69impl Endpoint {
70    /// Create a new endpoint on an interface and port
71    #[tracing::instrument(level = "info")]
72    pub async fn new(addr: &str, port: u16, name: &str, mode: Mode) -> Result<Arc<Self>> {
73        info!("Initialising Tcp backend");
74
75        let routes = Routes::new(port);
76        let server = Server::new(Arc::clone(&routes), addr, port, mode).await?;
77
78        server.run();
79        Ok(Arc::new(Self { server, routes }))
80    }
81
82    /// Get the current runtime mode
83    pub fn mode(&self) -> Mode {
84        self.server.mode()
85    }
86
87    pub async fn stop(&self) {
88        self.server.stop();
89        self.routes.stop_all().await;
90    }
91
92    /// Insert a set of peers into the routing table
93    ///
94    /// Each peer will spawn a worker that periodically attempts to
95    /// connect to it.  Connections might not be recipricated if the
96    /// peer doesn't know the local IP or is rejecting unknown
97    /// connections.
98    pub async fn add_peers(&self, peers: Vec<String>) -> Result<()> {
99        for p in peers.into_iter() {
100            if &p == "" && continue {}
101
102            let mut parts: Vec<_> = p.split(|x| x == ' ').collect();
103            let _type = parts.get(1);
104            let peer = match parts[0].parse().ok() {
105                Some(s) => s,
106                None => {
107                    error!("Failed to parse peer info `{}`", parts[0]);
108                    continue;
109                }
110            };
111
112            let t = match _type {
113                Some(&"limited") => LinkType::Limited,
114                _ => LinkType::Bidirect,
115            };
116
117            trace!(
118                "Adding peer: {} ({})",
119                peer,
120                match t {
121                    LinkType::Limited => "limited",
122                    LinkType::Bidirect => "",
123                }
124            );
125
126            self.routes.add_via_dst(peer, t).await;
127        }
128
129        Ok(())
130    }
131}
132
133#[async_trait]
134impl EndpointExt for Endpoint {
135    fn size_hint(&self) -> usize {
136        0
137    }
138
139    async fn send(&self, frame: Frame, target: Target) -> netmod::Result<()> {
140        match target {
141            Target::Flood => {
142                let dsts = self.routes.all_dst().await;
143                for peer in dsts {
144                    peer.send(Packet::Frame(frame.clone())).await;
145                }
146            }
147            Target::Single(id) => {
148                let peer = match self.routes.get_peer(id as usize).await {
149                    Some(p) => Ok(p),
150                    None => Err(netmod::Error::ConnectionLost),
151                }?;
152                peer.send(Packet::Frame(frame)).await;
153            }
154        }
155
156        Ok(())
157    }
158
159    async fn next(&self) -> netmod::Result<(Frame, Target)> {
160        Ok(self.server.next().await)
161    }
162}