1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7#[macro_use]
8extern crate strum_macros;
9
10use std::convert::{TryFrom, TryInto};
11use std::sync::Arc;
12use std::thread;
13use std::time::Duration;
14
15use anyhow::Error;
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
18use tokio::net::{TcpListener, TcpStream};
19use tokio::sync::mpsc::error::SendTimeoutError;
20use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
21
22use mesh_portal_api_server::{Portal, PortalEvent, PortalInfo, PortalRequestHandler};
23use mesh_portal::version::latest::config::PortalConfig;
24use mesh_portal::version::latest::frame::CloseReason;
25use mesh_portal::version::latest::messaging::Request;
26use mesh_portal::version::latest::messaging::{Message, Response};
27use mesh_portal::version::latest::portal::initin::PortalAuth;
28use mesh_portal::version::latest::portal::inlet::{Frame, Log};
29use mesh_portal::version::latest::portal::{initin, initout, inlet, outlet, Exchanger};
30use mesh_portal::version::latest::resource::Code;
31use mesh_portal::version::latest::resource::{ResourceStub, Status};
32use mesh_portal_tcp_common::{
33 FrameReader, FrameWriter, PrimitiveFrameReader, PrimitiveFrameWriter,
34};
35use serde::{Deserialize, Serialize};
36use std::future::Future;
37use std::sync::atomic::{AtomicU64, Ordering};
38use tokio::sync::broadcast::Receiver;
39use tokio::sync::oneshot::error::RecvError;
40use tokio::task::yield_now;
41
42#[derive(Clone, strum_macros::Display)]
43pub enum PortalServerEvent {
44 Status(Status),
45 ClientConnected,
46 FlavorNegotiation(EventResult<String>),
47 Authorization(EventResult<String>),
48 Shutdown,
49}
50
51#[derive(Clone)]
52pub enum EventResult<E> {
53 Ok(E),
54 Err(String),
55}
56
57pub enum TcpServerCall {
58 GetServerEvents(oneshot::Sender<broadcast::Receiver<PortalServerEvent>>),
59 GetPortalEvents(oneshot::Sender<broadcast::Receiver<PortalEvent>>),
60 Shutdown,
61}
62
63struct Alive {
64 pub alive: bool,
65}
66
67impl Alive {
68 pub fn new() -> Self {
69 Self { alive: true }
70 }
71}
72
73pub struct PortalTcpServer {
74 portal_config: PortalConfig,
75 port: usize,
76 server: Arc<dyn PortalServer>,
77 server_event_broadcaster_tx: broadcast::Sender<PortalServerEvent>,
78 portal_broadcast_tx: broadcast::Sender<PortalEvent>,
79 call_tx: mpsc::Sender<TcpServerCall>,
80 alive: Arc<Mutex<Alive>>,
81 request_handler: Arc<dyn PortalRequestHandler>,
82}
83
84impl PortalTcpServer {
85 pub fn new(port: usize, server: Box<dyn PortalServer>) -> mpsc::Sender<TcpServerCall> {
86 let (call_tx, mut call_rx) = mpsc::channel(1024);
87 {
88 let call_tx = call_tx.clone();
89 tokio::task::spawn_blocking(move || {
90 let server: Arc<dyn PortalServer> = server.into();
91 let (server_event_broadcaster_tx, _) = broadcast::channel(32);
92 let (portal_broadcast_tx, _) = broadcast::channel(1024);
93
94 let server = Self {
95 request_handler: server.portal_request_handler(),
96 portal_config: Default::default(),
97 port,
98 server,
99 server_event_broadcaster_tx,
100 portal_broadcast_tx,
101 call_tx: call_tx.clone(),
102 alive: Arc::new(Mutex::new(Alive::new())),
103 };
104
105 tokio::spawn(async move {
106 server
107 .server_event_broadcaster_tx
108 .send(PortalServerEvent::Status(Status::Unknown))
109 .unwrap_or_default();
110 {
111 let port = server.port.clone();
112 let server_event_broadcaster_tx = server.server_event_broadcaster_tx.clone();
113 let portal_broadcast_tx = server.portal_broadcast_tx.clone();
114 let alive = server.alive.clone();
115 tokio::spawn(async move {
116 yield_now().await;
117 while let Option::Some(call) = call_rx.recv().await {
118 match call {
119 TcpServerCall::GetServerEvents(tx) => {
120 tx.send(server_event_broadcaster_tx.subscribe());
121 }
122 TcpServerCall::GetPortalEvents(tx) => {
123 tx.send(portal_broadcast_tx.subscribe());
124 }
125 TcpServerCall::Shutdown => {
126 server_event_broadcaster_tx
127 .send(PortalServerEvent::Shutdown)
128 .unwrap_or_default();
129 alive.lock().await.alive = false;
130 match std::net::TcpStream::connect(format!(
131 "localhost:{}",
132 port
133 )) {
134 Ok(_) => {}
135 Err(_) => {}
136 }
137 return;
138 }
139 }
140 }
141 });
142 }
143
144 server.start().await;
145 });
146 });
147 }
148 call_tx
149 }
150
151 async fn start(self) {
152 let addr = format!("localhost:{}", self.port);
153 match std::net::TcpListener::bind(addr.clone()) {
154 Ok(std_listener) => {
155 tokio::time::sleep(Duration::from_secs(0)).await;
156 let listener = TcpListener::from_std(std_listener).unwrap();
157 self.server_event_broadcaster_tx
158 .send(PortalServerEvent::Status(Status::Ready))
159 .unwrap_or_default();
160 tokio::time::sleep(Duration::from_secs(0)).await;
161 while let Ok((stream, _)) = listener.accept().await {
162 {
163 if !self.alive.lock().await.alive.clone() {
164 (self.server.logger())("server reached final shutdown");
165 break;
166 }
167 }
168 self.server_event_broadcaster_tx
169 .send(PortalServerEvent::ClientConnected)
170 .unwrap_or_default();
171 (&self).handle(stream).await;
172 }
173 self.server_event_broadcaster_tx
174 .send(PortalServerEvent::Status(Status::Done(Code::Ok)))
175 .unwrap_or_default();
176 }
177 Err(error) => {
178 let message = format!("FATAL: could not setup TcpListener {}", error);
179 (self.server.logger())(message.as_str());
180 self.server_event_broadcaster_tx
181 .send(PortalServerEvent::Status(Status::Panic(message)))
182 .unwrap_or_default();
183 }
184 }
185 }
186
187 async fn handle(&self, stream: TcpStream) -> Result<(), Error> {
188 let (reader, writer) = stream.into_split();
189 let mut reader = PrimitiveFrameReader::new(reader);
190 let mut writer = PrimitiveFrameWriter::new(writer);
191
192 let mut reader: FrameReader<initin::Frame> = FrameReader::new(reader);
193 let mut writer: FrameWriter<initout::Frame> = FrameWriter::new(writer);
194
195 if let initin::Frame::Flavor(flavor) = reader.read().await? {
196 if flavor != self.server.flavor() {
198 let message = format!(
199 "ERROR: flavor does not match. expected '{}'",
200 self.server.flavor()
201 );
202 println!("{}", message);
203
204 tokio::time::sleep(Duration::from_secs(0)).await;
205
206 self.server_event_broadcaster_tx
207 .send(PortalServerEvent::FlavorNegotiation(EventResult::Err(
208 message.clone(),
209 )))
210 .unwrap_or_default();
211 return Err(anyhow!(message));
212 } else {
213 self.server_event_broadcaster_tx
214 .send(PortalServerEvent::FlavorNegotiation(EventResult::Ok(
215 self.server.flavor(),
216 )))
217 .unwrap_or_default();
218 }
219 } else {
220 let message = format!(
221 "ERROR: unexpected frame. expected flavor '{}'",
222 self.server.flavor()
223 );
224 self.server_event_broadcaster_tx
225 .send(PortalServerEvent::FlavorNegotiation(EventResult::Err(
226 message.clone(),
227 )))
228 .unwrap_or_default();
229 return Err(anyhow!(message));
230 }
231
232 writer.write(initout::Frame::Ok).await?;
233 yield_now().await;
234
235 if let initin::Frame::Auth(portal_auth) = reader.read().await? {
236 self.server_event_broadcaster_tx
237 .send(PortalServerEvent::Authorization(EventResult::Ok(
238 portal_auth.user.clone(),
239 )))
240 .unwrap_or_default();
241 tokio::time::sleep(Duration::from_secs(0)).await;
242 writer.write(initout::Frame::Ok).await?;
243
244 loop {
245 match reader.read().await? {
246 initin::Frame::Artifact(request) => {
247 let response = self.server.portal_request_handler().handle_artifact_request(request.item.clone()).await?;
248 let response = request.with(response);
249 writer.write(initout::Frame::Artifact(response)).await?;
250println!("portal server: wrote initout::Frame::Artifact");
251 }
252 initin::Frame::Ready => {
253 break;
254 }
255 _ => {
256 return Err(anyhow!("portal server: illegal initin::Frame encountered during client init process") )
257 }
258 }
259 }
260
261 let mut reader: FrameReader<inlet::Frame> = FrameReader::new(reader.done());
262 let mut writer: FrameWriter<outlet::Frame> = FrameWriter::new(writer.done());
263
264 let (outlet_tx, mut outlet_rx) = mpsc::channel(1024);
265
266 fn logger(log: Log) {
267 println!("{}", log.to_string());
268 }
269
270 let portal_key = match portal_auth.portal_key {
271 None => uuid::Uuid::new_v4().to_string(),
272 Some(portal_key) => portal_key,
273 };
274
275 let info = PortalInfo { portal_key };
276
277 let (portal, inlet_tx) = Portal::new(
278 info,
279 self.portal_config.clone(),
280 outlet_tx,
281 self.request_handler.clone(),
282 self.portal_broadcast_tx.clone(),
283 logger,
284 );
285
286
287 let portal_api = portal.api();
288 self.server.add_portal(portal);
289 self.portal_broadcast_tx.send( PortalEvent::PortalAdded(portal_api));
290
291 {
292 let logger = self.server.logger();
293 tokio::spawn(async move {
294 loop {
295 match reader.read().await {
296 Ok(frame) => {
297 println!("server TCP READ FRAME: {}", frame.to_string());
298 let result = inlet_tx.send(frame).await;
299 yield_now().await;
300 if result.is_err() {
301 (logger)("FATAL: cannot send frame to portal inlet_tx");
302 return;
303 }
304 }
305 Err(err) => {
306 eprintln!("portal server: TCP Reader end... {}",err.to_string());
307 break;
308 }
309 }
310 }
311 });
312 }
313
314 {
315 let logger = self.server.logger();
316 let task = tokio::task::spawn_blocking(move || {
317 tokio::spawn(async move {
318 while let Option::Some(frame) = outlet_rx.recv().await {
319 println!(
320 "server... SENDING from outlet_rx frame :==:> {}",
321 frame.to_string()
322 );
323 writer.write(frame).await;
324 }
325 println!("server: outlet_rx complete.");
326 });
327 });
328 task.await?;
329 }
330 }
331 Ok(())
332 }
333}
334
335pub struct RouterProxy {
336 pub server: Arc<dyn PortalServer>,
337}
338
339#[async_trait]
340pub trait PortalServer: Sync + Send {
341 fn flavor(&self) -> String;
342
343 async fn auth(
344 &self,
345 reader: &mut PrimitiveFrameReader,
346 writer: &mut PrimitiveFrameWriter,
347 ) -> Result<PortalAuth, anyhow::Error> {
348 let frame = reader.read().await?;
349 let client_ident: PortalAuth = bincode::deserialize(frame.data.as_slice())?;
350 tokio::time::sleep(Duration::from_secs(0)).await;
351 Ok(client_ident)
352 }
353
354 fn logger(&self) -> fn(message: &str);
355 fn portal_request_handler(&self) -> Arc<dyn PortalRequestHandler>;
356 fn add_portal(&self, portal: Portal);
357}