Skip to main content

xs/processor/
mod.rs

1pub mod action;
2pub mod actor;
3pub mod lifecycle;
4pub mod service;
5
6use crate::store::Frame;
7use tokio::sync::mpsc;
8
9pub enum Lifecycle {
10    Historical(Frame),
11    Threshold(Frame),
12    Live(Frame),
13}
14
15pub struct LifecycleReader {
16    rx: mpsc::Receiver<Frame>,
17    past_threshold: bool,
18}
19
20impl LifecycleReader {
21    pub fn new(rx: mpsc::Receiver<Frame>) -> Self {
22        Self {
23            rx,
24            past_threshold: false,
25        }
26    }
27
28    pub async fn recv(&mut self) -> Option<Lifecycle> {
29        let frame = self.rx.recv().await?;
30        if !self.past_threshold {
31            if frame.topic == "xs.threshold" {
32                self.past_threshold = true;
33                return Some(Lifecycle::Threshold(frame));
34            }
35            return Some(Lifecycle::Historical(frame));
36        }
37        Some(Lifecycle::Live(frame))
38    }
39}