streamweave_vec/consumers/
input.rs

1use super::vec_consumer::VecConsumer;
2use futures::Stream;
3use std::pin::Pin;
4use streamweave::Input;
5
6impl<T> Input for VecConsumer<T>
7where
8  T: std::fmt::Debug + Clone + Send + Sync + 'static,
9{
10  type Input = T;
11  type InputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
12}
13
14#[cfg(test)]
15mod tests {
16  use super::*;
17  use futures::{StreamExt, stream};
18  use proptest::prelude::*;
19  use proptest::proptest;
20  use std::pin::Pin;
21
22  proptest! {
23    #[test]
24    fn test_vec_consumer_input_trait_implementation(_value in any::<i32>()) {
25      // Test that VecConsumer implements Input trait correctly
26      fn assert_input_trait<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
27        _consumer: VecConsumer<T>,
28      ) where
29        VecConsumer<T>: Input<Input = T>,
30      {
31        // This function compiles only if VecConsumer<T> implements Input<Input = T>
32      }
33
34      let consumer = VecConsumer::<i32>::new();
35      assert_input_trait(consumer);
36    }
37
38    #[test]
39    fn test_vec_consumer_input_type_constraints(_value in any::<i32>()) {
40      // Test that the Input type is correctly set to T
41      fn get_input_type<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
42        _consumer: VecConsumer<T>,
43      ) -> std::marker::PhantomData<T>
44      where
45        VecConsumer<T>: Input<Input = T>,
46      {
47        std::marker::PhantomData
48      }
49
50      let consumer = VecConsumer::<i32>::new();
51      let _phantom = get_input_type(consumer);
52      // This compiles only if the Input type is correctly set to T
53    }
54
55    #[test]
56    fn test_vec_consumer_input_stream_type(_value in any::<i32>()) {
57      // Test that the InputStream type is correctly constrained
58      fn create_input_stream<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
59        _consumer: VecConsumer<T>,
60      ) -> Pin<Box<dyn Stream<Item = T> + Send>>
61      where
62        VecConsumer<T>: Input<Input = T, InputStream = Pin<Box<dyn Stream<Item = T> + Send>>>,
63      {
64        Box::pin(stream::empty())
65      }
66
67      let consumer = VecConsumer::<i32>::new();
68      let _stream = create_input_stream(consumer);
69      // This compiles only if the InputStream type is correctly constrained
70    }
71
72    #[test]
73    fn test_vec_consumer_input_stream_send_bound(data in prop::collection::vec(any::<i32>(), 0..20)) {
74      let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
75      rt.block_on(test_vec_consumer_input_stream_send_bound_async(data));
76    }
77
78    #[test]
79    fn test_vec_consumer_input_trait_bounds(_ in prop::num::u8::ANY) {
80      // Test that the trait bounds are correctly applied
81      fn test_trait_bounds<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
82        consumer: VecConsumer<T>,
83      ) where
84        VecConsumer<T>: Input,
85      {
86        // Test that the consumer can be used as Input
87        let _consumer = consumer;
88      }
89
90      // Test with different types
91      test_trait_bounds(VecConsumer::<i32>::new());
92      test_trait_bounds(VecConsumer::<String>::new());
93      test_trait_bounds(VecConsumer::<i32>::new());
94    }
95
96    #[test]
97    fn test_vec_consumer_input_static_lifetime(_ in prop::num::u8::ANY) {
98      // Test that the 'static lifetime bound is correctly applied
99      fn test_static_lifetime<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
100        consumer: VecConsumer<T>,
101      ) where
102        VecConsumer<T>: Input,
103      {
104        // This function can only be called with types that have 'static lifetime
105        let _consumer = consumer;
106      }
107
108      // These should compile because they have 'static lifetime
109      test_static_lifetime(VecConsumer::<i32>::new());
110      test_static_lifetime(VecConsumer::<String>::new());
111    }
112
113    #[test]
114    fn test_vec_consumer_input_stream_compatibility(data in prop::collection::vec(any::<i32>(), 0..20)) {
115      let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
116      rt.block_on(test_vec_consumer_input_stream_compatibility_async(data));
117    }
118
119    #[test]
120    fn test_vec_consumer_input_trait_object_safety(_ in prop::num::u8::ANY) {
121      // Test that the Input trait can be used with trait objects
122      fn process_input<I: Input>(_input: I)
123      where
124        I::Input: std::fmt::Debug,
125      {
126        // This function can accept any type that implements Input
127      }
128
129      // Test with different VecConsumer instances
130      process_input(VecConsumer::<i32>::new());
131      process_input(VecConsumer::<String>::new());
132    }
133  }
134
135  async fn test_vec_consumer_input_stream_send_bound_async(data: Vec<i32>) {
136    // Test that the InputStream implements Send bound for async usage
137    let _consumer = VecConsumer::<i32>::new();
138
139    // Create a stream that matches the InputStream type
140    let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(data.clone()));
141
142    // Test that we can spawn this stream in a task (requires Send)
143    let handle = tokio::spawn(async move {
144      let result: Vec<i32> = stream.collect().await;
145      assert_eq!(result, data);
146    });
147
148    handle.await.unwrap();
149  }
150
151  async fn test_vec_consumer_input_stream_compatibility_async(data: Vec<i32>) {
152    // Test that streams can be created and used with the Input trait
153    let _consumer = VecConsumer::<i32>::new();
154
155    // Create a stream that matches the expected InputStream type
156    let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(data.clone()));
157
158    // Test that we can collect from the stream
159    let result: Vec<i32> = stream.collect().await;
160    assert_eq!(result, data);
161  }
162}