streamweave/
output.rs

1use futures::Stream;
2
3/// Trait for components that can produce output streams.
4///
5/// This trait defines the interface for components that generate data streams.
6/// It is implemented by producers and transformers that output data.
7pub trait Output {
8  /// The type of items produced by this output stream.
9  type Output;
10  /// The output stream type that yields items of type `Self::Output`.
11  type OutputStream: Stream<Item = Self::Output> + Send;
12}
13
14#[cfg(test)]
15mod tests {
16  use super::*;
17  use futures::{StreamExt, stream};
18  use std::pin::Pin;
19
20  // Test implementation for numeric output
21  struct NumberOutput {
22    data: Vec<i32>,
23  }
24
25  impl Output for NumberOutput {
26    type Output = i32;
27    type OutputStream = Pin<Box<dyn Stream<Item = i32> + Send>>;
28  }
29
30  // Test implementation for string output
31  struct TextOutput {
32    data: String,
33  }
34
35  impl Output for TextOutput {
36    type Output = String;
37    type OutputStream = Pin<Box<dyn Stream<Item = String> + Send>>;
38  }
39
40  #[tokio::test]
41  async fn test_number_output_stream() {
42    let output = NumberOutput {
43      data: vec![1, 2, 3],
44    };
45
46    // Verify type constraints are met
47    let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(output.data));
48
49    // Test stream functionality
50    let sum = stream.fold(0, |acc, x| async move { acc + x }).await;
51    assert_eq!(sum, 6);
52  }
53
54  #[tokio::test]
55  async fn test_text_output_stream() {
56    let output = TextOutput {
57      data: "hello".to_string(),
58    };
59
60    // Verify type constraints are met
61    let stream: Pin<Box<dyn Stream<Item = String> + Send>> =
62      Box::pin(stream::iter(output.data.chars().map(|c| c.to_string())));
63
64    // Test stream functionality
65    let result = stream.collect::<String>().await;
66    assert_eq!(result, "hello");
67  }
68
69  #[tokio::test]
70  async fn test_output_stream_send() {
71    let output = NumberOutput {
72      data: vec![1, 2, 3],
73    };
74
75    let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(output.data));
76
77    // Verify we can send the stream between threads
78    let handle = tokio::spawn(async move {
79      let sum = stream.fold(0, |acc, x| async move { acc + x }).await;
80      assert_eq!(sum, 6);
81    });
82
83    handle.await.unwrap();
84  }
85
86  #[test]
87  fn test_output_trait_bounds() {
88    fn takes_output<T: Output>(_: T) {}
89
90    takes_output(NumberOutput { data: vec![] });
91    takes_output(TextOutput {
92      data: String::new(),
93    });
94  }
95
96  #[test]
97  fn test_output_static_bounds() {
98    fn static_output_fn() -> impl Output {
99      NumberOutput { data: vec![] }
100    }
101
102    let _output = static_output_fn();
103  }
104
105  #[tokio::test]
106  async fn test_output_stream_composition() {
107    let output1 = NumberOutput {
108      data: vec![1, 2, 3],
109    };
110    let output2 = NumberOutput {
111      data: vec![4, 5, 6],
112    };
113
114    // Test that we can compose output streams
115    let stream1: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(output1.data));
116    let stream2: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(output2.data));
117
118    let combined = Box::pin(stream::select(stream1, stream2));
119    let sum = combined.fold(0, |acc, x| async move { acc + x }).await;
120    assert_eq!(sum, 21); // 1 + 2 + 3 + 4 + 5 + 6
121  }
122}