Skip to main content

battler_wamprat_schema/
lib.rs

1//! # battler-wamprat-schema
2//! **battler-wamprat-schema** is a supplemental crate for [`battler-wamprat`](https://crates.io/crates/battler-wamprat). It provides a procedural macro for generating consumer and producer peer objects for strongly-typed procedures and pub/sub topics.
3//!
4//! ## What is WAMP?
5//!
6//! **WAMP** is an open standard, routed protocol that provides two messaging patterns: Publish &
7//! Subscribe and routed Remote Procedure Calls. It is intended to connect application components in
8//! distributed applications. WAMP uses WebSocket as its default transport, but it can be
9//! transmitted via any other protocol that allows for ordered, reliable, bi-directional, and
10//! message-oriented communications.
11//!
12//! ## Background
13//!
14//! **battler-wamprat** is a Rust library and framework for peers communicating over the **Web
15//! Application Message Protocol** (WAMP).
16//!
17//! The library is built on [`battler-wamp`](https://crates.io/crates/battler-wamp) to provide more complex functionality:
18//!
19//! 1. Automatic reconnection and re-registration of procedures and subscriptions when a session is
20//!    dropped.
21//! 1. Strongly-typed procedure handling, procedure calls, event publication, and subscription event
22//!    handling using built-in serialization and deserialization.
23//!
24//! The library uses [`tokio`](https://tokio.rs) as its asynchronous runtime, and is ready for use on top of WebSocket streams.
25//!
26//! ## Schemas
27//!
28//! The `battler-wamprat-schema` crate works by generating code around
29//! [`battler_wamprat::peer::Peer`] objects based on a schema.
30//!
31//! A **schema** is a collection of procedures and pub/sub topics that are logically connected
32//! by application logic. A schema can be consumed by a **consumer** (a.k.a., a caller and
33//! subscriber) and produced by a **producer** (a.k.a., a callee and publisher).
34//!
35//! Both consumers and producers are peers communicating via a WAMP router. When defining a schema,
36//! the code for producer and consumer peers are automatically generated around the
37//! [`battler_wamprat::peer::Peer`] object. Thus, peer objects can be entirely constructed by
38//! `battler_wamprat_schema`, while all underlying functionality is provided by `battler_wamprat`.
39//!
40//! ## Usage
41//!
42//! A schema is defined with an enum type using the [`WampSchema`] procedural macro. You simply
43//! attach different types (e.g., input, output, error, event, etc.) to each enum variant to
44//! generate the strongly-typed peer methods.
45//!
46//! Note that schemas are attached to a **single realm**, so the connection logic is simplified.
47//! Each peer will stay connected to the peer until is is manually canceled.
48//!
49//! After defining the schema, producers and consumers can be created from the schema enum.
50//! Consumers generate a wrapper around [`battler_wamprat::peer::Peer`] directly, while producers
51//! generate a wrapper around [`battler_wamprat::peer::PeerBuilder`] for registering procedure
52//! handlers.
53//!
54//! To start a peer, you only need to provide preliminary information in the [`PeerConfig`], such as
55//! which router to connect to and supported authentication methods.
56//!
57//! Below is a detailed example of a consumer and producer peer interacting through a router.
58//!
59//! ```
60//! use std::time::Duration;
61//!
62//! use anyhow::Result;
63//! use battler_wamp::{
64//!     peer::{
65//!         WebSocketPeer,
66//!         new_web_socket_peer,
67//!     },
68//!     router::{
69//!         EmptyPubSubPolicies,
70//!         EmptyRpcPolicies,
71//!         RealmAuthenticationConfig,
72//!         RealmConfig,
73//!         RouterConfig,
74//!         RouterHandle,
75//!         new_web_socket_router,
76//!     },
77//! };
78//! use battler_wamp_uri::Uri;
79//! use battler_wamp_values::{
80//!     WampDictionary,
81//!     WampList,
82//! };
83//! use battler_wamprat::{
84//!     peer::{
85//!         CallOptions,
86//!         PublishOptions,
87//!         PeerConnectionConfig,
88//!         PeerConnectionType,
89//!     },
90//!     procedure::{
91//!         Invocation,
92//!         TypedProcedure,
93//!     },
94//!     subscription::{
95//!         TypedPatternMatchedSubscription,
96//!         TypedSubscription,
97//!     },
98//! };
99//! use battler_wamprat_error::WampError;
100//! use battler_wamprat_message::WampApplicationMessage;
101//! use battler_wamprat_schema::{
102//!     PeerConfig,
103//!     WampSchema,
104//!     WampSchemaError,
105//! };
106//! use battler_wamprat_uri::WampUriMatcher;
107//! use thiserror::Error;
108//! use tokio::{
109//!     select,
110//!     sync::broadcast::{
111//!         self,
112//!         error::{
113//!             RecvError,
114//!             TryRecvError,
115//!         },
116//!     },
117//!     task::JoinHandle,
118//! };
119//!
120//! #[derive(Debug, WampList)]
121//! struct OneNumber(u64);
122//!
123//! #[derive(Debug, WampList)]
124//! struct TwoNumbers(u64, u64);
125//!
126//! #[derive(Debug, WampApplicationMessage)]
127//! struct Input(#[arguments] TwoNumbers);
128//!
129//! #[derive(Debug, WampApplicationMessage)]
130//! struct Output(#[arguments] OneNumber);
131//!
132//! #[derive(Debug, Error, WampError)]
133//! enum DivideError {
134//!     #[error("cannot divide by 0")]
135//!     #[uri("com.battler.error.divide_by_zero")]
136//!     DivideByZero,
137//! }
138//!
139//! #[derive(Debug, Clone, WampApplicationMessage)]
140//! struct Ping;
141//!
142//! #[derive(Debug, Clone, WampDictionary)]
143//! struct Message {
144//!     author: String,
145//!     content: String,
146//! }
147//!
148//! #[derive(Debug, WampApplicationMessage)]
149//! struct MessageEvent(#[arguments_keyword] Message);
150//!
151//! #[derive(Debug, WampUriMatcher)]
152//! #[uri("com.battler.message.{version}.{channel}")]
153//! struct MessagePattern {
154//!     version: u64,
155//!     channel: String,
156//! }
157//!
158//! #[derive(WampSchema)]
159//! #[realm("com.battler_wamprat_schema.realm.example")]
160//! #[allow(unused)]
161//! enum Example {
162//!     #[rpc(uri = "com.battler.add", input = Input, output = Output)]
163//!     Add,
164//!     #[rpc(uri = "com.battler.divide", input = Input, output = Output, error = DivideError)]
165//!     Divide,
166//!     #[pubsub(uri = "com.battler.ping", event = Ping)]
167//!     Ping,
168//!     #[pubsub(pattern = MessagePattern, event = MessageEvent)]
169//!     Message,
170//! }
171//!
172//! async fn start_router() -> Result<(RouterHandle, JoinHandle<()>)> {
173//!     let mut config = RouterConfig::default();
174//!     config.realms.push(RealmConfig {
175//!         name: "example".to_owned(),
176//!         uri: Uri::try_from("com.battler_wamprat_schema.realm.example")?,
177//!         authentication: RealmAuthenticationConfig::default(),
178//!     });
179//!     let router = new_web_socket_router(
180//!         config,
181//!         Box::new(EmptyPubSubPolicies::default()),
182//!         Box::new(EmptyRpcPolicies::default()),
183//!     )?;
184//!     router.start().await
185//! }
186//!
187//! fn create_peer(name: &str) -> Result<WebSocketPeer> {
188//!     let mut config = battler_wamp::peer::PeerConfig::default();
189//!     config.name = name.to_owned();
190//!     new_web_socket_peer(config)
191//! }
192//!
193//! async fn run_producer(
194//!     router_handle: RouterHandle,
195//!     producer_ready_tx: broadcast::Sender<()>,
196//!     mut done_rx: broadcast::Receiver<()>,
197//! ) {
198//!     struct Adder;
199//!     impl AddProcedure for Adder {}
200//!
201//!     impl TypedProcedure for Adder {
202//!         type Input = Input;
203//!         type Output = Output;
204//!         type Error = anyhow::Error;
205//!
206//!         async fn invoke(
207//!             &self,
208//!             _: Invocation,
209//!             input: Self::Input,
210//!         ) -> Result<Self::Output, Self::Error> {
211//!             Ok(Output(OneNumber(input.0.0 + input.0.1)))
212//!         }
213//!     }
214//!
215//!     struct Divider;
216//!     impl DivideProcedure for Divider {}
217//!
218//!     impl TypedProcedure for Divider {
219//!         type Input = Input;
220//!         type Output = Output;
221//!         type Error = DivideError;
222//!
223//!         async fn invoke(
224//!             &self,
225//!             _: Invocation,
226//!             input: Self::Input,
227//!         ) -> Result<Self::Output, Self::Error> {
228//!             if input.0.1 == 0 {
229//!                 Err(DivideError::DivideByZero)
230//!             } else {
231//!                 Ok(Output(OneNumber(input.0.0 / input.0.1)))
232//!             }
233//!         }
234//!     }
235//!
236//!     let mut producer_builder = Example::producer_builder(PeerConfig {
237//!         connection: PeerConnectionConfig::new(PeerConnectionType::Remote(format!(
238//!             "ws://{}",
239//!             router_handle.local_addr()
240//!         ))),
241//!         auth_methods: Vec::default(),
242//!     });
243//!     producer_builder.register_add(Adder).unwrap();
244//!     producer_builder.register_divide(Divider).unwrap();
245//!     let producer = producer_builder
246//!         .start(create_peer("producer").unwrap())
247//!         .unwrap();
248//!     producer.wait_until_ready().await.unwrap();
249//!
250//!     producer_ready_tx.send(()).unwrap();
251//!
252//!     loop {
253//!         select! {
254//!             _ = done_rx.recv() => break,
255//!             _ = tokio::time::sleep(Duration::from_secs(1)) => {
256//!                 producer.publish_ping(Ping, PublishOptions::default()).await.unwrap();
257//!                 producer.publish_message(
258//!                     MessagePattern {
259//!                         version: 1,
260//!                         channel: "main".to_owned(),
261//!                     },
262//!                     MessageEvent(Message {
263//!                         author: "user".to_owned(),
264//!                         content: "foo".to_owned(),
265//!                     }), PublishOptions::default()
266//!                 )
267//!                 .await.unwrap();
268//!             }
269//!         }
270//!     }
271//!
272//!     producer.stop().await.unwrap();
273//! }
274//!
275//! #[tokio::main]
276//! async fn main() {
277//!     let (router_handle, _) = start_router().await.unwrap();
278//!
279//!     let (producer_ready_tx, mut producer_ready_rx) = broadcast::channel(1);
280//!     let (done_tx, done_rx) = broadcast::channel(1);
281//!
282//!     let producer_join_handle = tokio::spawn(run_producer(
283//!         router_handle.clone(),
284//!         producer_ready_tx,
285//!         done_rx,
286//!     ));
287//!
288//!     // Wait for producer to be ready to serve procedure calls.
289//!     producer_ready_rx.recv().await.unwrap();
290//!
291//!     let consumer = Example::consumer(
292//!         PeerConfig {
293//!             connection: PeerConnectionConfig::new(PeerConnectionType::Remote(format!(
294//!                 "ws://{}",
295//!                 router_handle.local_addr()
296//!             ))),
297//!             auth_methods: Vec::default(),
298//!         },
299//!         create_peer("consumer").unwrap(),
300//!     )
301//!     .unwrap();
302//!     consumer.wait_until_ready().await.unwrap();
303//!
304//!     assert_matches::assert_matches!(
305//!         consumer
306//!             .add(Input(TwoNumbers(36345, 88818)), CallOptions::default())
307//!             .await,
308//!         Ok(rpc) => {
309//!             assert_matches::assert_matches!(rpc.result().await, Ok(Output(OneNumber(125163))));
310//!         }
311//!     );
312//!
313//!     assert_matches::assert_matches!(consumer.divide(Input(TwoNumbers(25, 2)), CallOptions::default()).await, Ok(rpc) => {
314//!         assert_matches::assert_matches!(rpc.result_observing_error().await, Ok(Output(OneNumber(12))));
315//!     });
316//!
317//!     assert_matches::assert_matches!(consumer.divide(Input(TwoNumbers(1, 0)), CallOptions::default()).await, Ok(rpc) => {
318//!         assert_matches::assert_matches!(rpc.result_observing_error().await, Err(WampSchemaError::Known(DivideError::DivideByZero)));
319//!     });
320//!
321//!     struct PingHandler {
322//!         events_tx: broadcast::Sender<Ping>,
323//!     }
324//!     impl PingSubscription for PingHandler {}
325//!
326//!     impl TypedSubscription for PingHandler {
327//!         type Event = Ping;
328//!
329//!         async fn handle_event(&self, event: Self::Event) {
330//!             self.events_tx.send(event).unwrap();
331//!         }
332//!     }
333//!
334//!     let (events_tx, mut events_rx) = broadcast::channel(16);
335//!     assert_matches::assert_matches!(
336//!         consumer.subscribe_ping(PingHandler { events_tx }).await,
337//!         Ok(())
338//!     );
339//!
340//!     assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
341//!     assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
342//!     assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
343//!     assert_matches::assert_matches!(events_rx.try_recv(), Err(TryRecvError::Empty));
344//!
345//!     assert_matches::assert_matches!(consumer.unsubscribe_ping().await, Ok(()));
346//!     assert_matches::assert_matches!(events_rx.recv().await, Err(RecvError::Closed));
347//!
348//!     struct MessageHandler {
349//!         events_tx: broadcast::Sender<(Message, u64, String)>,
350//!     }
351//!     impl MessageSubscription for MessageHandler {}
352//!
353//!     impl TypedPatternMatchedSubscription for MessageHandler {
354//!         type Event = MessageEvent;
355//!         type Pattern = MessagePattern;
356//!
357//!         async fn handle_event(&self, event: Self::Event, topic: Self::Pattern) {
358//!             self.events_tx
359//!                 .send((event.0, topic.version, topic.channel))
360//!                 .unwrap();
361//!         }
362//!     }
363//!
364//!     let (events_tx, mut events_rx) = broadcast::channel(16);
365//!     assert_matches::assert_matches!(
366//!         consumer
367//!             .subscribe_message(MessageHandler { events_tx })
368//!             .await,
369//!         Ok(())
370//!     );
371//!     assert_matches::assert_matches!(events_rx.recv().await, Ok((message, version, channel)) => {
372//!         assert_eq!(message.author, "user");
373//!         assert_eq!(message.content, "foo");
374//!         assert_eq!(version, 1);
375//!         assert_eq!(channel, "main");
376//!     });
377//!
378//!     // Clean up the consumer and producer.
379//!     consumer.stop().await.unwrap();
380//!     done_tx.send(()).unwrap();
381//!     producer_join_handle.await.unwrap();
382//! }
383//! ```
384
385use std::{
386    fmt::{
387        Debug,
388        Display,
389    },
390    marker::PhantomData,
391};
392
393use battler_wamp::core::error::WampError;
394pub use battler_wamprat_schema_proc_macro::WampSchema;
395
396/// An error resulting from a call to a schema object.
397///
398/// Procedures may define error types that are expected to be generated by callees. However,
399/// procedure calls can fail in many other ways not decided by the callee. In this case, several
400/// other error types may be generated instead. This type reflects this error handling scenario;
401/// some errors may be "known" and others may be "unknown."
402///
403/// Unknown errors are reported with [`anyhow::Error`] and can be further inspected by the client.
404#[derive(Debug)]
405pub enum WampSchemaError<E> {
406    /// A known error occurred and is parsed ahead of time for the client.
407    Known(E),
408    /// An unknown error occurred.
409    Unknown(anyhow::Error),
410}
411
412impl<E> WampSchemaError<E>
413where
414    E: Into<anyhow::Error>,
415{
416    /// Converts the error back into the generic form.
417    pub fn any_err(self) -> anyhow::Error {
418        match self {
419            Self::Known(err) => err.into(),
420            Self::Unknown(err) => err,
421        }
422    }
423}
424
425/// A wrapper around [`battler_wamprat::peer::TypedSimplePendingRpc`] for strongly-typed procedure
426/// calls.
427#[derive(Debug)]
428pub struct SimplePendingRpc<T, E> {
429    rpc: battler_wamprat::peer::TypedSimplePendingRpc<T>,
430    _t: PhantomData<T>,
431    _e: PhantomData<E>,
432}
433
434impl<T, E> SimplePendingRpc<T, E>
435where
436    T: battler_wamprat_message::WampApplicationMessage,
437    E: TryFrom<WampError, Error = WampError> + Debug + Display + Send + Sync + 'static,
438{
439    /// Waits for the result of the procedure call, observing the error and attempting to parse it
440    /// to the known type.
441    pub async fn result_observing_error(self) -> Result<T, WampSchemaError<E>> {
442        self.rpc
443            .result()
444            .await
445            .map_err(|err| match err.downcast::<WampError>() {
446                Ok(err) => match E::try_from(err) {
447                    Ok(err) => WampSchemaError::Known(err),
448                    Err(err) => WampSchemaError::Unknown(err.into()),
449                },
450                Err(err) => WampSchemaError::Unknown(err),
451            })
452    }
453}
454
455impl<T, E> SimplePendingRpc<T, E>
456where
457    T: battler_wamprat_message::WampApplicationMessage,
458    E: Debug + Display + Send + Sync + 'static,
459{
460    /// Waits for the result of the procedure call.
461    pub async fn result(self) -> Result<T, anyhow::Error> {
462        self.rpc.result().await
463    }
464
465    /// Cancels the pending call.
466    pub async fn cancel(&self) -> anyhow::Result<()> {
467        self.rpc.cancel().await
468    }
469
470    /// Kills the pending call.
471    ///
472    /// The end error, or result, can still be read from [`Self::result`].
473    pub async fn kill(&self) -> anyhow::Result<()> {
474        self.rpc.kill().await
475    }
476}
477
478impl<T, E> From<battler_wamprat::peer::TypedSimplePendingRpc<T>> for SimplePendingRpc<T, E> {
479    fn from(value: battler_wamprat::peer::TypedSimplePendingRpc<T>) -> Self {
480        Self {
481            rpc: value,
482            _t: PhantomData,
483            _e: PhantomData,
484        }
485    }
486}
487
488/// A wrapper around [`battler_wamprat::peer::TypedProgressivePendingRpc`] for strongly-typed
489/// procedure calls.
490#[derive(Debug)]
491pub struct ProgressivePendingRpc<T, E> {
492    rpc: battler_wamprat::peer::TypedProgressivePendingRpc<T>,
493    _t: PhantomData<T>,
494    _e: PhantomData<E>,
495}
496
497impl<T, E> ProgressivePendingRpc<T, E>
498where
499    T: battler_wamprat_message::WampApplicationMessage,
500    E: TryFrom<WampError, Error = WampError> + Debug + Display + Send + Sync + 'static,
501{
502    /// Waits for the result of the procedure call, observing the error and attempting to parse it
503    /// to the known type.
504    pub async fn next_result_observing_error(&mut self) -> Result<Option<T>, WampSchemaError<E>> {
505        self.rpc
506            .next_result()
507            .await
508            .map_err(|err| match err.downcast::<WampError>() {
509                Ok(err) => match E::try_from(err) {
510                    Ok(err) => WampSchemaError::Known(err),
511                    Err(err) => WampSchemaError::Unknown(err.into()),
512                },
513                Err(err) => WampSchemaError::Unknown(err),
514            })
515    }
516}
517
518impl<T, E> ProgressivePendingRpc<T, E>
519where
520    T: battler_wamprat_message::WampApplicationMessage,
521    E: Debug + Display + Send + Sync + 'static,
522{
523    /// Waits for the result of the procedure call.
524    pub async fn next_result(&mut self) -> Result<Option<T>, anyhow::Error> {
525        self.rpc.next_result().await
526    }
527
528    /// Cancels the pending call.
529    pub async fn cancel(&mut self) -> anyhow::Result<()> {
530        self.rpc.cancel().await
531    }
532
533    /// Kills the pending call.
534    ///
535    /// The end error, or result, can still be read from [`Self::next_result`].
536    pub async fn kill(&mut self) -> anyhow::Result<()> {
537        self.rpc.kill().await
538    }
539}
540
541impl<T, E> From<battler_wamprat::peer::TypedProgressivePendingRpc<T>>
542    for ProgressivePendingRpc<T, E>
543{
544    fn from(value: battler_wamprat::peer::TypedProgressivePendingRpc<T>) -> Self {
545        Self {
546            rpc: value,
547            _t: PhantomData,
548            _e: PhantomData,
549        }
550    }
551}
552
553/// Configuration for a peer connecting to a router.
554#[derive(Debug, Clone)]
555pub struct PeerConfig {
556    /// Connection configuration.
557    pub connection: battler_wamprat::peer::PeerConnectionConfig,
558    /// Supported authentication methods.
559    pub auth_methods: Vec<battler_wamp::peer::SupportedAuthMethod>,
560}