noir_compute/operator/source/
iterator.rs

1use std::fmt::Display;
2
3use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication};
4use crate::operator::source::Source;
5use crate::operator::{Operator, StreamElement};
6use crate::scheduler::ExecutionMetadata;
7use crate::Stream;
8
9/// Source that consumes an iterator and emits all its elements into the stream.
10///
11/// The iterator will be consumed **only from one replica**, therefore this source is not parallel.
12#[derive(Derivative)]
13#[derivative(Debug)]
14pub struct IteratorSource<It>
15where
16    It: Iterator + Send + 'static,
17    It::Item: Send,
18{
19    #[derivative(Debug = "ignore")]
20    inner: It,
21    terminated: bool,
22}
23
24impl<It> Display for IteratorSource<It>
25where
26    It: Iterator + Send + 'static,
27    It::Item: Send,
28{
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "IteratorSource<{}>", std::any::type_name::<It::Item>())
31    }
32}
33
34impl<It> IteratorSource<It>
35where
36    It: Iterator + Send + 'static,
37    It::Item: Send,
38{
39    /// Create a new source that reads the items from the iterator provided as input.
40    ///
41    /// **Note**: this source is **not parallel**, the iterator will be consumed only on a single
42    /// replica, on all the others no item will be read from the iterator. If you want to achieve
43    /// parallelism you need to add an operator that shuffles the data (e.g.
44    /// [`Stream::shuffle`](crate::Stream::shuffle)).
45    ///
46    /// ## Example
47    ///
48    /// ```
49    /// # use noir_compute::{StreamContext, RuntimeConfig};
50    /// # use noir_compute::operator::source::IteratorSource;
51    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
52    /// let source = IteratorSource::new((0..5));
53    /// let s = env.stream(source);
54    /// ```
55    pub fn new(inner: It) -> Self {
56        Self {
57            inner,
58            terminated: false,
59        }
60    }
61}
62
63impl<It> Source for IteratorSource<It>
64where
65    It: Iterator + Send + 'static,
66    It::Item: Send,
67{
68    fn replication(&self) -> Replication {
69        Replication::One
70    }
71}
72
73impl<It> Operator for IteratorSource<It>
74where
75    It: Iterator + Send + 'static,
76    It::Item: Send,
77{
78    type Out = It::Item;
79
80    fn setup(&mut self, _metadata: &mut ExecutionMetadata) {}
81
82    fn next(&mut self) -> StreamElement<Self::Out> {
83        if self.terminated {
84            return StreamElement::Terminate;
85        }
86        // TODO: with adaptive batching this does not work since it never emits FlushBatch messages
87        match self.inner.next() {
88            Some(t) => StreamElement::Item(t),
89            None => {
90                self.terminated = true;
91                StreamElement::FlushAndRestart
92            }
93        }
94    }
95
96    fn structure(&self) -> BlockStructure {
97        let mut operator = OperatorStructure::new::<Self::Out, _>("IteratorSource");
98        operator.kind = OperatorKind::Source;
99        BlockStructure::default().add_operator(operator)
100    }
101}
102
103impl<It> Clone for IteratorSource<It>
104where
105    It: Iterator + Send + 'static,
106    It::Item: Send,
107{
108    fn clone(&self) -> Self {
109        // Since this is a non-parallel source, we don't want the other replicas to emit any value
110        panic!("IteratorSource cannot be cloned, replication should be 1");
111    }
112}
113
114impl crate::StreamContext {
115    /// Convenience method, creates a `IteratorSource` and makes a stream using `StreamContext::stream`
116    pub fn stream_iter<It>(&self, iterator: It) -> Stream<IteratorSource<It>>
117    where
118        It: Iterator + Send + 'static,
119        It::Item: Send,
120    {
121        let source = IteratorSource::new(iterator);
122        self.stream(source)
123    }
124}