1pub mod action;
2pub mod actor;
3pub mod service;
4
5use crate::nu;
6use crate::store::{Frame, Store};
7use scru128::Scru128Id;
8use tokio::sync::mpsc;
9
10pub fn build_engine(
11 store: &Store,
12 as_of: &Scru128Id,
13) -> Result<nu::Engine, Box<dyn std::error::Error + Send + Sync>> {
14 let mut engine = nu::Engine::new()?;
15 nu::add_core_commands(&mut engine, store)?;
16 engine.add_alias(".rm", ".remove")?;
17 let modules = store.nu_modules_at(as_of);
18 nu::load_modules(&mut engine.state, store, &modules)?;
19 Ok(engine)
20}
21
22pub enum Lifecycle {
23 Historical(Frame),
24 Threshold(Frame),
25 Live(Frame),
26}
27
28pub struct LifecycleReader {
29 rx: mpsc::Receiver<Frame>,
30 past_threshold: bool,
31}
32
33impl LifecycleReader {
34 pub fn new(rx: mpsc::Receiver<Frame>) -> Self {
35 Self {
36 rx,
37 past_threshold: false,
38 }
39 }
40
41 pub async fn recv(&mut self) -> Option<Lifecycle> {
42 let frame = self.rx.recv().await?;
43 if !self.past_threshold {
44 if frame.topic == "xs.threshold" {
45 self.past_threshold = true;
46 return Some(Lifecycle::Threshold(frame));
47 }
48 return Some(Lifecycle::Historical(frame));
49 }
50 Some(Lifecycle::Live(frame))
51 }
52}