1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18 pub duplex: Option<bool>,
19 pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24 pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29 pub id: Scru128Id,
30 pub run_closure: nu_protocol::engine::Closure,
31 pub return_options: Option<ReturnOptions>,
32 pub duplex: bool,
33 pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39 Running,
40 Recv {
42 suffix: String,
43 data: Vec<u8>,
44 },
45 RecvMeta {
47 suffix: String,
48 meta: serde_json::Value,
49 },
50 Stopped(StopReason),
51 ParseError {
52 message: String,
53 },
54 Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60 pub kind: ServiceEventKind,
61 pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67 Finished,
68 Error { message: String },
69 Terminate,
70 Shutdown,
71 Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75 store: &Store,
76 loop_ctx: &ServiceLoop,
77 source_id: Scru128Id,
78 return_opts: Option<&ReturnOptions>,
79 kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81 let frame = match &kind {
82 ServiceEventKind::Running => store.append(
83 Frame::builder(format!("{topic}.running", topic = loop_ctx.topic))
84 .meta(json!({ "source_id": source_id.to_string() }))
85 .build(),
86 )?,
87
88 ServiceEventKind::Recv { suffix, data } => {
89 let hash = store.cas_insert_bytes_sync(data)?;
90 store.append(
91 Frame::builder(format!(
92 "{topic}.{suffix}",
93 topic = loop_ctx.topic,
94 suffix = suffix
95 ))
96 .hash(hash)
97 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
98 .meta(json!({ "source_id": source_id.to_string() }))
99 .build(),
100 )?
101 }
102
103 ServiceEventKind::RecvMeta { suffix, meta } => {
104 let mut merged = meta.clone();
105 merged["source_id"] = json!(source_id.to_string());
106 store.append(
107 Frame::builder(format!(
108 "{topic}.{suffix}",
109 topic = loop_ctx.topic,
110 suffix = suffix
111 ))
112 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
113 .meta(merged)
114 .build(),
115 )?
116 }
117
118 ServiceEventKind::Stopped(reason) => {
119 let mut meta = json!({
120 "source_id": source_id.to_string(),
121 "reason": stop_reason_str(reason),
122 });
123 if let StopReason::Update { update_id } = reason {
124 meta["update_id"] = json!(update_id.to_string());
125 }
126 if let StopReason::Error { message } = reason {
127 meta["message"] = json!(message);
128 }
129 store.append(
130 Frame::builder(format!("{topic}.stopped", topic = loop_ctx.topic))
131 .meta(meta)
132 .build(),
133 )?
134 }
135
136 ServiceEventKind::ParseError { message } => store.append(
137 Frame::builder(format!("{topic}.parse.error", topic = loop_ctx.topic))
138 .meta(json!({
139 "source_id": source_id.to_string(),
140 "reason": message,
141 }))
142 .build(),
143 )?,
144
145 ServiceEventKind::Shutdown => store.append(
146 Frame::builder(format!("{topic}.shutdown", topic = loop_ctx.topic))
147 .meta(json!({ "source_id": source_id.to_string() }))
148 .build(),
149 )?,
150 };
151
152 Ok(ServiceEvent { kind, frame })
153}
154
155fn stop_reason_str(r: &StopReason) -> &'static str {
156 match r {
157 StopReason::Finished => "finished",
158 StopReason::Error { .. } => "error",
159 StopReason::Terminate => "terminate",
160 StopReason::Shutdown => "shutdown",
161 StopReason::Update { .. } => "update",
162 }
163}
164
165pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
166 tokio::spawn(async move { run(store, spawn_frame).await })
167}
168
169async fn run(store: Store, spawn_frame: Frame) {
170 let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) {
171 Ok(e) => e,
172 Err(_) => return,
173 };
174
175 let hash = match spawn_frame.hash.clone() {
176 Some(h) => h,
177 None => return,
178 };
179 let mut reader = match store.cas_reader(hash).await {
180 Ok(r) => r,
181 Err(_) => return,
182 };
183 let mut script = String::new();
184 if reader.read_to_string(&mut script).await.is_err() {
185 return;
186 }
187
188 let loop_ctx = ServiceLoop {
189 topic: spawn_frame
190 .topic
191 .strip_suffix(".spawn")
192 .unwrap_or(&spawn_frame.topic)
193 .to_string(),
194 };
195
196 let nu_config = match nu::parse_config(&mut engine, &script) {
197 Ok(cfg) => cfg,
198 Err(e) => {
199 let _ = emit_event(
200 &store,
201 &loop_ctx,
202 spawn_frame.id,
203 None,
204 ServiceEventKind::ParseError {
205 message: e.to_string(),
206 },
207 );
208 return;
209 }
210 };
211 let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
212
213 let interrupt = Arc::new(AtomicBool::new(false));
215 engine.state.set_signals(Signals::new(interrupt.clone()));
216
217 let task = Task {
218 id: spawn_frame.id,
219 run_closure: nu_config.run_closure,
220 return_options: opts.return_options,
221 duplex: opts.duplex.unwrap_or(false),
222 engine,
223 };
224
225 run_loop(store, loop_ctx, task).await;
226}
227
228async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) {
229 let start_event = emit_event(
231 &store,
232 &loop_ctx,
233 task.id,
234 task.return_options.as_ref(),
235 ServiceEventKind::Running,
236 )
237 .expect("failed to emit running event");
238 let mut start_id = start_event.frame.id;
239
240 let control_rx_options = ReadOptions::builder()
241 .follow(FollowOption::On)
242 .after(start_id)
243 .build();
244
245 let mut control_rx = store.read(control_rx_options).await;
246
247 enum LoopOutcome {
248 Continue,
249 Update(Box<Task>, Scru128Id),
250 Terminate,
251 Shutdown,
252 Error(String),
253 }
254
255 impl core::fmt::Debug for LoopOutcome {
256 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
257 match self {
258 LoopOutcome::Continue => write!(f, "Continue"),
259 LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
260 LoopOutcome::Terminate => write!(f, "Terminate"),
261 LoopOutcome::Shutdown => write!(f, "Shutdown"),
262 LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
263 }
264 }
265 }
266
267 impl From<&LoopOutcome> for StopReason {
268 fn from(value: &LoopOutcome) -> Self {
269 match value {
270 LoopOutcome::Continue => StopReason::Finished,
271 LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
272 LoopOutcome::Terminate => StopReason::Terminate,
273 LoopOutcome::Shutdown => StopReason::Shutdown,
274 LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
275 }
276 }
277 }
278
279 loop {
280 let input_pipeline = if task.duplex {
281 let options = ReadOptions::builder()
282 .follow(FollowOption::On)
283 .after(start_id)
284 .build();
285 let send_rx = store.read(options).await;
286 build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
287 } else {
288 PipelineData::empty()
289 };
290
291 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
292 spawn_thread(
293 store.clone(),
294 loop_ctx.clone(),
295 task.clone(),
296 input_pipeline,
297 done_tx,
298 );
299
300 let terminate_topic = format!("{topic}.terminate", topic = loop_ctx.topic);
301 let spawn_topic = format!("{topic}.spawn", topic = loop_ctx.topic);
302 tokio::pin!(done_rx);
303
304 let outcome = 'ctrl: loop {
305 tokio::select! {
306 biased;
307 maybe = control_rx.recv() => {
308 match maybe {
309 Some(frame) if frame.topic == terminate_topic => {
310 task.engine.state.signals().trigger();
311 task.engine.kill_job_by_name(&task.id.to_string());
312 let _ = (&mut done_rx).await;
313 break 'ctrl LoopOutcome::Terminate;
314 }
315 Some(frame) if frame.topic == "xs.stopping" => {
316 task.engine.state.signals().trigger();
317 task.engine.kill_job_by_name(&task.id.to_string());
318 let _ = (&mut done_rx).await;
319 break 'ctrl LoopOutcome::Shutdown;
320 }
321 Some(frame) if frame.topic == spawn_topic => {
322 if let Some(hash) = frame.hash.clone() {
323 if let Ok(mut reader) = store.cas_reader(hash).await {
324 let mut script = String::new();
325 if reader.read_to_string(&mut script).await.is_ok() {
326 let mut new_engine = match crate::processor::build_engine(&store, &frame.id) {
327 Ok(e) => e,
328 Err(_) => continue,
329 };
330 match nu::parse_config(&mut new_engine, &script) {
331 Ok(cfg) => {
332 let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
333 let interrupt = Arc::new(AtomicBool::new(false));
334 new_engine.state.set_signals(Signals::new(interrupt.clone()));
335
336 task.engine.state.signals().trigger();
337 task.engine.kill_job_by_name(&task.id.to_string());
338 let _ = (&mut done_rx).await;
339
340 let new_task = Task {
341 id: frame.id,
342 run_closure: cfg.run_closure,
343 return_options: opts.return_options,
344 duplex: opts.duplex.unwrap_or(false),
345 engine: new_engine,
346 };
347
348 break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
349 }
350 Err(e) => {
351 let _ = emit_event(
352 &store,
353 &loop_ctx,
354 frame.id,
355 None,
356 ServiceEventKind::ParseError { message: e.to_string() },
357 );
358 }
359 }
360 }
361 }
362 }
363 }
364 Some(_) => {}
365 None => break 'ctrl LoopOutcome::Error("control".into()),
366 }
367 }
368 res = &mut done_rx => {
369 break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
370 Ok(()) => LoopOutcome::Continue,
371 Err(e) => LoopOutcome::Error(e),
372 };
373 }
374 }
375 };
376
377 let reason: StopReason = (&outcome).into();
378 let _ = emit_event(
379 &store,
380 &loop_ctx,
381 task.id,
382 task.return_options.as_ref(),
383 ServiceEventKind::Stopped(reason.clone()),
384 );
385
386 match outcome {
387 LoopOutcome::Continue => {
388 tokio::time::sleep(Duration::from_secs(1)).await;
389 if let Ok(event) = emit_event(
390 &store,
391 &loop_ctx,
392 task.id,
393 task.return_options.as_ref(),
394 ServiceEventKind::Running,
395 ) {
396 start_id = event.frame.id;
397 }
398 }
399 LoopOutcome::Update(new_task, _) => {
400 task = *new_task;
401 if let Ok(event) = emit_event(
402 &store,
403 &loop_ctx,
404 task.id,
405 task.return_options.as_ref(),
406 ServiceEventKind::Running,
407 ) {
408 start_id = event.frame.id;
409 }
410 }
411 LoopOutcome::Terminate | LoopOutcome::Shutdown | LoopOutcome::Error(_) => {
412 let _ = emit_event(
413 &store,
414 &loop_ctx,
415 task.id,
416 task.return_options.as_ref(),
417 ServiceEventKind::Shutdown,
418 );
419 break;
420 }
421 }
422 }
423}
424
425async fn build_input_pipeline(
426 store: Store,
427 loop_ctx: &ServiceLoop,
428 task: &Task,
429 rx: tokio::sync::mpsc::Receiver<Frame>,
430) -> PipelineData {
431 let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
432 let signals = task.engine.state.signals().clone();
433 let mut rx = rx;
434 let iter = std::iter::from_fn(move || loop {
435 if signals.interrupted() {
436 return None;
437 }
438
439 match rx.try_recv() {
440 Ok(frame) => {
441 if frame.topic == topic {
442 if let Some(hash) = frame.hash {
443 if let Ok(bytes) = store.cas_read_sync(&hash) {
444 if let Ok(content) = String::from_utf8(bytes) {
445 return Some(content);
446 }
447 }
448 }
449 }
450 }
451 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
452 std::thread::sleep(std::time::Duration::from_millis(10));
453 continue;
454 }
455 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
456 return None;
457 }
458 }
459 });
460
461 ByteStream::from_iter(
462 iter,
463 Span::unknown(),
464 task.engine.state.signals().clone(),
465 ByteStreamType::Unknown,
466 )
467 .into()
468}
469
470fn spawn_thread(
471 store: Store,
472 loop_ctx: ServiceLoop,
473 mut task: Task,
474 input_pipeline: PipelineData,
475 done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
476) {
477 let handle = tokio::runtime::Handle::current();
478 std::thread::spawn(move || {
479 let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
480 let _ = done_tx.send(res);
481 });
482}
483
484fn run_pipeline(
485 handle: &tokio::runtime::Handle,
486 store: &Store,
487 loop_ctx: &ServiceLoop,
488 task: &mut Task,
489 input_pipeline: PipelineData,
490) -> Result<(), String> {
491 let pipeline = task
492 .engine
493 .run_closure_in_job(
494 &task.run_closure,
495 vec![],
496 Some(input_pipeline),
497 task.id.to_string(),
498 )
499 .map_err(|e| {
500 let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
501 nu_protocol::format_cli_error(None, &working_set, &*e, None)
502 })?;
503
504 let suffix = task
505 .return_options
506 .as_ref()
507 .and_then(|o| o.suffix.clone())
508 .unwrap_or_else(|| "recv".into());
509 let use_cas = task
510 .return_options
511 .as_ref()
512 .and_then(|o| o.target.as_deref())
513 .is_some_and(|t| t == "cas");
514
515 let emit = |event| {
516 handle.block_on(async {
517 let _ = emit_event(
518 store,
519 loop_ctx,
520 task.id,
521 task.return_options.as_ref(),
522 event,
523 );
524 });
525 };
526
527 match pipeline {
528 PipelineData::Empty => {}
529 PipelineData::Value(value, _) => {
530 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
531 emit(event);
532 }
533 }
534 PipelineData::ListStream(mut stream, _) => {
535 while let Some(value) = stream.next_value() {
536 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
537 emit(event);
538 }
539 }
540 }
541 PipelineData::ByteStream(stream, _) => {
542 if let Some(mut reader) = stream.reader() {
543 let mut buf = [0u8; 8192];
544 loop {
545 match reader.read(&mut buf) {
546 Ok(0) => break,
547 Ok(n) => {
548 emit(ServiceEventKind::Recv {
549 suffix: suffix.clone(),
550 data: buf[..n].to_vec(),
551 });
552 }
553 Err(_) => break,
554 }
555 }
556 }
557 }
558 }
559 Ok(())
560}
561
562fn value_to_event(
563 value: &Value,
564 suffix: &str,
565 use_cas: bool,
566) -> Result<Option<ServiceEventKind>, String> {
567 match value {
568 Value::Nothing { .. } => Ok(None),
569 Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
570 suffix: suffix.to_string(),
571 meta: value_to_json(value),
572 })),
573 _ if use_cas => {
574 let data = match value {
575 Value::String { val, .. } => val.as_bytes().to_vec(),
576 Value::Binary { val, .. } => val.clone(),
577 _ => value_to_json(value).to_string().into_bytes(),
578 };
579 Ok(Some(ServiceEventKind::Recv {
580 suffix: suffix.to_string(),
581 data,
582 }))
583 }
584 _ => Err(format!(
585 "Service output must be a record when target is not \"cas\"; got {}. \
586 Set return_options.target to \"cas\" for non-record output.",
587 value.get_type()
588 )),
589 }
590}