wiremocket/responder.rs
1//! Determine how a given mock responds to the client.
2use async_stream::stream;
3use futures::{
4 stream::{self, BoxStream},
5 Stream, StreamExt,
6};
7use std::sync::Arc;
8use tokio::sync::broadcast;
9use tokio_stream::wrappers::BroadcastStream;
10use tracing::warn;
11use tungstenite::Message;
12
13// Design thoughts I want:
14//
15// 1. Ability to send a message (always good)
16// 2. Send messages in response to a message from the client
17// 3. Send messages without receiving anything from the client
18// 4. Interleave these potential different sources of messages
19//
20// Now maybe the easiest way to do this is just creating a stream that outputs messages, people can
21// use futures crate things to make streams from iterators etc etc. But `Stream` isn't super
22// ergonomic so maybe there's a better way. (Internally for this I'd use the fact the socket
23// implements `Sink` to forward the `Stream` to the `Sink`.
24//
25// Single messages being sent out could be solved by putting the websocket (more likely something
26// that sends to it into the `Match` trait so people can send responses on matches). Or I make the
27// responder take the last client message as an output.
28
29/// Every [`Mock`](crate::mock::Mock) must have a valid responder. By default the `pending` responder is provided which
30/// never outputs a message.
31pub trait ResponseStream {
32 /// Given a broadcast of messages from the client returns a stream of responses.
33 ///
34 /// # Implementing
35 ///
36 /// The most complex implementation pattern is when the output messages rely on the input ones
37 /// and there is not a 1-to-1 relationship between the two. In this case the easiest thing is
38 /// likely to make use of the [`async_stream`](https://crates.io/crates/async_stream) and
39 /// implement something as follows:
40 ///
41 /// ```rust
42 /// use async_stream::stream;
43 /// use futures::{stream::BoxStream, StreamExt};
44 /// use tokio::sync::broadcast;
45 /// use tokio_stream::wrappers::BroadcastStream;
46 /// use tungstenite::Message;
47 /// use wiremocket::prelude::ResponseStream;
48 ///
49 /// pub struct ExampleResponder;
50 ///
51 /// impl ResponseStream for ExampleResponder {
52 ///
53 /// fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
54 ///
55 /// let mut input = BroadcastStream::new(input);
56 ///
57 /// let stream = stream! {
58 /// for await value in input {
59 /// match value {
60 /// Ok(v) => {
61 /// // Echoes the stream and if the message len is >10 sends back a
62 /// // ping
63 /// if v.len() > 10 {
64 /// yield Message::Ping(Default::default());
65 /// }
66 /// yield v.clone();
67 /// },
68 /// Err(e) => {
69 /// panic!("ohno");
70 /// }
71 /// }
72 /// }
73 /// };
74 /// stream.boxed()
75 /// }
76 /// }
77 /// ```
78 fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message>;
79}
80
81/// Type to hold streaming responses where a stream of server messages independent of the client
82/// input is returned.
83pub struct StreamResponse {
84 stream_ctor: Arc<dyn Fn() -> BoxStream<'static, Message> + Send + Sync + 'static>,
85}
86
87impl StreamResponse {
88 /// Creates a new `StreamResponse`. Because the stream needs to be constructed per session
89 /// which matches we need a function that creates the stream rather than the stream itself.
90 pub fn new<F, S>(ctor: F) -> Self
91 where
92 F: Fn() -> S + Send + Sync + 'static,
93 S: Stream<Item = Message> + Send + Sync + 'static,
94 {
95 let stream_ctor = Arc::new(move || ctor().boxed());
96 Self { stream_ctor }
97 }
98}
99
100impl ResponseStream for StreamResponse {
101 fn handle(&self, _: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
102 (self.stream_ctor)()
103 }
104}
105
106/// Create a output stream that never emits any messages but stays open.
107pub fn pending() -> StreamResponse {
108 StreamResponse::new(stream::pending)
109}
110
111/// Responds with a 1-to-1 mapping of `Message->Message`. The server will return as many messages
112/// as the client sends.
113pub struct MapResponder {
114 map: Arc<dyn Fn(Message) -> Message + Send + Sync + 'static>,
115}
116
117impl MapResponder {
118 /// Create a new `MapResponder`.
119 pub fn new<F: Fn(Message) -> Message + Send + Sync + 'static>(f: F) -> Self {
120 Self { map: Arc::new(f) }
121 }
122}
123
124impl ResponseStream for MapResponder {
125 fn handle(&self, input: broadcast::Receiver<Message>) -> BoxStream<'static, Message> {
126 let map_fn = Arc::clone(&self.map);
127
128 let input = BroadcastStream::new(input);
129
130 let stream = stream! {
131 for await value in input {
132 match value {
133 Ok(v) => yield map_fn(v),
134 Err(e) => {
135 warn!("Broadcast error: {}", e);
136 }
137 }
138 }
139 };
140 stream.boxed()
141 }
142}
143
144/// Has the server respond with the client's message.
145pub fn echo_response() -> MapResponder {
146 MapResponder::new(|msg| msg)
147}