1use tokio::sync::mpsc;
2
3#[cfg(feature = "cross-stream")]
8pub struct Store {
9 inner: xs::store::Store,
10 path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15 _private: (), }
17
18#[cfg(feature = "cross-stream")]
21impl Store {
22 pub async fn init(
24 path: std::path::PathBuf,
25 services: bool,
26 expose: Option<String>,
27 ) -> Result<Self, xs::store::StoreError> {
28 let inner = xs::store::Store::new(path.clone())?;
29
30 let store_for_api = inner.clone();
32 tokio::spawn(async move {
33 let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
34 if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
35 eprintln!("Store API server error: {e}");
36 }
37 });
38
39 if services {
41 let s = inner.clone();
42 tokio::spawn(async move {
43 if let Err(e) = xs::processor::actor::run(s).await {
44 eprintln!("Actor processor error: {e}");
45 }
46 });
47
48 let s = inner.clone();
49 tokio::spawn(async move {
50 if let Err(e) = xs::processor::service::run(s).await {
51 eprintln!("Service processor error: {e}");
52 }
53 });
54
55 let s = inner.clone();
56 tokio::spawn(async move {
57 if let Err(e) = xs::processor::action::run(s).await {
58 eprintln!("Action processor error: {e}");
59 }
60 });
61 }
62
63 Ok(Self { inner, path })
64 }
65
66 pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
68 engine.add_store_commands(&self.inner)?;
69 engine.add_store_mj_commands(&self.inner)
70 }
71
72 pub async fn topic_source(
77 &self,
78 topic: &str,
79 watch: bool,
80 base_engine: crate::Engine,
81 tx: mpsc::Sender<crate::Engine>,
82 ) {
83 let store_path = self.path.display().to_string();
84
85 let (initial_script, last_id) = match self.read_topic_content(topic) {
86 Some((content, id)) => (content, Some(id)),
87 None => (placeholder_closure(topic, &store_path), None),
88 };
89
90 let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
91 if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script, None) {
92 tx.send(engine).await.expect("channel closed unexpectedly");
93 }
94
95 if watch {
96 spawn_topic_watcher(
97 self.inner.clone(),
98 topic.to_string(),
99 last_id,
100 base_engine,
101 tx,
102 );
103 }
104 }
105
106 fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
107 let options = xs::store::ReadOptions::builder()
108 .follow(xs::store::FollowOption::Off)
109 .topic(topic.to_string())
110 .last(1_usize)
111 .build();
112 let frame = self.inner.read_sync(options).last()?;
113 let id = frame.id;
114 let hash = frame.hash?;
115 let bytes = self.inner.cas_read_sync(&hash).ok()?;
116 let content = String::from_utf8(bytes).ok()?;
117 Some((content, id))
118 }
119}
120
121#[cfg(feature = "cross-stream")]
123fn enrich_engine(
124 base: &crate::Engine,
125 store: &xs::store::Store,
126 as_of: Option<&scru128::Scru128Id>,
127) -> crate::Engine {
128 let mut engine = base.clone();
129 if let Some(id) = as_of {
130 let modules = store.nu_modules_at(id);
131 if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
132 eprintln!("Error loading stream modules: {e}");
133 }
134 }
135 engine
136}
137
138#[cfg(feature = "cross-stream")]
139fn placeholder_closure(topic: &str, store_path: &str) -> String {
140 include_str!("../examples/topic-placeholder.nu")
141 .replace("__TOPIC__", topic)
142 .replace("__STORE_PATH__", store_path)
143}
144
145#[cfg(feature = "cross-stream")]
146fn spawn_topic_watcher(
147 store: xs::store::Store,
148 topic: String,
149 after: Option<scru128::Scru128Id>,
150 base_engine: crate::Engine,
151 tx: mpsc::Sender<crate::Engine>,
152) {
153 tokio::spawn(async move {
154 let options = xs::store::ReadOptions::builder()
155 .follow(xs::store::FollowOption::On)
156 .topic(topic.clone())
157 .maybe_after(after)
158 .build();
159
160 let mut receiver = store.read(options).await;
161
162 while let Some(frame) = receiver.recv().await {
163 if frame.topic != topic {
164 continue;
165 }
166
167 let Some(hash) = frame.hash else {
168 continue;
169 };
170
171 let script = match store.cas_read(&hash).await {
172 Ok(bytes) => match String::from_utf8(bytes) {
173 Ok(s) => s,
174 Err(e) => {
175 eprintln!("Error decoding topic content: {e}");
176 continue;
177 }
178 },
179 Err(e) => {
180 eprintln!("Error reading topic content: {e}");
181 continue;
182 }
183 };
184
185 let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
186 if let Some(engine) = crate::engine::script_to_engine(&enriched, &script, None) {
187 if tx.send(engine).await.is_err() {
188 break;
189 }
190 }
191 }
192 });
193}
194
195#[cfg(not(feature = "cross-stream"))]
198impl Store {
199 pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
200 unreachable!("Store is never constructed without cross-stream feature")
201 }
202
203 pub async fn topic_source(
204 &self,
205 _topic: &str,
206 _watch: bool,
207 _base_engine: crate::Engine,
208 _tx: mpsc::Sender<crate::Engine>,
209 ) {
210 unreachable!("Store is never constructed without cross-stream feature")
211 }
212}