mesh_portal_api_client/
lib.rs

1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7
8
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11
12use anyhow::Error;
13use dashmap::DashMap;
14use tokio::sync::{oneshot, mpsc};
15use uuid::Uuid;
16
17
18use std::prelude::rust_2021::TryFrom;
19use std::ops::Deref;
20use std::collections::HashMap;
21use tokio::sync::watch::Receiver;
22use mesh_portal::version::latest::http::{HttpRequest, HttpResponse};
23use mesh_portal::version::latest::portal::{Exchanger, inlet, outlet};
24use mesh_portal::version::latest::resource::{ResourceStub, Status};
25use mesh_portal::version::latest::{portal, entity};
26use std::convert::TryInto;
27use dashmap::mapref::one::Ref;
28use tokio::sync::oneshot::Sender;
29use tokio::task::yield_now;
30use mesh_portal::version::latest::config::{Assign, Config, PortalConfig, ResourceConfigBody};
31use mesh_portal::version::latest::entity::response::ResponseCore;
32use mesh_portal::version::latest::id::Address;
33use mesh_portal::version::latest::messaging::{Request, Response};
34use mesh_portal::version::latest::portal::inlet::{AssignRequest, Log};
35use mesh_portal::version::latest::portal::outlet::Frame;
36use mesh_portal::version::latest::util::unique_id;
37
38pub fn std_logger( log: Log ) {
39    println!("{}", log.to_string())
40}
41
42#[derive(Clone)]
43pub struct ResourceSkel {
44  pub assign: Assign,
45  pub portal: PortalSkel,
46  pub logger: fn(message: &str),
47}
48
49pub trait ResourceCtrlFactory: Sync+Send {
50    fn matches(&self,config:Config<ResourceConfigBody>) -> bool;
51    fn create(&self, skel: ResourceSkel ) -> Result<Arc<dyn ResourceCtrl>, Error>;
52}
53
54#[async_trait]
55pub trait ResourceCtrl: Sync+Send {
56    async fn init(&self) -> Result<(), Error>
57    {
58        Ok(())
59    }
60
61    async fn handle_request( &self, request: Request ) -> ResponseCore {
62        request.core.not_found()
63    }
64
65}
66
67pub fn log(message: &str) {
68    println!("{}",message);
69}
70
71pub trait Inlet: Sync+Send {
72    fn inlet_frame(&self, frame: inlet::Frame);
73}
74
75pub trait Outlet: Sync+Send {
76    fn receive(&mut self, frame: outlet::Frame);
77}
78
79pub struct StatusChamber {
80    pub status: Status
81}
82
83impl StatusChamber{
84    pub fn new( status: Status ) -> Self {
85        Self {
86            status
87        }
88    }
89}
90
91pub type Exchanges = Arc<DashMap<String, oneshot::Sender<Response>>>;
92
93#[derive(Clone)]
94pub struct PrePortalSkel {
95    pub config: PortalConfig,
96    pub inlet: Arc<dyn Inlet>,
97    pub logger: fn(message: &str),
98    pub exchanges: Exchanges,
99    pub assign_exchange: Arc<DashMap<String, oneshot::Sender<Arc<dyn ResourceCtrl>>>>,
100}
101impl PrePortalSkel {
102
103    pub fn api(&self) -> InletApi {
104        InletApi::new( self.config.clone(), self.inlet.clone(), self.exchanges.clone(), std_logger )
105    }
106
107}
108#[derive(Clone)]
109pub struct PortalSkel {
110    pub pre: PrePortalSkel,
111    pub tx: mpsc::Sender<outlet::Frame>,
112    pub ctrl_factory: Arc<dyn ResourceCtrlFactory>,
113}
114
115impl Deref for PortalSkel {
116    type Target = PrePortalSkel;
117
118    fn deref(&self) -> &Self::Target {
119        &self.pre
120    }
121}
122
123pub enum ResourceCommand {
124    Add{address: Address, resource: Arc<dyn ResourceCtrl> },
125    Remove(Address),
126    None
127}
128
129
130pub struct Portal {
131    pub skel: PortalSkel,
132}
133
134impl Portal {
135    pub async fn new(
136        pre: PrePortalSkel,
137        outlet_tx: mpsc::Sender<outlet::Frame>,
138        mut outlet_rx: mpsc::Receiver<outlet::Frame>,
139        ctrl_factory: Arc<dyn ResourceCtrlFactory>,
140        logger: fn(message: &str)
141    ) -> Result<Arc<Portal>, Error> {
142
143        let skel =  PortalSkel {
144            pre,
145            tx: outlet_tx,
146            ctrl_factory,
147        };
148
149        {
150println!("NEW PORTAL!" );
151            let skel = skel.clone();
152            tokio::spawn(async move {
153                let mut resources = HashMap::new();
154println!("portal listening...");
155                while let Option::Some(frame) = outlet_rx.recv().await {
156println!("Portal received frame: {}", frame.to_string());
157                    if let Frame::Close(_) = frame {
158                        println!("XXX>>> Client exiting outlet_rx loop");
159                        break;
160                    }
161                    process(skel.clone(), &mut resources, frame).await;
162
163                        async fn process( skel: PortalSkel,  resources: &mut HashMap<Address,Arc<dyn ResourceCtrl>>, frame: outlet::Frame ) -> Result<(),Error> {
164println!("CLIENT PROCESS");
165
166                            match &frame {
167                                outlet::Frame::Init => {
168
169                                }
170                                outlet::Frame::Assign(assign) => {
171                                    let resource_skel = ResourceSkel {
172                                        assign: assign.item.clone(),
173                                        portal: skel.clone(),
174                                        logger: skel.logger,
175                                    };
176                                    let resource = skel.ctrl_factory.create(resource_skel)?;
177                                    resources.insert( assign.stub.address.clone(), resource.clone() );
178                                    let assign = assign.clone();
179                                    let skel = skel.clone();
180                                    let frame = frame.clone();
181                                    tokio::spawn( async move {
182                                        println!("CLIENT INIT");
183                                        resource.init().await;
184                                        match skel.assign_exchange.remove( &assign.id ) {
185                                            None => {
186                                                println!("could not find exchange for {}",assign.id);
187                                            }
188                                            Some((_,tx)) => {
189                                                tx.send( resource );
190                                            }
191                                        }
192                                        println!("CLIENT INIT COMPLETE");
193                                    });
194
195                                }
196                                outlet::Frame::Request(request) => {
197                                    let request = request.clone();
198                                    let resource = resources.get(&request.to ).ok_or(anyhow!("expected to find resource for address '{}'", request.to.to_string()))?;
199                                    let response = resource.handle_request(request.clone()).await;
200                                    let response = Response {
201                                        id: unique_id(),
202                                        from: request.to,
203                                        to: request.from,
204                                        core: response,
205                                        response_to: request.id
206                                    };
207                                    skel.inlet.inlet_frame(inlet::Frame::Response(response));
208                                }
209                                outlet::Frame::Response(response) => {
210                                    if let Some((_,tx)) = skel.exchanges.remove(&response.response_to)
211                                    {
212                                        tx.send( response.clone() );
213                                    }
214                                }
215                                outlet::Frame::Artifact(_) => {
216                                    unimplemented!()
217                                }
218                                outlet::Frame::Close(_) => {
219                                }
220                            }
221
222                            Ok(())
223                        }
224
225
226                }
227
228
229
230            });
231        }
232
233        let portal = Self {
234            skel: skel.clone(),
235        };
236
237        Ok(Arc::new(portal))
238    }
239
240    pub fn log( &self, log: Log ) {
241        self.skel.inlet.inlet_frame(inlet::Frame::Log(log));
242    }
243
244    pub async fn request_assign( &self, request: AssignRequest) -> Result<Arc<dyn ResourceCtrl>,Error> {
245       let (tx,rx) = oneshot::channel();
246       let request = Exchanger::new(request);
247       self.skel.assign_exchange.insert( request.id.clone(), tx );
248       self.skel.inlet.inlet_frame(inlet::Frame::AssignRequest(request) );
249       Ok(rx.await?)
250    }
251
252    pub async fn request( &self, request: Request ) -> Response {
253        self.skel.api().exchange(request).await
254    }
255}
256
257#[async_trait]
258impl Outlet for Portal {
259    fn receive(&mut self, frame: outlet::Frame) {
260        self.skel.tx.send( frame );
261    }
262}
263
264
265pub struct InletApi {
266    config: PortalConfig,
267    inlet: Arc<dyn Inlet>,
268    exchanges: Exchanges,
269    logger: fn( log: Log )
270}
271
272impl InletApi {
273    pub fn new(config: PortalConfig, inlet: Arc<dyn Inlet>, exchanges: Exchanges, logger: fn( log: Log ) ) -> Self {
274        Self {
275            config,
276            inlet,
277            exchanges,
278            logger
279        }
280    }
281
282
283    pub fn notify(&self, request: Request) {
284        self.inlet.inlet_frame(inlet::Frame::Request(request));
285    }
286
287    pub async fn exchange(
288        &mut self,
289        request: Request
290    ) -> Response {
291
292        let (tx,rx) = oneshot::channel();
293        self.exchanges.insert(request.id.clone(), tx);
294        self.inlet.inlet_frame(inlet::Frame::Request(request.clone()));
295
296        let result = tokio::time::timeout(Duration::from_secs(self.config.response_timeout.clone()),rx).await;
297        match result {
298            Ok(Ok(response)) => response,
299            Ok(Err(error)) => request.fail(error.to_string().as_str()),
300            Err(error) => request.fail(error.to_string().as_str())
301        }
302    }
303
304    pub fn send_response(&self, response: Response ) {
305        self.inlet.inlet_frame( inlet::Frame::Response(response) );
306    }
307}
308
309pub mod client {
310    use std::ops::Deref;
311    use anyhow::Error;
312    use mesh_portal::version::latest::portal::outlet;
313    use mesh_portal::version::latest::id::{Address};
314    use mesh_portal::version::latest::http::HttpRequest;
315
316    /*
317    #[derive(Clone)]
318    pub struct RequestContext {
319        pub portal_info: Info,
320        pub logger: fn(message: &str),
321    }
322
323    impl RequestContext {
324        pub fn new(portal_info: Info, logger: fn(message: &str)) -> Self {
325            Self {
326                portal_info,
327                logger
328            }
329        }
330    }
331
332    pub struct Request<REQUEST> {
333        pub context: RequestContext,
334        pub from: Address,
335        pub request: REQUEST
336    }
337
338    impl<REQUEST> Deref for Request<REQUEST> {
339        type Target = REQUEST;
340
341        fn deref(&self) -> &Self::Target {
342            &self.request
343        }
344    }
345
346     */
347}
348
349
350pub mod example {
351    use std::sync::Arc;
352
353    use anyhow::Error;
354
355
356    use crate::{InletApi, ResourceCtrl, PortalSkel, inlet};
357    use std::collections::HashMap;
358    use mesh_portal::version::latest::payload::{Payload, Primitive};
359    use mesh_portal::version::latest::entity;
360
361    pub struct HelloCtrl {
362        pub skel: Arc<PortalSkel>,
363        pub inlet_api: InletApi
364    }
365
366    impl HelloCtrl {
367        #[allow(dead_code)]
368        fn new(skel: Arc<PortalSkel>, inlet_api: InletApi) -> Box<Self> {
369            Box::new(Self { skel, inlet_api } )
370        }
371    }
372
373    #[async_trait]
374    impl ResourceCtrl for HelloCtrl {
375
376        async fn init(&self) -> Result<(), Error> {
377            unimplemented!();
378            /*
379            let mut request =
380                inlet::Request::new(entity::request::ReqEntity::Msg( Msg {
381                    action: "HelloWorld".to_string(),
382                    payload: Payload::Empty,
383
384                    path: "/".to_string()
385                }));
386
387            request.to.push(self.inlet_api.info.address.clone());
388
389            let response = self.inlet_api.exchange(request).await?;
390
391            if let entity::response::RespEntity::Ok(Payload::Primitive(Primitive::Text(text))) = response.entity {
392                println!("{}",text);
393            } else {
394                return Err(anyhow!("unexpected signal"));
395            }
396
397
398             */
399            Ok(())
400        }
401
402
403    }
404
405
406}
407
408
409
410
411/*                        match request.entity.clone() {
412                            ExtOperation::Http(_) => {
413                                if let Exchange::RequestResponse(exchange_id) = &kind
414                                {
415                                    let result = Request::try_from_http(request, context);
416                                    match result {
417                                        Ok(request) => {
418                                            let path = request.path.clone();
419                                            let result = ctrl.http_request(request).await;
420                                            match result {
421                                                Ok(response) => {
422                                                    let response = inlet::Response {
423                                                        to: from,
424                                                        exchange:exchange_id.clone(),
425                                                        entity: ResponseEntity::Ok(Entity::HttpResponse(response))
426                                                    };
427                                                    inlet_api.respond( response );
428                                                }
429                                                Err(err) => {
430                                                    (skel.logger)(format!("ERROR: HttpRequest.path: '{}' error: '{}' ",  path, err.to_string()).as_str());
431                                                    let response = inlet::Response {
432                                                        to: from,
433                                                        exchange:exchange_id.clone(),
434                                                        entity: ResponseEntity::Ok(Entity::HttpResponse(HttpResponse::server_side_error()))
435                                                    };
436                                                    inlet_api.respond( response );
437                                                }
438                                            }
439                                        }
440                                        Err(err) => {
441                                            (skel.logger)(format!("FATAL: could not modify HttpRequest into Request<HttpRequest>: {}", err.to_string()).as_str());
442                                        }
443                                    }
444                                } else {
445                                    (skel.logger)("FATAL: http request MUST be of ExchangeKind::RequestResponse");
446                                }
447                            }
448                            ExtOperation::Port(port_request) => {
449                                match ports.get(&port_request.port ) {
450                                    Some(port) => {
451                                        let result = Request::try_from_port(request, context );
452                                        match result {
453                                            Ok(request) => {
454                                                let request_from = request.from.clone();
455                                                let result = port.request(request).await;
456                                                match result {
457                                                    Ok(response) => {
458                                                        match response {
459                                                            Some(signal) => {
460                                                                if let Exchange::RequestResponse(exchange_id) = &kind
461                                                                {
462                                                                   let response = inlet::Response {
463                                                                       to: request_from,
464                                                                       exchange: exchange_id.clone(),
465                                                                       entity: signal
466                                                                   };
467
468                                                                   inlet_api.respond(response);
469                                                                } else {
470                                                                    let message = format!("WARN: PortOperation.port '{}' generated a response to a ExchangeKind::Notification", port_request.port);
471                                                                    (skel.logger)(message.as_str());
472                                                                }
473                                                            }
474                                                            None => {
475                                                                let message = format!("ERROR: PortOperation.port '{}' generated no response", port_request.port);
476                                                                (skel.logger)(message.as_str());
477                                                                if let Exchange::RequestResponse(exchange_id) = &kind
478                                                                {
479                                                                    let response = inlet::Response {
480                                                                        to: request_from,
481                                                                        exchange: exchange_id.clone(),
482                                                                        entity: ResponseEntity::Error(message)
483                                                                    };
484                                                                    inlet_api.respond(response);
485                                                                }
486                                                            }
487                                                        }
488                                                    }
489                                                    Err(err) => {
490                                                        let message = format!("ERROR: PortOperation.port '{}' message: '{}'", port_request.port, err.to_string());
491                                                        (skel.logger)(message.as_str());
492                                                        if let Exchange::RequestResponse(exchange_id) = &kind
493                                                        {
494                                                            let response = inlet::Response {
495                                                                to: request_from,
496                                                                exchange: exchange_id.clone(),
497                                                                entity: ResponseEntity::Error(message)
498                                                            };
499                                                            inlet_api.respond(response);
500                                                        }
501                                                    }
502                                                }
503
504                                            }
505                                            Err(err) => {
506                                                let message = format!("FATAL: could not modify PortOperation into Request<PortOperation>: {}", err.to_string());
507                                                (skel.logger)(message.as_str());
508                                                if let Exchange::RequestResponse(exchange_id) = &kind
509                                                {
510                                                    let response = inlet::Response {
511                                                        to: from,
512                                                        exchange: exchange_id.clone(),
513                                                        entity: ResponseEntity::Error(message)
514                                                    };
515                                                    inlet_api.respond(response);
516                                                }
517                                            }
518                                        }
519
520                                    }
521                                    None => {
522                                        let message =format!("ERROR: message port: '{}' not defined ", port_request.port );
523                                        (skel.logger)(message.as_str());
524                                        if let Exchange::RequestResponse(exchange_id) = &kind
525                                        {
526                                            let response = inlet::Response {
527                                                to: from,
528                                                exchange: exchange_id.clone(),
529                                                entity: ResponseEntity::Error(message)
530                                            };
531                                            inlet_api.respond(response);
532                                        }
533                                    }
534                                }
535                            }
536                        }*/