use std::collections::{HashMap, HashSet};
use std::future::Future;
use futures::FutureExt;
use log::*;
use tokio::sync::oneshot;
use tokio::sync::{
mpsc, mpsc::UnboundedReceiver, mpsc::UnboundedSender,
};
use url::*;
pub use crate::common::*;
use crate::core::*;
use crate::error::*;
use crate::serializer::SerializerType;
pub struct ClientConfig {
agent: String,
roles: HashSet<ClientRole>,
serializers: Vec<SerializerType>,
max_msg_size: u32,
ssl_verify: bool,
websocket_headers: HashMap<String, String>,
}
impl Default for ClientConfig {
fn default() -> Self {
ClientConfig {
agent: String::from(DEFAULT_AGENT_STR),
roles: [
ClientRole::Caller,
ClientRole::Callee,
ClientRole::Publisher,
ClientRole::Subscriber,
]
.iter()
.cloned()
.collect(),
serializers: vec![SerializerType::Json, SerializerType::MsgPack],
max_msg_size: 0,
ssl_verify: true,
websocket_headers: HashMap::new(),
}
}
}
impl ClientConfig {
pub fn set_agent<T: AsRef<str>>(mut self, agent: T) -> Self {
self.agent = String::from(agent.as_ref());
self
}
pub fn get_agent(&self) -> &str {
&self.agent
}
pub fn set_max_msg_size(mut self, msg_size: u32) -> Self {
self.max_msg_size = msg_size;
self
}
pub fn get_max_msg_size(&self) -> Option<u32> {
if self.max_msg_size == 0 {
None
} else {
Some(self.max_msg_size)
}
}
pub fn set_serializers(mut self, serializers: Vec<SerializerType>) -> Self {
self.serializers = serializers;
self
}
pub fn get_serializers(&self) -> &Vec<SerializerType> {
&self.serializers
}
pub fn set_roles(mut self, roles: Vec<ClientRole>) -> Self {
self.roles.drain();
for role in roles {
self.roles.insert(role);
}
self
}
pub fn set_ssl_verify(mut self, val: bool) -> Self {
self.ssl_verify = val;
self
}
pub fn get_ssl_verify(&self) -> bool {
self.ssl_verify
}
pub fn add_websocket_header(mut self, key: String, val: String) -> Self {
self.websocket_headers.insert(key, val);
self
}
pub fn get_websocket_headers(&self) -> &HashMap<String, String> {
&self.websocket_headers
}
}
pub struct Client<'a> {
config: ClientConfig,
core_res: UnboundedReceiver<Result<(), WampError>>,
core_status: ClientState,
server_roles: HashSet<String>,
session_id: Option<WampId>,
ctl_channel: UnboundedSender<Request<'a>>,
}
pub enum ClientState {
NoEventLoop,
Running,
Disconnected(Result<(), WampError>),
}
impl<'a> Client<'a> {
pub async fn connect<T: AsRef<str>>(
uri: T,
cfg: Option<ClientConfig>,
) -> Result<
(
Client<'a>,
(
GenericFuture<'a>,
Option<UnboundedReceiver<GenericFuture<'a>>>,
),
),
WampError,
> {
let uri = match Url::parse(uri.as_ref()) {
Ok(u) => u,
Err(e) => return Err(WampError::InvalidUri(e)),
};
let config = match cfg {
Some(c) => c,
None => ClientConfig::default(),
};
let (ctl_channel, ctl_receiver) = mpsc::unbounded_channel();
let (core_res_w, core_res) = mpsc::unbounded_channel();
let ctl_sender = ctl_channel.clone();
let mut conn = Core::connect(&uri, &config, (ctl_sender, ctl_receiver), core_res_w).await?;
let rpc_evt_queue = if config.roles.contains(&ClientRole::Callee) {
conn.rpc_event_queue_r.take()
} else {
None
};
Ok((
Client {
config,
server_roles: HashSet::new(),
session_id: None,
ctl_channel,
core_res,
core_status: ClientState::NoEventLoop,
},
(Box::pin(conn.event_loop()), rpc_evt_queue),
))
}
async fn inner_join_realm(
&mut self,
realm: String,
authentication_methods: Vec<AuthenticationMethod>,
authentication_id: Option<String>,
on_challenge_handler: Option<AuthenticationChallengeHandler<'a>>,
) -> Result<(), WampError> {
if let ClientState::NoEventLoop = self.get_cur_status() {
debug!("Called join_realm() before th event loop is ready... Waiting...");
self.wait_for_status_change().await;
}
if !self.is_connected() {
return Err(From::from(
"The client is currently not connected".to_string(),
));
}
if self.session_id.is_some() {
return Err(From::from(format!(
"join_realm('{}') : Client already joined to a realm",
realm
)));
}
let (res_sender, res) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Join {
uri: realm,
roles: self.config.roles.clone(),
agent_str: if self.config.agent.is_empty() {
Some(self.config.agent.clone())
} else {
None
},
authentication_methods,
authentication_id,
on_challenge_handler,
res: res_sender,
}) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
let (session_id, mut server_roles) = match res.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
self.server_roles.drain();
for (role, _) in server_roles.drain().take(1) {
self.server_roles.insert(role);
}
self.session_id = Some(session_id);
debug!("Connected with session_id {} !", session_id);
Ok(())
}
pub async fn join_realm<T: Into<String>>(&mut self, realm: T) -> Result<(), WampError> {
self.inner_join_realm(realm.into(), vec![], None, None)
.await
}
pub async fn join_realm_with_authentication<
Realm,
AuthenticationId,
AuthenticationChallengeHandler,
AuthenticationChallengeHandlerResponse,
>(
&mut self,
realm: Realm,
authentication_methods: Vec<AuthenticationMethod>,
authentication_id: AuthenticationId,
on_challenge_handler: AuthenticationChallengeHandler,
) -> Result<(), WampError>
where
Realm: Into<String>,
AuthenticationId: Into<String>,
AuthenticationChallengeHandler: Fn(AuthenticationMethod, WampDict) -> AuthenticationChallengeHandlerResponse
+ Send
+ Sync
+ 'a,
AuthenticationChallengeHandlerResponse: std::future::Future<Output = Result<AuthenticationChallengeResponse, WampError>>
+ Send
+ 'a,
{
self.inner_join_realm(
realm.into(),
authentication_methods,
Some(authentication_id.into()),
Some(Box::new(move |authentication_method, extra| {
Box::pin(on_challenge_handler(authentication_method, extra))
})),
)
.await
}
pub async fn leave_realm(&mut self) -> Result<(), WampError> {
if !self.is_connected() {
return Err(From::from(
"The client is currently not connected".to_string(),
));
}
if self.session_id.take().is_none() {
return Ok(());
}
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Leave { res }) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
match result.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
Ok(())
}
pub async fn subscribe<T: AsRef<str>>(
&self,
topic: T,
) -> Result<(WampId, SubscriptionQueue), WampError> {
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Subscribe {
uri: topic.as_ref().to_string(),
res,
}) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
let (sub_id, evt_queue) = match result.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
Ok((sub_id, evt_queue))
}
pub async fn unsubscribe(&self, sub_id: WampId) -> Result<(), WampError> {
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Unsubscribe { sub_id, res }) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
match result.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
Ok(())
}
pub async fn publish<T: AsRef<str>>(
&self,
topic: T,
arguments: Option<WampArgs>,
arguments_kw: Option<WampKwArgs>,
acknowledge: bool,
) -> Result<Option<WampId>, WampError> {
let mut options = WampDict::new();
if acknowledge {
options.insert("acknowledge".to_string(), Arg::Bool(true));
}
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Publish {
uri: topic.as_ref().to_string(),
options,
arguments,
arguments_kw,
res,
}) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
let pub_id = if acknowledge {
Some(match result.await {
Ok(Ok(r)) => r.unwrap(),
Ok(Err(e)) => return Err(From::from(format!("Failed to send publish : {}", e))),
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
})
} else {
None
};
Ok(pub_id)
}
pub async fn register<T, F, Fut>(&self, uri: T, func_ptr: F) -> Result<WampId, WampError>
where
T: AsRef<str>,
F: Fn(Option<WampArgs>, Option<WampKwArgs>) -> Fut + Send + Sync + 'a,
Fut: Future<Output = Result<(Option<WampArgs>, Option<WampKwArgs>), WampError>> + Send + 'a,
{
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Register {
uri: uri.as_ref().to_string(),
res,
func_ptr: Box::new(move |a, k| Box::pin(func_ptr(a, k))),
}) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
let rpc_id = match result.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
Ok(rpc_id)
}
pub async fn unregister(&self, rpc_id: WampId) -> Result<(), WampError> {
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Unregister { rpc_id, res }) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
match result.await {
Ok(r) => r?,
Err(e) => {
return Err(From::from(format!(
"Core never returned a response : {}",
e
)))
}
};
Ok(())
}
pub async fn call<T: AsRef<str>>(
&self,
uri: T,
arguments: Option<WampArgs>,
arguments_kw: Option<WampKwArgs>,
) -> Result<(Option<WampArgs>, Option<WampKwArgs>), WampError> {
let (res, result) = oneshot::channel();
if let Err(e) = self.ctl_channel.send(Request::Call {
uri: uri.as_ref().to_string(),
options: WampDict::new(),
arguments,
arguments_kw,
res,
}) {
return Err(From::from(format!(
"Core never received our request : {}",
e
)));
}
match result.await {
Ok(r) => r,
Err(e) => Err(From::from(format!(
"Core never returned a response : {}",
e
))),
}
}
pub fn get_cur_status(&mut self) -> &ClientState {
let new_status = self.core_res.recv().now_or_never();
#[allow(clippy::match_wild_err_arm)]
match new_status {
Some(Some(state)) => self.set_next_status(state),
None => &self.core_status,
Some(None) => panic!("The event loop died without sending a new status"),
}
}
pub fn is_connected(&mut self) -> bool {
match self.get_cur_status() {
ClientState::Running => true,
_ => false,
}
}
fn set_next_status(&mut self, new_status: Result<(), WampError>) -> &ClientState {
if new_status.is_err() {
self.core_status = ClientState::Disconnected(new_status);
return &self.core_status;
}
match self.core_status {
ClientState::NoEventLoop => {
self.core_status = ClientState::Running;
}
ClientState::Running => {
self.core_status = ClientState::Disconnected(new_status);
}
ClientState::Disconnected(_) => {
panic!("Got new core status after already being disconnected");
}
}
&self.core_status
}
async fn wait_for_status_change(&mut self) -> &ClientState {
if let ClientState::Disconnected(ref _r) = self.core_status {
return &self.core_status;
}
let new_status = match self.core_res.recv().await {
Some(v) => v,
None => {
panic!("The event loop died without sending a new status");
}
};
self.set_next_status(new_status)
}
pub async fn block_until_disconnect(&mut self) -> &ClientState {
let mut cur_status = self.get_cur_status();
loop {
match cur_status {
ClientState::Disconnected(_) => break,
_ => {
cur_status = self.wait_for_status_change().await;
}
}
}
&self.core_status
}
pub async fn disconnect(mut self) {
if self.is_connected() {
let _ = self.leave_realm().await;
let _ = self.ctl_channel.send(Request::Shutdown);
match self.core_res.recv().await {
Some(Err(e)) => error!("Error while shutting down : {:?}", e),
None => error!("Core never sent a status after shutting down..."),
_ => {}
}
}
}
}