1use tokio::sync::mpsc;
2
3#[cfg(feature = "cross-stream")]
8pub struct Store {
9 inner: xs::store::Store,
10 path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15 _private: (), }
17
18#[cfg(feature = "cross-stream")]
21impl Store {
22 pub async fn init(
24 path: std::path::PathBuf,
25 services: bool,
26 expose: Option<String>,
27 ) -> Result<Self, xs::store::StoreError> {
28 let inner = xs::store::Store::new(path.clone())?;
29
30 let store_for_api = inner.clone();
32 tokio::spawn(async move {
33 let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
34 if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
35 eprintln!("Store API server error: {e}");
36 }
37 });
38
39 if services {
41 let s = inner.clone();
42 tokio::spawn(async move {
43 if let Err(e) = xs::processor::actor::run(s).await {
44 eprintln!("Actor processor error: {e}");
45 }
46 });
47
48 let s = inner.clone();
49 tokio::spawn(async move {
50 if let Err(e) = xs::processor::service::run(s).await {
51 eprintln!("Service processor error: {e}");
52 }
53 });
54
55 let s = inner.clone();
56 tokio::spawn(async move {
57 if let Err(e) = xs::processor::action::run(s).await {
58 eprintln!("Action processor error: {e}");
59 }
60 });
61 }
62
63 Ok(Self { inner, path })
64 }
65
66 pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
68 engine.add_store_commands(&self.inner)
69 }
70
71 pub async fn topic_source(
76 &self,
77 topic: &str,
78 watch: bool,
79 base_engine: crate::Engine,
80 tx: mpsc::Sender<crate::Engine>,
81 ) {
82 let store_path = self.path.display().to_string();
83
84 let (initial_script, last_id) = match self.read_topic_content(topic) {
85 Some((content, id)) => (content, Some(id)),
86 None => (placeholder_closure(topic, &store_path), None),
87 };
88
89 let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
90 if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script) {
91 tx.send(engine).await.expect("channel closed unexpectedly");
92 }
93
94 if watch {
95 spawn_topic_watcher(
96 self.inner.clone(),
97 topic.to_string(),
98 last_id,
99 base_engine,
100 tx,
101 );
102 }
103 }
104
105 fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
106 let options = xs::store::ReadOptions::builder()
107 .follow(xs::store::FollowOption::Off)
108 .topic(topic.to_string())
109 .last(1_usize)
110 .build();
111 let frame = self.inner.read_sync(options).last()?;
112 let id = frame.id;
113 let hash = frame.hash?;
114 let bytes = self.inner.cas_read_sync(&hash).ok()?;
115 let content = String::from_utf8(bytes).ok()?;
116 Some((content, id))
117 }
118}
119
120#[cfg(feature = "cross-stream")]
122fn enrich_engine(
123 base: &crate::Engine,
124 store: &xs::store::Store,
125 as_of: Option<&scru128::Scru128Id>,
126) -> crate::Engine {
127 let mut engine = base.clone();
128 if let Some(id) = as_of {
129 let modules = store.nu_modules_at(id);
130 if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
131 eprintln!("Error loading stream modules: {e}");
132 }
133 }
134 engine
135}
136
137#[cfg(feature = "cross-stream")]
138fn placeholder_closure(topic: &str, store_path: &str) -> String {
139 include_str!("../examples/topic-placeholder.nu")
140 .replace("__TOPIC__", topic)
141 .replace("__STORE_PATH__", store_path)
142}
143
144#[cfg(feature = "cross-stream")]
145fn spawn_topic_watcher(
146 store: xs::store::Store,
147 topic: String,
148 after: Option<scru128::Scru128Id>,
149 base_engine: crate::Engine,
150 tx: mpsc::Sender<crate::Engine>,
151) {
152 tokio::spawn(async move {
153 let options = xs::store::ReadOptions::builder()
154 .follow(xs::store::FollowOption::On)
155 .topic(topic.clone())
156 .maybe_after(after)
157 .build();
158
159 let mut receiver = store.read(options).await;
160
161 while let Some(frame) = receiver.recv().await {
162 if frame.topic != topic {
163 continue;
164 }
165
166 let Some(hash) = frame.hash else {
167 continue;
168 };
169
170 let script = match store.cas_read(&hash).await {
171 Ok(bytes) => match String::from_utf8(bytes) {
172 Ok(s) => s,
173 Err(e) => {
174 eprintln!("Error decoding topic content: {e}");
175 continue;
176 }
177 },
178 Err(e) => {
179 eprintln!("Error reading topic content: {e}");
180 continue;
181 }
182 };
183
184 let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
185 if let Some(engine) = crate::engine::script_to_engine(&enriched, &script) {
186 if tx.send(engine).await.is_err() {
187 break;
188 }
189 }
190 }
191 });
192}
193
194#[cfg(not(feature = "cross-stream"))]
197impl Store {
198 pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
199 unreachable!("Store is never constructed without cross-stream feature")
200 }
201
202 pub async fn topic_source(
203 &self,
204 _topic: &str,
205 _watch: bool,
206 _base_engine: crate::Engine,
207 _tx: mpsc::Sender<crate::Engine>,
208 ) {
209 unreachable!("Store is never constructed without cross-stream feature")
210 }
211}