signalrs_client/
invocation.rs

1//! SignalR invocation builder
2
3use super::{arguments::InvocationArgs, error::ClientError, ResponseStream, SignalRClient};
4use crate::{
5    arguments::InvocationStream,
6    messages::{self, ClientMessage, MessageEncoding},
7    protocol::{Invocation, StreamInvocation, StreamItem},
8    stream_ext::SignalRStreamExt,
9};
10use futures::{Stream, StreamExt};
11use serde::{de::DeserializeOwned, Serialize};
12use uuid::Uuid;
13
14/// Request builder for the [`SignalRClient`]
15///
16/// Allows adding streams and arguments to the invocation.
17/// Contains set of method that drive behavior of response reception.
18pub struct InvocationBuilder<'a> {
19    client: &'a SignalRClient,
20    method: String,
21    encoding: MessageEncoding,
22    arguments: Vec<serde_json::Value>,
23    streams: Vec<ClientStream>,
24}
25
26struct ClientStream {
27    stream_id: String,
28    items: Box<dyn Stream<Item = ClientMessage> + Unpin + Send>,
29}
30
31impl<'a> InvocationBuilder<'a> {
32    pub(crate) fn new(client: &'a SignalRClient, method: impl ToString) -> Self {
33        InvocationBuilder {
34            client,
35            method: method.to_string(),
36            encoding: MessageEncoding::Json,
37            arguments: Default::default(),
38            streams: Default::default(),
39        }
40    }
41
42    /// Adds ordered argument to invocation
43    ///
44    /// Argument can either be:
45    /// - a value
46    /// - a [stream](futures::stream::Stream)
47    ///
48    /// Order of arguments matters, they need to be passed in exactly the same order server expects them.
49    ///
50    /// # Example
51    ///
52    /// Assuming server has a hub method defined as:
53    /// ```rust,no_run
54    /// use futures::stream::Stream;
55    ///
56    /// fn calculate(name: String, a: usize, b: usize, items: impl Stream<Item = usize>) -> usize {
57    ///     println!("{}", name);
58    ///     a + b
59    /// }
60    /// ```
61    ///
62    /// Innvocation would have to be built in a following way to call this method
63    /// ```rust,no_run
64    /// use signalrs_client::{SignalRClient, arguments::InvocationStream};
65    /// use futures::StreamExt;
66    ///
67    /// # async fn function() -> anyhow::Result<()> {
68    /// let client: SignalRClient = get_client();
69    /// let result = client.method("calculate")
70    ///     .arg("Johnny")?
71    ///     .arg(1)?
72    ///     .arg(2)?
73    ///     .arg(InvocationStream::new(futures::stream::repeat(1usize).take(5)))?   
74    ///     .invoke::<usize>()
75    ///     .await?;
76    /// # Ok(())
77    /// # }
78    /// # fn get_client() -> SignalRClient { panic!() }
79    ///
80    /// ```
81    pub fn arg<A, B>(mut self, arg: A) -> Result<Self, ClientError>
82    where
83        A: Into<InvocationArgs<B>> + Send + 'static,
84        B: Serialize + Send + 'static,
85    {
86        match arg.into() {
87            InvocationArgs::Argument(arg) => self.arguments.push(
88                messages::to_json_value(&arg).map_err(|e| ClientError::malformed_request(e))?,
89            ),
90            InvocationArgs::Stream(stream) => {
91                let stream_id = Uuid::new_v4().to_string();
92                let client_stream = into_client_stream::<B>(stream_id, stream, self.encoding);
93                self.streams.push(client_stream);
94            }
95        };
96
97        return Ok(self);
98
99        fn into_client_stream<A: Serialize + Send + 'static>(
100            stream_id: String,
101            input: InvocationStream<A>,
102            encoding: MessageEncoding,
103        ) -> ClientStream {
104            let items = input
105                .zip(futures::stream::repeat(stream_id.clone()))
106                .map(|(i, id)| StreamItem::new(id, i))
107                .map(move |i| encoding.serialize(i))
108                .append_completion(stream_id.clone(), encoding);
109
110            ClientStream {
111                stream_id,
112                items: Box::new(items),
113            }
114        }
115    }
116
117    /// Sends an invocation to the server and does not expect any response
118    ///
119    /// This method follows 'fire and forget' semantics.
120    /// As soon as the message is sent from the client it returns to the caller.
121    /// Server then processes the request asynchronously.
122    pub async fn send(self) -> Result<(), ClientError> {
123        let arguments = args_as_option(self.arguments);
124
125        let mut invocation = Invocation::non_blocking(self.method, arguments);
126        invocation.with_streams(get_stream_ids(&self.streams));
127
128        let serialized = self
129            .encoding
130            .serialize(&invocation)
131            .map_err(|error| ClientError::malformed_request(error))?;
132
133        self.client.send_message(serialized).await?;
134        SignalRClient::send_streams(
135            self.client.get_transport_handle(),
136            into_actual_streams(self.streams),
137        )
138        .await
139    }
140
141    /// Sends an ivocation to the server and awaits unit response
142    ///
143    /// It does not expect any meaingful reponse except from empty response from the server.
144    /// It follows semantics such as `void` methods or functions returning `()`.
145    pub async fn invoke_unit(self) -> Result<(), ClientError> {
146        let invocation_id = Uuid::new_v4().to_string();
147        let arguments = args_as_option(self.arguments);
148
149        let mut invocation = Invocation::non_blocking(self.method, arguments);
150        invocation.with_invocation_id(invocation_id.clone());
151        invocation.with_streams(get_stream_ids(&self.streams));
152
153        let serialized = self
154            .encoding
155            .serialize(&invocation)
156            .map_err(|e| ClientError::malformed_request(e))?;
157
158        let result = self
159            .client
160            .invoke_option::<()>(invocation_id, serialized, into_actual_streams(self.streams))
161            .await;
162
163        if let Err(error) = result {
164            return Err(error);
165        }
166
167        Ok(())
168    }
169
170    /// Sends an ivocation to the server and awaits meaningful, single response
171    ///
172    /// It expects the response to be well-structured object in an encoding format used for communication.
173    /// For instance this method can return `usize` or `MyCustomStruct` as long as this type implements [`Deserialize`](serde::Deserialize) and is not generic over lifetime.
174    ///
175    /// # Important
176    ///
177    /// This function will cause errors if called with `()`. Use [`invoke_unit`](InvocationBuilder::invoke_unit) to do this.
178    pub async fn invoke<T: DeserializeOwned>(self) -> Result<T, ClientError> {
179        let invocation_id = Uuid::new_v4().to_string();
180        let arguments = args_as_option(self.arguments);
181
182        let mut invocation = Invocation::non_blocking(self.method, arguments);
183        invocation.with_invocation_id(invocation_id.clone());
184        invocation.with_streams(get_stream_ids(&self.streams));
185
186        let serialized = self
187            .encoding
188            .serialize(&invocation)
189            .map_err(|e| ClientError::malformed_request(e))?;
190
191        self.client
192            .invoke_option::<T>(invocation_id, serialized, into_actual_streams(self.streams))
193            .await?
194            .ok_or_else(|| ClientError::result("expected some result, received empty"))
195    }
196
197    /// Sends an ivocation to the server and awaits meaningful stream of responses
198    ///
199    /// It expects the responses to be well-structured objects in an encoding format used for communication.
200    /// For instance this method can return a [`Stream`] of `usize` or `MyCustomStruct` as long as this type implements [`Deserialize`](serde::Deserialize) and is not generic over lifetime.
201    ///
202    /// # Example
203    /// Assuming server has a hub method defined as:
204    /// ```rust,no_run
205    /// use futures::stream::Stream;
206    /// use futures::stream::StreamExt;
207    /// fn answers() -> impl Stream<Item = String> {
208    ///     // not really important what happens here
209    ///     # futures::stream::repeat("congratulation for looking in the source code".to_string()).take(5)
210    /// }
211    /// ```
212    ///
213    /// Innvocation would have to be built in a following way to call this method
214    /// ```rust,no_run
215    /// use signalrs_client::{SignalRClient, arguments::InvocationStream};
216    /// use futures::StreamExt;
217    ///
218    /// # async fn function() -> anyhow::Result<()> {
219    /// let client: SignalRClient = get_client();
220    /// let mut result = client.method("answers")
221    ///     .invoke_stream::<String>().await?;
222    ///
223    /// while let Some(answer) = result.next().await {
224    ///     println!("next answer: {}", answer?);
225    /// }
226    /// # Ok(())
227    /// # }
228    /// # fn get_client() -> SignalRClient { panic!() }
229    ///
230    /// ```
231    pub async fn invoke_stream<T: DeserializeOwned>(
232        self,
233    ) -> Result<ResponseStream<'a, T>, ClientError> {
234        let invocation_id = Uuid::new_v4().to_string();
235
236        let mut invocation =
237            StreamInvocation::new(invocation_id.clone(), self.method, Some(self.arguments));
238        invocation.with_streams(get_stream_ids(&self.streams));
239
240        let serialized = self
241            .encoding
242            .serialize(&invocation)
243            .map_err(|e| ClientError::malformed_request(e))?;
244
245        let response_stream = self
246            .client
247            .invoke_stream::<T>(invocation_id, serialized, into_actual_streams(self.streams))
248            .await?;
249
250        Ok(response_stream)
251    }
252}
253
254fn args_as_option(arguments: Vec<serde_json::Value>) -> Option<Vec<serde_json::Value>> {
255    if arguments.is_empty() {
256        None
257    } else {
258        Some(arguments)
259    }
260}
261
262fn get_stream_ids(streams: &[ClientStream]) -> Vec<String> {
263    streams.iter().map(|s| s.get_stream_id()).collect()
264}
265
266fn into_actual_streams(
267    streams: Vec<ClientStream>,
268) -> Vec<Box<dyn Stream<Item = ClientMessage> + Unpin + Send>> {
269    streams.into_iter().map(|s| s.items).collect()
270}
271
272impl ClientStream {
273    pub fn get_stream_id(&self) -> String {
274        self.stream_id.clone()
275    }
276}