battler_wamprat_schema/lib.rs
1//! # battler-wamprat-schema
2//! **battler-wamprat-schema** is a supplemental crate for [`battler-wamprat`](https://crates.io/crates/battler-wamprat). It provides a procedural macro for generating consumer and producer peer objects for strongly-typed procedures and pub/sub topics.
3//!
4//! ## What is WAMP?
5//!
6//! **WAMP** is an open standard, routed protocol that provides two messaging patterns: Publish &
7//! Subscribe and routed Remote Procedure Calls. It is intended to connect application components in
8//! distributed applications. WAMP uses WebSocket as its default transport, but it can be
9//! transmitted via any other protocol that allows for ordered, reliable, bi-directional, and
10//! message-oriented communications.
11//!
12//! ## Background
13//!
14//! **battler-wamprat** is a Rust library and framework for peers communicating over the **Web
15//! Application Message Protocol** (WAMP).
16//!
17//! The library is built on [`battler-wamp`](https://crates.io/crates/battler-wamp) to provide more complex functionality:
18//!
19//! 1. Automatic reconnection and re-registration of procedures and subscriptions when a session is
20//! dropped.
21//! 1. Strongly-typed procedure handling, procedure calls, event publication, and subscription event
22//! handling using built-in serialization and deserialization.
23//!
24//! The library uses [`tokio`](https://tokio.rs) as its asynchronous runtime, and is ready for use on top of WebSocket streams.
25//!
26//! ## Schemas
27//!
28//! The `battler-wamprat-schema` crate works by generating code around
29//! [`battler_wamprat::peer::Peer`] objects based on a schema.
30//!
31//! A **schema** is a collection of procedures and pub/sub topics that are logically connected
32//! by application logic. A schema can be consumed by a **consumer** (a.k.a., a caller and
33//! subscriber) and produced by a **producer** (a.k.a., a callee and publisher).
34//!
35//! Both consumers and producers are peers communicating via a WAMP router. When defining a schema,
36//! the code for producer and consumer peers are automatically generated around the
37//! [`battler_wamprat::peer::Peer`] object. Thus, peer objects can be entirely constructed by
38//! `battler_wamprat_schema`, while all underlying functionality is provided by `battler_wamprat`.
39//!
40//! ## Usage
41//!
42//! A schema is defined with an enum type using the [`WampSchema`] procedural macro. You simply
43//! attach different types (e.g., input, output, error, event, etc.) to each enum variant to
44//! generate the strongly-typed peer methods.
45//!
46//! Note that schemas are attached to a **single realm**, so the connection logic is simplified.
47//! Each peer will stay connected to the peer until is is manually canceled.
48//!
49//! After defining the schema, producers and consumers can be created from the schema enum.
50//! Consumers generate a wrapper around [`battler_wamprat::peer::Peer`] directly, while producers
51//! generate a wrapper around [`battler_wamprat::peer::PeerBuilder`] for registering procedure
52//! handlers.
53//!
54//! To start a peer, you only need to provide preliminary information in the [`PeerConfig`], such as
55//! which router to connect to and supported authentication methods.
56//!
57//! Below is a detailed example of a consumer and producer peer interacting through a router.
58//!
59//! ```
60//! use std::time::Duration;
61//!
62//! use anyhow::Result;
63//! use battler_wamp::{
64//! peer::{
65//! WebSocketPeer,
66//! new_web_socket_peer,
67//! },
68//! router::{
69//! EmptyPubSubPolicies,
70//! EmptyRpcPolicies,
71//! RealmAuthenticationConfig,
72//! RealmConfig,
73//! RouterConfig,
74//! RouterHandle,
75//! new_web_socket_router,
76//! },
77//! };
78//! use battler_wamp_uri::Uri;
79//! use battler_wamp_values::{
80//! WampDictionary,
81//! WampList,
82//! };
83//! use battler_wamprat::{
84//! peer::{
85//! CallOptions,
86//! PublishOptions,
87//! PeerConnectionConfig,
88//! PeerConnectionType,
89//! },
90//! procedure::{
91//! Invocation,
92//! TypedProcedure,
93//! },
94//! subscription::{
95//! TypedPatternMatchedSubscription,
96//! TypedSubscription,
97//! },
98//! };
99//! use battler_wamprat_error::WampError;
100//! use battler_wamprat_message::WampApplicationMessage;
101//! use battler_wamprat_schema::{
102//! PeerConfig,
103//! WampSchema,
104//! WampSchemaError,
105//! };
106//! use battler_wamprat_uri::WampUriMatcher;
107//! use thiserror::Error;
108//! use tokio::{
109//! select,
110//! sync::broadcast::{
111//! self,
112//! error::{
113//! RecvError,
114//! TryRecvError,
115//! },
116//! },
117//! task::JoinHandle,
118//! };
119//!
120//! #[derive(Debug, WampList)]
121//! struct OneNumber(u64);
122//!
123//! #[derive(Debug, WampList)]
124//! struct TwoNumbers(u64, u64);
125//!
126//! #[derive(Debug, WampApplicationMessage)]
127//! struct Input(#[arguments] TwoNumbers);
128//!
129//! #[derive(Debug, WampApplicationMessage)]
130//! struct Output(#[arguments] OneNumber);
131//!
132//! #[derive(Debug, Error, WampError)]
133//! enum DivideError {
134//! #[error("cannot divide by 0")]
135//! #[uri("com.battler.error.divide_by_zero")]
136//! DivideByZero,
137//! }
138//!
139//! #[derive(Debug, Clone, WampApplicationMessage)]
140//! struct Ping;
141//!
142//! #[derive(Debug, Clone, WampDictionary)]
143//! struct Message {
144//! author: String,
145//! content: String,
146//! }
147//!
148//! #[derive(Debug, WampApplicationMessage)]
149//! struct MessageEvent(#[arguments_keyword] Message);
150//!
151//! #[derive(Debug, WampUriMatcher)]
152//! #[uri("com.battler.message.{version}.{channel}")]
153//! struct MessagePattern {
154//! version: u64,
155//! channel: String,
156//! }
157//!
158//! #[derive(WampSchema)]
159//! #[realm("com.battler_wamprat_schema.realm.example")]
160//! #[allow(unused)]
161//! enum Example {
162//! #[rpc(uri = "com.battler.add", input = Input, output = Output)]
163//! Add,
164//! #[rpc(uri = "com.battler.divide", input = Input, output = Output, error = DivideError)]
165//! Divide,
166//! #[pubsub(uri = "com.battler.ping", event = Ping)]
167//! Ping,
168//! #[pubsub(pattern = MessagePattern, event = MessageEvent)]
169//! Message,
170//! }
171//!
172//! async fn start_router() -> Result<(RouterHandle, JoinHandle<()>)> {
173//! let mut config = RouterConfig::default();
174//! config.realms.push(RealmConfig {
175//! name: "example".to_owned(),
176//! uri: Uri::try_from("com.battler_wamprat_schema.realm.example")?,
177//! authentication: RealmAuthenticationConfig::default(),
178//! });
179//! let router = new_web_socket_router(
180//! config,
181//! Box::new(EmptyPubSubPolicies::default()),
182//! Box::new(EmptyRpcPolicies::default()),
183//! )?;
184//! router.start().await
185//! }
186//!
187//! fn create_peer(name: &str) -> Result<WebSocketPeer> {
188//! let mut config = battler_wamp::peer::PeerConfig::default();
189//! config.name = name.to_owned();
190//! new_web_socket_peer(config)
191//! }
192//!
193//! async fn run_producer(
194//! router_handle: RouterHandle,
195//! producer_ready_tx: broadcast::Sender<()>,
196//! mut done_rx: broadcast::Receiver<()>,
197//! ) {
198//! struct Adder;
199//! impl AddProcedure for Adder {}
200//!
201//! impl TypedProcedure for Adder {
202//! type Input = Input;
203//! type Output = Output;
204//! type Error = anyhow::Error;
205//!
206//! async fn invoke(
207//! &self,
208//! _: Invocation,
209//! input: Self::Input,
210//! ) -> Result<Self::Output, Self::Error> {
211//! Ok(Output(OneNumber(input.0.0 + input.0.1)))
212//! }
213//! }
214//!
215//! struct Divider;
216//! impl DivideProcedure for Divider {}
217//!
218//! impl TypedProcedure for Divider {
219//! type Input = Input;
220//! type Output = Output;
221//! type Error = DivideError;
222//!
223//! async fn invoke(
224//! &self,
225//! _: Invocation,
226//! input: Self::Input,
227//! ) -> Result<Self::Output, Self::Error> {
228//! if input.0.1 == 0 {
229//! Err(DivideError::DivideByZero)
230//! } else {
231//! Ok(Output(OneNumber(input.0.0 / input.0.1)))
232//! }
233//! }
234//! }
235//!
236//! let mut producer_builder = Example::producer_builder(PeerConfig {
237//! connection: PeerConnectionConfig::new(PeerConnectionType::Remote(format!(
238//! "ws://{}",
239//! router_handle.local_addr()
240//! ))),
241//! auth_methods: Vec::default(),
242//! });
243//! producer_builder.register_add(Adder).unwrap();
244//! producer_builder.register_divide(Divider).unwrap();
245//! let producer = producer_builder
246//! .start(create_peer("producer").unwrap())
247//! .unwrap();
248//! producer.wait_until_ready().await.unwrap();
249//!
250//! producer_ready_tx.send(()).unwrap();
251//!
252//! loop {
253//! select! {
254//! _ = done_rx.recv() => break,
255//! _ = tokio::time::sleep(Duration::from_secs(1)) => {
256//! producer.publish_ping(Ping, PublishOptions::default()).await.unwrap();
257//! producer.publish_message(
258//! MessagePattern {
259//! version: 1,
260//! channel: "main".to_owned(),
261//! },
262//! MessageEvent(Message {
263//! author: "user".to_owned(),
264//! content: "foo".to_owned(),
265//! }), PublishOptions::default()
266//! )
267//! .await.unwrap();
268//! }
269//! }
270//! }
271//!
272//! producer.stop().await.unwrap();
273//! }
274//!
275//! #[tokio::main]
276//! async fn main() {
277//! let (router_handle, _) = start_router().await.unwrap();
278//!
279//! let (producer_ready_tx, mut producer_ready_rx) = broadcast::channel(1);
280//! let (done_tx, done_rx) = broadcast::channel(1);
281//!
282//! let producer_join_handle = tokio::spawn(run_producer(
283//! router_handle.clone(),
284//! producer_ready_tx,
285//! done_rx,
286//! ));
287//!
288//! // Wait for producer to be ready to serve procedure calls.
289//! producer_ready_rx.recv().await.unwrap();
290//!
291//! let consumer = Example::consumer(
292//! PeerConfig {
293//! connection: PeerConnectionConfig::new(PeerConnectionType::Remote(format!(
294//! "ws://{}",
295//! router_handle.local_addr()
296//! ))),
297//! auth_methods: Vec::default(),
298//! },
299//! create_peer("consumer").unwrap(),
300//! )
301//! .unwrap();
302//! consumer.wait_until_ready().await.unwrap();
303//!
304//! assert_matches::assert_matches!(
305//! consumer
306//! .add(Input(TwoNumbers(36345, 88818)), CallOptions::default())
307//! .await,
308//! Ok(rpc) => {
309//! assert_matches::assert_matches!(rpc.result().await, Ok(Output(OneNumber(125163))));
310//! }
311//! );
312//!
313//! assert_matches::assert_matches!(consumer.divide(Input(TwoNumbers(25, 2)), CallOptions::default()).await, Ok(rpc) => {
314//! assert_matches::assert_matches!(rpc.result_observing_error().await, Ok(Output(OneNumber(12))));
315//! });
316//!
317//! assert_matches::assert_matches!(consumer.divide(Input(TwoNumbers(1, 0)), CallOptions::default()).await, Ok(rpc) => {
318//! assert_matches::assert_matches!(rpc.result_observing_error().await, Err(WampSchemaError::Known(DivideError::DivideByZero)));
319//! });
320//!
321//! struct PingHandler {
322//! events_tx: broadcast::Sender<Ping>,
323//! }
324//! impl PingSubscription for PingHandler {}
325//!
326//! impl TypedSubscription for PingHandler {
327//! type Event = Ping;
328//!
329//! async fn handle_event(&self, event: Self::Event) {
330//! self.events_tx.send(event).unwrap();
331//! }
332//! }
333//!
334//! let (events_tx, mut events_rx) = broadcast::channel(16);
335//! assert_matches::assert_matches!(
336//! consumer.subscribe_ping(PingHandler { events_tx }).await,
337//! Ok(())
338//! );
339//!
340//! assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
341//! assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
342//! assert_matches::assert_matches!(events_rx.recv().await, Ok(Ping));
343//! assert_matches::assert_matches!(events_rx.try_recv(), Err(TryRecvError::Empty));
344//!
345//! assert_matches::assert_matches!(consumer.unsubscribe_ping().await, Ok(()));
346//! assert_matches::assert_matches!(events_rx.recv().await, Err(RecvError::Closed));
347//!
348//! struct MessageHandler {
349//! events_tx: broadcast::Sender<(Message, u64, String)>,
350//! }
351//! impl MessageSubscription for MessageHandler {}
352//!
353//! impl TypedPatternMatchedSubscription for MessageHandler {
354//! type Event = MessageEvent;
355//! type Pattern = MessagePattern;
356//!
357//! async fn handle_event(&self, event: Self::Event, topic: Self::Pattern) {
358//! self.events_tx
359//! .send((event.0, topic.version, topic.channel))
360//! .unwrap();
361//! }
362//! }
363//!
364//! let (events_tx, mut events_rx) = broadcast::channel(16);
365//! assert_matches::assert_matches!(
366//! consumer
367//! .subscribe_message(MessageHandler { events_tx })
368//! .await,
369//! Ok(())
370//! );
371//! assert_matches::assert_matches!(events_rx.recv().await, Ok((message, version, channel)) => {
372//! assert_eq!(message.author, "user");
373//! assert_eq!(message.content, "foo");
374//! assert_eq!(version, 1);
375//! assert_eq!(channel, "main");
376//! });
377//!
378//! // Clean up the consumer and producer.
379//! consumer.stop().await.unwrap();
380//! done_tx.send(()).unwrap();
381//! producer_join_handle.await.unwrap();
382//! }
383//! ```
384
385use std::{
386 fmt::{
387 Debug,
388 Display,
389 },
390 marker::PhantomData,
391};
392
393use battler_wamp::core::error::WampError;
394pub use battler_wamprat_schema_proc_macro::WampSchema;
395
396/// An error resulting from a call to a schema object.
397///
398/// Procedures may define error types that are expected to be generated by callees. However,
399/// procedure calls can fail in many other ways not decided by the callee. In this case, several
400/// other error types may be generated instead. This type reflects this error handling scenario;
401/// some errors may be "known" and others may be "unknown."
402///
403/// Unknown errors are reported with [`anyhow::Error`] and can be further inspected by the client.
404#[derive(Debug)]
405pub enum WampSchemaError<E> {
406 /// A known error occurred and is parsed ahead of time for the client.
407 Known(E),
408 /// An unknown error occurred.
409 Unknown(anyhow::Error),
410}
411
412impl<E> WampSchemaError<E>
413where
414 E: Into<anyhow::Error>,
415{
416 /// Converts the error back into the generic form.
417 pub fn any_err(self) -> anyhow::Error {
418 match self {
419 Self::Known(err) => err.into(),
420 Self::Unknown(err) => err,
421 }
422 }
423}
424
425/// A wrapper around [`battler_wamprat::peer::TypedSimplePendingRpc`] for strongly-typed procedure
426/// calls.
427#[derive(Debug)]
428pub struct SimplePendingRpc<T, E> {
429 rpc: battler_wamprat::peer::TypedSimplePendingRpc<T>,
430 _t: PhantomData<T>,
431 _e: PhantomData<E>,
432}
433
434impl<T, E> SimplePendingRpc<T, E>
435where
436 T: battler_wamprat_message::WampApplicationMessage,
437 E: TryFrom<WampError, Error = WampError> + Debug + Display + Send + Sync + 'static,
438{
439 /// Waits for the result of the procedure call, observing the error and attempting to parse it
440 /// to the known type.
441 pub async fn result_observing_error(self) -> Result<T, WampSchemaError<E>> {
442 self.rpc
443 .result()
444 .await
445 .map_err(|err| match err.downcast::<WampError>() {
446 Ok(err) => match E::try_from(err) {
447 Ok(err) => WampSchemaError::Known(err),
448 Err(err) => WampSchemaError::Unknown(err.into()),
449 },
450 Err(err) => WampSchemaError::Unknown(err),
451 })
452 }
453}
454
455impl<T, E> SimplePendingRpc<T, E>
456where
457 T: battler_wamprat_message::WampApplicationMessage,
458 E: Debug + Display + Send + Sync + 'static,
459{
460 /// Waits for the result of the procedure call.
461 pub async fn result(self) -> Result<T, anyhow::Error> {
462 self.rpc.result().await
463 }
464
465 /// Cancels the pending call.
466 pub async fn cancel(&self) -> anyhow::Result<()> {
467 self.rpc.cancel().await
468 }
469
470 /// Kills the pending call.
471 ///
472 /// The end error, or result, can still be read from [`Self::result`].
473 pub async fn kill(&self) -> anyhow::Result<()> {
474 self.rpc.kill().await
475 }
476}
477
478impl<T, E> From<battler_wamprat::peer::TypedSimplePendingRpc<T>> for SimplePendingRpc<T, E> {
479 fn from(value: battler_wamprat::peer::TypedSimplePendingRpc<T>) -> Self {
480 Self {
481 rpc: value,
482 _t: PhantomData,
483 _e: PhantomData,
484 }
485 }
486}
487
488/// A wrapper around [`battler_wamprat::peer::TypedProgressivePendingRpc`] for strongly-typed
489/// procedure calls.
490#[derive(Debug)]
491pub struct ProgressivePendingRpc<T, E> {
492 rpc: battler_wamprat::peer::TypedProgressivePendingRpc<T>,
493 _t: PhantomData<T>,
494 _e: PhantomData<E>,
495}
496
497impl<T, E> ProgressivePendingRpc<T, E>
498where
499 T: battler_wamprat_message::WampApplicationMessage,
500 E: TryFrom<WampError, Error = WampError> + Debug + Display + Send + Sync + 'static,
501{
502 /// Waits for the result of the procedure call, observing the error and attempting to parse it
503 /// to the known type.
504 pub async fn next_result_observing_error(&mut self) -> Result<Option<T>, WampSchemaError<E>> {
505 self.rpc
506 .next_result()
507 .await
508 .map_err(|err| match err.downcast::<WampError>() {
509 Ok(err) => match E::try_from(err) {
510 Ok(err) => WampSchemaError::Known(err),
511 Err(err) => WampSchemaError::Unknown(err.into()),
512 },
513 Err(err) => WampSchemaError::Unknown(err),
514 })
515 }
516}
517
518impl<T, E> ProgressivePendingRpc<T, E>
519where
520 T: battler_wamprat_message::WampApplicationMessage,
521 E: Debug + Display + Send + Sync + 'static,
522{
523 /// Waits for the result of the procedure call.
524 pub async fn next_result(&mut self) -> Result<Option<T>, anyhow::Error> {
525 self.rpc.next_result().await
526 }
527
528 /// Cancels the pending call.
529 pub async fn cancel(&mut self) -> anyhow::Result<()> {
530 self.rpc.cancel().await
531 }
532
533 /// Kills the pending call.
534 ///
535 /// The end error, or result, can still be read from [`Self::next_result`].
536 pub async fn kill(&mut self) -> anyhow::Result<()> {
537 self.rpc.kill().await
538 }
539}
540
541impl<T, E> From<battler_wamprat::peer::TypedProgressivePendingRpc<T>>
542 for ProgressivePendingRpc<T, E>
543{
544 fn from(value: battler_wamprat::peer::TypedProgressivePendingRpc<T>) -> Self {
545 Self {
546 rpc: value,
547 _t: PhantomData,
548 _e: PhantomData,
549 }
550 }
551}
552
553/// Configuration for a peer connecting to a router.
554#[derive(Debug, Clone)]
555pub struct PeerConfig {
556 /// Connection configuration.
557 pub connection: battler_wamprat::peer::PeerConnectionConfig,
558 /// Supported authentication methods.
559 pub auth_methods: Vec<battler_wamp::peer::SupportedAuthMethod>,
560}