wiremocket/
responder.rs

1//! Determine how a given mock responds to the client.
2use async_stream::stream;
3use futures::{
4    stream::{self, BoxStream},
5    Stream, StreamExt,
6};
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio_stream::wrappers::BroadcastStream;
10use tracing::warn;
11use tungstenite::Message;
12
13// Design thoughts I want:
14//
15// 1. Ability to send a message (always good)
16// 2. Send messages in response to a message from the client
17// 3. Send messages without receiving anything from the client
18// 4. Interleave these potential different sources of messages
19//
20// Now maybe the easiest way to do this is just creating a stream that outputs messages, people can
21// use futures crate things to make streams from iterators etc etc. But `Stream` isn't super
22// ergonomic so maybe there's a better way. (Internally for this I'd use the fact the socket
23// implements `Sink` to forward the `Stream` to the `Sink`.
24//
25// Single messages being sent out could be solved by putting the websocket (more likely something
26// that sends to it into the `Match` trait so people can send responses on matches). Or I make the
27// responder take the last client message as an output.
28
29/// Every [`Mock`](crate::mock::Mock) must have a valid responder. By default the `pending` responder is provided which
30/// never outputs a message.
31pub trait ResponseStream {
32    /// Given a broadcast of messages from the client returns a stream of responses.
33    ///
34    /// # Implementing
35    ///
36    /// The most complex implementation pattern is when the output messages rely on the input ones
37    /// and there is not a 1-to-1 relationship between the two. In this case the easiest thing is
38    /// likely to make use of the [`async_stream`](https://crates.io/crates/async_stream) and
39    /// implement something as follows:
40    ///
41    /// ```rust
42    /// use async_stream::stream;
43    /// use futures::{stream::BoxStream, StreamExt};
44    /// use tokio::sync::broadcast;
45    /// use tokio_stream::wrappers::BroadcastStream;
46    /// use tungstenite::Message;
47    /// use wiremocket::prelude::ResponseStream;
48    ///
49    /// pub struct ExampleResponder;
50    ///
51    /// impl ResponseStream for ExampleResponder {
52    ///
53    ///     fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
54    ///
55    ///         let mut input = BroadcastStream::new(input);
56    ///
57    ///         let stream = stream! {
58    ///             for await value in input {
59    ///                 match value {
60    ///                     Ok(v) => {
61    ///                         // Echoes the stream and if the message len is >10 sends back a
62    ///                         // ping
63    ///                         if v.len() > 10 {
64    ///                             yield Message::Ping(Default::default());
65    ///                         }
66    ///                         yield v.clone();
67    ///                     },
68    ///                     Err(e) => {
69    ///                         panic!("ohno");
70    ///                     }
71    ///                 }
72    ///             }
73    ///         };
74    ///         stream.boxed()
75    ///     }
76    /// }
77    /// ```
78    fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message>;
79}
80
81/// Type to hold streaming responses where a stream of server messages independent of the client
82/// input is returned.
83pub struct StreamResponse {
84    stream_ctor: Arc<dyn Fn() -> BoxStream<'static, Message> + Send + Sync + 'static>,
85}
86
87impl StreamResponse {
88    /// Creates a new `StreamResponse`. Because the stream needs to be constructed per session
89    /// which matches we need a function that creates the stream rather than the stream itself.
90    pub fn new<F, S>(ctor: F) -> Self
91    where
92        F: Fn() -> S + Send + Sync + 'static,
93        S: Stream<Item = Message> + Send + Sync + 'static,
94    {
95        let stream_ctor = Arc::new(move || ctor().boxed());
96        Self { stream_ctor }
97    }
98}
99
100impl ResponseStream for StreamResponse {
101    fn handle(&self, _: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
102        (self.stream_ctor)()
103    }
104}
105
106/// Create a output stream that never emits any messages but stays open.
107pub fn pending() -> StreamResponse {
108    StreamResponse::new(stream::pending)
109}
110
111/// Responds with a 1-to-1 mapping of `Message->Message`. The server will return as many messages
112/// as the client sends.
113pub struct MapResponder {
114    map: Arc<dyn Fn(Message) -> Message + Send + Sync + 'static>,
115}
116
117impl MapResponder {
118    /// Create a new `MapResponder`.
119    pub fn new<F: Fn(Message) -> Message + Send + Sync + 'static>(f: F) -> Self {
120        Self { map: Arc::new(f) }
121    }
122}
123
124impl ResponseStream for MapResponder {
125    fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
126        let map_fn = Arc::clone(&self.map);
127
128        let input = BroadcastStream::new(input);
129
130        let stream = stream! {
131            for await value in input {
132                match value {
133                    Ok(v) => yield map_fn(v),
134                    Err(e) => {
135                        warn!("Broadcast error: {}", e);
136                    }
137                }
138            }
139        };
140        stream.boxed()
141    }
142}
143
144/// Has the server respond with the client's message.
145pub fn echo_response() -> MapResponder {
146    MapResponder::new(|msg| msg)
147}