Skip to main content

zrx_stream/stream/operator/
throttle.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Throttle operator.
27
28use std::marker::PhantomData;
29
30use zrx_scheduler::action::descriptor::Property;
31use zrx_scheduler::action::output::IntoOutputs;
32use zrx_scheduler::action::Descriptor;
33use zrx_scheduler::effect::timer::IntoDuration;
34use zrx_scheduler::effect::{Item, Timer};
35use zrx_scheduler::{outputs, Id, Value};
36
37use crate::stream::function::SelectFn;
38use crate::stream::Stream;
39
40use super::{Operator, OperatorExt};
41
42// ----------------------------------------------------------------------------
43// Structs
44// ----------------------------------------------------------------------------
45
46/// Throttle operator.
47struct Throttle<F, D> {
48    /// Operator function.
49    function: F,
50    /// Capture types.
51    marker: PhantomData<D>,
52}
53
54// ----------------------------------------------------------------------------
55// Implementations
56// ----------------------------------------------------------------------------
57
58impl<I, T> Stream<I, T>
59where
60    I: Id,
61    T: Value + Clone,
62{
63    pub fn throttle<D>(&self, duration: D) -> Stream<I, T>
64    where
65        D: IntoDuration,
66    {
67        let duration = duration.into_duration();
68        self.with_operator(Throttle {
69            function: move |_: &T| duration,
70            marker: PhantomData,
71        })
72    }
73
74    pub fn throttle_with<F, D>(&self, f: F) -> Stream<I, T>
75    where
76        F: SelectFn<I, T, D>,
77        D: IntoDuration,
78    {
79        self.with_operator(Throttle {
80            function: f,
81            marker: PhantomData,
82        })
83    }
84}
85
86// ----------------------------------------------------------------------------
87// Trait implementations
88// ----------------------------------------------------------------------------
89
90impl<I, T, F, D> Operator<I, T> for Throttle<F, D>
91where
92    I: Id,
93    T: Value + Clone,
94    F: SelectFn<I, T, D>,
95    D: IntoDuration,
96{
97    type Item<'a> = Item<&'a I, &'a T>;
98
99    /// Handles the given item.
100    ///
101    /// Throttling is implemented with the help of two timers, where the first
102    /// timer acts as a guard to ensure that the second timer is only set if the
103    /// first timer hasn't already been set. The most recent item is emitted
104    /// only if the second timer isn't active.
105    #[cfg_attr(
106        feature = "tracing",
107        tracing::instrument(level = "debug", skip_all, fields(id = %item.id))
108    )]
109    fn handle(&mut self, item: Self::Item<'_>) -> impl IntoOutputs<I> {
110        self.function.execute(item.id, item.data).map(|report| {
111            report.map(|duration| {
112                let timer = Timer::set(duration, None);
113                Timer::set(
114                    0,
115                    Some(outputs![timer, item.into_owned().map(Some)]),
116                )
117            })
118        })
119    }
120
121    /// Returns the descriptor.
122    #[inline]
123    fn descriptor(&self) -> Descriptor {
124        Descriptor::builder()
125            .property(Property::Pure)
126            .property(Property::Stable)
127            .property(Property::Flush)
128            .build()
129    }
130}