use std::collections::{HashMap, HashSet};
use log::*;
use tokio::select;
use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, mpsc::UnboundedReceiver, mpsc::UnboundedSender};
use crate::common::*;
use crate::error::*;
use crate::serializer::*;
use crate::transport::*;
mod recv;
mod send;
use crate::client;
use crate::message::*;
pub use send::Request;
pub enum Status {
Shutdown,
Ok,
}
pub type JoinResult = Sender<
Result<
(
WampId, // Session ID
HashMap<WampString, Arg>, // Server roles
),
WampError,
>,
>;
pub type SubscriptionQueue = UnboundedReceiver<(
WampId, // Publish event ID
Option<WampArgs>, // Publish args
Option<WampKwArgs>,
)>; pub type PendingSubResult = Sender<
Result<
(
WampId, //Subcription ID
SubscriptionQueue, // Queue for incoming events
),
WampError,
>,
>;
pub type PendingRegisterResult = Sender<
Result<
WampId, WampError,
>,
>;
pub type PendingCallResult = Sender<
Result<
(
Option<WampArgs>, // Return args
Option<WampKwArgs>, // Return kwargs
),
WampError,
>,
>;
pub struct Core<'a> {
sock: Box<dyn Transport + Send>,
valid_session: bool,
core_res: UnboundedSender<Result<(), WampError>>,
serializer: Box<dyn SerializerImpl + Send>,
ctl_sender: UnboundedSender<Request<'a>>,
ctl_channel: Option<UnboundedReceiver<Request<'a>>>,
pending_requests: HashSet<WampId>,
pending_transactions: HashMap<WampId, Sender<Result<Option<WampId>, WampError>>>,
pending_sub: HashMap<WampId, PendingSubResult>,
subscriptions: HashMap<WampId, UnboundedSender<(WampId, Option<WampArgs>, Option<WampKwArgs>)>>,
pending_register: HashMap<WampId, (RpcFunc<'a>, PendingRegisterResult)>,
rpc_endpoints: HashMap<WampId, RpcFunc<'a>>,
pub rpc_event_queue_r: Option<UnboundedReceiver<GenericFuture<'a>>>,
rpc_event_queue_w: UnboundedSender<GenericFuture<'a>>,
pending_call: HashMap<WampId, PendingCallResult>,
}
impl<'a> Core<'a> {
pub async fn connect(
uri: &url::Url,
cfg: &client::ClientConfig,
ctl_channel: (UnboundedSender<Request<'a>>, UnboundedReceiver<Request<'a>>),
core_res: UnboundedSender<Result<(), WampError>>,
) -> Result<Core<'a>, WampError> {
let (sock, serializer_type) = match uri.scheme() {
"ws" | "wss" => ws::connect(uri, &cfg).await?,
"tcp" | "tcps" => {
let host_port = match uri.port() {
Some(p) => p,
None => {
return Err(From::from("No port specified for tcp host".to_string()));
}
};
tcp::connect(
uri.host_str().unwrap(),
host_port,
uri.scheme() != "tcp",
&cfg,
)
.await?
}
s => return Err(From::from(format!("Unknown uri scheme : {}", s))),
};
debug!("Connected with serializer : {:?}", serializer_type);
let serializer: Box<dyn SerializerImpl + Send> = match serializer_type {
SerializerType::Json => Box::new(json::JsonSerializer {}),
SerializerType::MsgPack => Box::new(msgpack::MsgPackSerializer {}),
};
let (rpc_event_queue_w, rpc_event_queue_r) = mpsc::unbounded_channel();
Ok(Core {
sock,
core_res,
valid_session: false,
serializer,
ctl_sender: ctl_channel.0,
ctl_channel: Some(ctl_channel.1),
pending_requests: HashSet::new(),
pending_transactions: HashMap::new(),
pending_sub: HashMap::new(),
subscriptions: HashMap::new(),
pending_register: HashMap::new(),
rpc_endpoints: HashMap::new(),
rpc_event_queue_r: Some(rpc_event_queue_r),
rpc_event_queue_w,
pending_call: HashMap::new(),
})
}
pub async fn event_loop(mut self) -> Result<(), WampError> {
let mut ctl_channel = self.ctl_channel.take().unwrap();
let _ = self.core_res.send(Ok(()));
loop {
match select! {
msg = self.recv() => {
match msg {
Err(e) => {
if self.valid_session {
error!("Failed to recv : {:?}", e);
let _ = self.core_res.send(Err(e));
}
break;
},
Ok(m) => self.handle_peer_msg(m).await,
}
},
req = ctl_channel.recv() => {
let req = match req {
Some(r) => r,
None => {
let _ = self.core_res.send(Err(WampError::ClientDied));
break;
}
};
self.handle_local_request(req).await
}
} {
Status::Shutdown => {
let _ = self.core_res.send(Ok(()));
break;
}
Status::Ok => {}
}
}
debug!("Event loop shutting down !");
self.shutdown().await;
Ok(())
}
async fn handle_peer_msg<'b>(&'b mut self, msg: Msg) -> Status
where
'a: 'b,
{
if let Some(ref request) = msg.request_id() {
if !self.pending_requests.remove(request) {
warn!("Peer sent a response to an unknown request : {}", request);
return Status::Ok;
}
}
match msg {
Msg::Subscribed {
request,
subscription,
} => recv::subscribed(self, request, subscription).await,
Msg::Unsubscribed { request } => recv::unsubscribed(self, request).await,
Msg::Published {
request,
publication,
} => recv::published(self, request, publication).await,
Msg::Event {
subscription,
publication,
details,
arguments,
arguments_kw,
} => {
recv::event(
self,
subscription,
publication,
details,
arguments,
arguments_kw,
)
.await
}
Msg::Registered {
request,
registration,
} => recv::registered(self, request, registration).await,
Msg::Unregistered { request } => recv::unregisterd(self, request).await,
Msg::Invocation {
request,
registration,
details,
arguments,
arguments_kw,
} => {
recv::invocation(
self,
request,
registration,
details,
arguments,
arguments_kw,
)
.await
}
Msg::Result {
request,
details,
arguments,
arguments_kw,
} => recv::call_result(self, request, details, arguments, arguments_kw).await,
Msg::Goodbye { details, reason } => recv::goodbye(self, details, reason).await,
Msg::Abort { details, reason } => recv::abort(self, details, reason).await,
Msg::Error {
typ,
request,
details,
error,
arguments,
arguments_kw,
} => recv::error(self, typ, request, details, error, arguments, arguments_kw).await,
_ => {
warn!("Recevied unhandled message {:?}", msg);
Status::Ok
}
}
}
async fn handle_local_request(&mut self, req: Request<'a>) -> Status {
match req {
Request::Shutdown => Status::Shutdown,
Request::Join {
uri,
roles,
agent_str,
authentication_methods,
authentication_id,
on_challenge_handler,
res,
} => {
send::join_realm(
self,
uri,
roles,
agent_str,
authentication_methods,
authentication_id,
on_challenge_handler,
res,
)
.await
}
Request::Leave { res } => send::leave_realm(self, res).await,
Request::Subscribe { uri, res } => send::subscribe(self, uri, res).await,
Request::Unsubscribe { sub_id, res } => send::unsubscribe(self, sub_id, res).await,
Request::Publish {
uri,
options,
arguments,
arguments_kw,
res,
} => send::publish(self, uri, options, arguments, arguments_kw, res).await,
Request::Register { uri, res, func_ptr } => {
send::register(self, uri, res, func_ptr).await
}
Request::Unregister { rpc_id, res } => send::unregister(self, rpc_id, res).await,
Request::InvocationResult { request, res } => {
send::invoke_yield(self, request, res).await
}
Request::Call {
uri,
options,
arguments,
arguments_kw,
res,
} => send::call(self, uri, options, arguments, arguments_kw, res).await,
}
}
pub async fn send(&mut self, msg: &Msg) -> Result<(), WampError> {
let payload = self.serializer.pack(msg)?;
match std::str::from_utf8(&payload) {
Ok(v) => debug!("Send : {}", v),
Err(_) => debug!("Send : {:?}", msg),
};
self.sock.send(&payload).await?;
Ok(())
}
pub async fn recv<'b>(&'b mut self) -> Result<Msg, WampError>
where
'a: 'b,
{
let payload = self.sock.recv().await?;
let msg = self.serializer.unpack(&payload);
match std::str::from_utf8(&payload) {
Ok(v) => debug!("Recv : {}", v),
Err(_) => debug!("Recv : {:?}", msg),
};
Ok(msg?)
}
pub async fn shutdown(mut self) {
self.sock.close().await;
}
fn create_request(&mut self) -> WampId {
let mut request = WampId::generate();
while !self.pending_requests.insert(request) {
request = WampId::generate();
}
request
}
}