mesh_portal_api_client/lib.rs
1#[macro_use]
2extern crate async_trait;
3
4#[macro_use]
5extern crate anyhow;
6
7
8
9use std::sync::{Arc, RwLock};
10use std::time::Duration;
11
12use anyhow::Error;
13use dashmap::DashMap;
14use tokio::sync::{oneshot, mpsc};
15use uuid::Uuid;
16
17
18use std::prelude::rust_2021::TryFrom;
19use std::ops::Deref;
20use std::collections::HashMap;
21use tokio::sync::watch::Receiver;
22use mesh_portal::version::latest::http::{HttpRequest, HttpResponse};
23use mesh_portal::version::latest::portal::{Exchanger, inlet, outlet};
24use mesh_portal::version::latest::resource::{ResourceStub, Status};
25use mesh_portal::version::latest::{portal, entity};
26use std::convert::TryInto;
27use dashmap::mapref::one::Ref;
28use tokio::sync::oneshot::Sender;
29use tokio::task::yield_now;
30use mesh_portal::version::latest::config::{Assign, Config, PortalConfig, ResourceConfigBody};
31use mesh_portal::version::latest::entity::response::ResponseCore;
32use mesh_portal::version::latest::id::Address;
33use mesh_portal::version::latest::messaging::{Request, Response};
34use mesh_portal::version::latest::portal::inlet::{AssignRequest, Log};
35use mesh_portal::version::latest::portal::outlet::Frame;
36use mesh_portal::version::latest::util::unique_id;
37
38pub fn std_logger( log: Log ) {
39 println!("{}", log.to_string())
40}
41
42#[derive(Clone)]
43pub struct ResourceSkel {
44 pub assign: Assign,
45 pub portal: PortalSkel,
46 pub logger: fn(message: &str),
47}
48
49pub trait ResourceCtrlFactory: Sync+Send {
50 fn matches(&self,config:Config<ResourceConfigBody>) -> bool;
51 fn create(&self, skel: ResourceSkel ) -> Result<Arc<dyn ResourceCtrl>, Error>;
52}
53
54#[async_trait]
55pub trait ResourceCtrl: Sync+Send {
56 async fn init(&self) -> Result<(), Error>
57 {
58 Ok(())
59 }
60
61 async fn handle_request( &self, request: Request ) -> ResponseCore {
62 request.core.not_found()
63 }
64
65}
66
67pub fn log(message: &str) {
68 println!("{}",message);
69}
70
71pub trait Inlet: Sync+Send {
72 fn inlet_frame(&self, frame: inlet::Frame);
73}
74
75pub trait Outlet: Sync+Send {
76 fn receive(&mut self, frame: outlet::Frame);
77}
78
79pub struct StatusChamber {
80 pub status: Status
81}
82
83impl StatusChamber{
84 pub fn new( status: Status ) -> Self {
85 Self {
86 status
87 }
88 }
89}
90
91pub type Exchanges = Arc<DashMap<String, oneshot::Sender<Response>>>;
92
93#[derive(Clone)]
94pub struct PrePortalSkel {
95 pub config: PortalConfig,
96 pub inlet: Arc<dyn Inlet>,
97 pub logger: fn(message: &str),
98 pub exchanges: Exchanges,
99 pub assign_exchange: Arc<DashMap<String, oneshot::Sender<Arc<dyn ResourceCtrl>>>>,
100}
101impl PrePortalSkel {
102
103 pub fn api(&self) -> InletApi {
104 InletApi::new( self.config.clone(), self.inlet.clone(), self.exchanges.clone(), std_logger )
105 }
106
107}
108#[derive(Clone)]
109pub struct PortalSkel {
110 pub pre: PrePortalSkel,
111 pub tx: mpsc::Sender<outlet::Frame>,
112 pub ctrl_factory: Arc<dyn ResourceCtrlFactory>,
113}
114
115impl Deref for PortalSkel {
116 type Target = PrePortalSkel;
117
118 fn deref(&self) -> &Self::Target {
119 &self.pre
120 }
121}
122
123pub enum ResourceCommand {
124 Add{address: Address, resource: Arc<dyn ResourceCtrl> },
125 Remove(Address),
126 None
127}
128
129
130pub struct Portal {
131 pub skel: PortalSkel,
132}
133
134impl Portal {
135 pub async fn new(
136 pre: PrePortalSkel,
137 outlet_tx: mpsc::Sender<outlet::Frame>,
138 mut outlet_rx: mpsc::Receiver<outlet::Frame>,
139 ctrl_factory: Arc<dyn ResourceCtrlFactory>,
140 logger: fn(message: &str)
141 ) -> Result<Arc<Portal>, Error> {
142
143 let skel = PortalSkel {
144 pre,
145 tx: outlet_tx,
146 ctrl_factory,
147 };
148
149 {
150println!("NEW PORTAL!" );
151 let skel = skel.clone();
152 tokio::spawn(async move {
153 let mut resources = HashMap::new();
154println!("portal listening...");
155 while let Option::Some(frame) = outlet_rx.recv().await {
156println!("Portal received frame: {}", frame.to_string());
157 if let Frame::Close(_) = frame {
158 println!("XXX>>> Client exiting outlet_rx loop");
159 break;
160 }
161 process(skel.clone(), &mut resources, frame).await;
162
163 async fn process( skel: PortalSkel, resources: &mut HashMap<Address,Arc<dyn ResourceCtrl>>, frame: outlet::Frame ) -> Result<(),Error> {
164println!("CLIENT PROCESS");
165
166 match &frame {
167 outlet::Frame::Init => {
168
169 }
170 outlet::Frame::Assign(assign) => {
171 let resource_skel = ResourceSkel {
172 assign: assign.item.clone(),
173 portal: skel.clone(),
174 logger: skel.logger,
175 };
176 let resource = skel.ctrl_factory.create(resource_skel)?;
177 resources.insert( assign.stub.address.clone(), resource.clone() );
178 let assign = assign.clone();
179 let skel = skel.clone();
180 let frame = frame.clone();
181 tokio::spawn( async move {
182 println!("CLIENT INIT");
183 resource.init().await;
184 match skel.assign_exchange.remove( &assign.id ) {
185 None => {
186 println!("could not find exchange for {}",assign.id);
187 }
188 Some((_,tx)) => {
189 tx.send( resource );
190 }
191 }
192 println!("CLIENT INIT COMPLETE");
193 });
194
195 }
196 outlet::Frame::Request(request) => {
197 let request = request.clone();
198 let resource = resources.get(&request.to ).ok_or(anyhow!("expected to find resource for address '{}'", request.to.to_string()))?;
199 let response = resource.handle_request(request.clone()).await;
200 let response = Response {
201 id: unique_id(),
202 from: request.to,
203 to: request.from,
204 core: response,
205 response_to: request.id
206 };
207 skel.inlet.inlet_frame(inlet::Frame::Response(response));
208 }
209 outlet::Frame::Response(response) => {
210 if let Some((_,tx)) = skel.exchanges.remove(&response.response_to)
211 {
212 tx.send( response.clone() );
213 }
214 }
215 outlet::Frame::Artifact(_) => {
216 unimplemented!()
217 }
218 outlet::Frame::Close(_) => {
219 }
220 }
221
222 Ok(())
223 }
224
225
226 }
227
228
229
230 });
231 }
232
233 let portal = Self {
234 skel: skel.clone(),
235 };
236
237 Ok(Arc::new(portal))
238 }
239
240 pub fn log( &self, log: Log ) {
241 self.skel.inlet.inlet_frame(inlet::Frame::Log(log));
242 }
243
244 pub async fn request_assign( &self, request: AssignRequest) -> Result<Arc<dyn ResourceCtrl>,Error> {
245 let (tx,rx) = oneshot::channel();
246 let request = Exchanger::new(request);
247 self.skel.assign_exchange.insert( request.id.clone(), tx );
248 self.skel.inlet.inlet_frame(inlet::Frame::AssignRequest(request) );
249 Ok(rx.await?)
250 }
251
252 pub async fn request( &self, request: Request ) -> Response {
253 self.skel.api().exchange(request).await
254 }
255}
256
257#[async_trait]
258impl Outlet for Portal {
259 fn receive(&mut self, frame: outlet::Frame) {
260 self.skel.tx.send( frame );
261 }
262}
263
264
265pub struct InletApi {
266 config: PortalConfig,
267 inlet: Arc<dyn Inlet>,
268 exchanges: Exchanges,
269 logger: fn( log: Log )
270}
271
272impl InletApi {
273 pub fn new(config: PortalConfig, inlet: Arc<dyn Inlet>, exchanges: Exchanges, logger: fn( log: Log ) ) -> Self {
274 Self {
275 config,
276 inlet,
277 exchanges,
278 logger
279 }
280 }
281
282
283 pub fn notify(&self, request: Request) {
284 self.inlet.inlet_frame(inlet::Frame::Request(request));
285 }
286
287 pub async fn exchange(
288 &mut self,
289 request: Request
290 ) -> Response {
291
292 let (tx,rx) = oneshot::channel();
293 self.exchanges.insert(request.id.clone(), tx);
294 self.inlet.inlet_frame(inlet::Frame::Request(request.clone()));
295
296 let result = tokio::time::timeout(Duration::from_secs(self.config.response_timeout.clone()),rx).await;
297 match result {
298 Ok(Ok(response)) => response,
299 Ok(Err(error)) => request.fail(error.to_string().as_str()),
300 Err(error) => request.fail(error.to_string().as_str())
301 }
302 }
303
304 pub fn send_response(&self, response: Response ) {
305 self.inlet.inlet_frame( inlet::Frame::Response(response) );
306 }
307}
308
309pub mod client {
310 use std::ops::Deref;
311 use anyhow::Error;
312 use mesh_portal::version::latest::portal::outlet;
313 use mesh_portal::version::latest::id::{Address};
314 use mesh_portal::version::latest::http::HttpRequest;
315
316 /*
317 #[derive(Clone)]
318 pub struct RequestContext {
319 pub portal_info: Info,
320 pub logger: fn(message: &str),
321 }
322
323 impl RequestContext {
324 pub fn new(portal_info: Info, logger: fn(message: &str)) -> Self {
325 Self {
326 portal_info,
327 logger
328 }
329 }
330 }
331
332 pub struct Request<REQUEST> {
333 pub context: RequestContext,
334 pub from: Address,
335 pub request: REQUEST
336 }
337
338 impl<REQUEST> Deref for Request<REQUEST> {
339 type Target = REQUEST;
340
341 fn deref(&self) -> &Self::Target {
342 &self.request
343 }
344 }
345
346 */
347}
348
349
350pub mod example {
351 use std::sync::Arc;
352
353 use anyhow::Error;
354
355
356 use crate::{InletApi, ResourceCtrl, PortalSkel, inlet};
357 use std::collections::HashMap;
358 use mesh_portal::version::latest::payload::{Payload, Primitive};
359 use mesh_portal::version::latest::entity;
360
361 pub struct HelloCtrl {
362 pub skel: Arc<PortalSkel>,
363 pub inlet_api: InletApi
364 }
365
366 impl HelloCtrl {
367 #[allow(dead_code)]
368 fn new(skel: Arc<PortalSkel>, inlet_api: InletApi) -> Box<Self> {
369 Box::new(Self { skel, inlet_api } )
370 }
371 }
372
373 #[async_trait]
374 impl ResourceCtrl for HelloCtrl {
375
376 async fn init(&self) -> Result<(), Error> {
377 unimplemented!();
378 /*
379 let mut request =
380 inlet::Request::new(entity::request::ReqEntity::Msg( Msg {
381 action: "HelloWorld".to_string(),
382 payload: Payload::Empty,
383
384 path: "/".to_string()
385 }));
386
387 request.to.push(self.inlet_api.info.address.clone());
388
389 let response = self.inlet_api.exchange(request).await?;
390
391 if let entity::response::RespEntity::Ok(Payload::Primitive(Primitive::Text(text))) = response.entity {
392 println!("{}",text);
393 } else {
394 return Err(anyhow!("unexpected signal"));
395 }
396
397
398 */
399 Ok(())
400 }
401
402
403 }
404
405
406}
407
408
409
410
411/* match request.entity.clone() {
412 ExtOperation::Http(_) => {
413 if let Exchange::RequestResponse(exchange_id) = &kind
414 {
415 let result = Request::try_from_http(request, context);
416 match result {
417 Ok(request) => {
418 let path = request.path.clone();
419 let result = ctrl.http_request(request).await;
420 match result {
421 Ok(response) => {
422 let response = inlet::Response {
423 to: from,
424 exchange:exchange_id.clone(),
425 entity: ResponseEntity::Ok(Entity::HttpResponse(response))
426 };
427 inlet_api.respond( response );
428 }
429 Err(err) => {
430 (skel.logger)(format!("ERROR: HttpRequest.path: '{}' error: '{}' ", path, err.to_string()).as_str());
431 let response = inlet::Response {
432 to: from,
433 exchange:exchange_id.clone(),
434 entity: ResponseEntity::Ok(Entity::HttpResponse(HttpResponse::server_side_error()))
435 };
436 inlet_api.respond( response );
437 }
438 }
439 }
440 Err(err) => {
441 (skel.logger)(format!("FATAL: could not modify HttpRequest into Request<HttpRequest>: {}", err.to_string()).as_str());
442 }
443 }
444 } else {
445 (skel.logger)("FATAL: http request MUST be of ExchangeKind::RequestResponse");
446 }
447 }
448 ExtOperation::Port(port_request) => {
449 match ports.get(&port_request.port ) {
450 Some(port) => {
451 let result = Request::try_from_port(request, context );
452 match result {
453 Ok(request) => {
454 let request_from = request.from.clone();
455 let result = port.request(request).await;
456 match result {
457 Ok(response) => {
458 match response {
459 Some(signal) => {
460 if let Exchange::RequestResponse(exchange_id) = &kind
461 {
462 let response = inlet::Response {
463 to: request_from,
464 exchange: exchange_id.clone(),
465 entity: signal
466 };
467
468 inlet_api.respond(response);
469 } else {
470 let message = format!("WARN: PortOperation.port '{}' generated a response to a ExchangeKind::Notification", port_request.port);
471 (skel.logger)(message.as_str());
472 }
473 }
474 None => {
475 let message = format!("ERROR: PortOperation.port '{}' generated no response", port_request.port);
476 (skel.logger)(message.as_str());
477 if let Exchange::RequestResponse(exchange_id) = &kind
478 {
479 let response = inlet::Response {
480 to: request_from,
481 exchange: exchange_id.clone(),
482 entity: ResponseEntity::Error(message)
483 };
484 inlet_api.respond(response);
485 }
486 }
487 }
488 }
489 Err(err) => {
490 let message = format!("ERROR: PortOperation.port '{}' message: '{}'", port_request.port, err.to_string());
491 (skel.logger)(message.as_str());
492 if let Exchange::RequestResponse(exchange_id) = &kind
493 {
494 let response = inlet::Response {
495 to: request_from,
496 exchange: exchange_id.clone(),
497 entity: ResponseEntity::Error(message)
498 };
499 inlet_api.respond(response);
500 }
501 }
502 }
503
504 }
505 Err(err) => {
506 let message = format!("FATAL: could not modify PortOperation into Request<PortOperation>: {}", err.to_string());
507 (skel.logger)(message.as_str());
508 if let Exchange::RequestResponse(exchange_id) = &kind
509 {
510 let response = inlet::Response {
511 to: from,
512 exchange: exchange_id.clone(),
513 entity: ResponseEntity::Error(message)
514 };
515 inlet_api.respond(response);
516 }
517 }
518 }
519
520 }
521 None => {
522 let message =format!("ERROR: message port: '{}' not defined ", port_request.port );
523 (skel.logger)(message.as_str());
524 if let Exchange::RequestResponse(exchange_id) = &kind
525 {
526 let response = inlet::Response {
527 to: from,
528 exchange: exchange_id.clone(),
529 entity: ResponseEntity::Error(message)
530 };
531 inlet_api.respond(response);
532 }
533 }
534 }
535 }
536 }*/