zrx_stream/stream/operator/
select.rs

1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Select operator.
27
28use ahash::{HashMap, HashSet};
29use zrx_scheduler::action::descriptor::Interest;
30use zrx_scheduler::action::output::IntoOutputs;
31use zrx_scheduler::action::Descriptor;
32use zrx_scheduler::effect::{Item, Signal};
33use zrx_scheduler::{Id, Value};
34
35use crate::stream::barrier::{Barrier, Condition};
36use crate::stream::value::Delta;
37use crate::stream::Stream;
38
39use super::Operator;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Select operator.
46struct Select<I, T>
47where
48    I: Id,
49{
50    /// Items.
51    items: HashMap<I, T>,
52    /// Barriers.
53    barriers: HashMap<I, Barrier<I>>,
54    /// Observed ids.
55    ids: HashSet<I>,
56}
57
58// ----------------------------------------------------------------------------
59// Implementations
60// ----------------------------------------------------------------------------
61
62impl<I, T> Stream<I, T>
63where
64    I: Id,
65    T: Value + Clone,
66{
67    pub fn select(
68        &self, selector: &Stream<I, Condition<I>>,
69    ) -> Stream<I, Delta<I, T>> {
70        self.workflow.add_operator(
71            [self.id, selector.id],
72            Select::<I, T> {
73                items: HashMap::default(),
74                barriers: HashMap::default(),
75                ids: HashSet::default(),
76            },
77        )
78    }
79}
80
81// ----------------------------------------------------------------------------
82// Trait implementations
83// ----------------------------------------------------------------------------
84
85impl<I, T> Operator<I, T> for Select<I, T>
86where
87    I: Id,
88    T: Value + Clone,
89{
90    type Item<'a> = Item<&'a I, (Option<&'a T>, Option<&'a Condition<I>>)>;
91
92    #[cfg_attr(
93        feature = "tracing",
94        tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
95    )]
96    fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
97        let (data, condition) = item.data;
98
99        // Update internal state
100        if let Some(data) = data {
101            self.items.insert(item.id.clone(), data.clone());
102        } else {
103            self.items.remove(item.id);
104        }
105
106        // In case there's a condition, register a new barrier
107        if let Some(condition) = condition {
108            self.barriers
109                .insert(item.id.clone(), Barrier::new(condition.clone()));
110
111            // Register items in barrier
112            for id in &self.ids {
113                if let Some(barrier) = self.barriers.get_mut(item.id) {
114                    if !self.items.contains_key(id) {
115                        barrier.insert(id);
116                    }
117                }
118            }
119        }
120
121        // Check barriers
122        let mut items = vec![];
123        for (id, barrier) in &mut self.barriers {
124            barrier.remove(item.id);
125            if barrier.is_empty() {
126                let iter = self.items.iter();
127                let delta = iter
128                    .filter(|(key, _)| barrier.satisfies(key))
129                    .map(|(key, value)| {
130                        Item::new((*key).clone(), Some(value.clone()))
131                    })
132                    .collect::<Delta<_, _>>();
133
134                // Add delta to items
135                items.push(Item::new(id.clone(), Some(delta)));
136            }
137        }
138
139        // Return selected items.
140        items
141    }
142
143    /// Notifies the operator of a signal.
144    #[cfg_attr(
145        feature = "tracing",
146        tracing::instrument(level = "debug", skip_all)
147    )]
148    fn notify(&mut self, signal: Signal<I>) -> impl IntoOutputs<I> {
149        if let Signal::Submit(id) = signal {
150            self.ids.insert(id.clone());
151            for barrier in self.barriers.values_mut() {
152                barrier.insert(id);
153            }
154        }
155    }
156
157    /// Returns the descriptor.
158    #[inline]
159    fn descriptor(&self) -> Descriptor {
160        Descriptor::builder() // fmt
161            .interest(Interest::Submit)
162            .build()
163    }
164}