mesh_portal_api_server/
lib.rs

1#[macro_use]
2extern crate anyhow;
3
4#[macro_use]
5extern crate async_trait;
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::prelude::rust_2021::TryInto;
10use std::sync::Arc;
11use std::time::Duration;
12
13use anyhow::Error;
14use futures::future::select_all;
15use futures::{FutureExt, SinkExt};
16use tokio::sync::mpsc::error::{SendError, SendTimeoutError, TryRecvError};
17use tokio::sync::{broadcast, mpsc, oneshot};
18
19use mesh_portal::version::latest;
20use mesh_portal::version::latest::entity::response;
21use mesh_portal::version::latest::fail;
22use mesh_portal::version::latest::frame::CloseReason;
23use mesh_portal::version::latest::id::Address;
24use mesh_portal::version::latest::messaging::{Message, Request,  Response};
25use mesh_portal::version::latest::pattern::AddressKindPattern;
26use mesh_portal::version::latest::portal::{Exchanger, inlet, outlet};
27use mesh_portal::version::latest::resource::ResourceStub;
28use mesh_portal::version::latest::resource::Status;
29use std::fmt::Debug;
30use dashmap::DashMap;
31use tokio::task::yield_now;
32use mesh_portal::version::latest::artifact::{Artifact, ArtifactRequest, ArtifactResponse};
33use mesh_portal::version::latest::config::{Assign, Config, ConfigBody, PortalConfig};
34use mesh_portal::version::latest::portal::inlet::{AssignRequest, Log};
35use mesh_portal::version::latest::portal::outlet::Frame;
36
37#[derive(Debug,Clone)]
38pub struct PortalApi {
39    pub info: PortalInfo,
40    tx: mpsc::Sender<PortalCall>
41}
42
43impl PortalApi {
44    pub async fn handle_request( &self, request: Request ) -> Response {
45        let (tx,rx) = oneshot::channel();
46        self.tx.send( PortalCall::Request { request: request.clone(), tx }).await;
47        match tokio::time::timeout(Duration::from_secs(60), rx ).await {
48            Ok(Ok(response)) => {
49                response
50            }
51            _ => {
52                request.fail("timeout".to_string().as_str() )
53            }
54        }
55    }
56
57    pub fn assign(&self, assign: Assign)  {
58        let tx = self.tx.clone();
59        tokio::spawn(async move  {
60            tx.send( PortalCall::Assign(assign)).await;
61        });
62    }
63}
64
65#[derive(Clone)]
66pub enum PortalEvent {
67    PortalAdded(PortalApi),
68    PortalRemoved(String),
69    ResourceAdded(PortalResourceApi),
70    ResourceRemoved(Address)
71}
72
73enum PortalCall {
74  Request{ request: Request, tx: oneshot::Sender<Response>},
75  Assign(Assign),
76}
77
78#[derive(Clone, Eq, PartialEq, Hash)]
79pub enum PortalStatus {
80    None,
81    Initializing,
82    Ready,
83    Panic(String),
84}
85
86#[derive(Debug,Clone)]
87pub struct PortalInfo {
88    pub portal_key: String
89}
90
91#[derive(Debug)]
92pub struct Portal {
93    pub info: PortalInfo,
94    pub config: PortalConfig,
95    outlet_tx: mpsc::Sender<outlet::Frame>,
96    exchanges: Arc<DashMap<String,oneshot::Sender<Response>>>,
97    pub log: fn(log: Log),
98    tx: mpsc::Sender<PortalCall>,
99    broadcast_tx: broadcast::Sender<PortalEvent>
100}
101
102impl Portal {
103    pub fn new(
104        info: PortalInfo,
105        config: PortalConfig,
106        outlet_tx: mpsc::Sender<outlet::Frame>,
107        request_handler: Arc<dyn PortalRequestHandler>,
108        broadcast_tx: broadcast::Sender<PortalEvent>,
109        logger: fn(log: Log),
110    ) -> (Self,mpsc::Sender<inlet::Frame>) {
111        let (inlet_tx,mut inlet_rx) = mpsc::channel(1024);
112        let exchanges: Arc<DashMap<String,oneshot::Sender<Response>>> = Arc::new( DashMap::new() );
113        let (tx,mut rx) = mpsc::channel(1024);
114        let portal_api = PortalApi {
115          tx: tx.clone(),
116          info: info.clone()
117        };
118        {
119            let config = config.clone();
120            let exchanges = exchanges.clone();
121            let outlet_tx = outlet_tx.clone();
122            let portal_api = portal_api.clone();
123            let broadcast_tx = broadcast_tx.clone();
124            tokio::spawn(async move {
125               while let Some(call) = rx.recv().await {
126                   match call {
127                       PortalCall::Request { request, tx } => {
128                           let exchanges = exchanges.clone();
129                           let outlet_tx = outlet_tx.clone();
130                           tokio::spawn( async move {
131                               exchanges.insert( request.id.clone(), tx );
132                               outlet_tx.send( outlet::Frame::Request(request.clone()) ).await;
133                           });
134                       }
135                       PortalCall::Assign(assign) => {
136                           let portal_api = portal_api.clone();
137                           let assign = Exchanger::new(assign);
138                           let stub = assign.stub.clone();
139                           outlet_tx.send(outlet::Frame::Assign(assign)).await;
140                           let resource_api = PortalResourceApi {
141                               stub,
142                               portal_api: portal_api
143                           };
144                           broadcast_tx.send( PortalEvent::ResourceAdded(resource_api));
145                       }
146                   }
147               }
148            });
149        }
150
151        {
152            let request_handler = request_handler.clone();
153            let info = info.clone();
154            let portal_config = config.clone();
155            let outlet_tx = outlet_tx.clone();
156            let exchanges = exchanges.clone();
157            tokio::spawn(async move {
158                loop {
159                    match inlet_rx.recv().await {
160                        Some(frame) => {
161                            let frame:inlet::Frame = frame;
162                            match frame {
163                                inlet::Frame::Log(log) => {
164                                    (logger)(log);
165                                }
166                                inlet::Frame::AssignRequest(request) => {
167                                    let result = request_handler.handle_assign_request(request.item.clone() ).await;
168                                    match result {
169                                        Ok(assignment) => {
170                                            let assign = request.with(assignment);
171                                            outlet_tx.send( outlet::Frame::Assign(assign) ).await;
172                                        }
173                                        Err(error) => {
174                                            println!("{}",error.to_string());
175                                        }
176                                    }
177                                }
178                                inlet::Frame::Request(request) => {
179                                    let request_handler = request_handler.clone();
180                                    let outlet_tx = outlet_tx.clone();
181                                    tokio::spawn(async move {
182                                        match tokio::time::timeout(Duration::from_secs(portal_config.response_timeout ), request_handler.route_to_mesh(request.clone()) ).await {
183                                            Ok(response) => {
184                                                outlet_tx.send( outlet::Frame::Response(response)).await;
185                                            }
186                                            _ => {
187                                                let response = request.fail("timeout".to_string().as_str());
188                                                outlet_tx.send( outlet::Frame::Response(response)).await;
189                                            }
190                                        }
191                                    });
192                                }
193                                inlet::Frame::Response(response) => {
194                                    if let Option::Some((_,mut tx)) = exchanges.remove(&response.response_to) {
195                                        tx.send(response);
196                                    } else {
197                                        (logger)(Log::new( "Portal", "response had no listening request" ));
198                                    }
199                                }
200                                inlet::Frame::Artifact(request) => {
201                                    let request_handler = request_handler.clone();
202                                    let outlet_tx = outlet_tx.clone();
203                                    tokio::spawn( async move {
204                                        match request_handler.handle_artifact_request(request.item.clone()).await {
205                                            Ok(response ) => {
206                                                let response = request.with(response);
207                                                outlet_tx.send( outlet::Frame::Artifact(response)).await;
208                                            }
209                                            Err(err) => {
210                                                (logger)(Log::new( "Portal", err.to_string().as_str() ));
211                                            }
212                                        }
213                                    });
214                                }
215                                inlet::Frame::Status(_) => {
216                                    // not implemented
217                                }
218                                inlet::Frame::Close(_) => {
219                                    // not implemented
220                                }
221                            }
222                        }
223                        None => {
224                            break;
225                        }
226                    }
227                }
228            });
229        }
230
231
232        (Self {
233            info,
234            config,
235            log: logger,
236            exchanges,
237            outlet_tx,
238            broadcast_tx,
239            tx
240        },inlet_tx)
241    }
242
243
244    pub fn api(&self) -> PortalApi {
245        PortalApi {
246            info: self.info.clone(),
247            tx: self.tx.clone()
248        }
249    }
250
251    pub async fn handle_request(&self, request: Request ) -> Response {
252        let (tx,rx) = oneshot::channel();
253        self.exchanges.insert( request.id.clone(), tx );
254        self.outlet_tx.send( outlet::Frame::Request(request.clone()) ).await;
255        match tokio::time::timeout(Duration::from_secs(self.config.response_timeout ), rx ).await {
256            Ok(Ok(response)) => {
257                response
258            }
259            _ => {
260                let response = request.fail("timeout".to_string().as_str() );
261                response
262            }
263        }
264    }
265
266    pub fn assign(&self, assign: Assign ) {
267        let outlet_tx = self.outlet_tx.clone();
268        tokio::spawn(async move {
269            let assign = Exchanger::new(assign);
270            outlet_tx.send(outlet::Frame::Assign(assign)).await;
271        });
272    }
273
274    pub fn shutdown(&mut self) {
275        self.outlet_tx
276            .try_send(outlet::Frame::Close(CloseReason::Done))
277            .unwrap_or(());
278    }
279}
280
281#[async_trait]
282pub trait PortalRequestHandler: Send + Sync {
283
284    async fn route_to_mesh(&self, request: Request ) -> Response;
285
286    async fn default_assign(&self) -> Result<Assign, Error> {
287        Err(anyhow!("request handler does not have a default assign"))
288    }
289
290    async fn handle_assign_request(&self, request: AssignRequest ) -> Result<Assign, Error> {
291        Err(anyhow!("request handler does not assign"))
292    }
293
294    async fn handle_artifact_request(
295        &self,
296        request: ArtifactRequest,
297    ) -> Result<ArtifactResponse, Error> {
298        Err(anyhow!("request handler does not handle artifacts"))
299    }
300
301    async fn handle_config_request(
302        &self,
303        request: ArtifactRequest,
304    ) -> Result<ArtifactResponse, Error> {
305        Err(anyhow!("request handler does not handle configs"))
306    }
307}
308
309
310#[derive(Debug,Clone)]
311pub struct PortalResourceApi {
312   portal_api: PortalApi,
313   pub stub: ResourceStub
314}
315
316impl PortalResourceApi {
317   pub async fn handle_request( &self, request: Request ) -> Response {
318       self.portal_api.handle_request(request).await
319   }
320}