mesh_portal_tcp_client/
lib.rs

1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7use mesh_portal_tcp_common::{PrimitiveFrameReader, PrimitiveFrameWriter, FrameWriter, FrameReader };
8use anyhow::Error;
9use mesh_portal_api_client::{Portal, ResourceCtrl, PortalSkel, InletApi, Inlet, ResourceCtrlFactory, Exchanges, PrePortalSkel};
10use std::sync::Arc;
11use dashmap::DashMap;
12use tokio::net::TcpStream;
13use tokio::sync::{broadcast, mpsc};
14use mesh_portal::version::latest::portal;
15use tokio::sync::mpsc::error::TrySendError;
16use tokio::task::yield_now;
17use mesh_portal::version;
18use tokio::time::Duration;
19use mesh_portal::version::latest::portal::{outlet, inlet, Exchanger, initin, initout};
20use mesh_portal::version::latest::portal::initin::PortalAuth;
21use mesh_portal::version::latest::portal::inlet::AssignRequest;
22
23pub struct PortalTcpClient {
24    pub host: String,
25    pub portal: Arc<Portal>,
26    pub close_tx: broadcast::Sender<i32>
27}
28
29impl PortalTcpClient {
30
31    pub async fn new( host: String, mut client: Box<dyn PortalClient> ) -> Result<Self,Error> {
32        let stream = TcpStream::connect(host.clone()).await?;
33
34        let (reader,writer) = stream.into_split();
35        let mut reader = PrimitiveFrameReader::new(reader);
36        let mut writer = PrimitiveFrameWriter::new(writer);
37
38        let mut reader : FrameReader<initout::Frame> = FrameReader::new(reader );
39        let mut writer : FrameWriter<initin::Frame>  = FrameWriter::new(writer );
40
41        writer.write(initin::Frame::Flavor(client.flavor())).await?;
42
43        if let initout::Frame::Ok = reader.read().await? {
44println!("client: Flavor negotiaion Ok");
45        } else {
46            let message = "FLAVOR NEGOTIATION FAILED".to_string();
47            (client.logger())(message.as_str());
48            return Err(anyhow!(message));
49        }
50
51        let auth = client.auth();
52        writer.write( initin::Frame::Auth(auth)).await?;
53
54        if let initout::Frame::Ok = reader.read().await? {
55println!("client: auth Ok.");
56        } else {
57            let message = "AUTH FAILED".to_string();
58            (client.logger())(message.as_str());
59            return Err(anyhow!(message));
60        }
61
62        let (inlet_tx, mut inlet_rx) = mpsc::channel(1024 );
63        let (outlet_tx, mut outlet_rx) = mpsc::channel(1024 );
64
65        let inlet = Arc::new(TcpInlet{
66            sender: inlet_tx,
67            logger: client.logger()
68        });
69
70        let skel = PrePortalSkel {
71            config: Default::default(),
72            inlet,
73            logger: client.logger(),
74            exchanges: Arc::new(DashMap::new() ),
75            assign_exchange: Arc::new(DashMap::new() ),
76        };
77
78println!("client: init client pre");
79        let factory = client.init( &mut reader, &mut writer, skel.clone() ).await?;
80
81println!("client: init client post");
82        writer.write( initin::Frame::Ready ).await?;
83println!("client: signaled ready");
84
85        let mut reader : FrameReader<outlet::Frame> = FrameReader::new(reader.done() );
86        let mut writer : FrameWriter<inlet::Frame>  = FrameWriter::new(writer.done() );
87
88println!("client: transitioned to portal frames.");
89
90        let (close_tx,_) = broadcast::channel(128 );
91
92        {
93            let logger = client.logger();
94            let close_tx = close_tx.clone();
95            tokio::spawn(async move {
96                while let Option::Some(frame) = inlet_rx.recv().await {
97                    match writer.write(frame).await {
98                        Ok(_) => {}
99                        Err(err) => {
100                            (logger)("FATAL: writer disconnected");
101                            eprintln!("client: FATAL! writer disconnected.");
102                            break;
103                        }
104                    }
105                    yield_now().await;
106                }
107println!("client: inlet_rx complete.");
108                close_tx.send(0);
109            });
110        }
111
112
113        let portal = Portal::new(skel, outlet_tx.clone(), outlet_rx, factory, client.logger()).await?;
114        {
115            let logger = client.logger();
116            let close_tx = close_tx.clone();
117            tokio::spawn(async move {
118                while let Result::Ok(frame) = reader.read().await {
119println!("client reading frame: {}",frame.to_string());
120                    match outlet_tx.send( frame ).await {
121                        Result::Ok(_) => {
122
123                        }
124                        Result::Err(err) => {
125                            (logger)("FATAL: reader disconnected");
126                            eprintln!("client: FATAL! reader disconnected.");
127                            break;
128                        }
129                    }
130                    yield_now().await;
131                }
132println!("client reader.read() complete.");
133                close_tx.send(0);
134            });
135        }
136
137        return Ok(Self {
138            host,
139            portal,
140            close_tx
141        });
142
143    }
144
145    pub async fn request_assign( &self, request: AssignRequest ) -> Result<Arc<dyn ResourceCtrl>,Error> {
146        self.portal.request_assign(request).await
147    }
148}
149
150#[async_trait]
151pub trait PortalClient: Send+Sync {
152    fn flavor(&self) -> String;
153    fn auth( &self ) -> PortalAuth;
154    fn logger(&self) -> fn(message: &str);
155    async fn init( &self, reader: & mut FrameReader<initout::Frame>, writer: & mut FrameWriter<initin::Frame>, skel: PrePortalSkel ) -> Result<Arc< dyn ResourceCtrlFactory >,Error>;
156
157}
158
159struct TcpInlet {
160    pub sender: mpsc::Sender<inlet::Frame>,
161    pub logger: fn( message: &str )
162}
163
164impl Inlet for TcpInlet {
165    fn inlet_frame(&self, frame: inlet::Frame) {
166        let sender = self.sender.clone();
167        let logger = self.logger;
168        tokio::spawn(async move {
169println!("Sending FRAME via inlet api...{}", frame.to_string());
170            match sender.send(frame).await
171            {
172                Ok(_) => {
173                    println!("SENT FRAME via inlet!");
174                }
175                Err(err) => {
176                    (logger)(format!("ERROR: frame failed to send to client inlet").as_str())
177                }
178            }
179        });
180    }
181}
182
183
184#[cfg(test)]
185mod tests {
186    #[test]
187    fn it_works() {
188        let result = 2 + 2;
189        assert_eq!(result, 4);
190    }
191}