Skip to main content

http_nu/
store.rs

1use tokio::sync::mpsc;
2
3/// Wraps a cross.stream store, providing engine configuration and topic-based script loading.
4///
5/// When the `cross-stream` feature is disabled, `Store` is an empty struct that is never
6/// constructed. All methods exist as stubs so callers compile without `#[cfg]` annotations.
7#[cfg(feature = "cross-stream")]
8pub struct Store {
9    inner: xs::store::Store,
10    path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15    _private: (), // prevent construction
16}
17
18// --- cross-stream implementation ---
19
20#[cfg(feature = "cross-stream")]
21impl Store {
22    /// Create the store and spawn the API server and optional services.
23    pub async fn init(
24        path: std::path::PathBuf,
25        services: bool,
26        expose: Option<String>,
27    ) -> Result<Self, xs::store::StoreError> {
28        let inner = xs::store::Store::new(path.clone())?;
29
30        // API server
31        let store_for_api = inner.clone();
32        tokio::spawn(async move {
33            let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
34            if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
35                eprintln!("Store API server error: {e}");
36            }
37        });
38
39        // Processors (actor, service, action) -- each gets its own subscription
40        if services {
41            let s = inner.clone();
42            tokio::spawn(async move {
43                if let Err(e) = xs::processor::actor::run(s).await {
44                    eprintln!("Actor processor error: {e}");
45                }
46            });
47
48            let s = inner.clone();
49            tokio::spawn(async move {
50                if let Err(e) = xs::processor::service::run(s).await {
51                    eprintln!("Service processor error: {e}");
52                }
53            });
54
55            let s = inner.clone();
56            tokio::spawn(async move {
57                if let Err(e) = xs::processor::action::run(s).await {
58                    eprintln!("Action processor error: {e}");
59                }
60            });
61        }
62
63        Ok(Self { inner, path })
64    }
65
66    /// Add store commands (.cat, .append, .cas, .last, etc.) to the engine.
67    pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
68        engine.add_store_commands(&self.inner)
69    }
70
71    /// Load a handler closure from a store topic, enrich with VFS modules from
72    /// the stream, and send the resulting engine through `tx`.
73    ///
74    /// If `watch` is true, spawns a background task that reloads on topic updates.
75    pub async fn topic_source(
76        &self,
77        topic: &str,
78        watch: bool,
79        base_engine: crate::Engine,
80        tx: mpsc::Sender<crate::Engine>,
81    ) {
82        let store_path = self.path.display().to_string();
83
84        let (initial_script, last_id) = match self.read_topic_content(topic) {
85            Some((content, id)) => (content, Some(id)),
86            None => (placeholder_closure(topic, &store_path), None),
87        };
88
89        let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
90        if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script) {
91            tx.send(engine).await.expect("channel closed unexpectedly");
92        }
93
94        if watch {
95            spawn_topic_watcher(
96                self.inner.clone(),
97                topic.to_string(),
98                last_id,
99                base_engine,
100                tx,
101            );
102        }
103    }
104
105    fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
106        let options = xs::store::ReadOptions::builder()
107            .follow(xs::store::FollowOption::Off)
108            .topic(topic.to_string())
109            .last(1_usize)
110            .build();
111        let frame = self.inner.read_sync(options).last()?;
112        let id = frame.id;
113        let hash = frame.hash?;
114        let bytes = self.inner.cas_read_sync(&hash).ok()?;
115        let content = String::from_utf8(bytes).ok()?;
116        Some((content, id))
117    }
118}
119
120/// Clone the base engine and load VFS modules from the stream.
121#[cfg(feature = "cross-stream")]
122fn enrich_engine(
123    base: &crate::Engine,
124    store: &xs::store::Store,
125    as_of: Option<&scru128::Scru128Id>,
126) -> crate::Engine {
127    let mut engine = base.clone();
128    if let Some(id) = as_of {
129        let modules = store.nu_modules_at(id);
130        if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
131            eprintln!("Error loading stream modules: {e}");
132        }
133    }
134    engine
135}
136
137#[cfg(feature = "cross-stream")]
138fn placeholder_closure(topic: &str, store_path: &str) -> String {
139    include_str!("../examples/topic-placeholder.nu")
140        .replace("__TOPIC__", topic)
141        .replace("__STORE_PATH__", store_path)
142}
143
144#[cfg(feature = "cross-stream")]
145fn spawn_topic_watcher(
146    store: xs::store::Store,
147    topic: String,
148    after: Option<scru128::Scru128Id>,
149    base_engine: crate::Engine,
150    tx: mpsc::Sender<crate::Engine>,
151) {
152    tokio::spawn(async move {
153        let options = xs::store::ReadOptions::builder()
154            .follow(xs::store::FollowOption::On)
155            .topic(topic.clone())
156            .maybe_after(after)
157            .build();
158
159        let mut receiver = store.read(options).await;
160
161        while let Some(frame) = receiver.recv().await {
162            if frame.topic != topic {
163                continue;
164            }
165
166            let Some(hash) = frame.hash else {
167                continue;
168            };
169
170            let script = match store.cas_read(&hash).await {
171                Ok(bytes) => match String::from_utf8(bytes) {
172                    Ok(s) => s,
173                    Err(e) => {
174                        eprintln!("Error decoding topic content: {e}");
175                        continue;
176                    }
177                },
178                Err(e) => {
179                    eprintln!("Error reading topic content: {e}");
180                    continue;
181                }
182            };
183
184            let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
185            if let Some(engine) = crate::engine::script_to_engine(&enriched, &script) {
186                if tx.send(engine).await.is_err() {
187                    break;
188                }
189            }
190        }
191    });
192}
193
194// --- stubs when cross-stream is disabled ---
195
196#[cfg(not(feature = "cross-stream"))]
197impl Store {
198    pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
199        unreachable!("Store is never constructed without cross-stream feature")
200    }
201
202    pub async fn topic_source(
203        &self,
204        _topic: &str,
205        _watch: bool,
206        _base_engine: crate::Engine,
207        _tx: mpsc::Sender<crate::Engine>,
208    ) {
209        unreachable!("Store is never constructed without cross-stream feature")
210    }
211}