1use tokio::sync::mpsc;
2
3#[cfg(feature = "cross-stream")]
8pub struct Store {
9 inner: xs::store::Store,
10 path: std::path::PathBuf,
11}
12
13#[cfg(not(feature = "cross-stream"))]
14pub struct Store {
15 _private: (), }
17
18#[cfg(feature = "cross-stream")]
21impl Store {
22 pub fn from_inner(inner: xs::store::Store, path: std::path::PathBuf) -> Self {
24 Self { inner, path }
25 }
26
27 pub async fn init(
29 path: std::path::PathBuf,
30 services: bool,
31 expose: Option<String>,
32 ) -> Result<Self, xs::store::StoreError> {
33 let inner = xs::store::Store::new(path.clone())?;
34
35 let store_for_api = inner.clone();
37 tokio::spawn(async move {
38 let engine = xs::nu::Engine::new().expect("Failed to create xs nu::Engine");
39 if let Err(e) = xs::api::serve(store_for_api, engine, expose).await {
40 eprintln!("Store API server error: {e}");
41 }
42 });
43
44 if services {
46 let s = inner.clone();
47 tokio::spawn(async move {
48 if let Err(e) = xs::processor::actor::run(s).await {
49 eprintln!("Actor processor error: {e}");
50 }
51 });
52
53 let s = inner.clone();
54 tokio::spawn(async move {
55 if let Err(e) = xs::processor::service::run(s).await {
56 eprintln!("Service processor error: {e}");
57 }
58 });
59
60 let s = inner.clone();
61 tokio::spawn(async move {
62 if let Err(e) = xs::processor::action::run(s).await {
63 eprintln!("Action processor error: {e}");
64 }
65 });
66 }
67
68 Ok(Self { inner, path })
69 }
70
71 pub fn configure_engine(&self, engine: &mut crate::Engine) -> Result<(), crate::Error> {
73 engine.add_store_commands(&self.inner)?;
74 engine.add_store_mj_commands(&self.inner)
75 }
76
77 pub async fn topic_source(
82 &self,
83 topic: &str,
84 watch: bool,
85 base_engine: crate::Engine,
86 tx: mpsc::Sender<crate::Engine>,
87 ) {
88 let store_path = self.path.display().to_string();
89
90 let (initial_script, last_id) = match self.read_topic_content(topic) {
91 Some((content, id)) => (content, Some(id)),
92 None => (placeholder_closure(topic, &store_path), None),
93 };
94
95 let enriched = enrich_engine(&base_engine, &self.inner, last_id.as_ref());
96 if let Some(engine) = crate::engine::script_to_engine(&enriched, &initial_script, None) {
97 tx.send(engine).await.expect("channel closed unexpectedly");
98 }
99
100 if watch {
101 spawn_topic_watcher(
102 self.inner.clone(),
103 topic.to_string(),
104 last_id,
105 base_engine,
106 tx,
107 );
108 }
109 }
110
111 fn read_topic_content(&self, topic: &str) -> Option<(String, scru128::Scru128Id)> {
112 let options = xs::store::ReadOptions::builder()
113 .follow(xs::store::FollowOption::Off)
114 .topic(topic.to_string())
115 .last(1_usize)
116 .build();
117 let frame = self.inner.read_sync(options).last()?;
118 let id = frame.id;
119 let hash = frame.hash?;
120 let bytes = self.inner.cas_read_sync(&hash).ok()?;
121 let content = String::from_utf8(bytes).ok()?;
122 Some((content, id))
123 }
124}
125
126#[cfg(feature = "cross-stream")]
128fn enrich_engine(
129 base: &crate::Engine,
130 store: &xs::store::Store,
131 as_of: Option<&scru128::Scru128Id>,
132) -> crate::Engine {
133 let mut engine = base.clone();
134 if let Some(id) = as_of {
135 let modules = store.nu_modules_at(id);
136 if let Err(e) = xs::nu::load_modules(&mut engine.state, store, &modules) {
137 eprintln!("Error loading stream modules: {e}");
138 }
139 }
140 engine
141}
142
143#[cfg(feature = "cross-stream")]
144fn placeholder_closure(topic: &str, store_path: &str) -> String {
145 include_str!("../examples/topic-placeholder.nu")
146 .replace("__TOPIC__", topic)
147 .replace("__STORE_PATH__", store_path)
148}
149
150#[cfg(feature = "cross-stream")]
151fn spawn_topic_watcher(
152 store: xs::store::Store,
153 topic: String,
154 after: Option<scru128::Scru128Id>,
155 base_engine: crate::Engine,
156 tx: mpsc::Sender<crate::Engine>,
157) {
158 tokio::spawn(async move {
159 let options = xs::store::ReadOptions::builder()
160 .follow(xs::store::FollowOption::On)
161 .topic(topic.clone())
162 .maybe_after(after)
163 .build();
164
165 let mut receiver = store.read(options).await;
166
167 while let Some(frame) = receiver.recv().await {
168 if frame.topic != topic {
169 continue;
170 }
171
172 let Some(hash) = frame.hash else {
173 continue;
174 };
175
176 let script = match store.cas_read(&hash).await {
177 Ok(bytes) => match String::from_utf8(bytes) {
178 Ok(s) => s,
179 Err(e) => {
180 eprintln!("Error decoding topic content: {e}");
181 continue;
182 }
183 },
184 Err(e) => {
185 eprintln!("Error reading topic content: {e}");
186 continue;
187 }
188 };
189
190 let enriched = enrich_engine(&base_engine, &store, Some(&frame.id));
191 if let Some(engine) = crate::engine::script_to_engine(&enriched, &script, None) {
192 if tx.send(engine).await.is_err() {
193 break;
194 }
195 }
196 }
197 });
198}
199
200#[cfg(not(feature = "cross-stream"))]
203impl Store {
204 pub fn configure_engine(&self, _engine: &mut crate::Engine) -> Result<(), crate::Error> {
205 unreachable!("Store is never constructed without cross-stream feature")
206 }
207
208 pub async fn topic_source(
209 &self,
210 _topic: &str,
211 _watch: bool,
212 _base_engine: crate::Engine,
213 _tx: mpsc::Sender<crate::Engine>,
214 ) {
215 unreachable!("Store is never constructed without cross-stream feature")
216 }
217}