zrx_stream/stream/combinator/set.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Stream set.
27
28use std::vec::IntoIter;
29
30use crate::stream::Stream;
31
32mod convert;
33mod ext;
34
35pub use convert::IntoStreamSet;
36pub use ext::StreamSetExt;
37
38// ----------------------------------------------------------------------------
39// Structs
40// ----------------------------------------------------------------------------
41
42/// Stream set.
43///
44/// Stream sets are homogeneous collections of streams, implementing common set
45/// operations like union, intersection, and difference. They are represented
46/// as a vector of streams, in order to preserve the ordering in which they
47/// were added, which is crucial for operators like [`Stream::coalesce`].
48///
49/// Operators implemented with stream sets include:
50///
51/// - [`Stream::union`]
52/// - [`Stream::intersection`]
53/// - [`Stream::difference`]
54/// - [`Stream::coalesce`]
55///
56/// Note that stream sets implement set operations themselves, including union,
57/// intersection, and difference, which means they can be combined with other
58/// stream sets to implement combinatorical structural operators. However, the
59/// union set operation is likely the most common use case.
60#[derive(Clone, Debug)]
61pub struct StreamSet<I, T> {
62 /// Vector of streams.
63 inner: Vec<Stream<I, T>>,
64}
65
66// ----------------------------------------------------------------------------
67// Structs
68// ----------------------------------------------------------------------------
69
70impl<I, T> StreamSet<I, T> {
71 /// Creates the union of two stream sets.
72 ///
73 /// # Examples
74 ///
75 /// ```
76 /// use zrx_stream::combinator::IntoStreamSet;
77 /// use zrx_stream::workspace::Workspace;
78 ///
79 /// // Create workspace and workflow
80 /// let workspace = Workspace::<&str>::new();
81 /// let workflow = workspace.add_workflow();
82 ///
83 /// // Create streams (homogeneous)
84 /// let a = workflow.add_source::<i32>();
85 /// let b = workflow.add_source::<i32>();
86 /// let c = workflow.add_source::<i32>();
87 ///
88 /// // Create union of stream sets
89 /// let set = [&a, &b].into_stream_set().union([&b, &c]);
90 /// assert_eq!(set.len(), 3);
91 /// ```
92 #[must_use]
93 pub fn union<S>(mut self, streams: S) -> Self
94 where
95 S: IntoStreamSet<I, T>,
96 {
97 let streams = streams.into_stream_set();
98 for stream in streams {
99 if !self.inner.contains(&stream) {
100 self.inner.push(stream);
101 }
102 }
103 self
104 }
105
106 /// Creates the intersection of two stream sets.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use zrx_stream::combinator::IntoStreamSet;
112 /// use zrx_stream::workspace::Workspace;
113 ///
114 /// // Create workspace and workflow
115 /// let workspace = Workspace::<&str>::new();
116 /// let workflow = workspace.add_workflow();
117 ///
118 /// // Create streams (homogeneous)
119 /// let a = workflow.add_source::<i32>();
120 /// let b = workflow.add_source::<i32>();
121 /// let c = workflow.add_source::<i32>();
122 ///
123 /// // Create intersection of stream sets
124 /// let set = [&a, &b].into_stream_set().intersection([&b, &c]);
125 /// assert_eq!(set.len(), 1);
126 /// ```
127 #[must_use]
128 pub fn intersection<S>(self, streams: S) -> Self
129 where
130 S: IntoStreamSet<I, T>,
131 {
132 let streams = streams.into_stream_set();
133 self.into_iter()
134 .filter(|stream| streams.contains(stream))
135 .collect()
136 }
137
138 /// Creates the difference of two stream sets.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use zrx_stream::combinator::IntoStreamSet;
144 /// use zrx_stream::workspace::Workspace;
145 ///
146 /// // Create workspace and workflow
147 /// let workspace = Workspace::<&str>::new();
148 /// let workflow = workspace.add_workflow();
149 ///
150 /// // Create streams (homogeneous)
151 /// let a = workflow.add_source::<i32>();
152 /// let b = workflow.add_source::<i32>();
153 /// let c = workflow.add_source::<i32>();
154 ///
155 /// // Create difference of stream sets
156 /// let set = [&a, &b].into_stream_set().difference([&b, &c]);
157 /// assert_eq!(set.len(), 1);
158 /// ```
159 #[must_use]
160 pub fn difference<S>(self, streams: S) -> Self
161 where
162 S: IntoStreamSet<I, T>,
163 {
164 let streams = streams.into_stream_set();
165 self.into_iter()
166 .filter(|stream| !streams.contains(stream))
167 .collect()
168 }
169
170 /// Returns whether the stream set is a subset.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// use zrx_stream::combinator::IntoStreamSet;
176 /// use zrx_stream::workspace::Workspace;
177 ///
178 /// // Create workspace and workflow
179 /// let workspace = Workspace::<&str>::new();
180 /// let workflow = workspace.add_workflow();
181 ///
182 /// // Create streams (homogeneous)
183 /// let a = workflow.add_source::<i32>();
184 /// let b = workflow.add_source::<i32>();
185 ///
186 /// // Create stream set and check for subset
187 /// let set = a.into_stream_set();
188 /// assert!(set.is_subset([&a, &b]));
189 /// ```
190 pub fn is_subset<S>(&self, streams: S) -> bool
191 where
192 S: IntoStreamSet<I, T>,
193 {
194 let streams = streams.into_stream_set();
195 let mut iter = self.inner.iter();
196 iter.all(|stream| streams.contains(stream))
197 }
198
199 /// Returns whether the stream set is a superset.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use zrx_stream::combinator::IntoStreamSet;
205 /// use zrx_stream::workspace::Workspace;
206 ///
207 /// // Create workspace and workflow
208 /// let workspace = Workspace::<&str>::new();
209 /// let workflow = workspace.add_workflow();
210 ///
211 /// // Create streams (homogeneous)
212 /// let a = workflow.add_source::<i32>();
213 /// let b = workflow.add_source::<i32>();
214 ///
215 /// // Create stream set and check for superset
216 /// let set = [&a, &b].into_stream_set();
217 /// assert!(set.is_superset(&a));
218 /// ```
219 pub fn is_superset<S>(&self, streams: S) -> bool
220 where
221 S: IntoStreamSet<I, T>,
222 {
223 let streams = streams.into_stream_set();
224 let mut iter = streams.into_iter();
225 iter.all(|stream| self.contains(&stream))
226 }
227
228 /// Returns a reference to the stream at the given index.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use zrx_stream::combinator::IntoStreamSet;
234 /// use zrx_stream::workspace::Workspace;
235 ///
236 /// // Create workspace and workflow
237 /// let workspace = Workspace::<&str>::new();
238 /// let workflow = workspace.add_workflow();
239 ///
240 /// // Create streams (homogeneous)
241 /// let a = workflow.add_source::<i32>();
242 /// let b = workflow.add_source::<i32>();
243 ///
244 /// // Create stream set and obtain stream reference
245 /// let set = [&a, &b].into_stream_set();
246 /// assert_eq!(set.get(0), Some(&a));
247 /// ```
248 #[inline]
249 #[must_use]
250 pub fn get(&self, index: usize) -> Option<&Stream<I, T>> {
251 self.inner.get(index)
252 }
253
254 /// Returns whether the stream set contains the given stream.
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use zrx_stream::combinator::IntoStreamSet;
260 /// use zrx_stream::workspace::Workspace;
261 ///
262 /// // Create workspace and workflow
263 /// let workspace = Workspace::<&str>::new();
264 /// let workflow = workspace.add_workflow();
265 ///
266 /// // Create streams (homogeneous)
267 /// let a = workflow.add_source::<i32>();
268 /// let b = workflow.add_source::<i32>();
269 ///
270 /// // Create stream set and ensure presence of stream
271 /// let set = [&a, &b].into_stream_set();
272 /// assert!(set.contains(&a));
273 /// ```
274 #[inline]
275 #[must_use]
276 pub fn contains(&self, stream: &Stream<I, T>) -> bool {
277 self.inner.contains(stream)
278 }
279}
280
281#[allow(clippy::must_use_candidate)]
282impl<I, T> StreamSet<I, T> {
283 /// Returns the number of streams.
284 #[inline]
285 pub fn len(&self) -> usize {
286 self.inner.len()
287 }
288
289 /// Returns whether there are any streams.
290 #[inline]
291 pub fn is_empty(&self) -> bool {
292 self.inner.is_empty()
293 }
294}
295
296// ----------------------------------------------------------------------------
297// Trait implementations
298// ----------------------------------------------------------------------------
299
300impl<I, T> FromIterator<Stream<I, T>> for StreamSet<I, T> {
301 /// Creates a stream set from an iterator.
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// use zrx_stream::combinator::{IntoStreamSet, StreamSet};
307 /// use zrx_stream::workspace::Workspace;
308 ///
309 /// // Create workspace and workflow
310 /// let workspace = Workspace::<&str>::new();
311 /// let workflow = workspace.add_workflow();
312 ///
313 /// // Create streams (homogeneous)
314 /// let a = workflow.add_source::<i32>();
315 /// let b = workflow.add_source::<i32>();
316 ///
317 /// // Create stream set from iterator
318 /// let set = StreamSet::from_iter([a, b]);
319 /// ```
320 #[inline]
321 fn from_iter<S>(iter: S) -> Self
322 where
323 S: IntoIterator<Item = Stream<I, T>>,
324 {
325 Self {
326 inner: iter.into_iter().collect(),
327 }
328 }
329}
330
331impl<I, T> IntoIterator for StreamSet<I, T> {
332 type Item = Stream<I, T>;
333 type IntoIter = IntoIter<Self::Item>;
334
335 /// Creates an iterator over the stream set.
336 ///
337 /// # Examples
338 ///
339 /// ```
340 /// use zrx_stream::combinator::{IntoStreamSet, StreamSet};
341 /// use zrx_stream::workspace::Workspace;
342 ///
343 /// // Create workspace and workflow
344 /// let workspace = Workspace::<&str>::new();
345 /// let workflow = workspace.add_workflow();
346 ///
347 /// // Create streams (homogeneous)
348 /// let a = workflow.add_source::<i32>();
349 /// let b = workflow.add_source::<i32>();
350 ///
351 /// // Create and iterate over stream set
352 /// let set = StreamSet::from_iter([a, b]);
353 /// for stream in set {
354 /// println!("{stream:?}");
355 /// }
356 /// ```
357 #[inline]
358 fn into_iter(self) -> Self::IntoIter {
359 self.inner.into_iter()
360 }
361}