Skip to main content

plexus_core/plexus/
streaming.rs

1//! Streaming helpers for the caller-wraps architecture
2//!
3//! These functions are used by the DynamicHub routing layer to wrap activation
4//! responses with metadata. Activations return typed domain events, and
5//! the caller uses these helpers to create PlexusStreamItems.
6
7use futures::stream::{self, Stream, StreamExt};
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use std::pin::Pin;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14
15use super::bidirectional::BidirChannel;
16use super::context::PlexusContext;
17use super::types::{PlexusStreamItem, StreamMetadata};
18
19/// Type alias for boxed stream of PlexusStreamItem
20pub type PlexusStream = Pin<Box<dyn Stream<Item = PlexusStreamItem> + Send>>;
21
22/// Wrap a typed stream into PlexusStream with automatic Done event
23///
24/// This is the core helper for the caller-wraps architecture.
25/// Activations return typed domain events (e.g., HealthEvent),
26/// and the caller wraps them with metadata. A Done event is
27/// automatically appended when the stream completes.
28///
29/// # Example
30///
31/// ```ignore
32/// let stream = health.check();  // Returns Stream<Item = HealthEvent>
33/// let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
34/// // Stream will emit: Data, Data, ..., Done
35/// ```
36pub fn wrap_stream<T: Serialize + Send + 'static>(
37    stream: impl Stream<Item = T> + Send + 'static,
38    content_type: &'static str,
39    provenance: Vec<String>,
40) -> PlexusStream {
41    let plexus_hash = PlexusContext::hash();
42    let metadata = StreamMetadata::new(provenance.clone(), plexus_hash.clone());
43    let done_metadata = StreamMetadata::new(provenance, plexus_hash);
44
45    let data_stream = stream.map(move |item| PlexusStreamItem::Data {
46        metadata: metadata.clone(),
47        content_type: content_type.to_string(),
48        content: serde_json::to_value(item).expect("serialization failed"),
49    });
50
51    let done_stream = stream::once(async move { PlexusStreamItem::Done {
52        metadata: done_metadata,
53    }});
54
55    Box::pin(data_stream.chain(done_stream))
56}
57
58
59/// Create a bidirectional channel and wrap a stream, merging Request items
60///
61/// This function:
62/// 1. Creates a BidirChannel connected to an internal mpsc channel
63/// 2. Wraps the user's typed stream into PlexusStreamItems
64/// 3. Merges in any Request items emitted by the BidirChannel
65/// 4. Returns both the channel (for the activation to use) and the merged stream
66///
67/// # Arguments
68///
69/// * `content_type` - Content type string for data items (e.g., "interactive.wizard")
70/// * `provenance` - Provenance path for metadata
71///
72/// # Returns
73///
74/// Returns a tuple of:
75/// * `Arc<BidirChannel<Req, Resp>>` - The bidirectional channel for the activation
76/// * A closure that takes the user's stream and returns the merged PlexusStream
77///
78/// # Example
79///
80/// ```ignore
81/// let (ctx, wrap_fn) = create_bidir_stream::<StandardRequest, StandardResponse>(
82///     "interactive.wizard",
83///     vec!["interactive".into()],
84/// );
85/// let user_stream = activation.wizard(&ctx).await;
86/// let merged_stream = wrap_fn(user_stream);
87/// ```
88pub fn create_bidir_stream<Req, Resp>(
89    content_type: &'static str,
90    provenance: Vec<String>,
91) -> (
92    Arc<BidirChannel<Req, Resp>>,
93    impl FnOnce(Pin<Box<dyn Stream<Item = PlexusStreamItem> + Send>>) -> PlexusStream,
94)
95where
96    Req: Serialize + DeserializeOwned + Send + Sync + 'static,
97    Resp: Serialize + DeserializeOwned + Send + Sync + 'static,
98{
99    let plexus_hash = PlexusContext::hash();
100
101    // Create channel for BidirChannel to send Request items
102    let (bidir_tx, bidir_rx) = mpsc::channel::<PlexusStreamItem>(32);
103
104    // Create the BidirChannel with:
105    // - bidirectional_supported = true (we support it)
106    // - use_global_registry = true (responses come via substrate.respond)
107    let bidir_channel = Arc::new(BidirChannel::<Req, Resp>::new(
108        bidir_tx,
109        true,  // bidirectional_supported
110        provenance.clone(),
111        plexus_hash.clone(),
112    ));
113
114    // Create the wrapper closure
115    let wrap_fn = move |user_stream: Pin<Box<dyn Stream<Item = PlexusStreamItem> + Send>>| -> PlexusStream {
116        let bidir_stream = ReceiverStream::new(bidir_rx);
117
118        // Use stream::select to interleave items from both streams
119        // This allows Request items to appear in the stream alongside Data items
120        let merged = stream::select(user_stream, bidir_stream);
121
122        Box::pin(merged)
123    };
124
125    (bidir_channel, wrap_fn)
126}
127
128/// Wrap a typed stream with bidirectional support
129///
130/// Convenience wrapper that creates a BidirChannel and wraps the stream in one call.
131/// The channel is returned for use by the activation.
132///
133/// # Type Parameters
134///
135/// * `T` - The type of items in the user's stream
136/// * `Req` - Request type for bidirectional channel
137/// * `Resp` - Response type for bidirectional channel
138///
139/// # Example
140///
141/// ```ignore
142/// use plexus_core::plexus::{StandardRequest, StandardResponse, wrap_stream_with_bidir};
143///
144/// let (ctx, stream) = wrap_stream_with_bidir::<_, StandardRequest, StandardResponse>(
145///     user_stream,
146///     "interactive.wizard",
147///     vec!["interactive".into()],
148/// );
149/// // ctx can now be used for bidirectional requests
150/// // stream includes both data items and any Request items
151/// ```
152pub fn wrap_stream_with_bidir<T, Req, Resp>(
153    stream: impl Stream<Item = T> + Send + 'static,
154    content_type: &'static str,
155    provenance: Vec<String>,
156) -> (Arc<BidirChannel<Req, Resp>>, PlexusStream)
157where
158    T: Serialize + Send + 'static,
159    Req: Serialize + DeserializeOwned + Send + Sync + 'static,
160    Resp: Serialize + DeserializeOwned + Send + Sync + 'static,
161{
162    let (ctx, wrap_fn) = create_bidir_stream::<Req, Resp>(content_type, provenance.clone());
163
164    // Wrap the user's typed stream into PlexusStreamItems
165    let wrapped_user_stream = wrap_stream(stream, content_type, provenance);
166
167    // Merge with bidir stream
168    let merged = wrap_fn(wrapped_user_stream);
169
170    (ctx, merged)
171}
172
173/// Create an error stream
174///
175/// Returns a single-item stream containing an error event.
176pub fn error_stream(
177    message: String,
178    provenance: Vec<String>,
179    recoverable: bool,
180) -> PlexusStream {
181    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
182
183    Box::pin(stream::once(async move {
184        PlexusStreamItem::Error {
185            metadata,
186            message,
187            code: None,
188            recoverable,
189        }
190    }))
191}
192
193/// Create an error stream with error code
194///
195/// Returns a single-item stream containing an error event with a code.
196pub fn error_stream_with_code(
197    message: String,
198    code: String,
199    provenance: Vec<String>,
200    recoverable: bool,
201) -> PlexusStream {
202    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
203
204    Box::pin(stream::once(async move {
205        PlexusStreamItem::Error {
206            metadata,
207            message,
208            code: Some(code),
209            recoverable,
210        }
211    }))
212}
213
214/// Create a done stream
215///
216/// Returns a single-item stream containing a done event.
217pub fn done_stream(provenance: Vec<String>) -> PlexusStream {
218    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
219
220    Box::pin(stream::once(async move {
221        PlexusStreamItem::Done { metadata }
222    }))
223}
224
225/// Create a progress stream
226///
227/// Returns a single-item stream containing a progress event.
228pub fn progress_stream(
229    message: String,
230    percentage: Option<f32>,
231    provenance: Vec<String>,
232) -> PlexusStream {
233    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
234
235    Box::pin(stream::once(async move {
236        PlexusStreamItem::Progress {
237            metadata,
238            message,
239            percentage,
240        }
241    }))
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use futures::StreamExt;
248    use serde::{Deserialize, Serialize};
249
250    #[derive(Debug, Clone, Serialize, Deserialize)]
251    struct TestEvent {
252        value: i32,
253    }
254
255    #[tokio::test]
256    async fn test_wrap_stream() {
257        let events = vec![TestEvent { value: 1 }, TestEvent { value: 2 }];
258        let input_stream = stream::iter(events);
259
260        let wrapped = wrap_stream(input_stream, "test.event", vec!["test".into()]);
261        let items: Vec<_> = wrapped.collect().await;
262
263        // 2 data items + 1 done
264        assert_eq!(items.len(), 3);
265
266        // Check first item
267        match &items[0] {
268            PlexusStreamItem::Data {
269                content_type,
270                content,
271                metadata,
272            } => {
273                assert_eq!(content_type, "test.event");
274                assert_eq!(content["value"], 1);
275                assert_eq!(metadata.provenance, vec!["test"]);
276            }
277            _ => panic!("Expected Data item"),
278        }
279
280        // Check done at end
281        assert!(matches!(items[2], PlexusStreamItem::Done { .. }));
282    }
283
284
285    #[tokio::test]
286    async fn test_error_stream() {
287        let stream = error_stream("Something failed".into(), vec!["test".into()], false);
288        let items: Vec<_> = stream.collect().await;
289
290        assert_eq!(items.len(), 1);
291        match &items[0] {
292            PlexusStreamItem::Error {
293                message,
294                recoverable,
295                code,
296                ..
297            } => {
298                assert_eq!(message, "Something failed");
299                assert!(!recoverable);
300                assert!(code.is_none());
301            }
302            _ => panic!("Expected Error item"),
303        }
304    }
305
306    #[tokio::test]
307    async fn test_error_stream_with_code() {
308        let stream = error_stream_with_code(
309            "Not found".into(),
310            "NOT_FOUND".into(),
311            vec!["test".into()],
312            true,
313        );
314        let items: Vec<_> = stream.collect().await;
315
316        assert_eq!(items.len(), 1);
317        match &items[0] {
318            PlexusStreamItem::Error {
319                message,
320                code,
321                recoverable,
322                ..
323            } => {
324                assert_eq!(message, "Not found");
325                assert_eq!(code.as_deref(), Some("NOT_FOUND"));
326                assert!(recoverable);
327            }
328            _ => panic!("Expected Error item"),
329        }
330    }
331}