Skip to main content

xs/processor/
mod.rs

1pub mod action;
2pub mod actor;
3pub mod service;
4
5use crate::nu;
6use crate::store::{Frame, Store};
7use scru128::Scru128Id;
8use tokio::sync::mpsc;
9
10pub fn build_engine(
11    store: &Store,
12    as_of: &Scru128Id,
13) -> Result<nu::Engine, Box<dyn std::error::Error + Send + Sync>> {
14    let mut engine = nu::Engine::new()?;
15    nu::add_core_commands(&mut engine, store)?;
16    engine.add_alias(".rm", ".remove")?;
17    let modules = store.nu_modules_at(as_of);
18    nu::load_modules(&mut engine.state, store, &modules)?;
19    Ok(engine)
20}
21
22pub enum Lifecycle {
23    Historical(Frame),
24    Threshold(Frame),
25    Live(Frame),
26}
27
28pub struct LifecycleReader {
29    rx: mpsc::Receiver<Frame>,
30    past_threshold: bool,
31}
32
33impl LifecycleReader {
34    pub fn new(rx: mpsc::Receiver<Frame>) -> Self {
35        Self {
36            rx,
37            past_threshold: false,
38        }
39    }
40
41    pub async fn recv(&mut self) -> Option<Lifecycle> {
42        let frame = self.rx.recv().await?;
43        if !self.past_threshold {
44            if frame.topic == "xs.threshold" {
45                self.past_threshold = true;
46                return Some(Lifecycle::Threshold(frame));
47            }
48            return Some(Lifecycle::Historical(frame));
49        }
50        Some(Lifecycle::Live(frame))
51    }
52}