Skip to main content

http_nu/
store.rs

1use tokio::sync::mpsc;
2
3/// Wraps a cross.stream store, providing engine configuration and topic-based script loading.
4///
5/// When the `cross-stream` feature is disabled, `Store` is an empty struct that is never
6/// constructed. All methods exist as stubs so callers compile without `#[cfg]` annotations.
7#[cfg(feature = "cross-stream")]
8pub struct Store {
9    inner: xs::store::Store,
10    path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15    _private: (), // prevent construction
16}
17
18// --- cross-stream implementation ---
19
20#[cfg(feature = "cross-stream")]
21impl Store {
22    /// Create the store and spawn the API server and optional services.
23    pub async fn init(
24        path: std::path::PathBuf,
25        services: bool,
26        expose: Option<String>,
27    ) -> Result<Self, xs::store::StoreError> {
28        let inner = xs::store::Store::new(path.clone())?;
29
30        // API server
31        let store_for_api = inner.clone();
32        tokio::spawn(async move {
33            let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
34            if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
35                eprintln!("Store API server error: {e}");
36            }
37        });
38
39        // Processors (actor, service, action) -- each gets its own subscription
40        if services {
41            let s = inner.clone();
42            tokio::spawn(async move {
43                if let Err(e) = xs::processor::actor::run(s).await {
44                    eprintln!("Actor processor error: {e}");
45                }
46            });
47
48            let s = inner.clone();
49            tokio::spawn(async move {
50                if let Err(e) = xs::processor::service::run(s).await {
51                    eprintln!("Service processor error: {e}");
52                }
53            });
54
55            let s = inner.clone();
56            tokio::spawn(async move {
57                if let Err(e) = xs::processor::action::run(s).await {
58                    eprintln!("Action processor error: {e}");
59                }
60            });
61        }
62
63        Ok(Self { inner, path })
64    }
65
66    /// Add store commands (.cat, .append, .cas, .last, etc.) to the engine.
67    pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
68        engine.add_store_commands(&self.inner)?;
69        engine.add_store_mj_commands(&self.inner)
70    }
71
72    /// Load a handler closure from a store topic, enrich with VFS modules from
73    /// the stream, and send the resulting engine through `tx`.
74    ///
75    /// If `watch` is true, spawns a background task that reloads on topic updates.
76    pub async fn topic_source(
77        &self,
78        topic: &str,
79        watch: bool,
80        base_engine: crate::Engine,
81        tx: mpsc::Sender<crate::Engine>,
82    ) {
83        let store_path = self.path.display().to_string();
84
85        let (initial_script, last_id) = match self.read_topic_content(topic) {
86            Some((content, id)) => (content, Some(id)),
87            None => (placeholder_closure(topic, &store_path), None),
88        };
89
90        let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
91        if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script, None) {
92            tx.send(engine).await.expect("channel closed unexpectedly");
93        }
94
95        if watch {
96            spawn_topic_watcher(
97                self.inner.clone(),
98                topic.to_string(),
99                last_id,
100                base_engine,
101                tx,
102            );
103        }
104    }
105
106    fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
107        let options = xs::store::ReadOptions::builder()
108            .follow(xs::store::FollowOption::Off)
109            .topic(topic.to_string())
110            .last(1_usize)
111            .build();
112        let frame = self.inner.read_sync(options).last()?;
113        let id = frame.id;
114        let hash = frame.hash?;
115        let bytes = self.inner.cas_read_sync(&hash).ok()?;
116        let content = String::from_utf8(bytes).ok()?;
117        Some((content, id))
118    }
119}
120
121/// Clone the base engine and load VFS modules from the stream.
122#[cfg(feature = "cross-stream")]
123fn enrich_engine(
124    base: &crate::Engine,
125    store: &xs::store::Store,
126    as_of: Option<&scru128::Scru128Id>,
127) -> crate::Engine {
128    let mut engine = base.clone();
129    if let Some(id) = as_of {
130        let modules = store.nu_modules_at(id);
131        if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
132            eprintln!("Error loading stream modules: {e}");
133        }
134    }
135    engine
136}
137
138#[cfg(feature = "cross-stream")]
139fn placeholder_closure(topic: &str, store_path: &str) -> String {
140    include_str!("../examples/topic-placeholder.nu")
141        .replace("__TOPIC__", topic)
142        .replace("__STORE_PATH__", store_path)
143}
144
145#[cfg(feature = "cross-stream")]
146fn spawn_topic_watcher(
147    store: xs::store::Store,
148    topic: String,
149    after: Option<scru128::Scru128Id>,
150    base_engine: crate::Engine,
151    tx: mpsc::Sender<crate::Engine>,
152) {
153    tokio::spawn(async move {
154        let options = xs::store::ReadOptions::builder()
155            .follow(xs::store::FollowOption::On)
156            .topic(topic.clone())
157            .maybe_after(after)
158            .build();
159
160        let mut receiver = store.read(options).await;
161
162        while let Some(frame) = receiver.recv().await {
163            if frame.topic != topic {
164                continue;
165            }
166
167            let Some(hash) = frame.hash else {
168                continue;
169            };
170
171            let script = match store.cas_read(&hash).await {
172                Ok(bytes) => match String::from_utf8(bytes) {
173                    Ok(s) => s,
174                    Err(e) => {
175                        eprintln!("Error decoding topic content: {e}");
176                        continue;
177                    }
178                },
179                Err(e) => {
180                    eprintln!("Error reading topic content: {e}");
181                    continue;
182                }
183            };
184
185            let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
186            if let Some(engine) = crate::engine::script_to_engine(&enriched, &script, None) {
187                if tx.send(engine).await.is_err() {
188                    break;
189                }
190            }
191        }
192    });
193}
194
195// --- stubs when cross-stream is disabled ---
196
197#[cfg(not(feature = "cross-stream"))]
198impl Store {
199    pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
200        unreachable!("Store is never constructed without cross-stream feature")
201    }
202
203    pub async fn topic_source(
204        &self,
205        _topic: &str,
206        _watch: bool,
207        _base_engine: crate::Engine,
208        _tx: mpsc::Sender<crate::Engine>,
209    ) {
210        unreachable!("Store is never constructed without cross-stream feature")
211    }
212}