Skip to main content

zrx_stream/stream/operator/
select.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Select operator.
27
28use ahash::{HashMap, HashSet};
29
30use zrx_scheduler::action::descriptor::Interest;
31use zrx_scheduler::action::output::IntoOutputs;
32use zrx_scheduler::action::Descriptor;
33use zrx_scheduler::effect::{Item, Signal};
34use zrx_scheduler::{Id, Value};
35
36use crate::stream::barrier::{Barrier, Condition};
37use crate::stream::value::Delta;
38use crate::stream::Stream;
39
40use super::Operator;
41
42// ----------------------------------------------------------------------------
43// Structs
44// ----------------------------------------------------------------------------
45
46/// Select operator.
47struct Select<I, T>
48where
49    I: Id,
50{
51    /// Items.
52    items: HashMap<I, T>,
53    /// Barriers.
54    barriers: HashMap<I, Barrier<I>>,
55    /// Observed ids.
56    ids: HashSet<I>,
57}
58
59// ----------------------------------------------------------------------------
60// Implementations
61// ----------------------------------------------------------------------------
62
63impl<I, T> Stream<I, T>
64where
65    I: Id,
66    T: Value + Clone,
67{
68    pub fn select(
69        &self, selector: &Stream<I, Condition<I>>,
70    ) -> Stream<I, Delta<I, T>> {
71        self.workflow.add_operator(
72            [self.id, selector.id],
73            Select::<I, T> {
74                items: HashMap::default(),
75                barriers: HashMap::default(),
76                ids: HashSet::default(),
77            },
78        )
79    }
80}
81
82// ----------------------------------------------------------------------------
83// Trait implementations
84// ----------------------------------------------------------------------------
85
86impl<I, T> Operator<I, T> for Select<I, T>
87where
88    I: Id,
89    T: Value + Clone,
90{
91    type Item<'a> = Item<&'a I, (Option<&'a T>, Option<&'a Condition<I>>)>;
92
93    #[cfg_attr(
94        feature = "tracing",
95        tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
96    )]
97    fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
98        let (data, condition) = item.data;
99        let mut deletions = Vec::new();
100
101        // Update internal state
102        if let Some(data) = data {
103            self.items.insert(item.id.clone(), data.clone());
104        } else if self.items.remove(item.id).is_some() {
105            deletions.push(Item::new(item.id.clone(), None));
106        }
107
108        // In case there's a condition, register a new barrier
109        if let Some(condition) = condition {
110            self.barriers
111                .insert(item.id.clone(), Barrier::new(condition.clone()));
112
113            // Register items in barrier
114            for id in &self.ids {
115                if let Some(barrier) = self.barriers.get_mut(item.id) {
116                    if !self.items.contains_key(id) {
117                        barrier.insert(id);
118                    }
119                }
120            }
121        }
122
123        // Check barriers
124        let mut items = vec![];
125        for (id, barrier) in &mut self.barriers {
126            barrier.remove(item.id);
127            if barrier.is_empty() {
128                let iter = self.items.iter();
129                let delta = iter
130                    .filter(|(key, _)| barrier.satisfies(key))
131                    .map(|(key, value)| {
132                        Item::new((*key).clone(), Some(value.clone()))
133                    })
134                    .chain(deletions.iter().cloned())
135                    .collect::<Delta<_, _>>();
136
137                // Add delta to items
138                items.push(Item::new(id.clone(), Some(delta)));
139            }
140        }
141
142        // Return selected items.
143        items
144    }
145
146    /// Notifies the operator of a signal.
147    #[cfg_attr(
148        feature = "tracing",
149        tracing::instrument(level = "debug", skip_all)
150    )]
151    fn notify(&mut self, signal: Signal<I>) -> impl IntoOutputs<I> {
152        if let Signal::Submit(id) = signal {
153            self.ids.insert(id.clone());
154            for barrier in self.barriers.values_mut() {
155                barrier.insert(id);
156            }
157        }
158    }
159
160    /// Returns the descriptor.
161    #[inline]
162    fn descriptor(&self) -> Descriptor {
163        Descriptor::builder() // fmt
164            .interest(Interest::Submit)
165            .build()
166    }
167}