Skip to main content

plexus_core/plexus/
streaming.rs

1//! Streaming helpers for the caller-wraps architecture
2//!
3//! These functions are used by the DynamicHub routing layer to wrap activation
4//! responses with metadata. Activations return typed domain events, and
5//! the caller uses these helpers to create PlexusStreamItems.
6
7use futures::stream::{self, Stream, StreamExt};
8use serde::Serialize;
9use std::pin::Pin;
10
11use super::context::PlexusContext;
12use super::types::{PlexusStreamItem, StreamMetadata};
13
14/// Type alias for boxed stream of PlexusStreamItem
15pub type PlexusStream = Pin<Box<dyn Stream<Item = PlexusStreamItem> + Send>>;
16
17/// Wrap a typed stream into PlexusStream with automatic Done event
18///
19/// This is the core helper for the caller-wraps architecture.
20/// Activations return typed domain events (e.g., HealthEvent),
21/// and the caller wraps them with metadata. A Done event is
22/// automatically appended when the stream completes.
23///
24/// # Example
25///
26/// ```ignore
27/// let stream = health.check();  // Returns Stream<Item = HealthEvent>
28/// let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
29/// // Stream will emit: Data, Data, ..., Done
30/// ```
31pub fn wrap_stream<T: Serialize + Send + 'static>(
32    stream: impl Stream<Item = T> + Send + 'static,
33    content_type: &'static str,
34    provenance: Vec<String>,
35) -> PlexusStream {
36    let plexus_hash = PlexusContext::hash();
37    let metadata = StreamMetadata::new(provenance.clone(), plexus_hash.clone());
38    let done_metadata = StreamMetadata::new(provenance, plexus_hash);
39
40    let data_stream = stream.map(move |item| PlexusStreamItem::Data {
41        metadata: metadata.clone(),
42        content_type: content_type.to_string(),
43        content: serde_json::to_value(item).expect("serialization failed"),
44    });
45
46    let done_stream = stream::once(async move { PlexusStreamItem::Done {
47        metadata: done_metadata,
48    }});
49
50    Box::pin(data_stream.chain(done_stream))
51}
52
53
54/// Create an error stream
55///
56/// Returns a single-item stream containing an error event.
57pub fn error_stream(
58    message: String,
59    provenance: Vec<String>,
60    recoverable: bool,
61) -> PlexusStream {
62    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
63
64    Box::pin(stream::once(async move {
65        PlexusStreamItem::Error {
66            metadata,
67            message,
68            code: None,
69            recoverable,
70        }
71    }))
72}
73
74/// Create an error stream with error code
75///
76/// Returns a single-item stream containing an error event with a code.
77pub fn error_stream_with_code(
78    message: String,
79    code: String,
80    provenance: Vec<String>,
81    recoverable: bool,
82) -> PlexusStream {
83    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
84
85    Box::pin(stream::once(async move {
86        PlexusStreamItem::Error {
87            metadata,
88            message,
89            code: Some(code),
90            recoverable,
91        }
92    }))
93}
94
95/// Create a done stream
96///
97/// Returns a single-item stream containing a done event.
98pub fn done_stream(provenance: Vec<String>) -> PlexusStream {
99    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
100
101    Box::pin(stream::once(async move {
102        PlexusStreamItem::Done { metadata }
103    }))
104}
105
106/// Create a progress stream
107///
108/// Returns a single-item stream containing a progress event.
109pub fn progress_stream(
110    message: String,
111    percentage: Option<f32>,
112    provenance: Vec<String>,
113) -> PlexusStream {
114    let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
115
116    Box::pin(stream::once(async move {
117        PlexusStreamItem::Progress {
118            metadata,
119            message,
120            percentage,
121        }
122    }))
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use futures::StreamExt;
129    use serde::{Deserialize, Serialize};
130
131    #[derive(Debug, Clone, Serialize, Deserialize)]
132    struct TestEvent {
133        value: i32,
134    }
135
136    #[tokio::test]
137    async fn test_wrap_stream() {
138        let events = vec![TestEvent { value: 1 }, TestEvent { value: 2 }];
139        let input_stream = stream::iter(events);
140
141        let wrapped = wrap_stream(input_stream, "test.event", vec!["test".into()]);
142        let items: Vec<_> = wrapped.collect().await;
143
144        // 2 data items + 1 done
145        assert_eq!(items.len(), 3);
146
147        // Check first item
148        match &items[0] {
149            PlexusStreamItem::Data {
150                content_type,
151                content,
152                metadata,
153            } => {
154                assert_eq!(content_type, "test.event");
155                assert_eq!(content["value"], 1);
156                assert_eq!(metadata.provenance, vec!["test"]);
157            }
158            _ => panic!("Expected Data item"),
159        }
160
161        // Check done at end
162        assert!(matches!(items[2], PlexusStreamItem::Done { .. }));
163    }
164
165
166    #[tokio::test]
167    async fn test_error_stream() {
168        let stream = error_stream("Something failed".into(), vec!["test".into()], false);
169        let items: Vec<_> = stream.collect().await;
170
171        assert_eq!(items.len(), 1);
172        match &items[0] {
173            PlexusStreamItem::Error {
174                message,
175                recoverable,
176                code,
177                ..
178            } => {
179                assert_eq!(message, "Something failed");
180                assert!(!recoverable);
181                assert!(code.is_none());
182            }
183            _ => panic!("Expected Error item"),
184        }
185    }
186
187    #[tokio::test]
188    async fn test_error_stream_with_code() {
189        let stream = error_stream_with_code(
190            "Not found".into(),
191            "NOT_FOUND".into(),
192            vec!["test".into()],
193            true,
194        );
195        let items: Vec<_> = stream.collect().await;
196
197        assert_eq!(items.len(), 1);
198        match &items[0] {
199            PlexusStreamItem::Error {
200                message,
201                code,
202                recoverable,
203                ..
204            } => {
205                assert_eq!(message, "Not found");
206                assert_eq!(code.as_deref(), Some("NOT_FOUND"));
207                assert!(recoverable);
208            }
209            _ => panic!("Expected Error item"),
210        }
211    }
212}