rapace_core/
streaming.rs

1//! Streaming types for server-streaming and client-streaming RPCs.
2//!
3//! # Server-Streaming Pattern
4//!
5//! For server-streaming RPCs, the server method returns a `Streaming<T>`:
6//!
7//! ```ignore
8//! use rapace_core::Streaming;
9//!
10//! #[rapace::service]
11//! trait RangeService {
12//!     async fn range(&self, n: u32) -> Streaming<u32>;
13//! }
14//! ```
15//!
16//! The macro generates:
17//! - Server: Calls the method, iterates the stream, sends DATA frames, then EOS
18//! - Client: An `async fn` that returns `Result<Streaming<T>, RpcError>`
19//!
20//! # Client Usage
21//!
22//! ```ignore
23//! use futures::StreamExt;
24//!
25//! let mut stream = client.range(5).await?;
26//! while let Some(item) = stream.next().await {
27//!     let value = item?;
28//!     println!("{}", value);
29//! }
30//! ```
31//!
32//! # Server Implementation
33//!
34//! ```ignore
35//! use rapace_core::Streaming;
36//!
37//! impl RangeService for MyImpl {
38//!     async fn range(&self, n: u32) -> Streaming<u32> {
39//!         let (tx, rx) = tokio::sync::mpsc::channel(16);
40//!         tokio::spawn(async move {
41//!             for i in 0..n {
42//!                 if tx.send(Ok(i)).await.is_err() {
43//!                     break;
44//!                 }
45//!             }
46//!         });
47//!         Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
48//!     }
49//! }
50//! ```
51
52use std::future::Future;
53use std::pin::Pin;
54
55use crate::RpcError;
56
57/// Type alias for streaming RPC results.
58///
59/// Service traits should use this in their return types:
60/// ```ignore
61/// async fn range(&self, n: u32) -> Streaming<u32>;
62/// ```
63///
64/// The outer `async fn` gives you the stream, and each item of the stream
65/// is a `Result<T, RpcError>` representing either a value or an error.
66pub type Streaming<T> = Pin<Box<dyn futures::Stream<Item = Result<T, RpcError>> + Send>>;
67
68/// A sink for sending streaming items from server to client.
69///
70/// This is an internal building block. For service trait definitions,
71/// use `Streaming<T>` as the return type instead.
72pub trait StreamSink<T>: Send {
73    /// Send an item to the client.
74    ///
75    /// Returns `Err` if the channel was cancelled or an error occurred.
76    fn send(&mut self, item: T) -> Pin<Box<dyn Future<Output = Result<(), RpcError>> + Send + '_>>;
77
78    /// Check if the stream has been cancelled by the client.
79    fn is_cancelled(&self) -> bool;
80}
81
82/// A source for receiving streaming items (used in client-streaming).
83///
84/// This is an internal building block for future client-streaming support.
85pub trait StreamSource<T> {
86    /// Receive the next item, or `None` if the stream is complete.
87    #[allow(clippy::type_complexity)]
88    fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<Result<T, RpcError>>> + Send + '_>>;
89}
90
91/// Marker trait for types that can be streamed.
92///
93/// Types must implement `Facet<'static>` for serialization and be `Send`.
94pub trait Streamable: facet::Facet<'static> + Send + 'static {}
95
96// Blanket implementation for all compatible types
97impl<T: facet::Facet<'static> + Send + 'static> Streamable for T {}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    // StreamSink and StreamSource are object-safe
104    fn _assert_sink_object_safe(_: &dyn StreamSink<i32>) {}
105    fn _assert_source_object_safe(_: &dyn StreamSource<i32>) {}
106
107    #[test]
108    fn test_streamable_impl() {
109        fn _is_streamable<T: Streamable>() {}
110        _is_streamable::<i32>();
111        _is_streamable::<String>();
112        _is_streamable::<Vec<u8>>();
113    }
114}