1#[macro_use]
2extern crate anyhow;
3
4#[macro_use]
5extern crate async_trait;
6
7use std::collections::HashMap;
8use std::future::Future;
9use std::prelude::rust_2021::TryInto;
10use std::sync::Arc;
11use std::time::Duration;
12
13use anyhow::Error;
14use futures::future::select_all;
15use futures::{FutureExt, SinkExt};
16use tokio::sync::mpsc::error::{SendError, SendTimeoutError, TryRecvError};
17use tokio::sync::{broadcast, mpsc, oneshot};
18
19use mesh_portal::version::latest;
20use mesh_portal::version::latest::entity::response;
21use mesh_portal::version::latest::fail;
22use mesh_portal::version::latest::frame::CloseReason;
23use mesh_portal::version::latest::id::Address;
24use mesh_portal::version::latest::messaging::{Message, Request, Response};
25use mesh_portal::version::latest::pattern::AddressKindPattern;
26use mesh_portal::version::latest::portal::{Exchanger, inlet, outlet};
27use mesh_portal::version::latest::resource::ResourceStub;
28use mesh_portal::version::latest::resource::Status;
29use std::fmt::Debug;
30use dashmap::DashMap;
31use tokio::task::yield_now;
32use mesh_portal::version::latest::artifact::{Artifact, ArtifactRequest, ArtifactResponse};
33use mesh_portal::version::latest::config::{Assign, Config, ConfigBody, PortalConfig};
34use mesh_portal::version::latest::portal::inlet::{AssignRequest, Log};
35use mesh_portal::version::latest::portal::outlet::Frame;
36
37#[derive(Debug,Clone)]
38pub struct PortalApi {
39 pub info: PortalInfo,
40 tx: mpsc::Sender<PortalCall>
41}
42
43impl PortalApi {
44 pub async fn handle_request( &self, request: Request ) -> Response {
45 let (tx,rx) = oneshot::channel();
46 self.tx.send( PortalCall::Request { request: request.clone(), tx }).await;
47 match tokio::time::timeout(Duration::from_secs(60), rx ).await {
48 Ok(Ok(response)) => {
49 response
50 }
51 _ => {
52 request.fail("timeout".to_string().as_str() )
53 }
54 }
55 }
56
57 pub fn assign(&self, assign: Assign) {
58 let tx = self.tx.clone();
59 tokio::spawn(async move {
60 tx.send( PortalCall::Assign(assign)).await;
61 });
62 }
63}
64
65#[derive(Clone)]
66pub enum PortalEvent {
67 PortalAdded(PortalApi),
68 PortalRemoved(String),
69 ResourceAdded(PortalResourceApi),
70 ResourceRemoved(Address)
71}
72
73enum PortalCall {
74 Request{ request: Request, tx: oneshot::Sender<Response>},
75 Assign(Assign),
76}
77
78#[derive(Clone, Eq, PartialEq, Hash)]
79pub enum PortalStatus {
80 None,
81 Initializing,
82 Ready,
83 Panic(String),
84}
85
86#[derive(Debug,Clone)]
87pub struct PortalInfo {
88 pub portal_key: String
89}
90
91#[derive(Debug)]
92pub struct Portal {
93 pub info: PortalInfo,
94 pub config: PortalConfig,
95 outlet_tx: mpsc::Sender<outlet::Frame>,
96 exchanges: Arc<DashMap<String,oneshot::Sender<Response>>>,
97 pub log: fn(log: Log),
98 tx: mpsc::Sender<PortalCall>,
99 broadcast_tx: broadcast::Sender<PortalEvent>
100}
101
102impl Portal {
103 pub fn new(
104 info: PortalInfo,
105 config: PortalConfig,
106 outlet_tx: mpsc::Sender<outlet::Frame>,
107 request_handler: Arc<dyn PortalRequestHandler>,
108 broadcast_tx: broadcast::Sender<PortalEvent>,
109 logger: fn(log: Log),
110 ) -> (Self,mpsc::Sender<inlet::Frame>) {
111 let (inlet_tx,mut inlet_rx) = mpsc::channel(1024);
112 let exchanges: Arc<DashMap<String,oneshot::Sender<Response>>> = Arc::new( DashMap::new() );
113 let (tx,mut rx) = mpsc::channel(1024);
114 let portal_api = PortalApi {
115 tx: tx.clone(),
116 info: info.clone()
117 };
118 {
119 let config = config.clone();
120 let exchanges = exchanges.clone();
121 let outlet_tx = outlet_tx.clone();
122 let portal_api = portal_api.clone();
123 let broadcast_tx = broadcast_tx.clone();
124 tokio::spawn(async move {
125 while let Some(call) = rx.recv().await {
126 match call {
127 PortalCall::Request { request, tx } => {
128 let exchanges = exchanges.clone();
129 let outlet_tx = outlet_tx.clone();
130 tokio::spawn( async move {
131 exchanges.insert( request.id.clone(), tx );
132 outlet_tx.send( outlet::Frame::Request(request.clone()) ).await;
133 });
134 }
135 PortalCall::Assign(assign) => {
136 let portal_api = portal_api.clone();
137 let assign = Exchanger::new(assign);
138 let stub = assign.stub.clone();
139 outlet_tx.send(outlet::Frame::Assign(assign)).await;
140 let resource_api = PortalResourceApi {
141 stub,
142 portal_api: portal_api
143 };
144 broadcast_tx.send( PortalEvent::ResourceAdded(resource_api));
145 }
146 }
147 }
148 });
149 }
150
151 {
152 let request_handler = request_handler.clone();
153 let info = info.clone();
154 let portal_config = config.clone();
155 let outlet_tx = outlet_tx.clone();
156 let exchanges = exchanges.clone();
157 tokio::spawn(async move {
158 loop {
159 match inlet_rx.recv().await {
160 Some(frame) => {
161 let frame:inlet::Frame = frame;
162 match frame {
163 inlet::Frame::Log(log) => {
164 (logger)(log);
165 }
166 inlet::Frame::AssignRequest(request) => {
167 let result = request_handler.handle_assign_request(request.item.clone() ).await;
168 match result {
169 Ok(assignment) => {
170 let assign = request.with(assignment);
171 outlet_tx.send( outlet::Frame::Assign(assign) ).await;
172 }
173 Err(error) => {
174 println!("{}",error.to_string());
175 }
176 }
177 }
178 inlet::Frame::Request(request) => {
179 let request_handler = request_handler.clone();
180 let outlet_tx = outlet_tx.clone();
181 tokio::spawn(async move {
182 match tokio::time::timeout(Duration::from_secs(portal_config.response_timeout ), request_handler.route_to_mesh(request.clone()) ).await {
183 Ok(response) => {
184 outlet_tx.send( outlet::Frame::Response(response)).await;
185 }
186 _ => {
187 let response = request.fail("timeout".to_string().as_str());
188 outlet_tx.send( outlet::Frame::Response(response)).await;
189 }
190 }
191 });
192 }
193 inlet::Frame::Response(response) => {
194 if let Option::Some((_,mut tx)) = exchanges.remove(&response.response_to) {
195 tx.send(response);
196 } else {
197 (logger)(Log::new( "Portal", "response had no listening request" ));
198 }
199 }
200 inlet::Frame::Artifact(request) => {
201 let request_handler = request_handler.clone();
202 let outlet_tx = outlet_tx.clone();
203 tokio::spawn( async move {
204 match request_handler.handle_artifact_request(request.item.clone()).await {
205 Ok(response ) => {
206 let response = request.with(response);
207 outlet_tx.send( outlet::Frame::Artifact(response)).await;
208 }
209 Err(err) => {
210 (logger)(Log::new( "Portal", err.to_string().as_str() ));
211 }
212 }
213 });
214 }
215 inlet::Frame::Status(_) => {
216 }
218 inlet::Frame::Close(_) => {
219 }
221 }
222 }
223 None => {
224 break;
225 }
226 }
227 }
228 });
229 }
230
231
232 (Self {
233 info,
234 config,
235 log: logger,
236 exchanges,
237 outlet_tx,
238 broadcast_tx,
239 tx
240 },inlet_tx)
241 }
242
243
244 pub fn api(&self) -> PortalApi {
245 PortalApi {
246 info: self.info.clone(),
247 tx: self.tx.clone()
248 }
249 }
250
251 pub async fn handle_request(&self, request: Request ) -> Response {
252 let (tx,rx) = oneshot::channel();
253 self.exchanges.insert( request.id.clone(), tx );
254 self.outlet_tx.send( outlet::Frame::Request(request.clone()) ).await;
255 match tokio::time::timeout(Duration::from_secs(self.config.response_timeout ), rx ).await {
256 Ok(Ok(response)) => {
257 response
258 }
259 _ => {
260 let response = request.fail("timeout".to_string().as_str() );
261 response
262 }
263 }
264 }
265
266 pub fn assign(&self, assign: Assign ) {
267 let outlet_tx = self.outlet_tx.clone();
268 tokio::spawn(async move {
269 let assign = Exchanger::new(assign);
270 outlet_tx.send(outlet::Frame::Assign(assign)).await;
271 });
272 }
273
274 pub fn shutdown(&mut self) {
275 self.outlet_tx
276 .try_send(outlet::Frame::Close(CloseReason::Done))
277 .unwrap_or(());
278 }
279}
280
281#[async_trait]
282pub trait PortalRequestHandler: Send + Sync {
283
284 async fn route_to_mesh(&self, request: Request ) -> Response;
285
286 async fn default_assign(&self) -> Result<Assign, Error> {
287 Err(anyhow!("request handler does not have a default assign"))
288 }
289
290 async fn handle_assign_request(&self, request: AssignRequest ) -> Result<Assign, Error> {
291 Err(anyhow!("request handler does not assign"))
292 }
293
294 async fn handle_artifact_request(
295 &self,
296 request: ArtifactRequest,
297 ) -> Result<ArtifactResponse, Error> {
298 Err(anyhow!("request handler does not handle artifacts"))
299 }
300
301 async fn handle_config_request(
302 &self,
303 request: ArtifactRequest,
304 ) -> Result<ArtifactResponse, Error> {
305 Err(anyhow!("request handler does not handle configs"))
306 }
307}
308
309
310#[derive(Debug,Clone)]
311pub struct PortalResourceApi {
312 portal_api: PortalApi,
313 pub stub: ResourceStub
314}
315
316impl PortalResourceApi {
317 pub async fn handle_request( &self, request: Request ) -> Response {
318 self.portal_api.handle_request(request).await
319 }
320}