signalrs_client/arguments.rs
1//! Arguments to an invocations
2//!
3//! In other words this module contains everyting that is necessary to
4//! abstract over arguments in [`InvocationBuilder`](crate::invocation::InvocationBuilder)
5
6use futures::{Stream, StreamExt};
7use serde::Serialize;
8
9/// Represents all possible arguments to a client invocation.
10///
11/// There are two kinds of arguments that client understands:
12/// - simple arguments that will be serialized and passed to a server in one message
13/// - [`Stream`](futures::stream::Stream)-compatible arguments that will be sent to server as soon as new items are available
14/// # Example
15/// ```rust, no_run
16/// use signalrs_client::{SignalRClient, arguments::InvocationStream};
17/// # use futures::StreamExt;
18/// # #[tokio::main]
19/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
20/// # let client = SignalRClient::builder("localhost")
21/// # .build()
22/// # .await?;
23/// let stream = futures::stream::repeat("item").take(5);
24///
25/// client
26/// .method("StreamEcho")
27/// // server will receive it as a literal value
28/// .arg("execute order 66")?
29/// // server will receive it as a stream / async enumerable that it will be able to `.await`
30/// .arg(InvocationStream::new(stream))?
31/// .invoke_stream::<String>()
32/// .await?;
33/// # Ok(())
34/// }
35/// ```
36pub enum InvocationArgs<T> {
37 /// Simple argument that will be passed to server after serialization
38 ///
39 /// It is not intended to be created by hand. Client should be able to perform conversion for all availably types automatically.
40 Argument(T),
41
42 /// Stream argument that will be streamed asynchronously to a server
43 ///
44 /// *Needs to be constructed manually for the client.*
45 Stream(InvocationStream<T>),
46}
47
48/// [`Stream`](futures::stream::Stream) wrapper for [`InvocationArgs`]
49pub struct InvocationStream<T>(Box<dyn Stream<Item = T> + Unpin + Send>);
50
51impl<T> InvocationStream<T> {
52 pub fn new(inner: impl Stream<Item = T> + Unpin + Send + 'static) -> Self {
53 InvocationStream(Box::new(inner))
54 }
55}
56
57impl<T> Stream for InvocationStream<T> {
58 type Item = T;
59
60 fn poll_next(
61 mut self: std::pin::Pin<&mut Self>,
62 cx: &mut std::task::Context<'_>,
63 ) -> std::task::Poll<Option<Self::Item>> {
64 self.0.poll_next_unpin(cx)
65 }
66}
67
68impl<T> From<T> for InvocationArgs<T>
69where
70 T: Serialize,
71{
72 fn from(object: T) -> Self {
73 InvocationArgs::Argument(object)
74 }
75}
76
77impl<T> From<InvocationStream<T>> for InvocationArgs<T>
78where
79 T: Serialize,
80{
81 fn from(stream: InvocationStream<T>) -> Self {
82 InvocationArgs::Stream(stream)
83 }
84}