Skip to main content

zrx_stream/stream/combinator/
set.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Stream set.
27
28use std::vec::IntoIter;
29
30use crate::stream::Stream;
31
32mod convert;
33mod ext;
34
35pub use convert::IntoStreamSet;
36pub use ext::StreamSetExt;
37
38// ----------------------------------------------------------------------------
39// Structs
40// ----------------------------------------------------------------------------
41
42/// Stream set.
43///
44/// Stream sets are homogeneous collections of streams, implementing common set
45/// operations like union, intersection, and difference. They are represented
46/// as a vector of streams, in order to preserve the ordering in which they
47/// were added, which is crucial for operators like [`Stream::coalesce`].
48///
49/// Operators implemented with stream sets include:
50///
51/// - [`Stream::union`]
52/// - [`Stream::intersection`]
53/// - [`Stream::difference`]
54/// - [`Stream::coalesce`]
55///
56/// Note that stream sets implement set operations themselves, including union,
57/// intersection, and difference, which means they can be combined with other
58/// stream sets to implement combinatorical structural operators. However, the
59/// union set operation is likely the most common use case.
60#[derive(Clone, Debug)]
61pub struct StreamSet<I, T> {
62    /// Vector of streams.
63    inner: Vec<Stream<I, T>>,
64}
65
66// ----------------------------------------------------------------------------
67// Structs
68// ----------------------------------------------------------------------------
69
70impl<I, T> StreamSet<I, T> {
71    /// Creates the union of two stream sets.
72    ///
73    /// # Examples
74    ///
75    /// ```
76    /// use zrx_stream::combinator::IntoStreamSet;
77    /// use zrx_stream::workspace::Workspace;
78    ///
79    /// // Create workspace and workflow
80    /// let workspace = Workspace::<&str>::new();
81    /// let workflow = workspace.add_workflow();
82    ///
83    /// // Create streams (homogeneous)
84    /// let a = workflow.add_source::<i32>();
85    /// let b = workflow.add_source::<i32>();
86    /// let c = workflow.add_source::<i32>();
87    ///
88    /// // Create union of stream sets
89    /// let set = [&a, &b].into_stream_set().union([&b, &c]);
90    /// assert_eq!(set.len(), 3);
91    /// ```
92    #[must_use]
93    pub fn union<S>(mut self, streams: S) -> Self
94    where
95        S: IntoStreamSet<I, T>,
96    {
97        let streams = streams.into_stream_set();
98        for stream in streams {
99            if !self.inner.contains(&stream) {
100                self.inner.push(stream);
101            }
102        }
103        self
104    }
105
106    /// Creates the intersection of two stream sets.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use zrx_stream::combinator::IntoStreamSet;
112    /// use zrx_stream::workspace::Workspace;
113    ///
114    /// // Create workspace and workflow
115    /// let workspace = Workspace::<&str>::new();
116    /// let workflow = workspace.add_workflow();
117    ///
118    /// // Create streams (homogeneous)
119    /// let a = workflow.add_source::<i32>();
120    /// let b = workflow.add_source::<i32>();
121    /// let c = workflow.add_source::<i32>();
122    ///
123    /// // Create intersection of stream sets
124    /// let set = [&a, &b].into_stream_set().intersection([&b, &c]);
125    /// assert_eq!(set.len(), 1);
126    /// ```
127    #[must_use]
128    pub fn intersection<S>(self, streams: S) -> Self
129    where
130        S: IntoStreamSet<I, T>,
131    {
132        let streams = streams.into_stream_set();
133        self.into_iter()
134            .filter(|stream| streams.contains(stream))
135            .collect()
136    }
137
138    /// Creates the difference of two stream sets.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use zrx_stream::combinator::IntoStreamSet;
144    /// use zrx_stream::workspace::Workspace;
145    ///
146    /// // Create workspace and workflow
147    /// let workspace = Workspace::<&str>::new();
148    /// let workflow = workspace.add_workflow();
149    ///
150    /// // Create streams (homogeneous)
151    /// let a = workflow.add_source::<i32>();
152    /// let b = workflow.add_source::<i32>();
153    /// let c = workflow.add_source::<i32>();
154    ///
155    /// // Create difference of stream sets
156    /// let set = [&a, &b].into_stream_set().difference([&b, &c]);
157    /// assert_eq!(set.len(), 1);
158    /// ```
159    #[must_use]
160    pub fn difference<S>(self, streams: S) -> Self
161    where
162        S: IntoStreamSet<I, T>,
163    {
164        let streams = streams.into_stream_set();
165        self.into_iter()
166            .filter(|stream| !streams.contains(stream))
167            .collect()
168    }
169
170    /// Returns whether the stream set is a subset.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// use zrx_stream::combinator::IntoStreamSet;
176    /// use zrx_stream::workspace::Workspace;
177    ///
178    /// // Create workspace and workflow
179    /// let workspace = Workspace::<&str>::new();
180    /// let workflow = workspace.add_workflow();
181    ///
182    /// // Create streams (homogeneous)
183    /// let a = workflow.add_source::<i32>();
184    /// let b = workflow.add_source::<i32>();
185    ///
186    /// // Create stream set and check for subset
187    /// let set = a.into_stream_set();
188    /// assert!(set.is_subset([&a, &b]));
189    /// ```
190    pub fn is_subset<S>(&self, streams: S) -> bool
191    where
192        S: IntoStreamSet<I, T>,
193    {
194        let streams = streams.into_stream_set();
195        let mut iter = self.inner.iter();
196        iter.all(|stream| streams.contains(stream))
197    }
198
199    /// Returns whether the stream set is a superset.
200    ///
201    /// # Examples
202    ///
203    /// ```
204    /// use zrx_stream::combinator::IntoStreamSet;
205    /// use zrx_stream::workspace::Workspace;
206    ///
207    /// // Create workspace and workflow
208    /// let workspace = Workspace::<&str>::new();
209    /// let workflow = workspace.add_workflow();
210    ///
211    /// // Create streams (homogeneous)
212    /// let a = workflow.add_source::<i32>();
213    /// let b = workflow.add_source::<i32>();
214    ///
215    /// // Create stream set and check for superset
216    /// let set = [&a, &b].into_stream_set();
217    /// assert!(set.is_superset(&a));
218    /// ```
219    pub fn is_superset<S>(&self, streams: S) -> bool
220    where
221        S: IntoStreamSet<I, T>,
222    {
223        let streams = streams.into_stream_set();
224        let mut iter = streams.into_iter();
225        iter.all(|stream| self.contains(&stream))
226    }
227
228    /// Returns a reference to the stream at the given index.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use zrx_stream::combinator::IntoStreamSet;
234    /// use zrx_stream::workspace::Workspace;
235    ///
236    /// // Create workspace and workflow
237    /// let workspace = Workspace::<&str>::new();
238    /// let workflow = workspace.add_workflow();
239    ///
240    /// // Create streams (homogeneous)
241    /// let a = workflow.add_source::<i32>();
242    /// let b = workflow.add_source::<i32>();
243    ///
244    /// // Create stream set and obtain stream reference
245    /// let set = [&a, &b].into_stream_set();
246    /// assert_eq!(set.get(0), Some(&a));
247    /// ```
248    #[inline]
249    #[must_use]
250    pub fn get(&self, index: usize) -> Option<&Stream<I, T>> {
251        self.inner.get(index)
252    }
253
254    /// Returns whether the stream set contains the given stream.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// use zrx_stream::combinator::IntoStreamSet;
260    /// use zrx_stream::workspace::Workspace;
261    ///
262    /// // Create workspace and workflow
263    /// let workspace = Workspace::<&str>::new();
264    /// let workflow = workspace.add_workflow();
265    ///
266    /// // Create streams (homogeneous)
267    /// let a = workflow.add_source::<i32>();
268    /// let b = workflow.add_source::<i32>();
269    ///
270    /// // Create stream set and ensure presence of stream
271    /// let set = [&a, &b].into_stream_set();
272    /// assert!(set.contains(&a));
273    /// ```
274    #[inline]
275    #[must_use]
276    pub fn contains(&self, stream: &Stream<I, T>) -> bool {
277        self.inner.contains(stream)
278    }
279}
280
281#[allow(clippy::must_use_candidate)]
282impl<I, T> StreamSet<I, T> {
283    /// Returns the number of streams.
284    #[inline]
285    pub fn len(&self) -> usize {
286        self.inner.len()
287    }
288
289    /// Returns whether there are any streams.
290    #[inline]
291    pub fn is_empty(&self) -> bool {
292        self.inner.is_empty()
293    }
294}
295
296// ----------------------------------------------------------------------------
297// Trait implementations
298// ----------------------------------------------------------------------------
299
300impl<I, T> FromIterator<Stream<I, T>> for StreamSet<I, T> {
301    /// Creates a stream set from an iterator.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use zrx_stream::combinator::{IntoStreamSet, StreamSet};
307    /// use zrx_stream::workspace::Workspace;
308    ///
309    /// // Create workspace and workflow
310    /// let workspace = Workspace::<&str>::new();
311    /// let workflow = workspace.add_workflow();
312    ///
313    /// // Create streams (homogeneous)
314    /// let a = workflow.add_source::<i32>();
315    /// let b = workflow.add_source::<i32>();
316    ///
317    /// // Create stream set from iterator
318    /// let set = StreamSet::from_iter([a, b]);
319    /// ```
320    #[inline]
321    fn from_iter<S>(iter: S) -> Self
322    where
323        S: IntoIterator<Item = Stream<I, T>>,
324    {
325        Self {
326            inner: iter.into_iter().collect(),
327        }
328    }
329}
330
331impl<I, T> IntoIterator for StreamSet<I, T> {
332    type Item = Stream<I, T>;
333    type IntoIter = IntoIter<Self::Item>;
334
335    /// Creates an iterator over the stream set.
336    ///
337    /// # Examples
338    ///
339    /// ```
340    /// use zrx_stream::combinator::{IntoStreamSet, StreamSet};
341    /// use zrx_stream::workspace::Workspace;
342    ///
343    /// // Create workspace and workflow
344    /// let workspace = Workspace::<&str>::new();
345    /// let workflow = workspace.add_workflow();
346    ///
347    /// // Create streams (homogeneous)
348    /// let a = workflow.add_source::<i32>();
349    /// let b = workflow.add_source::<i32>();
350    ///
351    /// // Create and iterate over stream set
352    /// let set = StreamSet::from_iter([a, b]);
353    /// for stream in set {
354    ///     println!("{stream:?}");
355    /// }
356    /// ```
357    #[inline]
358    fn into_iter(self) -> Self::IntoIter {
359        self.inner.into_iter()
360    }
361}