zrx_stream/stream/operator.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Stream operators.
27
28#![allow(clippy::new_without_default)]
29
30use std::marker::PhantomData;
31
32use zrx_scheduler::action::Action;
33use zrx_scheduler::schedule::Subscriber;
34use zrx_scheduler::Id;
35
36use super::Stream;
37
38mod filter;
39mod filter_map;
40mod join;
41mod map;
42
43pub use filter::Filter;
44pub use filter_map::FilterMap;
45pub use join::Join;
46pub use map::Map;
47
48// ----------------------------------------------------------------------------
49// Traits
50// ----------------------------------------------------------------------------
51
52/// Operator.
53pub trait Operator<'a, I, A>
54where
55 A: Action<I>,
56{
57 /// Subscribe the given subscriber.
58 fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
59 where
60 S: Into<Subscriber<'a, I, A>>;
61}
62
63// ----------------------------------------------------------------------------
64// Trait implementations
65// ----------------------------------------------------------------------------
66
67impl<'a, I, A, T> Operator<'a, I, A> for Stream<I, T>
68where
69 I: Id,
70 A: Action<I, Inputs = (T,)> + 'static,
71{
72 /// Subscribe the given subscriber.
73 #[inline]
74 fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
75 where
76 S: Into<Subscriber<'a, I, A>>,
77 {
78 // We can safely use expect here, as the stream interface prevents us
79 // from creating subscribers with invalid nodes. Otherwise, it's a bug.
80 let id = self.workflow.with(|builder| {
81 builder.add([self.id], subscriber).expect("invariant")
82 });
83 Stream {
84 id,
85 workflow: self.workflow.clone(),
86 marker: PhantomData,
87 }
88 }
89}
90
91// ----------------------------------------------------------------------------
92// Macros
93// ----------------------------------------------------------------------------
94
95/// Implements operator trait for a tuple.
96macro_rules! impl_operator_for_tuple {
97 ($T1:ident $(, $T:ident)+ $(,)?) => {
98 impl<'a, I, A, $T1, $($T,)+> Operator<'a, I, A>
99 for (Stream<I, $T1>, $(Stream<I, $T>,)+)
100 where
101 I: Id,
102 A: Action<I, Inputs = ($T1, $($T,)+)> + 'static,
103 {
104 #[inline]
105 fn subscribe<S>(&self, subscriber: S) -> Stream<I, A::Output<'a>>
106 where
107 S: Into<Subscriber<'a, I, A>>,
108 {
109 #[allow(non_snake_case)]
110 let ($T1, $($T,)*) = self;
111 // Albeit this can practically never happen, we can technically
112 // have different workflows here if we somehow alter the stream
113 // interface. Thus, this assertion is just a cautionary measure
114 // to prevent our future selves from breaking it by accident.
115 $(assert_eq!($T1.workflow, $T.workflow);)+
116 let id = $T1.workflow.with(|builder| {
117 builder
118 .add([$T1.id, $($T.id,)*], subscriber)
119 .expect("invariant")
120 });
121 Stream {
122 id,
123 workflow: $T1.workflow.clone(),
124 marker: PhantomData,
125 }
126 }
127 }
128 };
129}
130
131// ----------------------------------------------------------------------------
132
133impl_operator_for_tuple!(T1, T2);
134impl_operator_for_tuple!(T1, T2, T3);
135impl_operator_for_tuple!(T1, T2, T3, T4);
136impl_operator_for_tuple!(T1, T2, T3, T4, T5);
137impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6);
138impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6, T7);
139impl_operator_for_tuple!(T1, T2, T3, T4, T5, T6, T7, T8);