cu_ratelimit/
lib.rs

1use bincode::de::Decoder;
2use bincode::enc::Encoder;
3use bincode::error::{DecodeError, EncodeError};
4use bincode::{Decode, Encode};
5use cu29::prelude::*;
6use std::marker::PhantomData;
7
8pub struct CuRateLimit<T>
9where
10    T: for<'a> CuMsgPayload + 'static,
11{
12    _marker: PhantomData<T>,
13    interval: CuDuration,
14    last_tov: Option<CuTime>,
15}
16
17impl<T> Freezable for CuRateLimit<T>
18where
19    T: CuMsgPayload,
20{
21    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
22        Encode::encode(&self.last_tov, encoder)
23    }
24
25    fn thaw<D: Decoder>(&mut self, decoder: &mut D) -> Result<(), DecodeError> {
26        self.last_tov = Decode::decode(decoder)?;
27        Ok(())
28    }
29}
30
31impl<T> CuTask for CuRateLimit<T>
32where
33    T: CuMsgPayload,
34{
35    type Input<'m> = input_msg!(T);
36    type Output<'m> = output_msg!(T);
37
38    fn new(config: Option<&ComponentConfig>) -> CuResult<Self> {
39        let hz = config
40            .and_then(|cfg| cfg.get::<f64>("rate"))
41            .ok_or("Missing required 'rate' config for CuRateLimiter")?;
42        let interval_ns = (1e9 / hz) as u64;
43        Ok(Self {
44            _marker: PhantomData,
45            interval: CuDuration::from(interval_ns),
46            last_tov: None,
47        })
48    }
49
50    fn process<'m>(
51        &mut self,
52        _clock: &RobotClock,
53        input: &Self::Input<'m>,
54        output: &mut Self::Output<'m>,
55    ) -> CuResult<()> {
56        let tov = match input.tov {
57            Tov::Time(ts) => ts,
58            _ => return Err("Expected single timestamp TOV".into()),
59        };
60
61        let allow = match self.last_tov {
62            None => true,
63            Some(last) => (tov - last) >= self.interval,
64        };
65
66        if allow {
67            self.last_tov = Some(tov);
68            if let Some(payload) = input.payload() {
69                output.set_payload(payload.clone());
70            } else {
71                output.clear_payload();
72            }
73        } else {
74            output.clear_payload();
75        }
76
77        Ok(())
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    fn create_test_ratelimiter(rate: f64) -> CuRateLimit<i32> {
86        let mut cfg = ComponentConfig::new();
87        cfg.set("rate", rate);
88        CuRateLimit::new(Some(&cfg)).unwrap()
89    }
90
91    #[test]
92    fn test_rate_limiting() {
93        let (clock, _) = RobotClock::mock();
94        let mut limiter = create_test_ratelimiter(10.0); // 10 Hz = 100ms interval
95        let mut input = CuMsg::<i32>::new(Some(42));
96        let mut output = CuMsg::<i32>::new(None);
97
98        // First message should pass
99        input.tov = Tov::Time(CuTime::from(0));
100        limiter.process(&clock, &input, &mut output).unwrap();
101        assert_eq!(output.payload(), Some(&42));
102
103        // Message within the interval should be blocked
104        input.tov = Tov::Time(CuTime::from(50_000_000)); // 50ms
105        limiter.process(&clock, &input, &mut output).unwrap();
106        assert_eq!(output.payload(), None);
107
108        // Message after the interval should pass
109        input.tov = Tov::Time(CuTime::from(100_000_000)); // 100ms
110        limiter.process(&clock, &input, &mut output).unwrap();
111        assert_eq!(output.payload(), Some(&42));
112    }
113
114    #[test]
115    fn test_payload_propagation() {
116        let (clock, _) = RobotClock::mock();
117        let mut limiter = create_test_ratelimiter(10.0);
118        let mut input = CuMsg::<i32>::new(None);
119        let mut output = CuMsg::<i32>::new(None);
120
121        // Test payload propagation
122        input.set_payload(123);
123        input.tov = Tov::Time(CuTime::from(0));
124        limiter.process(&clock, &input, &mut output).unwrap();
125        assert_eq!(output.payload(), Some(&123));
126
127        // Test empty payload propagation
128        input.clear_payload();
129        input.tov = Tov::Time(CuTime::from(100_000_000));
130        limiter.process(&clock, &input, &mut output).unwrap();
131        assert_eq!(output.payload(), None);
132    }
133}