Skip to main content

http_nu/
store.rs

1use tokio::sync::mpsc;
2
3/// Wraps a cross.stream store, providing engine configuration and topic-based script loading.
4///
5/// When the `cross-stream` feature is disabled, `Store` is an empty struct that is never
6/// constructed. All methods exist as stubs so callers compile without `#[cfg]` annotations.
7#[cfg(feature = "cross-stream")]
8pub struct Store {
9    inner: xs::store::Store,
10    path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15    _private: (), // prevent construction
16}
17
18// --- cross-stream implementation ---
19
20#[cfg(feature = "cross-stream")]
21impl Store {
22    /// Wrap an existing xs::store::Store (no API server or services).
23    pub fn from_inner(inner: xs::store::Store, path: std::path::PathBuf) -> Self {
24        Self { inner, path }
25    }
26
27    /// Create the store and spawn the API server and optional services.
28    pub async fn init(
29        path: std::path::PathBuf,
30        services: bool,
31        expose: Option<String>,
32    ) -> Result<Self, xs::store::StoreError> {
33        let inner = xs::store::Store::new(path.clone())?;
34
35        // API server
36        let store_for_api = inner.clone();
37        tokio::spawn(async move {
38            let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
39            if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
40                eprintln!("Store API server error: {e}");
41            }
42        });
43
44        // Processors (actor, service, action) -- each gets its own subscription
45        if services {
46            let s = inner.clone();
47            tokio::spawn(async move {
48                if let Err(e) = xs::processor::actor::run(s).await {
49                    eprintln!("Actor processor error: {e}");
50                }
51            });
52
53            let s = inner.clone();
54            tokio::spawn(async move {
55                if let Err(e) = xs::processor::service::run(s).await {
56                    eprintln!("Service processor error: {e}");
57                }
58            });
59
60            let s = inner.clone();
61            tokio::spawn(async move {
62                if let Err(e) = xs::processor::action::run(s).await {
63                    eprintln!("Action processor error: {e}");
64                }
65            });
66        }
67
68        Ok(Self { inner, path })
69    }
70
71    /// Add store commands (.cat, .append, .cas, .last, etc.) to the engine.
72    pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
73        engine.add_store_commands(&self.inner)?;
74        engine.add_store_mj_commands(&self.inner)
75    }
76
77    /// Load a handler closure from a store topic, enrich with VFS modules from
78    /// the stream, and send the resulting engine through `tx`.
79    ///
80    /// If `watch` is true, spawns a background task that reloads on topic updates.
81    pub async fn topic_source(
82        &self,
83        topic: &str,
84        watch: bool,
85        base_engine: crate::Engine,
86        tx: mpsc::Sender<crate::Engine>,
87    ) {
88        let store_path = self.path.display().to_string();
89
90        let (initial_script, last_id) = match self.read_topic_content(topic) {
91            Some((content, id)) => (content, Some(id)),
92            None => (placeholder_closure(topic, &store_path), None),
93        };
94
95        let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
96        if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script, None) {
97            tx.send(engine).await.expect("channel closed unexpectedly");
98        }
99
100        if watch {
101            spawn_topic_watcher(
102                self.inner.clone(),
103                topic.to_string(),
104                last_id,
105                base_engine,
106                tx,
107            );
108        }
109    }
110
111    fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
112        let options = xs::store::ReadOptions::builder()
113            .follow(xs::store::FollowOption::Off)
114            .topic(topic.to_string())
115            .last(1_usize)
116            .build();
117        let frame = self.inner.read_sync(options).last()?;
118        let id = frame.id;
119        let hash = frame.hash?;
120        let bytes = self.inner.cas_read_sync(&hash).ok()?;
121        let content = String::from_utf8(bytes).ok()?;
122        Some((content, id))
123    }
124}
125
126/// Clone the base engine and load VFS modules from the stream.
127#[cfg(feature = "cross-stream")]
128fn enrich_engine(
129    base: &crate::Engine,
130    store: &xs::store::Store,
131    as_of: Option<&scru128::Scru128Id>,
132) -> crate::Engine {
133    let mut engine = base.clone();
134    if let Some(id) = as_of {
135        let modules = store.nu_modules_at(id);
136        if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
137            eprintln!("Error loading stream modules: {e}");
138        }
139    }
140    engine
141}
142
143#[cfg(feature = "cross-stream")]
144fn placeholder_closure(topic: &str, store_path: &str) -> String {
145    include_str!("../examples/topic-placeholder.nu")
146        .replace("__TOPIC__", topic)
147        .replace("__STORE_PATH__", store_path)
148}
149
150#[cfg(feature = "cross-stream")]
151fn spawn_topic_watcher(
152    store: xs::store::Store,
153    topic: String,
154    after: Option<scru128::Scru128Id>,
155    base_engine: crate::Engine,
156    tx: mpsc::Sender<crate::Engine>,
157) {
158    tokio::spawn(async move {
159        let options = xs::store::ReadOptions::builder()
160            .follow(xs::store::FollowOption::On)
161            .topic(topic.clone())
162            .maybe_after(after)
163            .build();
164
165        let mut receiver = store.read(options).await;
166
167        while let Some(frame) = receiver.recv().await {
168            if frame.topic != topic {
169                continue;
170            }
171
172            let Some(hash) = frame.hash else {
173                continue;
174            };
175
176            let script = match store.cas_read(&hash).await {
177                Ok(bytes) => match String::from_utf8(bytes) {
178                    Ok(s) => s,
179                    Err(e) => {
180                        eprintln!("Error decoding topic content: {e}");
181                        continue;
182                    }
183                },
184                Err(e) => {
185                    eprintln!("Error reading topic content: {e}");
186                    continue;
187                }
188            };
189
190            let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
191            if let Some(engine) = crate::engine::script_to_engine(&enriched, &script, None) {
192                if tx.send(engine).await.is_err() {
193                    break;
194                }
195            }
196        }
197    });
198}
199
200// --- stubs when cross-stream is disabled ---
201
202#[cfg(not(feature = "cross-stream"))]
203impl Store {
204    pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
205        unreachable!("Store is never constructed without cross-stream feature")
206    }
207
208    pub async fn topic_source(
209        &self,
210        _topic: &str,
211        _watch: bool,
212        _base_engine: crate::Engine,
213        _tx: mpsc::Sender<crate::Engine>,
214    ) {
215        unreachable!("Store is never constructed without cross-stream feature")
216    }
217}