1use std::collections::{HashMap, HashSet};
2use std::future::Future;
3use futures::FutureExt;
4
5use log::*;
6use tokio::sync::oneshot;
7use tokio::sync::{
8 mpsc, mpsc::UnboundedReceiver, mpsc::UnboundedSender,
9};
10use url::*;
11
12pub use crate::common::*;
13use crate::core::*;
14use crate::error::*;
15use crate::serializer::SerializerType;
16
17pub struct ClientConfig {
19 agent: String,
21 roles: HashSet<ClientRole>,
23 serializers: Vec<SerializerType>,
25 max_msg_size: u32,
27 ssl_verify: bool,
29 websocket_headers: HashMap<String, String>,
31}
32
33impl Default for ClientConfig {
34 fn default() -> Self {
46 ClientConfig {
48 agent: String::from(DEFAULT_AGENT_STR),
49 roles: [
50 ClientRole::Caller,
51 ClientRole::Callee,
52 ClientRole::Publisher,
53 ClientRole::Subscriber,
54 ]
55 .iter()
56 .cloned()
57 .collect(),
58 serializers: vec![SerializerType::Json, SerializerType::MsgPack],
59 max_msg_size: 0,
60 ssl_verify: true,
61 websocket_headers: HashMap::new(),
62 }
63 }
64}
65
66impl ClientConfig {
67 pub fn set_agent<T: AsRef<str>>(mut self, agent: T) -> Self {
69 self.agent = String::from(agent.as_ref());
70 self
71 }
72 pub fn get_agent(&self) -> &str {
74 &self.agent
75 }
76
77 pub fn set_max_msg_size(mut self, msg_size: u32) -> Self {
80 self.max_msg_size = msg_size;
81 self
82 }
83 pub fn get_max_msg_size(&self) -> Option<u32> {
85 if self.max_msg_size == 0 {
86 None
87 } else {
88 Some(self.max_msg_size)
89 }
90 }
91
92 pub fn set_serializers(mut self, serializers: Vec<SerializerType>) -> Self {
94 self.serializers = serializers;
95 self
96 }
97 pub fn get_serializers(&self) -> &Vec<SerializerType> {
99 &self.serializers
100 }
101
102 pub fn set_roles(mut self, roles: Vec<ClientRole>) -> Self {
104 self.roles.drain();
105 for role in roles {
106 self.roles.insert(role);
107 }
108 self
109 }
110
111 pub fn set_ssl_verify(mut self, val: bool) -> Self {
113 self.ssl_verify = val;
114 self
115 }
116 pub fn get_ssl_verify(&self) -> bool {
118 self.ssl_verify
119 }
120
121 pub fn add_websocket_header(mut self, key: String, val: String) -> Self {
122 self.websocket_headers.insert(key, val);
123 self
124 }
125 pub fn get_websocket_headers(&self) -> &HashMap<String, String> {
126 &self.websocket_headers
127 }
128}
129
130pub struct Client<'a> {
132 config: ClientConfig,
134 core_res: UnboundedReceiver<Result<(), WampError>>,
136 core_status: ClientState,
137 server_roles: HashSet<String>,
139 session_id: Option<WampId>,
141 ctl_channel: UnboundedSender<Request<'a>>,
143}
144
145pub enum ClientState {
147 NoEventLoop,
149 Running,
151 Disconnected(Result<(), WampError>),
153}
154
155impl<'a> Client<'a> {
156 pub async fn connect<T: AsRef<str>>(
167 uri: T,
168 cfg: Option<ClientConfig>,
169 ) -> Result<
170 (
171 Client<'a>,
172 (
173 GenericFuture<'a>,
174 Option<UnboundedReceiver<GenericFuture<'a>>>,
175 ),
176 ),
177 WampError,
178 > {
179 let uri = match Url::parse(uri.as_ref()) {
180 Ok(u) => u,
181 Err(e) => return Err(WampError::InvalidUri(e)),
182 };
183
184 let config = match cfg {
185 Some(c) => c,
186 None => ClientConfig::default(),
188 };
189
190 let (ctl_channel, ctl_receiver) = mpsc::unbounded_channel();
191 let (core_res_w, core_res) = mpsc::unbounded_channel();
192
193 let ctl_sender = ctl_channel.clone();
194 let mut conn = Core::connect(&uri, &config, (ctl_sender, ctl_receiver), core_res_w).await?;
196
197 let rpc_evt_queue = if config.roles.contains(&ClientRole::Callee) {
198 conn.rpc_event_queue_r.take()
199 } else {
200 None
201 };
202
203 Ok((
204 Client {
205 config,
206 server_roles: HashSet::new(),
207 session_id: None,
208 ctl_channel,
209 core_res,
210 core_status: ClientState::NoEventLoop,
211 },
212 (Box::pin(conn.event_loop()), rpc_evt_queue),
213 ))
214 }
215
216 async fn inner_join_realm(
220 &mut self,
221 realm: String,
222 authentication_methods: Vec<AuthenticationMethod>,
223 authentication_id: Option<String>,
224 on_challenge_handler: Option<AuthenticationChallengeHandler<'a>>,
225 ) -> Result<(), WampError> {
226 if let ClientState::NoEventLoop = self.get_cur_status() {
228 debug!("Called join_realm() before th event loop is ready... Waiting...");
229 self.wait_for_status_change().await;
230 }
231
232 if !self.is_connected() {
234 return Err(From::from(
235 "The client is currently not connected".to_string(),
236 ));
237 }
238
239 if self.session_id.is_some() {
241 return Err(From::from(format!(
242 "join_realm('{}') : Client already joined to a realm",
243 realm
244 )));
245 }
246
247 let (res_sender, res) = oneshot::channel();
249 if let Err(e) = self.ctl_channel.send(Request::Join {
250 uri: realm,
251 roles: self.config.roles.clone(),
252 agent_str: if self.config.agent.is_empty() {
253 Some(self.config.agent.clone())
254 } else {
255 None
256 },
257 authentication_methods,
258 authentication_id,
259 on_challenge_handler,
260 res: res_sender,
261 }) {
262 return Err(From::from(format!(
263 "Core never received our request : {}",
264 e
265 )));
266 }
267
268 let (session_id, mut server_roles) = match res.await {
270 Ok(r) => r?,
271 Err(e) => {
272 return Err(From::from(format!(
273 "Core never returned a response : {}",
274 e
275 )))
276 }
277 };
278
279 self.server_roles.drain();
281 for (role, _) in server_roles.drain().take(1) {
282 self.server_roles.insert(role);
283 }
284
285 self.session_id = Some(session_id);
287 debug!("Connected with session_id {} !", session_id);
288
289 Ok(())
290 }
291
292 pub async fn join_realm<T: Into<String>>(&mut self, realm: T) -> Result<(), WampError> {
296 self.inner_join_realm(realm.into(), vec![], None, None)
297 .await
298 }
299
300 pub async fn join_realm_with_authentication<
323 Realm,
324 AuthenticationId,
325 AuthenticationChallengeHandler,
326 AuthenticationChallengeHandlerResponse,
327 >(
328 &mut self,
329 realm: Realm,
330 authentication_methods: Vec<AuthenticationMethod>,
331 authentication_id: AuthenticationId,
332 on_challenge_handler: AuthenticationChallengeHandler,
333 ) -> Result<(), WampError>
334 where
335 Realm: Into<String>,
336 AuthenticationId: Into<String>,
337 AuthenticationChallengeHandler: Fn(AuthenticationMethod, WampDict) -> AuthenticationChallengeHandlerResponse
338 + Send
339 + Sync
340 + 'a,
341 AuthenticationChallengeHandlerResponse: std::future::Future<Output = Result<AuthenticationChallengeResponse, WampError>>
342 + Send
343 + 'a,
344 {
345 self.inner_join_realm(
346 realm.into(),
347 authentication_methods,
348 Some(authentication_id.into()),
349 Some(Box::new(move |authentication_method, extra| {
350 Box::pin(on_challenge_handler(authentication_method, extra))
351 })),
352 )
353 .await
354 }
355
356 pub async fn leave_realm(&mut self) -> Result<(), WampError> {
358 if !self.is_connected() {
360 return Err(From::from(
361 "The client is currently not connected".to_string(),
362 ));
363 }
364
365 if self.session_id.take().is_none() {
367 return Ok(());
368 }
369
370 let (res, result) = oneshot::channel();
372 if let Err(e) = self.ctl_channel.send(Request::Leave { res }) {
373 return Err(From::from(format!(
374 "Core never received our request : {}",
375 e
376 )));
377 }
378
379 match result.await {
381 Ok(r) => r?,
382 Err(e) => {
383 return Err(From::from(format!(
384 "Core never returned a response : {}",
385 e
386 )))
387 }
388 };
389
390 Ok(())
391 }
392
393 pub async fn subscribe<T: AsRef<str>>(
398 &self,
399 topic: T,
400 ) -> Result<(WampId, SubscriptionQueue), WampError> {
401 let (res, result) = oneshot::channel();
403 if let Err(e) = self.ctl_channel.send(Request::Subscribe {
404 uri: topic.as_ref().to_string(),
405 res,
406 }) {
407 return Err(From::from(format!(
408 "Core never received our request : {}",
409 e
410 )));
411 }
412
413 let (sub_id, evt_queue) = match result.await {
415 Ok(r) => r?,
416 Err(e) => {
417 return Err(From::from(format!(
418 "Core never returned a response : {}",
419 e
420 )))
421 }
422 };
423
424 Ok((sub_id, evt_queue))
425 }
426
427 pub async fn unsubscribe(&self, sub_id: WampId) -> Result<(), WampError> {
429 let (res, result) = oneshot::channel();
431 if let Err(e) = self.ctl_channel.send(Request::Unsubscribe { sub_id, res }) {
432 return Err(From::from(format!(
433 "Core never received our request : {}",
434 e
435 )));
436 }
437
438 match result.await {
440 Ok(r) => r?,
441 Err(e) => {
442 return Err(From::from(format!(
443 "Core never returned a response : {}",
444 e
445 )))
446 }
447 };
448
449 Ok(())
450 }
451
452 pub async fn publish<T: AsRef<str>>(
457 &self,
458 topic: T,
459 arguments: Option<WampArgs>,
460 arguments_kw: Option<WampKwArgs>,
461 acknowledge: bool,
462 ) -> Result<Option<WampId>, WampError> {
463 let mut options = WampDict::new();
464
465 if acknowledge {
466 options.insert("acknowledge".to_string(), Arg::Bool(true));
467 }
468 let (res, result) = oneshot::channel();
470 if let Err(e) = self.ctl_channel.send(Request::Publish {
471 uri: topic.as_ref().to_string(),
472 options,
473 arguments,
474 arguments_kw,
475 res,
476 }) {
477 return Err(From::from(format!(
478 "Core never received our request : {}",
479 e
480 )));
481 }
482
483 let pub_id = if acknowledge {
484 Some(match result.await {
486 Ok(Ok(r)) => r.unwrap(),
487 Ok(Err(e)) => return Err(From::from(format!("Failed to send publish : {}", e))),
488 Err(e) => {
489 return Err(From::from(format!(
490 "Core never returned a response : {}",
491 e
492 )))
493 }
494 })
495 } else {
496 None
497 };
498 Ok(pub_id)
499 }
500
501 pub async fn register<T, F, Fut>(&self, uri: T, func_ptr: F) -> Result<WampId, WampError>
505 where
506 T: AsRef<str>,
507 F: Fn(Option<WampArgs>, Option<WampKwArgs>) -> Fut + Send + Sync + 'a,
508 Fut: Future<Output = Result<(Option<WampArgs>, Option<WampKwArgs>), WampError>> + Send + 'a,
509 {
510 let (res, result) = oneshot::channel();
512 if let Err(e) = self.ctl_channel.send(Request::Register {
513 uri: uri.as_ref().to_string(),
514 res,
515 func_ptr: Box::new(move |a, k| Box::pin(func_ptr(a, k))),
516 }) {
517 return Err(From::from(format!(
518 "Core never received our request : {}",
519 e
520 )));
521 }
522
523 let rpc_id = match result.await {
525 Ok(r) => r?,
526 Err(e) => {
527 return Err(From::from(format!(
528 "Core never returned a response : {}",
529 e
530 )))
531 }
532 };
533
534 Ok(rpc_id)
535 }
536
537 pub async fn unregister(&self, rpc_id: WampId) -> Result<(), WampError> {
539 let (res, result) = oneshot::channel();
541 if let Err(e) = self.ctl_channel.send(Request::Unregister { rpc_id, res }) {
542 return Err(From::from(format!(
543 "Core never received our request : {}",
544 e
545 )));
546 }
547
548 match result.await {
550 Ok(r) => r?,
551 Err(e) => {
552 return Err(From::from(format!(
553 "Core never returned a response : {}",
554 e
555 )))
556 }
557 };
558
559 Ok(())
560 }
561
562 pub async fn call<T: AsRef<str>>(
564 &self,
565 uri: T,
566 arguments: Option<WampArgs>,
567 arguments_kw: Option<WampKwArgs>,
568 ) -> Result<(Option<WampArgs>, Option<WampKwArgs>), WampError> {
569 let (res, result) = oneshot::channel();
571 if let Err(e) = self.ctl_channel.send(Request::Call {
572 uri: uri.as_ref().to_string(),
573 options: WampDict::new(),
574 arguments,
575 arguments_kw,
576 res,
577 }) {
578 return Err(From::from(format!(
579 "Core never received our request : {}",
580 e
581 )));
582 }
583
584 match result.await {
586 Ok(r) => r,
587 Err(e) => Err(From::from(format!(
588 "Core never returned a response : {}",
589 e
590 ))),
591 }
592 }
593
594 pub fn get_cur_status(&mut self) -> &ClientState {
596 let new_status = self.core_res.recv().now_or_never();
598 #[allow(clippy::match_wild_err_arm)]
599 match new_status {
600 Some(Some(state)) => self.set_next_status(state),
601 None => &self.core_status,
602 Some(None) => panic!("The event loop died without sending a new status"),
603 }
604 }
605
606 pub fn is_connected(&mut self) -> bool {
608 match self.get_cur_status() {
609 ClientState::Running => true,
610 _ => false,
611 }
612 }
613
614 fn set_next_status(&mut self, new_status: Result<(), WampError>) -> &ClientState {
615 if new_status.is_err() {
617 self.core_status = ClientState::Disconnected(new_status);
618 return &self.core_status;
619 }
620
621 match self.core_status {
623 ClientState::NoEventLoop => {
624 self.core_status = ClientState::Running;
625 }
626 ClientState::Running => {
627 self.core_status = ClientState::Disconnected(new_status);
628 }
629 ClientState::Disconnected(_) => {
630 panic!("Got new core status after already being disconnected");
631 }
632 }
633
634 &self.core_status
635 }
636
637 async fn wait_for_status_change(&mut self) -> &ClientState {
640 if let ClientState::Disconnected(ref _r) = self.core_status {
642 return &self.core_status;
643 }
644
645 let new_status = match self.core_res.recv().await {
647 Some(v) => v,
648 None => {
649 panic!("The event loop died without sending a new status");
650 }
651 };
652
653 self.set_next_status(new_status)
655 }
656
657 pub async fn block_until_disconnect(&mut self) -> &ClientState {
659 let mut cur_status = self.get_cur_status();
660 loop {
661 match cur_status {
662 ClientState::Disconnected(_) => break,
663 _ => {
664 cur_status = self.wait_for_status_change().await;
666 }
667 }
668 }
669
670 &self.core_status
671 }
672
673 pub async fn disconnect(mut self) {
675 if self.is_connected() {
676 let _ = self.leave_realm().await;
678 let _ = self.ctl_channel.send(Request::Shutdown);
680
681 match self.core_res.recv().await {
683 Some(Err(e)) => error!("Error while shutting down : {:?}", e),
684 None => error!("Core never sent a status after shutting down..."),
685 _ => {}
686 }
687 }
688 }
689}