Skip to main content

xs/processor/
mod.rs

1pub mod action;
2pub mod actor;
3pub mod lifecycle;
4pub mod service;
5
6use crate::nu;
7use crate::store::{Frame, Store};
8use scru128::Scru128Id;
9use tokio::sync::mpsc;
10
11pub fn build_engine(
12    store: &Store,
13    as_of: &Scru128Id,
14) -> Result<nu::Engine, Box<dyn std::error::Error + Send + Sync>> {
15    let mut engine = nu::Engine::new()?;
16    nu::add_core_commands(&mut engine, store)?;
17    engine.add_alias(".rm", ".remove")?;
18    let modules = store.nu_modules_at(as_of);
19    nu::load_modules(&mut engine.state, store, &modules)?;
20    Ok(engine)
21}
22
23pub enum Lifecycle {
24    Historical(Frame),
25    Threshold(Frame),
26    Live(Frame),
27}
28
29pub struct LifecycleReader {
30    rx: mpsc::Receiver<Frame>,
31    past_threshold: bool,
32}
33
34impl LifecycleReader {
35    pub fn new(rx: mpsc::Receiver<Frame>) -> Self {
36        Self {
37            rx,
38            past_threshold: false,
39        }
40    }
41
42    pub async fn recv(&mut self) -> Option<Lifecycle> {
43        let frame = self.rx.recv().await?;
44        if !self.past_threshold {
45            if frame.topic == "xs.threshold" {
46                self.past_threshold = true;
47                return Some(Lifecycle::Threshold(frame));
48            }
49            return Some(Lifecycle::Historical(frame));
50        }
51        Some(Lifecycle::Live(frame))
52    }
53}