mesh_portal_tcp_server/
lib.rs

1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7#[macro_use]
8extern crate strum_macros;
9
10use std::convert::{TryFrom, TryInto};
11use std::sync::Arc;
12use std::thread;
13use std::time::Duration;
14
15use anyhow::Error;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
18use tokio::net::{TcpListener, TcpStream};
19use tokio::sync::mpsc::error::SendTimeoutError;
20use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
21
22use mesh_portal_api_server::{Portal, PortalEvent, PortalInfo, PortalRequestHandler};
23use mesh_portal::version::latest::config::PortalConfig;
24use mesh_portal::version::latest::frame::CloseReason;
25use mesh_portal::version::latest::messaging::Request;
26use mesh_portal::version::latest::messaging::{Message, Response};
27use mesh_portal::version::latest::portal::initin::PortalAuth;
28use mesh_portal::version::latest::portal::inlet::{Frame, Log};
29use mesh_portal::version::latest::portal::{initin, initout, inlet, outlet, Exchanger};
30use mesh_portal::version::latest::resource::Code;
31use mesh_portal::version::latest::resource::{ResourceStub, Status};
32use mesh_portal_tcp_common::{
33    FrameReader, FrameWriter, PrimitiveFrameReader, PrimitiveFrameWriter,
34};
35use serde::{Deserialize, Serialize};
36use std::future::Future;
37use std::sync::atomic::{AtomicU64, Ordering};
38use tokio::sync::broadcast::Receiver;
39use tokio::sync::oneshot::error::RecvError;
40use tokio::task::yield_now;
41
42#[derive(Clone, strum_macros::Display)]
43pub enum PortalServerEvent {
44    Status(Status),
45    ClientConnected,
46    FlavorNegotiation(EventResult<String>),
47    Authorization(EventResult<String>),
48    Shutdown,
49}
50
51#[derive(Clone)]
52pub enum EventResult<E> {
53    Ok(E),
54    Err(String),
55}
56
57pub enum TcpServerCall {
58    GetServerEvents(oneshot::Sender<broadcast::Receiver<PortalServerEvent>>),
59    GetPortalEvents(oneshot::Sender<broadcast::Receiver<PortalEvent>>),
60    Shutdown,
61}
62
63struct Alive {
64    pub alive: bool,
65}
66
67impl Alive {
68    pub fn new() -> Self {
69        Self { alive: true }
70    }
71}
72
73pub struct PortalTcpServer {
74    portal_config: PortalConfig,
75    port: usize,
76    server: Arc<dyn PortalServer>,
77    server_event_broadcaster_tx: broadcast::Sender<PortalServerEvent>,
78    portal_broadcast_tx: broadcast::Sender<PortalEvent>,
79    call_tx: mpsc::Sender<TcpServerCall>,
80    alive: Arc<Mutex<Alive>>,
81    request_handler: Arc<dyn PortalRequestHandler>,
82}
83
84impl PortalTcpServer {
85    pub fn new(port: usize, server: Box<dyn PortalServer>) -> mpsc::Sender<TcpServerCall> {
86        let (call_tx, mut call_rx) = mpsc::channel(1024);
87        {
88            let call_tx = call_tx.clone();
89            tokio::task::spawn_blocking(move || {
90                let server: Arc<dyn PortalServer> = server.into();
91                let (server_event_broadcaster_tx, _) = broadcast::channel(32);
92                let (portal_broadcast_tx, _) = broadcast::channel(1024);
93
94                let server = Self {
95                    request_handler: server.portal_request_handler(),
96                    portal_config: Default::default(),
97                    port,
98                    server,
99                    server_event_broadcaster_tx,
100                    portal_broadcast_tx,
101                    call_tx: call_tx.clone(),
102                    alive: Arc::new(Mutex::new(Alive::new())),
103                };
104
105                tokio::spawn(async move {
106                    server
107                        .server_event_broadcaster_tx
108                        .send(PortalServerEvent::Status(Status::Unknown))
109                        .unwrap_or_default();
110                    {
111                        let port = server.port.clone();
112                        let server_event_broadcaster_tx = server.server_event_broadcaster_tx.clone();
113                        let portal_broadcast_tx = server.portal_broadcast_tx.clone();
114                        let alive = server.alive.clone();
115                        tokio::spawn(async move {
116                            yield_now().await;
117                            while let Option::Some(call) = call_rx.recv().await {
118                                match call {
119                                    TcpServerCall::GetServerEvents(tx) => {
120                                        tx.send(server_event_broadcaster_tx.subscribe());
121                                    }
122                                    TcpServerCall::GetPortalEvents(tx) => {
123                                        tx.send(portal_broadcast_tx.subscribe());
124                                    }
125                                    TcpServerCall::Shutdown => {
126                                        server_event_broadcaster_tx
127                                            .send(PortalServerEvent::Shutdown)
128                                            .unwrap_or_default();
129                                        alive.lock().await.alive = false;
130                                        match std::net::TcpStream::connect(format!(
131                                            "localhost:{}",
132                                            port
133                                        )) {
134                                            Ok(_) => {}
135                                            Err(_) => {}
136                                        }
137                                        return;
138                                    }
139                                }
140                            }
141                        });
142                    }
143
144                    server.start().await;
145                });
146            });
147        }
148        call_tx
149    }
150
151    async fn start(self) {
152        let addr = format!("localhost:{}", self.port);
153        match std::net::TcpListener::bind(addr.clone()) {
154            Ok(std_listener) => {
155                tokio::time::sleep(Duration::from_secs(0)).await;
156                let listener = TcpListener::from_std(std_listener).unwrap();
157                self.server_event_broadcaster_tx
158                    .send(PortalServerEvent::Status(Status::Ready))
159                    .unwrap_or_default();
160                tokio::time::sleep(Duration::from_secs(0)).await;
161                while let Ok((stream, _)) = listener.accept().await {
162                    {
163                        if !self.alive.lock().await.alive.clone() {
164                            (self.server.logger())("server reached final shutdown");
165                            break;
166                        }
167                    }
168                    self.server_event_broadcaster_tx
169                        .send(PortalServerEvent::ClientConnected)
170                        .unwrap_or_default();
171                    (&self).handle(stream).await;
172                }
173                self.server_event_broadcaster_tx
174                    .send(PortalServerEvent::Status(Status::Done(Code::Ok)))
175                    .unwrap_or_default();
176            }
177            Err(error) => {
178                let message = format!("FATAL: could not setup TcpListener {}", error);
179                (self.server.logger())(message.as_str());
180                self.server_event_broadcaster_tx
181                    .send(PortalServerEvent::Status(Status::Panic(message)))
182                    .unwrap_or_default();
183            }
184        }
185    }
186
187    async fn handle(&self, stream: TcpStream) -> Result<(), Error> {
188        let (reader, writer) = stream.into_split();
189        let mut reader = PrimitiveFrameReader::new(reader);
190        let mut writer = PrimitiveFrameWriter::new(writer);
191
192        let mut reader: FrameReader<initin::Frame> = FrameReader::new(reader);
193        let mut writer: FrameWriter<initout::Frame> = FrameWriter::new(writer);
194
195        if let initin::Frame::Flavor(flavor) = reader.read().await? {
196            // first verify flavor matches
197            if flavor != self.server.flavor() {
198                let message = format!(
199                    "ERROR: flavor does not match.  expected '{}'",
200                    self.server.flavor()
201                );
202                println!("{}", message);
203
204                tokio::time::sleep(Duration::from_secs(0)).await;
205
206                self.server_event_broadcaster_tx
207                    .send(PortalServerEvent::FlavorNegotiation(EventResult::Err(
208                        message.clone(),
209                    )))
210                    .unwrap_or_default();
211                return Err(anyhow!(message));
212            } else {
213                self.server_event_broadcaster_tx
214                    .send(PortalServerEvent::FlavorNegotiation(EventResult::Ok(
215                        self.server.flavor(),
216                    )))
217                    .unwrap_or_default();
218            }
219        } else {
220            let message = format!(
221                "ERROR: unexpected frame.  expected flavor '{}'",
222                self.server.flavor()
223            );
224            self.server_event_broadcaster_tx
225                .send(PortalServerEvent::FlavorNegotiation(EventResult::Err(
226                    message.clone(),
227                )))
228                .unwrap_or_default();
229            return Err(anyhow!(message));
230        }
231
232        writer.write(initout::Frame::Ok).await?;
233        yield_now().await;
234
235        if let initin::Frame::Auth(portal_auth) = reader.read().await? {
236            self.server_event_broadcaster_tx
237                .send(PortalServerEvent::Authorization(EventResult::Ok(
238                    portal_auth.user.clone(),
239                )))
240                .unwrap_or_default();
241            tokio::time::sleep(Duration::from_secs(0)).await;
242            writer.write(initout::Frame::Ok).await?;
243
244            loop {
245                match reader.read().await? {
246                    initin::Frame::Artifact(request) => {
247                        let response = self.server.portal_request_handler().handle_artifact_request(request.item.clone()).await?;
248                        let response = request.with(response);
249                        writer.write(initout::Frame::Artifact(response)).await?;
250println!("portal server: wrote initout::Frame::Artifact");
251                    }
252                    initin::Frame::Ready => {
253                        break;
254                    }
255                    _ => {
256                        return Err(anyhow!("portal server: illegal initin::Frame encountered during client init process") )
257                    }
258                }
259            }
260
261            let mut reader: FrameReader<inlet::Frame> = FrameReader::new(reader.done());
262            let mut writer: FrameWriter<outlet::Frame> = FrameWriter::new(writer.done());
263
264            let (outlet_tx, mut outlet_rx) = mpsc::channel(1024);
265
266            fn logger(log: Log) {
267                println!("{}", log.to_string());
268            }
269
270            let portal_key = match portal_auth.portal_key {
271                None => uuid::Uuid::new_v4().to_string(),
272                Some(portal_key) => portal_key,
273            };
274
275            let info = PortalInfo { portal_key };
276
277            let (portal, inlet_tx) = Portal::new(
278                info,
279                self.portal_config.clone(),
280                outlet_tx,
281                self.request_handler.clone(),
282                self.portal_broadcast_tx.clone(),
283                logger,
284            );
285
286
287            let portal_api = portal.api();
288            self.server.add_portal(portal);
289            self.portal_broadcast_tx.send( PortalEvent::PortalAdded(portal_api));
290
291            {
292                let logger = self.server.logger();
293                tokio::spawn(async move {
294                    loop {
295                        match  reader.read().await {
296                            Ok(frame) => {
297                                println!("server TCP READ FRAME: {}", frame.to_string());
298                                let result = inlet_tx.send(frame).await;
299                                yield_now().await;
300                                if result.is_err() {
301                                    (logger)("FATAL: cannot send frame to portal inlet_tx");
302                                    return;
303                                }
304                            }
305                            Err(err) => {
306                                eprintln!("portal server: TCP Reader end... {}",err.to_string());
307                                break;
308                            }
309                        }
310                    }
311                });
312            }
313
314            {
315                let logger = self.server.logger();
316                let task = tokio::task::spawn_blocking(move || {
317                    tokio::spawn(async move {
318                        while let Option::Some(frame) = outlet_rx.recv().await {
319                            println!(
320                                "server... SENDING from outlet_rx frame :==:> {}",
321                                frame.to_string()
322                            );
323                            writer.write(frame).await;
324                        }
325                        println!("server: outlet_rx complete.");
326                    });
327                });
328                task.await?;
329            }
330        }
331        Ok(())
332    }
333}
334
335pub struct RouterProxy {
336    pub server: Arc<dyn PortalServer>,
337}
338
339#[async_trait]
340pub trait PortalServer: Sync + Send {
341    fn flavor(&self) -> String;
342
343    async fn auth(
344        &self,
345        reader: &mut PrimitiveFrameReader,
346        writer: &mut PrimitiveFrameWriter,
347    ) -> Result<PortalAuth, anyhow::Error> {
348        let frame = reader.read().await?;
349        let client_ident: PortalAuth = bincode::deserialize(frame.data.as_slice())?;
350        tokio::time::sleep(Duration::from_secs(0)).await;
351        Ok(client_ident)
352    }
353
354    fn logger(&self) -> fn(message: &str);
355    fn portal_request_handler(&self) -> Arc<dyn PortalRequestHandler>;
356    fn add_portal(&self, portal: Portal);
357}