mesh_portal_tcp_client/
lib.rs1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7use mesh_portal_tcp_common::{PrimitiveFrameReader, PrimitiveFrameWriter, FrameWriter, FrameReader };
8use anyhow::Error;
9use mesh_portal_api_client::{Portal, ResourceCtrl, PortalSkel, InletApi, Inlet, ResourceCtrlFactory, Exchanges, PrePortalSkel};
10use std::sync::Arc;
11use dashmap::DashMap;
12use tokio::net::TcpStream;
13use tokio::sync::{broadcast, mpsc};
14use mesh_portal::version::latest::portal;
15use tokio::sync::mpsc::error::TrySendError;
16use tokio::task::yield_now;
17use mesh_portal::version;
18use tokio::time::Duration;
19use mesh_portal::version::latest::portal::{outlet, inlet, Exchanger, initin, initout};
20use mesh_portal::version::latest::portal::initin::PortalAuth;
21use mesh_portal::version::latest::portal::inlet::AssignRequest;
22
23pub struct PortalTcpClient {
24 pub host: String,
25 pub portal: Arc<Portal>,
26 pub close_tx: broadcast::Sender<i32>
27}
28
29impl PortalTcpClient {
30
31 pub async fn new( host: String, mut client: Box<dyn PortalClient> ) -> Result<Self,Error> {
32 let stream = TcpStream::connect(host.clone()).await?;
33
34 let (reader,writer) = stream.into_split();
35 let mut reader = PrimitiveFrameReader::new(reader);
36 let mut writer = PrimitiveFrameWriter::new(writer);
37
38 let mut reader : FrameReader<initout::Frame> = FrameReader::new(reader );
39 let mut writer : FrameWriter<initin::Frame> = FrameWriter::new(writer );
40
41 writer.write(initin::Frame::Flavor(client.flavor())).await?;
42
43 if let initout::Frame::Ok = reader.read().await? {
44println!("client: Flavor negotiaion Ok");
45 } else {
46 let message = "FLAVOR NEGOTIATION FAILED".to_string();
47 (client.logger())(message.as_str());
48 return Err(anyhow!(message));
49 }
50
51 let auth = client.auth();
52 writer.write( initin::Frame::Auth(auth)).await?;
53
54 if let initout::Frame::Ok = reader.read().await? {
55println!("client: auth Ok.");
56 } else {
57 let message = "AUTH FAILED".to_string();
58 (client.logger())(message.as_str());
59 return Err(anyhow!(message));
60 }
61
62 let (inlet_tx, mut inlet_rx) = mpsc::channel(1024 );
63 let (outlet_tx, mut outlet_rx) = mpsc::channel(1024 );
64
65 let inlet = Arc::new(TcpInlet{
66 sender: inlet_tx,
67 logger: client.logger()
68 });
69
70 let skel = PrePortalSkel {
71 config: Default::default(),
72 inlet,
73 logger: client.logger(),
74 exchanges: Arc::new(DashMap::new() ),
75 assign_exchange: Arc::new(DashMap::new() ),
76 };
77
78println!("client: init client pre");
79 let factory = client.init( &mut reader, &mut writer, skel.clone() ).await?;
80
81println!("client: init client post");
82 writer.write( initin::Frame::Ready ).await?;
83println!("client: signaled ready");
84
85 let mut reader : FrameReader<outlet::Frame> = FrameReader::new(reader.done() );
86 let mut writer : FrameWriter<inlet::Frame> = FrameWriter::new(writer.done() );
87
88println!("client: transitioned to portal frames.");
89
90 let (close_tx,_) = broadcast::channel(128 );
91
92 {
93 let logger = client.logger();
94 let close_tx = close_tx.clone();
95 tokio::spawn(async move {
96 while let Option::Some(frame) = inlet_rx.recv().await {
97 match writer.write(frame).await {
98 Ok(_) => {}
99 Err(err) => {
100 (logger)("FATAL: writer disconnected");
101 eprintln!("client: FATAL! writer disconnected.");
102 break;
103 }
104 }
105 yield_now().await;
106 }
107println!("client: inlet_rx complete.");
108 close_tx.send(0);
109 });
110 }
111
112
113 let portal = Portal::new(skel, outlet_tx.clone(), outlet_rx, factory, client.logger()).await?;
114 {
115 let logger = client.logger();
116 let close_tx = close_tx.clone();
117 tokio::spawn(async move {
118 while let Result::Ok(frame) = reader.read().await {
119println!("client reading frame: {}",frame.to_string());
120 match outlet_tx.send( frame ).await {
121 Result::Ok(_) => {
122
123 }
124 Result::Err(err) => {
125 (logger)("FATAL: reader disconnected");
126 eprintln!("client: FATAL! reader disconnected.");
127 break;
128 }
129 }
130 yield_now().await;
131 }
132println!("client reader.read() complete.");
133 close_tx.send(0);
134 });
135 }
136
137 return Ok(Self {
138 host,
139 portal,
140 close_tx
141 });
142
143 }
144
145 pub async fn request_assign( &self, request: AssignRequest ) -> Result<Arc<dyn ResourceCtrl>,Error> {
146 self.portal.request_assign(request).await
147 }
148}
149
150#[async_trait]
151pub trait PortalClient: Send+Sync {
152 fn flavor(&self) -> String;
153 fn auth( &self ) -> PortalAuth;
154 fn logger(&self) -> fn(message: &str);
155 async fn init( &self, reader: & mut FrameReader<initout::Frame>, writer: & mut FrameWriter<initin::Frame>, skel: PrePortalSkel ) -> Result<Arc< dyn ResourceCtrlFactory >,Error>;
156
157}
158
159struct TcpInlet {
160 pub sender: mpsc::Sender<inlet::Frame>,
161 pub logger: fn( message: &str )
162}
163
164impl Inlet for TcpInlet {
165 fn inlet_frame(&self, frame: inlet::Frame) {
166 let sender = self.sender.clone();
167 let logger = self.logger;
168 tokio::spawn(async move {
169println!("Sending FRAME via inlet api...{}", frame.to_string());
170 match sender.send(frame).await
171 {
172 Ok(_) => {
173 println!("SENT FRAME via inlet!");
174 }
175 Err(err) => {
176 (logger)(format!("ERROR: frame failed to send to client inlet").as_str())
177 }
178 }
179 });
180 }
181}
182
183
184#[cfg(test)]
185mod tests {
186 #[test]
187 fn it_works() {
188 let result = 2 + 2;
189 assert_eq!(result, 4);
190 }
191}