signalrs_client/
invocation.rs1use super::{arguments::InvocationArgs, error::ClientError, ResponseStream, SignalRClient};
4use crate::{
5 arguments::InvocationStream,
6 messages::{self, ClientMessage, MessageEncoding},
7 protocol::{Invocation, StreamInvocation, StreamItem},
8 stream_ext::SignalRStreamExt,
9};
10use futures::{Stream, StreamExt};
11use serde::{de::DeserializeOwned, Serialize};
12use uuid::Uuid;
13
14pub struct InvocationBuilder<'a> {
19 client: &'a SignalRClient,
20 method: String,
21 encoding: MessageEncoding,
22 arguments: Vec<serde_json::Value>,
23 streams: Vec<ClientStream>,
24}
25
26struct ClientStream {
27 stream_id: String,
28 items: Box<dyn Stream<Item = ClientMessage> + Unpin + Send>,
29}
30
31impl<'a> InvocationBuilder<'a> {
32 pub(crate) fn new(client: &'a SignalRClient, method: impl ToString) -> Self {
33 InvocationBuilder {
34 client,
35 method: method.to_string(),
36 encoding: MessageEncoding::Json,
37 arguments: Default::default(),
38 streams: Default::default(),
39 }
40 }
41
42 pub fn arg<A, B>(mut self, arg: A) -> Result<Self, ClientError>
82 where
83 A: Into<InvocationArgs<B>> + Send + 'static,
84 B: Serialize + Send + 'static,
85 {
86 match arg.into() {
87 InvocationArgs::Argument(arg) => self.arguments.push(
88 messages::to_json_value(&arg).map_err(|e| ClientError::malformed_request(e))?,
89 ),
90 InvocationArgs::Stream(stream) => {
91 let stream_id = Uuid::new_v4().to_string();
92 let client_stream = into_client_stream::<B>(stream_id, stream, self.encoding);
93 self.streams.push(client_stream);
94 }
95 };
96
97 return Ok(self);
98
99 fn into_client_stream<A: Serialize + Send + 'static>(
100 stream_id: String,
101 input: InvocationStream<A>,
102 encoding: MessageEncoding,
103 ) -> ClientStream {
104 let items = input
105 .zip(futures::stream::repeat(stream_id.clone()))
106 .map(|(i, id)| StreamItem::new(id, i))
107 .map(move |i| encoding.serialize(i))
108 .append_completion(stream_id.clone(), encoding);
109
110 ClientStream {
111 stream_id,
112 items: Box::new(items),
113 }
114 }
115 }
116
117 pub async fn send(self) -> Result<(), ClientError> {
123 let arguments = args_as_option(self.arguments);
124
125 let mut invocation = Invocation::non_blocking(self.method, arguments);
126 invocation.with_streams(get_stream_ids(&self.streams));
127
128 let serialized = self
129 .encoding
130 .serialize(&invocation)
131 .map_err(|error| ClientError::malformed_request(error))?;
132
133 self.client.send_message(serialized).await?;
134 SignalRClient::send_streams(
135 self.client.get_transport_handle(),
136 into_actual_streams(self.streams),
137 )
138 .await
139 }
140
141 pub async fn invoke_unit(self) -> Result<(), ClientError> {
146 let invocation_id = Uuid::new_v4().to_string();
147 let arguments = args_as_option(self.arguments);
148
149 let mut invocation = Invocation::non_blocking(self.method, arguments);
150 invocation.with_invocation_id(invocation_id.clone());
151 invocation.with_streams(get_stream_ids(&self.streams));
152
153 let serialized = self
154 .encoding
155 .serialize(&invocation)
156 .map_err(|e| ClientError::malformed_request(e))?;
157
158 let result = self
159 .client
160 .invoke_option::<()>(invocation_id, serialized, into_actual_streams(self.streams))
161 .await;
162
163 if let Err(error) = result {
164 return Err(error);
165 }
166
167 Ok(())
168 }
169
170 pub async fn invoke<T: DeserializeOwned>(self) -> Result<T, ClientError> {
179 let invocation_id = Uuid::new_v4().to_string();
180 let arguments = args_as_option(self.arguments);
181
182 let mut invocation = Invocation::non_blocking(self.method, arguments);
183 invocation.with_invocation_id(invocation_id.clone());
184 invocation.with_streams(get_stream_ids(&self.streams));
185
186 let serialized = self
187 .encoding
188 .serialize(&invocation)
189 .map_err(|e| ClientError::malformed_request(e))?;
190
191 self.client
192 .invoke_option::<T>(invocation_id, serialized, into_actual_streams(self.streams))
193 .await?
194 .ok_or_else(|| ClientError::result("expected some result, received empty"))
195 }
196
197 pub async fn invoke_stream<T: DeserializeOwned>(
232 self,
233 ) -> Result<ResponseStream<'a, T>, ClientError> {
234 let invocation_id = Uuid::new_v4().to_string();
235
236 let mut invocation =
237 StreamInvocation::new(invocation_id.clone(), self.method, Some(self.arguments));
238 invocation.with_streams(get_stream_ids(&self.streams));
239
240 let serialized = self
241 .encoding
242 .serialize(&invocation)
243 .map_err(|e| ClientError::malformed_request(e))?;
244
245 let response_stream = self
246 .client
247 .invoke_stream::<T>(invocation_id, serialized, into_actual_streams(self.streams))
248 .await?;
249
250 Ok(response_stream)
251 }
252}
253
254fn args_as_option(arguments: Vec<serde_json::Value>) -> Option<Vec<serde_json::Value>> {
255 if arguments.is_empty() {
256 None
257 } else {
258 Some(arguments)
259 }
260}
261
262fn get_stream_ids(streams: &[ClientStream]) -> Vec<String> {
263 streams.iter().map(|s| s.get_stream_id()).collect()
264}
265
266fn into_actual_streams(
267 streams: Vec<ClientStream>,
268) -> Vec<Box<dyn Stream<Item = ClientMessage> + Unpin + Send>> {
269 streams.into_iter().map(|s| s.items).collect()
270}
271
272impl ClientStream {
273 pub fn get_stream_id(&self) -> String {
274 self.stream_id.clone()
275 }
276}