Skip to main content

zrx_stream/stream/
operator.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Stream operators.
27
28#![allow(clippy::new_without_default)]
29
30use std::marker::PhantomData;
31
32use zrx_scheduler::action::Action;
33use zrx_scheduler::schedule::Subscriber;
34use zrx_scheduler::Id;
35
36use super::Stream;
37
38mod filter;
39mod filter_map;
40mod join;
41mod map;
42
43pub use filter::Filter;
44pub use filter_map::FilterMap;
45pub use join::Join;
46pub use map::Map;
47
48// ----------------------------------------------------------------------------
49// Traits
50// ----------------------------------------------------------------------------
51
52/// Operator.
53pub trait Operator<'a, I, A>
54where
55    A: Action<I>,
56{
57    /// Subscribe the given subscriber.
58    fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
59    where
60        S: Into<Subscriber<'a, I, A>>;
61}
62
63// ----------------------------------------------------------------------------
64// Trait implementations
65// ----------------------------------------------------------------------------
66
67impl<'a, I, A, T> Operator<'a, I, A> for Stream<I, T>
68where
69    I: Id,
70    A: Action<I, Inputs = (T,)> + 'static,
71{
72    /// Subscribe the given subscriber.
73    #[inline]
74    fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
75    where
76        S: Into<Subscriber<'a, I, A>>,
77    {
78        // We can safely use expect here, as the stream interface prevents us
79        // from creating subscribers with invalid nodes. Otherwise, it's a bug.
80        let id = self.workflow.with(|builder| {
81            builder.add([self.id], subscriber).expect("invariant")
82        });
83        Stream {
84            id,
85            workflow: self.workflow.clone(),
86            marker: PhantomData,
87        }
88    }
89}
90
91// ----------------------------------------------------------------------------
92// Macros
93// ----------------------------------------------------------------------------
94
95/// Implements operator trait for a tuple.
96macro_rules! impl_operator_for_tuple {
97    ($T1:ident $(, $T:ident)+ $(,)?) => {
98        impl<'a, I, A, $T1, $($T,)+> Operator<'a, I, A>
99            for (Stream<I, $T1>, $(Stream<I, $T>,)+)
100        where
101            I: Id,
102            A: Action<I, Inputs = ($T1, $($T,)+)> + 'static,
103        {
104            #[inline]
105            fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
106            where
107                S: Into<Subscriber<'a, I, A>>,
108            {
109                #[allow(non_snake_case)]
110                let ($T1, $($T,)*) = self;
111                // Albeit this can practically never happen, we can technically
112                // have different workflows here if we somehow alter the stream
113                // interface. Thus, this assertion is just a cautionary measure
114                // to prevent our future selves from breaking it by accident.
115                $(assert_eq!($T1.workflow, $T.workflow);)+
116                let id = $T1.workflow.with(|builder| {
117                    builder
118                        .add([$T1.id, $($T.id,)*], subscriber)
119                        .expect("invariant")
120                });
121                Stream {
122                    id,
123                    workflow: $T1.workflow.clone(),
124                    marker: PhantomData,
125                }
126            }
127        }
128    };
129}
130
131// ----------------------------------------------------------------------------
132
133impl_operator_for_tuple!(T1, T2);
134impl_operator_for_tuple!(T1, T2, T3);
135impl_operator_for_tuple!(T1, T2, T3, T4);
136impl_operator_for_tuple!(T1, T2, T3, T4, T5);
137impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6);
138impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6, T7);
139impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6, T7, T8);