1use std::str::FromStr;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use tracing::instrument;
6
7use serde::{Deserialize, Serialize};
8
9use tokio::io::AsyncReadExt;
10
11use nu_protocol::Value;
12
13use scru128::Scru128Id;
14
15use crate::error::Error;
16use crate::nu;
17use crate::nu::commands;
18use crate::nu::value_to_json;
19use crate::nu::{NuScriptConfig, ReturnOptions};
20use crate::store::{FollowOption, Frame, ReadOptions, Store};
21
22#[derive(Clone)]
23pub struct Handler {
24 pub id: Scru128Id,
25 pub context_id: Scru128Id,
26 pub topic: String,
27 config: HandlerConfig,
28 engine_worker: Arc<EngineWorker>,
29 output: Arc<Mutex<Vec<Frame>>>,
30}
31
32#[derive(Clone, Debug)]
33struct HandlerConfig {
34 resume_from: ResumeFrom,
35 pulse: Option<u64>,
36 return_options: Option<ReturnOptions>,
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41enum ResumeFrom {
42 Head,
43 Tail,
44 After(Scru128Id),
45}
46
47impl Default for ResumeFrom {
48 fn default() -> Self {
49 Self::Tail
50 }
51}
52
53#[derive(Deserialize, Debug, Default)]
55#[serde(default)] struct HandlerScriptOptions {
57 resume_from: Option<String>,
59 pulse: Option<u64>,
61 return_options: Option<ReturnOptions>,
63}
64
65impl Handler {
66 pub async fn new(
67 id: Scru128Id,
68 context_id: Scru128Id,
69 topic: String,
70 mut engine: nu::Engine,
71 expression: String,
72 store: Store,
73 ) -> Result<Self, Error> {
74 let output = Arc::new(Mutex::new(Vec::new()));
75 engine.add_commands(vec![
76 Box::new(commands::cat_command::CatCommand::new(
77 store.clone(),
78 context_id,
79 )),
80 Box::new(commands::head_command::HeadCommand::new(
81 store.clone(),
82 context_id,
83 )),
84 Box::new(commands::append_command_buffered::AppendCommand::new(
85 store.clone(),
86 output.clone(),
87 )),
88 ])?;
89
90 let nu_script_config = nu::parse_config(&mut engine, &expression)?;
92
93 let handler_config = extract_handler_config(&nu_script_config)?;
95
96 let block = engine
98 .state
99 .get_block(nu_script_config.run_closure.block_id);
100 if block.signature.required_positional.len() != 1 {
101 return Err(format!(
102 "Closure must accept exactly one frame argument, found {count}",
103 count = block.signature.required_positional.len()
104 )
105 .into());
106 }
107
108 let engine_worker = Arc::new(EngineWorker::new(engine, nu_script_config.run_closure));
109
110 Ok(Self {
111 id,
112 context_id,
113 topic,
114 config: handler_config,
115 engine_worker,
116 output,
117 })
118 }
119
120 pub async fn eval_in_thread(&self, frame: &crate::store::Frame) -> Result<Value, Error> {
121 self.engine_worker.eval(frame.clone()).await
122 }
123
124 #[instrument(
125 level = "info",
126 skip(self, frame, store),
127 fields(
128 message = %format!(
129 "handler={handler_id}:{topic} frame={frame_id}:{frame_topic}",
130 handler_id = self.id, topic = self.topic, frame_id = frame.id, frame_topic = frame.topic)
131 )
132 )]
133 async fn process_frame(&mut self, frame: &Frame, store: &Store) -> Result<(), Error> {
134 let frame_clone = frame.clone();
135
136 let value = self.eval_in_thread(&frame_clone).await?;
137
138 let additional_frame = if !is_value_an_append_frame_from_handler(&value, &self.id)
140 && !matches!(value, Value::Nothing { .. })
141 {
142 let return_options = self.config.return_options.as_ref();
143 let suffix = return_options
144 .and_then(|ro| ro.suffix.as_deref())
145 .unwrap_or(".out");
146
147 let hash = match &value {
148 Value::Binary { val, .. } => {
149 store.cas_insert(val).await?
151 }
152 _ => {
153 store.cas_insert(&value_to_json(&value).to_string()).await?
155 }
156 };
157 Some(
158 Frame::builder(
159 format!("{topic}{suffix}", topic = self.topic, suffix = suffix),
160 self.context_id,
161 )
162 .maybe_ttl(return_options.and_then(|ro| ro.ttl.clone()))
163 .maybe_hash(Some(hash))
164 .build(),
165 )
166 } else {
167 None
168 };
169
170 let output_to_process: Vec<_> = {
172 let mut output = self.output.lock().unwrap();
173 output
174 .drain(..)
175 .chain(additional_frame.into_iter())
176 .collect()
177 };
178
179 for mut output_frame in output_to_process {
180 let meta_obj = output_frame
181 .meta
182 .get_or_insert_with(|| serde_json::Value::Object(Default::default()))
183 .as_object_mut()
184 .expect("meta should be an object");
185
186 meta_obj.insert(
187 "handler_id".to_string(),
188 serde_json::Value::String(self.id.to_string()),
189 );
190 meta_obj.insert(
191 "frame_id".to_string(),
192 serde_json::Value::String(frame.id.to_string()),
193 );
194
195 output_frame.context_id = self.context_id;
197 let _ = store.append(output_frame);
198 }
199
200 Ok(())
201 }
202
203 async fn serve(&mut self, store: &Store, options: ReadOptions) {
204 let mut recver = store.read(options).await;
205
206 while let Some(frame) = recver.recv().await {
207 if (frame.topic == format!("{topic}.register", topic = self.topic)
209 || frame.topic == format!("{topic}.unregister", topic = self.topic))
210 && frame.id <= self.id
211 {
212 continue;
213 }
214
215 if frame.topic == format!("{topic}.register", topic = &self.topic)
216 || frame.topic == format!("{topic}.unregister", topic = &self.topic)
217 {
218 let _ = store.append(
219 Frame::builder(
220 format!("{topic}.unregistered", topic = &self.topic),
221 self.context_id,
222 )
223 .meta(serde_json::json!({
224 "handler_id": self.id.to_string(),
225 "frame_id": frame.id.to_string(),
226 }))
227 .build(),
228 );
229 break;
230 }
231
232 if frame
234 .meta
235 .as_ref()
236 .and_then(|meta| meta.get("handler_id"))
237 .and_then(|handler_id| handler_id.as_str())
238 .filter(|handler_id| *handler_id == self.id.to_string())
239 .is_some()
240 {
241 continue;
242 }
243
244 if let Err(err) = self.process_frame(&frame, store).await {
245 let _ = store.append(
246 Frame::builder(
247 format!("{topic}.unregistered", topic = self.topic),
248 self.context_id,
249 )
250 .meta(serde_json::json!({
251 "handler_id": self.id.to_string(),
252 "frame_id": frame.id.to_string(),
253 "error": err.to_string(),
254 }))
255 .build(),
256 );
257 break;
258 }
259 }
260 }
261
262 pub async fn spawn(&self, store: Store) -> Result<(), Error> {
263 let options = self.configure_read_options().await;
264
265 {
266 let store = store.clone();
267 let options = options.clone();
268 let mut handler = self.clone();
269
270 tokio::spawn(async move {
271 handler.serve(&store, options).await;
272 });
273 }
274
275 let _ = store.append(
276 Frame::builder(
277 format!("{topic}.active", topic = &self.topic),
278 self.context_id,
279 )
280 .meta(serde_json::json!({
281 "handler_id": self.id.to_string(),
282 "tail": options.tail,
283 "last_id": options.last_id.map(|id| id.to_string()),
284 }))
285 .build(),
286 );
287
288 Ok(())
289 }
290
291 pub async fn from_frame(
292 frame: &Frame,
293 store: &Store,
294 engine: nu::Engine,
295 ) -> Result<Self, Error> {
296 let topic = frame
297 .topic
298 .strip_suffix(".register")
299 .ok_or("Frame topic must end with .register")?;
300
301 let hash = frame.hash.as_ref().ok_or("Missing hash field")?;
303 let mut reader = store
304 .cas_reader(hash.clone())
305 .await
306 .map_err(|e| format!("Failed to get cas reader: {e}"))?;
307
308 let mut expression = String::new();
309 reader
310 .read_to_string(&mut expression)
311 .await
312 .map_err(|e| format!("Failed to read expression: {e}"))?;
313
314 let handler = Handler::new(
315 frame.id,
316 frame.context_id,
317 topic.to_string(),
318 engine,
319 expression,
320 store.clone(),
321 )
322 .await?;
323
324 Ok(handler)
325 }
326
327 async fn configure_read_options(&self) -> ReadOptions {
328 let (last_id, is_tail) = match &self.config.resume_from {
330 ResumeFrom::Head => (None, false),
331 ResumeFrom::Tail => (None, true),
332 ResumeFrom::After(id) => (Some(*id), false),
333 };
334
335 let follow_option = self
337 .config
338 .pulse
339 .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
340 .unwrap_or(FollowOption::On);
341
342 ReadOptions::builder()
343 .follow(follow_option)
344 .tail(is_tail)
345 .maybe_last_id(last_id)
346 .context_id(self.context_id)
347 .build()
348 }
349}
350
351use tokio::sync::{mpsc, oneshot};
352
353pub struct EngineWorker {
354 work_tx: mpsc::Sender<WorkItem>,
355}
356
357struct WorkItem {
358 frame: Frame,
359 resp_tx: oneshot::Sender<Result<Value, Error>>,
360}
361
362impl EngineWorker {
363 pub fn new(engine: nu::Engine, closure: nu_protocol::engine::Closure) -> Self {
364 let (work_tx, mut work_rx) = mpsc::channel(32);
365
366 std::thread::spawn(move || {
367 let mut engine = engine;
368
369 while let Some(WorkItem { frame, resp_tx }) = work_rx.blocking_recv() {
370 let arg_val = crate::nu::frame_to_value(&frame, nu_protocol::Span::unknown());
371
372 let pipeline = engine.run_closure_in_job(
373 &closure,
374 Some(arg_val), None, format!("handler {topic}", topic = frame.topic),
377 );
378
379 let output = pipeline
380 .map_err(|e| {
381 let working_set = nu_protocol::engine::StateWorkingSet::new(&engine.state);
382 Error::from(nu_protocol::format_cli_error(&working_set, &*e, None))
383 })
384 .and_then(|pd| {
385 pd.into_value(nu_protocol::Span::unknown())
386 .map_err(Error::from)
387 });
388
389 let _ = resp_tx.send(output);
390 }
391 });
392
393 Self { work_tx }
394 }
395
396 pub async fn eval(&self, frame: Frame) -> Result<Value, Error> {
397 let (resp_tx, resp_rx) = oneshot::channel();
398 let work_item = WorkItem { frame, resp_tx };
399
400 self.work_tx
401 .send(work_item)
402 .await
403 .map_err(|_| Error::from("Engine worker thread has terminated"))?;
404
405 resp_rx
406 .await
407 .map_err(|_| Error::from("Engine worker thread has terminated"))?
408 }
409}
410
411fn is_value_an_append_frame_from_handler(value: &Value, handler_id: &Scru128Id) -> bool {
412 value
413 .as_record()
414 .ok()
415 .filter(|record| record.get("id").is_some() && record.get("topic").is_some())
416 .and_then(|record| record.get("meta"))
417 .and_then(|meta| meta.as_record().ok())
418 .and_then(|meta_record| meta_record.get("handler_id"))
419 .and_then(|id| id.as_str().ok())
420 .filter(|id| *id == handler_id.to_string())
421 .is_some()
422}
423
424fn extract_handler_config(script_config: &NuScriptConfig) -> Result<HandlerConfig, Error> {
426 let script_options: HandlerScriptOptions = script_config.deserialize_options()?;
428
429 let resume_from = match script_options.resume_from.as_deref() {
431 Some("head") => ResumeFrom::Head,
432 Some("tail") => ResumeFrom::Tail,
433 Some(id_str) => ResumeFrom::After(Scru128Id::from_str(id_str).map_err(|_| -> Error {
434 format!("Invalid scru128 ID for resume_from: {id_str}").into()
435 })?),
436 None => ResumeFrom::default(), };
438
439 Ok(HandlerConfig {
441 resume_from,
442 pulse: script_options.pulse,
443 return_options: script_options.return_options,
444 })
445}