streamweave_vec/consumers/
input.rs1use super::vec_consumer::VecConsumer;
2use futures::Stream;
3use std::pin::Pin;
4use streamweave::Input;
5
6impl<T> Input for VecConsumer<T>
7where
8 T: std::fmt::Debug + Clone + Send + Sync + 'static,
9{
10 type Input = T;
11 type InputStream = Pin<Box<dyn Stream<Item = T> + Send>>;
12}
13
14#[cfg(test)]
15mod tests {
16 use super::*;
17 use futures::{StreamExt, stream};
18 use proptest::prelude::*;
19 use proptest::proptest;
20 use std::pin::Pin;
21
22 proptest! {
23 #[test]
24 fn test_vec_consumer_input_trait_implementation(_value in any::<i32>()) {
25 fn assert_input_trait<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
27 _consumer: VecConsumer<T>,
28 ) where
29 VecConsumer<T>: Input<Input = T>,
30 {
31 }
33
34 let consumer = VecConsumer::<i32>::new();
35 assert_input_trait(consumer);
36 }
37
38 #[test]
39 fn test_vec_consumer_input_type_constraints(_value in any::<i32>()) {
40 fn get_input_type<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
42 _consumer: VecConsumer<T>,
43 ) -> std::marker::PhantomData<T>
44 where
45 VecConsumer<T>: Input<Input = T>,
46 {
47 std::marker::PhantomData
48 }
49
50 let consumer = VecConsumer::<i32>::new();
51 let _phantom = get_input_type(consumer);
52 }
54
55 #[test]
56 fn test_vec_consumer_input_stream_type(_value in any::<i32>()) {
57 fn create_input_stream<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
59 _consumer: VecConsumer<T>,
60 ) -> Pin<Box<dyn Stream<Item = T> + Send>>
61 where
62 VecConsumer<T>: Input<Input = T, InputStream = Pin<Box<dyn Stream<Item = T> + Send>>>,
63 {
64 Box::pin(stream::empty())
65 }
66
67 let consumer = VecConsumer::<i32>::new();
68 let _stream = create_input_stream(consumer);
69 }
71
72 #[test]
73 fn test_vec_consumer_input_stream_send_bound(data in prop::collection::vec(any::<i32>(), 0..20)) {
74 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
75 rt.block_on(test_vec_consumer_input_stream_send_bound_async(data));
76 }
77
78 #[test]
79 fn test_vec_consumer_input_trait_bounds(_ in prop::num::u8::ANY) {
80 fn test_trait_bounds<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
82 consumer: VecConsumer<T>,
83 ) where
84 VecConsumer<T>: Input,
85 {
86 let _consumer = consumer;
88 }
89
90 test_trait_bounds(VecConsumer::<i32>::new());
92 test_trait_bounds(VecConsumer::<String>::new());
93 test_trait_bounds(VecConsumer::<i32>::new());
94 }
95
96 #[test]
97 fn test_vec_consumer_input_static_lifetime(_ in prop::num::u8::ANY) {
98 fn test_static_lifetime<T: std::fmt::Debug + Clone + Send + Sync + 'static>(
100 consumer: VecConsumer<T>,
101 ) where
102 VecConsumer<T>: Input,
103 {
104 let _consumer = consumer;
106 }
107
108 test_static_lifetime(VecConsumer::<i32>::new());
110 test_static_lifetime(VecConsumer::<String>::new());
111 }
112
113 #[test]
114 fn test_vec_consumer_input_stream_compatibility(data in prop::collection::vec(any::<i32>(), 0..20)) {
115 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
116 rt.block_on(test_vec_consumer_input_stream_compatibility_async(data));
117 }
118
119 #[test]
120 fn test_vec_consumer_input_trait_object_safety(_ in prop::num::u8::ANY) {
121 fn process_input<I: Input>(_input: I)
123 where
124 I::Input: std::fmt::Debug,
125 {
126 }
128
129 process_input(VecConsumer::<i32>::new());
131 process_input(VecConsumer::<String>::new());
132 }
133 }
134
135 async fn test_vec_consumer_input_stream_send_bound_async(data: Vec<i32>) {
136 let _consumer = VecConsumer::<i32>::new();
138
139 let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(data.clone()));
141
142 let handle = tokio::spawn(async move {
144 let result: Vec<i32> = stream.collect().await;
145 assert_eq!(result, data);
146 });
147
148 handle.await.unwrap();
149 }
150
151 async fn test_vec_consumer_input_stream_compatibility_async(data: Vec<i32>) {
152 let _consumer = VecConsumer::<i32>::new();
154
155 let stream: Pin<Box<dyn Stream<Item = i32> + Send>> = Box::pin(stream::iter(data.clone()));
157
158 let result: Vec<i32> = stream.collect().await;
160 assert_eq!(result, data);
161 }
162}