1#[macro_use]
4extern crate tracing;
5
6mod error;
7mod io;
8mod peer;
9mod proto;
10mod ptr;
11mod routes;
12mod server;
13
14pub use error::{Error, Result};
15
16pub(crate) use io::IoPair;
17pub(crate) use peer::{DstAddr, Peer, PeerState, SourceAddr};
18pub(crate) use proto::{Packet, PacketBuilder};
19pub(crate) use ptr::AtomPtr;
20pub(crate) use routes::Routes;
21pub(crate) use server::{LockedStream, Server};
22
23use async_std::sync::Arc;
24use async_trait::async_trait;
25use netmod::{self, Endpoint as EndpointExt, Frame, Target};
26use serde::{Deserialize, Serialize};
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
33pub enum Mode {
34 Static,
35 Dynamic,
36}
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
50pub enum LinkType {
51 Bidirect,
53 Limited,
55}
56
57impl Default for LinkType {
58 fn default() -> Self {
59 Self::Bidirect
60 }
61}
62
63#[derive(Clone)]
64pub struct Endpoint {
65 server: Arc<Server>,
66 routes: Arc<Routes>,
67}
68
69impl Endpoint {
70 #[tracing::instrument(level = "info")]
72 pub async fn new(addr: &str, port: u16, name: &str, mode: Mode) -> Result<Arc<Self>> {
73 info!("Initialising Tcp backend");
74
75 let routes = Routes::new(port);
76 let server = Server::new(Arc::clone(&routes), addr, port, mode).await?;
77
78 server.run();
79 Ok(Arc::new(Self { server, routes }))
80 }
81
82 pub fn mode(&self) -> Mode {
84 self.server.mode()
85 }
86
87 pub async fn stop(&self) {
88 self.server.stop();
89 self.routes.stop_all().await;
90 }
91
92 pub async fn add_peers(&self, peers: Vec<String>) -> Result<()> {
99 for p in peers.into_iter() {
100 if &p == "" && continue {}
101
102 let mut parts: Vec<_> = p.split(|x| x == ' ').collect();
103 let _type = parts.get(1);
104 let peer = match parts[0].parse().ok() {
105 Some(s) => s,
106 None => {
107 error!("Failed to parse peer info `{}`", parts[0]);
108 continue;
109 }
110 };
111
112 let t = match _type {
113 Some(&"limited") => LinkType::Limited,
114 _ => LinkType::Bidirect,
115 };
116
117 trace!(
118 "Adding peer: {} ({})",
119 peer,
120 match t {
121 LinkType::Limited => "limited",
122 LinkType::Bidirect => "",
123 }
124 );
125
126 self.routes.add_via_dst(peer, t).await;
127 }
128
129 Ok(())
130 }
131}
132
133#[async_trait]
134impl EndpointExt for Endpoint {
135 fn size_hint(&self) -> usize {
136 0
137 }
138
139 async fn send(&self, frame: Frame, target: Target) -> netmod::Result<()> {
140 match target {
141 Target::Flood => {
142 let dsts = self.routes.all_dst().await;
143 for peer in dsts {
144 peer.send(Packet::Frame(frame.clone())).await;
145 }
146 }
147 Target::Single(id) => {
148 let peer = match self.routes.get_peer(id as usize).await {
149 Some(p) => Ok(p),
150 None => Err(netmod::Error::ConnectionLost),
151 }?;
152 peer.send(Packet::Frame(frame)).await;
153 }
154 }
155
156 Ok(())
157 }
158
159 async fn next(&self) -> netmod::Result<(Frame, Target)> {
160 Ok(self.server.next().await)
161 }
162}