1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Handler {
24 pub id: Scru128Id,
25 pub topic: String,
26 config: HandlerConfig,
27 engine_worker: Arc<EngineWorker>,
28 output: Arc<Mutex<Vec<Frame>>>,
29}
30
31#[derive(Clone, Debug)]
32struct HandlerConfig {
33 start: Start,
34 pulse: Option<u64>,
35 return_options: Option<ReturnOptions>,
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40enum Start {
41 First,
42 #[default]
43 New,
44 After(Scru128Id),
45}
46
47#[derive(Deserialize, Debug, Default)]
49#[serde(default)] struct HandlerScriptOptions {
51 start: Option<String>,
53 pulse: Option<u64>,
55 return_options: Option<ReturnOptions>,
57}
58
59impl Handler {
60 pub async fn new(
61 id: Scru128Id,
62 topic: String,
63 mut engine: nu::Engine,
64 expression: String,
65 store: Store,
66 ) -> Result<Self, Error> {
67 let output = Arc::new(Mutex::new(Vec::new()));
68 engine.add_commands(vec![
69 Box::new(commands::cat_command::CatCommand::new(store.clone())),
70 Box::new(commands::last_command::LastCommand::new(store.clone())),
71 Box::new(commands::append_command_buffered::AppendCommand::new(
72 store.clone(),
73 output.clone(),
74 )),
75 ])?;
76
77 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
79
80 let handler_config = extract_handler_config(&nu_script_config)?;
82
83 let block = engine
85 .state
86 .get_block(nu_script_config.run_closure.block_id);
87 if block.signature.required_positional.len() != 1 {
88 return Err(format!(
89 "Closure must accept exactly one frame argument, found {count}",
90 count = block.signature.required_positional.len()
91 )
92 .into());
93 }
94
95 let engine_worker = Arc::new(EngineWorker::new(engine, nu_script_config.run_closure));
96
97 Ok(Self {
98 id,
99 topic,
100 config: handler_config,
101 engine_worker,
102 output,
103 })
104 }
105
106 pub async fn eval_in_thread(&self, frame: &crate::store::Frame) -> Result<Value, Error> {
107 self.engine_worker.eval(frame.clone()).await
108 }
109
110 #[instrument(
111 level = "info",
112 skip(self, frame, store),
113 fields(
114 message = %format!(
115 "handler={handler_id}:{topic} frame={frame_id}:{frame_topic}",
116 handler_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
117 )
118 )]
119 async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<(), Error> {
120 let frame_clone = frame.clone();
121
122 let value = self.eval_in_thread(&frame_clone).await?;
123
124 let additional_frame = if !is_value_an_append_frame_from_handler(&value, &self.id)
126 && !matches!(value, Value::Nothing { .. })
127 {
128 let return_options = self.config.return_options.as_ref();
129 let suffix = return_options
130 .and_then(|ro| ro.suffix.as_deref())
131 .unwrap_or(".out");
132
133 let hash = match &value {
134 Value::Binary { val, .. } => {
135 store.cas_insert(val).await?
137 }
138 _ => {
139 store.cas_insert(&value_to_json(&value).to_string()).await?
141 }
142 };
143 Some(
144 Frame::builder(format!(
145 "{topic}{suffix}",
146 topic = self.topic,
147 suffix = suffix
148 ))
149 .maybe_ttl(return_options.and_then(|ro| ro.ttl.clone()))
150 .maybe_hash(Some(hash))
151 .build(),
152 )
153 } else {
154 None
155 };
156
157 let output_to_process: Vec<_> = {
159 let mut output = self.output.lock().unwrap();
160 output
161 .drain(..)
162 .chain(additional_frame.into_iter())
163 .collect()
164 };
165
166 for mut output_frame in output_to_process {
167 let meta_obj = output_frame
168 .meta
169 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
170 .as_object_mut()
171 .expect("meta should be an object");
172
173 meta_obj.insert(
174 "handler_id".to_string(),
175 serde_json::Value::String(self.id.to_string()),
176 );
177 meta_obj.insert(
178 "frame_id".to_string(),
179 serde_json::Value::String(frame.id.to_string()),
180 );
181
182 let _ = store.append(output_frame);
183 }
184
185 Ok(())
186 }
187
188 async fn serve(&mut self, store: &Store, options: ReadOptions) {
189 let mut recver = store.read(options).await;
190
191 while let Some(frame) = recver.recv().await {
192 if (frame.topic == format!("{topic}.register", topic = self.topic)
194 || frame.topic == format!("{topic}.unregister", topic = self.topic))
195 && frame.id <= self.id
196 {
197 continue;
198 }
199
200 if frame.topic == format!("{topic}.register", topic = &self.topic)
201 || frame.topic == format!("{topic}.unregister", topic = &self.topic)
202 {
203 let _ = store.append(
204 Frame::builder(format!("{topic}.unregistered", topic = &self.topic))
205 .meta(serde_json::json!({
206 "handler_id": self.id.to_string(),
207 "frame_id": frame.id.to_string(),
208 }))
209 .build(),
210 );
211 break;
212 }
213
214 if frame
216 .meta
217 .as_ref()
218 .and_then(|meta| meta.get("handler_id"))
219 .and_then(|handler_id| handler_id.as_str())
220 .filter(|handler_id| *handler_id == self.id.to_string())
221 .is_some()
222 {
223 continue;
224 }
225
226 if let Err(err) = self.process_frame(&frame, store).await {
227 let _ = store.append(
228 Frame::builder(format!("{topic}.unregistered", topic = self.topic))
229 .meta(serde_json::json!({
230 "handler_id": self.id.to_string(),
231 "frame_id": frame.id.to_string(),
232 "error": err.to_string(),
233 }))
234 .build(),
235 );
236 break;
237 }
238 }
239 }
240
241 pub async fn spawn(&self, store: Store) -> Result<(), Error> {
242 let options = self.configure_read_options().await;
243
244 {
245 let store = store.clone();
246 let options = options.clone();
247 let mut handler = self.clone();
248
249 tokio::spawn(async move {
250 handler.serve(&store, options).await;
251 });
252 }
253
254 let _ = store.append(
255 Frame::builder(format!("{topic}.active", topic = &self.topic))
256 .meta(serde_json::json!({
257 "handler_id": self.id.to_string(),
258 "new": options.new,
259 "after": options.after.map(|id| id.to_string()),
260 }))
261 .build(),
262 );
263
264 Ok(())
265 }
266
267 pub async fn from_frame(
268 frame: &Frame,
269 store: &Store,
270 engine: nu::Engine,
271 ) -> Result<Self, Error> {
272 let topic = frame
273 .topic
274 .strip_suffix(".register")
275 .ok_or("Frame topic must end with .register")?;
276
277 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
279 let mut reader = store
280 .cas_reader(hash.clone())
281 .await
282 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
283
284 let mut expression = String::new();
285 reader
286 .read_to_string(&mut expression)
287 .await
288 .map_err(|e| format!("Failed to read expression: {e}"))?;
289
290 let handler = Handler::new(
291 frame.id,
292 topic.to_string(),
293 engine,
294 expression,
295 store.clone(),
296 )
297 .await?;
298
299 Ok(handler)
300 }
301
302 async fn configure_read_options(&self) -> ReadOptions {
303 let (after, is_new) = match &self.config.start {
305 Start::First => (None, false),
306 Start::New => (None, true),
307 Start::After(id) => (Some(*id), false),
308 };
309
310 let follow_option = self
312 .config
313 .pulse
314 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
315 .unwrap_or(FollowOption::On);
316
317 ReadOptions::builder()
318 .follow(follow_option)
319 .new(is_new)
320 .maybe_after(after)
321 .build()
322 }
323}
324
325use tokio::sync::{mpsc, oneshot};
326
327pub struct EngineWorker {
328 work_tx: mpsc::Sender<WorkItem>,
329}
330
331struct WorkItem {
332 frame: Frame,
333 resp_tx: oneshot::Sender<Result<Value, Error>>,
334}
335
336impl EngineWorker {
337 pub fn new(engine: nu::Engine, closure: nu_protocol::engine::Closure) -> Self {
338 let (work_tx, mut work_rx) = mpsc::channel(32);
339
340 std::thread::spawn(move || {
341 let mut engine = engine;
342
343 while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
344 let arg_val = crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown());
345
346 let pipeline = engine.run_closure_in_job(
347 &closure,
348 Some(arg_val), None, format!("handler {topic}", topic = frame.topic),
351 );
352
353 let output = pipeline
354 .map_err(|e| {
355 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
356 Error::from(nu_protocol::format_cli_error(None, &working_set, &*e, None))
357 })
358 .and_then(|pd| {
359 pd.into_value(nu_protocol::Span::unknown())
360 .map_err(Error::from)
361 });
362
363 let _ = resp_tx.send(output);
364 }
365 });
366
367 Self { work_tx }
368 }
369
370 pub async fn eval(&self, frame: Frame) -> Result<Value, Error> {
371 let (resp_tx, resp_rx) = oneshot::channel();
372 let work_item = WorkItem { frame, resp_tx };
373
374 self.work_tx
375 .send(work_item)
376 .await
377 .map_err(|_| Error::from("Engine worker thread has terminated"))?;
378
379 resp_rx
380 .await
381 .map_err(|_| Error::from("Engine worker thread has terminated"))?
382 }
383}
384
385fn is_value_an_append_frame_from_handler(value: &Value, handler_id: &Scru128Id) -> bool {
386 value
387 .as_record()
388 .ok()
389 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
390 .and_then(|record| record.get("meta"))
391 .and_then(|meta| meta.as_record().ok())
392 .and_then(|meta_record| meta_record.get("handler_id"))
393 .and_then(|id| id.as_str().ok())
394 .filter(|id| *id == handler_id.to_string())
395 .is_some()
396}
397
398fn extract_handler_config(script_config: &NuScriptConfig) -> Result<HandlerConfig, Error> {
400 let script_options: HandlerScriptOptions = script_config.deserialize_options()?;
402
403 let start =
405 match script_options.start.as_deref() {
406 Some("first") => Start::First,
407 Some("new") => Start::New,
408 Some(id_str) => Start::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
409 format!("Invalid scru128 ID for start: {id_str}").into()
410 })?),
411 None => Start::default(), };
413
414 Ok(HandlerConfig {
416 start,
417 pulse: script_options.pulse,
418 return_options: script_options.return_options,
419 })
420}