zrx_stream/stream/operator/product.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Product operator.
27
28use ahash::HashMap;
29
30use zrx_scheduler::action::output::IntoOutputs;
31use zrx_scheduler::action::Descriptor;
32use zrx_scheduler::effect::Item;
33use zrx_scheduler::{Id, Value};
34use zrx_store::StoreMut;
35
36use crate::stream::operator::Operator;
37use crate::stream::value::Delta;
38use crate::stream::Stream;
39
40// ----------------------------------------------------------------------------
41// Structs
42// ----------------------------------------------------------------------------
43
44/// Product operator.
45struct Product<I, T, U> {
46 /// Store of items (left).
47 this: HashMap<I, T>,
48 /// Store of items (right).
49 that: HashMap<I, U>,
50}
51
52// ----------------------------------------------------------------------------
53// Implementations
54// ----------------------------------------------------------------------------
55
56impl<I, T> Stream<I, T>
57where
58 I: Id,
59 T: Value + Clone + Eq,
60{
61 pub fn product<U>(
62 &self, stream: &Stream<I, U>,
63 ) -> Stream<I, Delta<I, (T, U)>>
64 where
65 U: Value + Clone + Eq,
66 {
67 self.workflow.add_operator(
68 [self.id, stream.id],
69 Product::<I, T, U> {
70 this: HashMap::default(),
71 that: HashMap::default(),
72 },
73 )
74 }
75}
76
77// ----------------------------------------------------------------------------
78// Trait implementations
79// ----------------------------------------------------------------------------
80
81impl<I, T, U> Operator<I, T> for Product<I, T, U>
82where
83 I: Id,
84 T: Value + Clone + Eq,
85 U: Value + Clone + Eq,
86{
87 type Item<'a> = Item<&'a I, (Option<&'a T>, Option<&'a U>)>;
88
89 /// Handles the given item.
90 ///
91 /// Computing the cartesian product of two streams is a stateful operation
92 /// that requires maintaining internal stores for both input streams. When
93 /// an item is received from either stream, we'll update the corresponding
94 /// store and compute the product with all items in the other store. This
95 /// ensures that all combinations of items from both streams are emitted
96 /// differentially.
97 ///
98 /// This implementation assumes that the streams are synchronized, which is
99 /// given if both input streams either belong to the same frontier, or if
100 /// the scheduler ensures that both streams are backed by stores that are
101 /// synchronized. If this assumption does not hold, this operator may yield
102 /// duplicate emissions, as there's not way for the operator to know whether
103 /// absence of one of the input streams is due to a deletion, or because the
104 /// item has not yet arrived. We deem this acceptable for now, as it only
105 /// affects the efficiency of the operator, but not its correctness.
106 #[cfg_attr(
107 feature = "tracing",
108 tracing::instrument(level = "debug" skip_all, fields(id = %item.id))
109 )]
110 fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
111 let mut items = Vec::new();
112
113 // The left (this) and right (that) values are optional, indicating
114 // whether there is an insertion or a deletion. Note that both values
115 // can be present or absent at the same time, which can indicate a
116 // self-join, but does not necessarily have to.
117 let (this, that) = item.data;
118 if let Some(data) = this {
119 // If the left item value is present, update the left store, and if
120 // it changed, compute the product of the item and the right store
121 if self.this.insert_if_changed(item.id, data)
122 && !self.that.is_empty()
123 {
124 let iter = self.that.iter().map(|(id, that)| {
125 Item::new(id.clone(), Some((data.clone(), that.clone())))
126 });
127 items.push(Item::new(
128 item.id.clone(),
129 Some(iter.collect::<Delta<_, _>>()),
130 ));
131 }
132 } else if that.is_none() && self.this.remove(item.id).is_some() {
133 // An item was present and was successfully removed, so we compute
134 // the product of the removed item and the right store
135 let iter = self.that.keys().map(|id| Item::new(id.clone(), None));
136 items.push(Item::new(
137 item.id.clone(),
138 Some(iter.collect::<Delta<_, _>>()),
139 ));
140 }
141
142 // After processing the left value, we do the same for the right value
143 // by updating the right store - basically the other way round
144 if let Some(data) = that {
145 // If the right item value is present, update the right store, and
146 // if it changed, compute the product of the left store and the item
147 if self.that.insert_if_changed(item.id, data) {
148 items.extend(self.this.iter().map(|(id, this)| {
149 Item::new(
150 id.clone(),
151 Some(Delta::from([Item::new(
152 item.id.clone(),
153 Some((this.clone(), data.clone())),
154 )])),
155 )
156 }));
157 }
158 } else if this.is_none() && self.that.remove(item.id).is_some() {
159 // An item was present, and could be removed, so we compute the
160 // product of the removed item and all items in the left store
161 items.extend(self.this.keys().map(|id| {
162 let inner = Item::new(item.id.clone(), None);
163 Item::new(id.clone(), Some(Delta::from([inner])))
164 }));
165 }
166
167 // Return deltas of items
168 items
169 }
170
171 /// Returns the descriptor.
172 #[inline]
173 fn descriptor(&self) -> Descriptor {
174 Descriptor::default()
175 }
176}