wamp_async/
client.rs

1use std::collections::{HashMap, HashSet};
2use std::future::Future;
3use futures::FutureExt;
4
5use log::*;
6use tokio::sync::oneshot;
7use tokio::sync::{
8    mpsc, mpsc::UnboundedReceiver, mpsc::UnboundedSender,
9};
10use url::*;
11
12pub use crate::common::*;
13use crate::core::*;
14use crate::error::*;
15use crate::serializer::SerializerType;
16
17/// Options one can set when connecting to a WAMP server
18pub struct ClientConfig {
19    /// Replaces the default user agent string
20    agent: String,
21    /// A Set of all the roles the client will support
22    roles: HashSet<ClientRole>,
23    /// A priority list of which serializer to use when talking to the server
24    serializers: Vec<SerializerType>,
25    /// Sets the maximum message to be sent over the transport
26    max_msg_size: u32,
27    /// When using a secure transport, this option disables certificate validation
28    ssl_verify: bool,
29    /// Additional WebSocket headers on establish connection
30    websocket_headers: HashMap<String, String>,
31}
32
33impl Default for ClientConfig {
34    /// Creates a client config with reasonnable defaults
35    ///
36    /// Roles :
37    /// - [ClientRole::Caller](enum.ClientRole.html#variant.Caller)
38    /// - [ClientRole::Callee](enum.ClientRole.html#variant.Callee)
39    /// - [ClientRole::Publisher](enum.ClientRole.html#variant.Publisher)
40    /// - [ClientRole::Subscriber](enum.ClientRole.html#variant.Subscriber)
41    ///
42    /// Serializers :
43    /// 1. [SerializerType::Json](enum.SerializerType.html#variant.Json)
44    /// 2. [SerializerType::MsgPack](enum.SerializerType.html#variant.MsgPack)
45    fn default() -> Self {
46        // Config with default values
47        ClientConfig {
48            agent: String::from(DEFAULT_AGENT_STR),
49            roles: [
50                ClientRole::Caller,
51                ClientRole::Callee,
52                ClientRole::Publisher,
53                ClientRole::Subscriber,
54            ]
55            .iter()
56            .cloned()
57            .collect(),
58            serializers: vec![SerializerType::Json, SerializerType::MsgPack],
59            max_msg_size: 0,
60            ssl_verify: true,
61            websocket_headers: HashMap::new(),
62        }
63    }
64}
65
66impl ClientConfig {
67    /// Replaces the default user agent string. Set to a zero length string to disable
68    pub fn set_agent<T: AsRef<str>>(mut self, agent: T) -> Self {
69        self.agent = String::from(agent.as_ref());
70        self
71    }
72    /// Returns the currently set agent string
73    pub fn get_agent(&self) -> &str {
74        &self.agent
75    }
76
77    /// Sets the maximum payload size which can be sent over the transport
78    /// Set to 0 to use default
79    pub fn set_max_msg_size(mut self, msg_size: u32) -> Self {
80        self.max_msg_size = msg_size;
81        self
82    }
83    /// Returns the maximum message size for the transport
84    pub fn get_max_msg_size(&self) -> Option<u32> {
85        if self.max_msg_size == 0 {
86            None
87        } else {
88            Some(self.max_msg_size)
89        }
90    }
91
92    /// Sets the serializers that will be used in order of preference (serializers[0] will be attempted first)
93    pub fn set_serializers(mut self, serializers: Vec<SerializerType>) -> Self {
94        self.serializers = serializers;
95        self
96    }
97    /// Returns the priority list of serializers
98    pub fn get_serializers(&self) -> &Vec<SerializerType> {
99        &self.serializers
100    }
101
102    /// Sets the roles that are intended to be used by the client
103    pub fn set_roles(mut self, roles: Vec<ClientRole>) -> Self {
104        self.roles.drain();
105        for role in roles {
106            self.roles.insert(role);
107        }
108        self
109    }
110
111    /// Enables (default) or disables TLS certificate validation
112    pub fn set_ssl_verify(mut self, val: bool) -> Self {
113        self.ssl_verify = val;
114        self
115    }
116    /// Returns whether certificate validation is enabled
117    pub fn get_ssl_verify(&self) -> bool {
118        self.ssl_verify
119    }
120
121    pub fn add_websocket_header(mut self, key: String, val: String) -> Self {
122        self.websocket_headers.insert(key, val);
123        self
124    }
125    pub fn get_websocket_headers(&self) -> &HashMap<String, String> {
126        &self.websocket_headers
127    }
128}
129
130/// Allows interaction as a client with a WAMP server
131pub struct Client<'a> {
132    /// Configuration struct used to customize the client
133    config: ClientConfig,
134    /// Generic transport
135    core_res: UnboundedReceiver<Result<(), WampError>>,
136    core_status: ClientState,
137    /// Roles supported by the server
138    server_roles: HashSet<String>,
139    /// Current Session ID
140    session_id: Option<WampId>,
141    /// Channel to send requests to the event loop
142    ctl_channel: UnboundedSender<Request<'a>>,
143}
144
145/// All the states a client can be in
146pub enum ClientState {
147    /// The event loop hasnt been spawned yet
148    NoEventLoop,
149    /// Currently running and connected to a server
150    Running,
151    /// Disconnected from a server
152    Disconnected(Result<(), WampError>),
153}
154
155impl<'a> Client<'a> {
156    /// Connects to a WAMP server using the specified protocol
157    ///
158    /// __Note__
159    ///
160    /// On success, this function returns :
161    /// -  Client : Used to interact with the server
162    /// -  Main event loop Future : __This MUST be spawned by the caller__ (e.g using tokio::spawn())
163    /// -  RPC event queue : If you register RPC endpoints, you MUST spawn a seperate task to also handle these events
164    ///
165    /// To customize parmeters used for the connection, see the [ClientConfig](struct.ClientConfig.html) struct
166    pub async fn connect<T: AsRef<str>>(
167        uri: T,
168        cfg: Option<ClientConfig>,
169    ) -> Result<
170        (
171            Client<'a>,
172            (
173                GenericFuture<'a>,
174                Option<UnboundedReceiver<GenericFuture<'a>>>,
175            ),
176        ),
177        WampError,
178    > {
179        let uri = match Url::parse(uri.as_ref()) {
180            Ok(u) => u,
181            Err(e) => return Err(WampError::InvalidUri(e)),
182        };
183
184        let config = match cfg {
185            Some(c) => c,
186            // Set defaults
187            None => ClientConfig::default(),
188        };
189
190        let (ctl_channel, ctl_receiver) = mpsc::unbounded_channel();
191        let (core_res_w, core_res) = mpsc::unbounded_channel();
192
193        let ctl_sender = ctl_channel.clone();
194        // Establish a connection
195        let mut conn = Core::connect(&uri, &config, (ctl_sender, ctl_receiver), core_res_w).await?;
196
197        let rpc_evt_queue = if config.roles.contains(&ClientRole::Callee) {
198            conn.rpc_event_queue_r.take()
199        } else {
200            None
201        };
202
203        Ok((
204            Client {
205                config,
206                server_roles: HashSet::new(),
207                session_id: None,
208                ctl_channel,
209                core_res,
210                core_status: ClientState::NoEventLoop,
211            },
212            (Box::pin(conn.event_loop()), rpc_evt_queue),
213        ))
214    }
215
216    /// Attempts to join a realm and start a session with the server.
217    ///
218    /// See [`join_realm_with_authentication`] method for more details.
219    async fn inner_join_realm(
220        &mut self,
221        realm: String,
222        authentication_methods: Vec<AuthenticationMethod>,
223        authentication_id: Option<String>,
224        on_challenge_handler: Option<AuthenticationChallengeHandler<'a>>,
225    ) -> Result<(), WampError> {
226        // Make sure the event loop is ready to process requests
227        if let ClientState::NoEventLoop = self.get_cur_status() {
228            debug!("Called join_realm() before th event loop is ready... Waiting...");
229            self.wait_for_status_change().await;
230        }
231
232        // Make sure we are still connected to a server
233        if !self.is_connected() {
234            return Err(From::from(
235                "The client is currently not connected".to_string(),
236            ));
237        }
238
239        // Make sure we arent already part of a realm
240        if self.session_id.is_some() {
241            return Err(From::from(format!(
242                "join_realm('{}') : Client already joined to a realm",
243                realm
244            )));
245        }
246
247        // Send a request for the core to perform the action
248        let (res_sender, res) = oneshot::channel();
249        if let Err(e) = self.ctl_channel.send(Request::Join {
250            uri: realm,
251            roles: self.config.roles.clone(),
252            agent_str: if self.config.agent.is_empty() {
253                Some(self.config.agent.clone())
254            } else {
255                None
256            },
257            authentication_methods,
258            authentication_id,
259            on_challenge_handler,
260            res: res_sender,
261        }) {
262            return Err(From::from(format!(
263                "Core never received our request : {}",
264                e
265            )));
266        }
267
268        // Wait for the request results
269        let (session_id, mut server_roles) = match res.await {
270            Ok(r) => r?,
271            Err(e) => {
272                return Err(From::from(format!(
273                    "Core never returned a response : {}",
274                    e
275                )))
276            }
277        };
278
279        // Add the server roles
280        self.server_roles.drain();
281        for (role, _) in server_roles.drain().take(1) {
282            self.server_roles.insert(role);
283        }
284
285        // Set the current session
286        self.session_id = Some(session_id);
287        debug!("Connected with session_id {} !", session_id);
288
289        Ok(())
290    }
291
292    /// Attempts to join a realm and start a session with the server.
293    ///
294    /// * `realm` - A name of the WAMP realm
295    pub async fn join_realm<T: Into<String>>(&mut self, realm: T) -> Result<(), WampError> {
296        self.inner_join_realm(realm.into(), vec![], None, None)
297            .await
298    }
299
300    /// Attempts to join a realm and start a session with the server.
301    ///
302    /// * `realm` - A name of the WAMP realm
303    /// * `authentication_methods` - A set of all the authentication methods the client will support
304    /// * `authentication_id` - An authentication ID (e.g. username) the client wishes to authenticate as.
305    ///   It is required for non-anynomous authentication methods.
306    /// * `on_challenge_handler` - An authentication handler function
307    ///
308    /// ```ignore
309    /// client
310    ///     .join_realm_with_authentication(
311    ///         "realm1",
312    ///         vec![wamp_async::AuthenticationMethod::Ticket],
313    ///         "username",
314    ///         |_authentication_method, _extra| async {
315    ///             Ok(wamp_async::AuthenticationChallengeResponse::with_signature(
316    ///                 "password".into(),
317    ///             ))
318    ///         },
319    ///     )
320    ///     .await?;
321    /// ```
322    pub async fn join_realm_with_authentication<
323        Realm,
324        AuthenticationId,
325        AuthenticationChallengeHandler,
326        AuthenticationChallengeHandlerResponse,
327    >(
328        &mut self,
329        realm: Realm,
330        authentication_methods: Vec<AuthenticationMethod>,
331        authentication_id: AuthenticationId,
332        on_challenge_handler: AuthenticationChallengeHandler,
333    ) -> Result<(), WampError>
334    where
335        Realm: Into<String>,
336        AuthenticationId: Into<String>,
337        AuthenticationChallengeHandler: Fn(AuthenticationMethod, WampDict) -> AuthenticationChallengeHandlerResponse
338            + Send
339            + Sync
340            + 'a,
341        AuthenticationChallengeHandlerResponse: std::future::Future<Output = Result<AuthenticationChallengeResponse, WampError>>
342            + Send
343            + 'a,
344    {
345        self.inner_join_realm(
346            realm.into(),
347            authentication_methods,
348            Some(authentication_id.into()),
349            Some(Box::new(move |authentication_method, extra| {
350                Box::pin(on_challenge_handler(authentication_method, extra))
351            })),
352        )
353        .await
354    }
355
356    /// Leaves the current realm and terminates the session with the server
357    pub async fn leave_realm(&mut self) -> Result<(), WampError> {
358        // Make sure we are still connected to a server
359        if !self.is_connected() {
360            return Err(From::from(
361                "The client is currently not connected".to_string(),
362            ));
363        }
364
365        // Nothing to do if not currently in a session
366        if self.session_id.take().is_none() {
367            return Ok(());
368        }
369
370        // Send the request
371        let (res, result) = oneshot::channel();
372        if let Err(e) = self.ctl_channel.send(Request::Leave { res }) {
373            return Err(From::from(format!(
374                "Core never received our request : {}",
375                e
376            )));
377        }
378
379        // Wait for the result
380        match result.await {
381            Ok(r) => r?,
382            Err(e) => {
383                return Err(From::from(format!(
384                    "Core never returned a response : {}",
385                    e
386                )))
387            }
388        };
389
390        Ok(())
391    }
392
393    /// Subscribes to events for the specifiec topic
394    ///
395    /// This function returns a subscription ID (required to unsubscribe) and
396    /// the receive end of a channel for events published on the topic.
397    pub async fn subscribe<T: AsRef<str>>(
398        &self,
399        topic: T,
400    ) -> Result<(WampId, SubscriptionQueue), WampError> {
401        // Send the request
402        let (res, result) = oneshot::channel();
403        if let Err(e) = self.ctl_channel.send(Request::Subscribe {
404            uri: topic.as_ref().to_string(),
405            res,
406        }) {
407            return Err(From::from(format!(
408                "Core never received our request : {}",
409                e
410            )));
411        }
412
413        // Wait for the result
414        let (sub_id, evt_queue) = match result.await {
415            Ok(r) => r?,
416            Err(e) => {
417                return Err(From::from(format!(
418                    "Core never returned a response : {}",
419                    e
420                )))
421            }
422        };
423
424        Ok((sub_id, evt_queue))
425    }
426
427    /// Unsubscribes to a previously subscribed topic
428    pub async fn unsubscribe(&self, sub_id: WampId) -> Result<(), WampError> {
429        // Send the request
430        let (res, result) = oneshot::channel();
431        if let Err(e) = self.ctl_channel.send(Request::Unsubscribe { sub_id, res }) {
432            return Err(From::from(format!(
433                "Core never received our request : {}",
434                e
435            )));
436        }
437
438        // Wait for the result
439        match result.await {
440            Ok(r) => r?,
441            Err(e) => {
442                return Err(From::from(format!(
443                    "Core never returned a response : {}",
444                    e
445                )))
446            }
447        };
448
449        Ok(())
450    }
451
452    /// Publishes an event on a specific topic
453    ///
454    /// The caller can set `acknowledge` to true to receive unique IDs from the server
455    /// for each published event.
456    pub async fn publish<T: AsRef<str>>(
457        &self,
458        topic: T,
459        arguments: Option<WampArgs>,
460        arguments_kw: Option<WampKwArgs>,
461        acknowledge: bool,
462    ) -> Result<Option<WampId>, WampError> {
463        let mut options = WampDict::new();
464
465        if acknowledge {
466            options.insert("acknowledge".to_string(), Arg::Bool(true));
467        }
468        // Send the request
469        let (res, result) = oneshot::channel();
470        if let Err(e) = self.ctl_channel.send(Request::Publish {
471            uri: topic.as_ref().to_string(),
472            options,
473            arguments,
474            arguments_kw,
475            res,
476        }) {
477            return Err(From::from(format!(
478                "Core never received our request : {}",
479                e
480            )));
481        }
482
483        let pub_id = if acknowledge {
484            // Wait for the acknowledgement
485            Some(match result.await {
486                Ok(Ok(r)) => r.unwrap(),
487                Ok(Err(e)) => return Err(From::from(format!("Failed to send publish : {}", e))),
488                Err(e) => {
489                    return Err(From::from(format!(
490                        "Core never returned a response : {}",
491                        e
492                    )))
493                }
494            })
495        } else {
496            None
497        };
498        Ok(pub_id)
499    }
500
501    /// Register an RPC endpoint. Upon succesful registration, a registration ID is returned (used to unregister)
502    /// and calls received from the server will generate a future which will be sent on the rpc event channel
503    /// returned by the call to [event_loop()](struct.Client.html#method.event_loop)
504    pub async fn register<T, F, Fut>(&self, uri: T, func_ptr: F) -> Result<WampId, WampError>
505    where
506        T: AsRef<str>,
507        F: Fn(Option<WampArgs>, Option<WampKwArgs>) -> Fut + Send + Sync + 'a,
508        Fut: Future<Output = Result<(Option<WampArgs>, Option<WampKwArgs>), WampError>> + Send + 'a,
509    {
510        // Send the request
511        let (res, result) = oneshot::channel();
512        if let Err(e) = self.ctl_channel.send(Request::Register {
513            uri: uri.as_ref().to_string(),
514            res,
515            func_ptr: Box::new(move |a, k| Box::pin(func_ptr(a, k))),
516        }) {
517            return Err(From::from(format!(
518                "Core never received our request : {}",
519                e
520            )));
521        }
522
523        // Wait for the result
524        let rpc_id = match result.await {
525            Ok(r) => r?,
526            Err(e) => {
527                return Err(From::from(format!(
528                    "Core never returned a response : {}",
529                    e
530                )))
531            }
532        };
533
534        Ok(rpc_id)
535    }
536
537    /// Unregisters an RPC endpoint
538    pub async fn unregister(&self, rpc_id: WampId) -> Result<(), WampError> {
539        // Send the request
540        let (res, result) = oneshot::channel();
541        if let Err(e) = self.ctl_channel.send(Request::Unregister { rpc_id, res }) {
542            return Err(From::from(format!(
543                "Core never received our request : {}",
544                e
545            )));
546        }
547
548        // Wait for the result
549        match result.await {
550            Ok(r) => r?,
551            Err(e) => {
552                return Err(From::from(format!(
553                    "Core never returned a response : {}",
554                    e
555                )))
556            }
557        };
558
559        Ok(())
560    }
561
562    /// Calls a registered RPC endpoint on the server
563    pub async fn call<T: AsRef<str>>(
564        &self,
565        uri: T,
566        arguments: Option<WampArgs>,
567        arguments_kw: Option<WampKwArgs>,
568    ) -> Result<(Option<WampArgs>, Option<WampKwArgs>), WampError> {
569        // Send the request
570        let (res, result) = oneshot::channel();
571        if let Err(e) = self.ctl_channel.send(Request::Call {
572            uri: uri.as_ref().to_string(),
573            options: WampDict::new(),
574            arguments,
575            arguments_kw,
576            res,
577        }) {
578            return Err(From::from(format!(
579                "Core never received our request : {}",
580                e
581            )));
582        }
583
584        // Wait for the result
585        match result.await {
586            Ok(r) => r,
587            Err(e) => Err(From::from(format!(
588                "Core never returned a response : {}",
589                e
590            ))),
591        }
592    }
593
594    /// Returns the current client status
595    pub fn get_cur_status(&mut self) -> &ClientState {
596        // Check to see if the status changed
597        let new_status = self.core_res.recv().now_or_never();
598        #[allow(clippy::match_wild_err_arm)]
599        match new_status {
600            Some(Some(state)) => self.set_next_status(state),
601            None => &self.core_status,
602            Some(None) => panic!("The event loop died without sending a new status"),
603        }
604    }
605
606    /// Returns whether we are connected to the server or not
607    pub fn is_connected(&mut self) -> bool {
608        match self.get_cur_status() {
609            ClientState::Running => true,
610            _ => false,
611        }
612    }
613
614    fn set_next_status(&mut self, new_status: Result<(), WampError>) -> &ClientState {
615        // Error means disconnection
616        if new_status.is_err() {
617            self.core_status = ClientState::Disconnected(new_status);
618            return &self.core_status;
619        }
620
621        // Progress to next state
622        match self.core_status {
623            ClientState::NoEventLoop => {
624                self.core_status = ClientState::Running;
625            }
626            ClientState::Running => {
627                self.core_status = ClientState::Disconnected(new_status);
628            }
629            ClientState::Disconnected(_) => {
630                panic!("Got new core status after already being disconnected");
631            }
632        }
633
634        &self.core_status
635    }
636
637    // Waits until the event loop sends a status change event
638    // This will update the current core_status field
639    async fn wait_for_status_change(&mut self) -> &ClientState {
640        // State cant change if disconnected
641        if let ClientState::Disconnected(ref _r) = self.core_status {
642            return &self.core_status;
643        }
644
645        // Yield until we receive something
646        let new_status = match self.core_res.recv().await {
647            Some(v) => v,
648            None => {
649                panic!("The event loop died without sending a new status");
650            }
651        };
652
653        // Save the new status
654        self.set_next_status(new_status)
655    }
656
657    /// Blocks the caller until the connection with the server is terminated
658    pub async fn block_until_disconnect(&mut self) -> &ClientState {
659        let mut cur_status = self.get_cur_status();
660        loop {
661            match cur_status {
662                ClientState::Disconnected(_) => break,
663                _ => {
664                    // Wait until status changes
665                    cur_status = self.wait_for_status_change().await;
666                }
667            }
668        }
669
670        &self.core_status
671    }
672
673    /// Cleanly closes a connection with the server
674    pub async fn disconnect(mut self) {
675        if self.is_connected() {
676            // Cleanly leave realm
677            let _ = self.leave_realm().await;
678            // Stop the eventloop and disconnect from server
679            let _ = self.ctl_channel.send(Request::Shutdown);
680
681            // Wait for return status from core
682            match self.core_res.recv().await {
683                Some(Err(e)) => error!("Error while shutting down : {:?}", e),
684                None => error!("Core never sent a status after shutting down..."),
685                _ => {}
686            }
687        }
688    }
689}