noir_compute/operator/source/
iterator.rs1use std::fmt::Display;
2
3use crate::block::{BlockStructure, OperatorKind, OperatorStructure, Replication};
4use crate::operator::source::Source;
5use crate::operator::{Operator, StreamElement};
6use crate::scheduler::ExecutionMetadata;
7use crate::Stream;
8
9#[derive(Derivative)]
13#[derivative(Debug)]
14pub struct IteratorSource<It>
15where
16 It: Iterator + Send + 'static,
17 It::Item: Send,
18{
19 #[derivative(Debug = "ignore")]
20 inner: It,
21 terminated: bool,
22}
23
24impl<It> Display for IteratorSource<It>
25where
26 It: Iterator + Send + 'static,
27 It::Item: Send,
28{
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "IteratorSource<{}>", std::any::type_name::<It::Item>())
31 }
32}
33
34impl<It> IteratorSource<It>
35where
36 It: Iterator + Send + 'static,
37 It::Item: Send,
38{
39 pub fn new(inner: It) -> Self {
56 Self {
57 inner,
58 terminated: false,
59 }
60 }
61}
62
63impl<It> Source for IteratorSource<It>
64where
65 It: Iterator + Send + 'static,
66 It::Item: Send,
67{
68 fn replication(&self) -> Replication {
69 Replication::One
70 }
71}
72
73impl<It> Operator for IteratorSource<It>
74where
75 It: Iterator + Send + 'static,
76 It::Item: Send,
77{
78 type Out = It::Item;
79
80 fn setup(&mut self, _metadata: &mut ExecutionMetadata) {}
81
82 fn next(&mut self) -> StreamElement<Self::Out> {
83 if self.terminated {
84 return StreamElement::Terminate;
85 }
86 match self.inner.next() {
88 Some(t) => StreamElement::Item(t),
89 None => {
90 self.terminated = true;
91 StreamElement::FlushAndRestart
92 }
93 }
94 }
95
96 fn structure(&self) -> BlockStructure {
97 let mut operator = OperatorStructure::new::<Self::Out, _>("IteratorSource");
98 operator.kind = OperatorKind::Source;
99 BlockStructure::default().add_operator(operator)
100 }
101}
102
103impl<It> Clone for IteratorSource<It>
104where
105 It: Iterator + Send + 'static,
106 It::Item: Send,
107{
108 fn clone(&self) -> Self {
109 panic!("IteratorSource cannot be cloned, replication should be 1");
111 }
112}
113
114impl crate::StreamContext {
115 pub fn stream_iter<It>(&self, iterator: It) -> Stream<IteratorSource<It>>
117 where
118 It: Iterator + Send + 'static,
119 It::Item: Send,
120 {
121 let source = IteratorSource::new(iterator);
122 self.stream(source)
123 }
124}