signalrs_client/
arguments.rs

1//! Arguments to an invocations
2//!
3//! In other words this module contains everyting that is necessary to
4//! abstract over arguments in [`InvocationBuilder`](crate::invocation::InvocationBuilder)
5
6use futures::{Stream, StreamExt};
7use serde::Serialize;
8
9/// Represents all possible arguments to a client invocation.
10///
11/// There are two kinds of arguments that client understands:
12/// - simple arguments that will be serialized and passed to a server in one message
13/// - [`Stream`](futures::stream::Stream)-compatible arguments that will be sent to server as soon as new items are available
14/// # Example
15/// ```rust, no_run
16/// use signalrs_client::{SignalRClient, arguments::InvocationStream};
17/// # use futures::StreamExt;
18/// # #[tokio::main]
19/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
20/// #    let client = SignalRClient::builder("localhost")
21/// #        .build()
22/// #        .await?;
23/// let stream = futures::stream::repeat("item").take(5);
24///
25/// client
26///     .method("StreamEcho")
27///     // server will receive it as a literal value
28///     .arg("execute order 66")?
29///     // server will receive it as a stream / async enumerable that it will be able to `.await`
30///     .arg(InvocationStream::new(stream))?
31///     .invoke_stream::<String>()
32///     .await?;
33/// # Ok(())
34/// }
35/// ```
36pub enum InvocationArgs<T> {
37    /// Simple argument that will be passed to server after serialization
38    ///
39    /// It is not intended to be created by hand. Client should be able to perform conversion for all availably types automatically.
40    Argument(T),
41
42    /// Stream argument that will be streamed asynchronously to a server
43    ///
44    /// *Needs to be constructed manually for the client.*
45    Stream(InvocationStream<T>),
46}
47
48/// [`Stream`](futures::stream::Stream) wrapper for [`InvocationArgs`]
49pub struct InvocationStream<T>(Box<dyn Stream<Item = T> + Unpin + Send>);
50
51impl<T> InvocationStream<T> {
52    pub fn new(inner: impl Stream<Item = T> + Unpin + Send + 'static) -> Self {
53        InvocationStream(Box::new(inner))
54    }
55}
56
57impl<T> Stream for InvocationStream<T> {
58    type Item = T;
59
60    fn poll_next(
61        mut self: std::pin::Pin<&mut Self>,
62        cx: &mut std::task::Context<'_>,
63    ) -> std::task::Poll<Option<Self::Item>> {
64        self.0.poll_next_unpin(cx)
65    }
66}
67
68impl<T> From<T> for InvocationArgs<T>
69where
70    T: Serialize,
71{
72    fn from(object: T) -> Self {
73        InvocationArgs::Argument(object)
74    }
75}
76
77impl<T> From<InvocationStream<T>> for InvocationArgs<T>
78where
79    T: Serialize,
80{
81    fn from(stream: InvocationStream<T>) -> Self {
82        InvocationArgs::Stream(stream)
83    }
84}