plexus_core/plexus/
streaming.rs1use futures::stream::{self, Stream, StreamExt};
8use serde::Serialize;
9use std::pin::Pin;
10
11use super::context::PlexusContext;
12use super::types::{PlexusStreamItem, StreamMetadata};
13
14pub type PlexusStream = Pin<Box<dyn Stream<Item = PlexusStreamItem> + Send>>;
16
17pub fn wrap_stream<T: Serialize + Send + 'static>(
32 stream: impl Stream<Item = T> + Send + 'static,
33 content_type: &'static str,
34 provenance: Vec<String>,
35) -> PlexusStream {
36 let plexus_hash = PlexusContext::hash();
37 let metadata = StreamMetadata::new(provenance.clone(), plexus_hash.clone());
38 let done_metadata = StreamMetadata::new(provenance, plexus_hash);
39
40 let data_stream = stream.map(move |item| PlexusStreamItem::Data {
41 metadata: metadata.clone(),
42 content_type: content_type.to_string(),
43 content: serde_json::to_value(item).expect("serialization failed"),
44 });
45
46 let done_stream = stream::once(async move { PlexusStreamItem::Done {
47 metadata: done_metadata,
48 }});
49
50 Box::pin(data_stream.chain(done_stream))
51}
52
53
54pub fn error_stream(
58 message: String,
59 provenance: Vec<String>,
60 recoverable: bool,
61) -> PlexusStream {
62 let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
63
64 Box::pin(stream::once(async move {
65 PlexusStreamItem::Error {
66 metadata,
67 message,
68 code: None,
69 recoverable,
70 }
71 }))
72}
73
74pub fn error_stream_with_code(
78 message: String,
79 code: String,
80 provenance: Vec<String>,
81 recoverable: bool,
82) -> PlexusStream {
83 let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
84
85 Box::pin(stream::once(async move {
86 PlexusStreamItem::Error {
87 metadata,
88 message,
89 code: Some(code),
90 recoverable,
91 }
92 }))
93}
94
95pub fn done_stream(provenance: Vec<String>) -> PlexusStream {
99 let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
100
101 Box::pin(stream::once(async move {
102 PlexusStreamItem::Done { metadata }
103 }))
104}
105
106pub fn progress_stream(
110 message: String,
111 percentage: Option<f32>,
112 provenance: Vec<String>,
113) -> PlexusStream {
114 let metadata = StreamMetadata::new(provenance, PlexusContext::hash());
115
116 Box::pin(stream::once(async move {
117 PlexusStreamItem::Progress {
118 metadata,
119 message,
120 percentage,
121 }
122 }))
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use futures::StreamExt;
129 use serde::{Deserialize, Serialize};
130
131 #[derive(Debug, Clone, Serialize, Deserialize)]
132 struct TestEvent {
133 value: i32,
134 }
135
136 #[tokio::test]
137 async fn test_wrap_stream() {
138 let events = vec![TestEvent { value: 1 }, TestEvent { value: 2 }];
139 let input_stream = stream::iter(events);
140
141 let wrapped = wrap_stream(input_stream, "test.event", vec!["test".into()]);
142 let items: Vec<_> = wrapped.collect().await;
143
144 assert_eq!(items.len(), 3);
146
147 match &items[0] {
149 PlexusStreamItem::Data {
150 content_type,
151 content,
152 metadata,
153 } => {
154 assert_eq!(content_type, "test.event");
155 assert_eq!(content["value"], 1);
156 assert_eq!(metadata.provenance, vec!["test"]);
157 }
158 _ => panic!("Expected Data item"),
159 }
160
161 assert!(matches!(items[2], PlexusStreamItem::Done { .. }));
163 }
164
165
166 #[tokio::test]
167 async fn test_error_stream() {
168 let stream = error_stream("Something failed".into(), vec!["test".into()], false);
169 let items: Vec<_> = stream.collect().await;
170
171 assert_eq!(items.len(), 1);
172 match &items[0] {
173 PlexusStreamItem::Error {
174 message,
175 recoverable,
176 code,
177 ..
178 } => {
179 assert_eq!(message, "Something failed");
180 assert!(!recoverable);
181 assert!(code.is_none());
182 }
183 _ => panic!("Expected Error item"),
184 }
185 }
186
187 #[tokio::test]
188 async fn test_error_stream_with_code() {
189 let stream = error_stream_with_code(
190 "Not found".into(),
191 "NOT_FOUND".into(),
192 vec!["test".into()],
193 true,
194 );
195 let items: Vec<_> = stream.collect().await;
196
197 assert_eq!(items.len(), 1);
198 match &items[0] {
199 PlexusStreamItem::Error {
200 message,
201 code,
202 recoverable,
203 ..
204 } => {
205 assert_eq!(message, "Not found");
206 assert_eq!(code.as_deref(), Some("NOT_FOUND"));
207 assert!(recoverable);
208 }
209 _ => panic!("Expected Error item"),
210 }
211 }
212}