1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18 pub duplex: Option<bool>,
19 pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24 pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29 pub id: Scru128Id,
30 pub run_closure: nu_protocol::engine::Closure,
31 pub return_options: Option<ReturnOptions>,
32 pub duplex: bool,
33 pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39 Running,
40 Recv {
42 suffix: String,
43 data: Vec<u8>,
44 },
45 RecvMeta {
47 suffix: String,
48 meta: serde_json::Value,
49 },
50 Stopped(StopReason),
51 ParseError {
52 message: String,
53 },
54 Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60 pub kind: ServiceEventKind,
61 pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67 Finished,
68 Error { message: String },
69 Terminate,
70 Shutdown,
71 Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75 store: &Store,
76 loop_ctx: &ServiceLoop,
77 source_id: Scru128Id,
78 return_opts: Option<&ReturnOptions>,
79 kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81 let frame = match &kind {
82 ServiceEventKind::Running => store.append(
83 Frame::builder(format!("xs.service.{}.active", loop_ctx.topic))
84 .meta(json!({ "source_id": source_id.to_string() }))
85 .build(),
86 )?,
87
88 ServiceEventKind::Recv { suffix, data } => {
89 let hash = store.cas_insert_bytes_sync(data)?;
91 store.append(
92 Frame::builder(format!(
93 "{topic}{suffix}",
94 topic = loop_ctx.topic,
95 suffix = suffix
96 ))
97 .hash(hash)
98 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
99 .meta(json!({ "source_id": source_id.to_string() }))
100 .build(),
101 )?
102 }
103
104 ServiceEventKind::RecvMeta { suffix, meta } => {
105 let mut merged = meta.clone();
107 merged["source_id"] = json!(source_id.to_string());
108 store.append(
109 Frame::builder(format!(
110 "{topic}{suffix}",
111 topic = loop_ctx.topic,
112 suffix = suffix
113 ))
114 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
115 .meta(merged)
116 .build(),
117 )?
118 }
119
120 ServiceEventKind::Stopped(reason) => {
121 let event_suffix = match reason {
130 StopReason::Finished => "fin.ok",
131 StopReason::Error { .. } => "fin.error",
132 StopReason::Terminate => "fin.term",
133 StopReason::Update { .. } => "replaced",
134 StopReason::Shutdown => {
135 return Ok(ServiceEvent {
139 kind,
140 frame: Frame::builder("").build(),
141 });
142 }
143 };
144 let mut meta = json!({
145 "source_id": source_id.to_string(),
146 });
147 if let StopReason::Update { update_id } = reason {
148 meta["update_id"] = json!(update_id.to_string());
149 }
150 if let StopReason::Error { message } = reason {
151 meta["message"] = json!(message);
152 }
153 store.append(
154 Frame::builder(format!("xs.service.{}.{event_suffix}", loop_ctx.topic))
155 .meta(meta)
156 .build(),
157 )?
158 }
159
160 ServiceEventKind::ParseError { message } => store.append(
161 Frame::builder(format!("xs.service.{}.invalid", loop_ctx.topic))
162 .meta(json!({
163 "source_id": source_id.to_string(),
164 "reason": message,
165 }))
166 .build(),
167 )?,
168
169 ServiceEventKind::Shutdown => store.append(
170 Frame::builder(format!("xs.service.{}.stopped", loop_ctx.topic))
171 .meta(json!({ "source_id": source_id.to_string() }))
172 .build(),
173 )?,
174 };
175
176 Ok(ServiceEvent { kind, frame })
177}
178
179pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
180 tokio::spawn(async move { run(store, spawn_frame).await })
181}
182
183fn specialize(
188 base: &nu::Engine,
189 store: &Store,
190 create_id: Scru128Id,
191) -> Result<nu::Engine, Box<dyn std::error::Error + Send + Sync>> {
192 let mut engine = base.clone();
193 let modules = store.nu_modules_at(&create_id);
194 nu::load_modules(&mut engine.state, store, &modules)?;
195 engine.set_append_meta(&serde_json::json!({ "service_id": create_id.to_string() }));
196 Ok(engine)
197}
198
199async fn run(store: Store, spawn_frame: Frame) {
200 let base = match nu::prepared_base(&store, nu::ReadMode::Stream, true) {
203 Ok(e) => e,
204 Err(_) => return,
205 };
206 let mut engine = match specialize(&base, &store, spawn_frame.id) {
207 Ok(e) => e,
208 Err(_) => return,
209 };
210
211 let hash = match spawn_frame.hash.clone() {
212 Some(h) => h,
213 None => return,
214 };
215 let mut reader = match store.cas_reader(hash).await {
216 Ok(r) => r,
217 Err(_) => return,
218 };
219 let mut script = String::new();
220 if reader.read_to_string(&mut script).await.is_err() {
221 return;
222 }
223
224 let loop_ctx = ServiceLoop {
225 topic: spawn_frame
227 .topic
228 .strip_prefix("xs.service.")
229 .and_then(|rest| rest.strip_suffix(".create"))
230 .unwrap_or(&spawn_frame.topic)
231 .to_string(),
232 };
233
234 let nu_config = match nu::parse_config(&mut engine, &script) {
235 Ok(cfg) => cfg,
236 Err(e) => {
237 let _ = emit_event(
238 &store,
239 &loop_ctx,
240 spawn_frame.id,
241 None,
242 ServiceEventKind::ParseError {
243 message: e.to_string(),
244 },
245 );
246 return;
247 }
248 };
249 let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
250
251 let interrupt = Arc::new(AtomicBool::new(false));
253 engine.state.set_signals(Signals::new(interrupt.clone()));
254
255 let task = Task {
256 id: spawn_frame.id,
257 run_closure: nu_config.run_closure,
258 return_options: opts.return_options,
259 duplex: opts.duplex.unwrap_or(false),
260 engine,
261 };
262
263 run_loop(store, loop_ctx, task, base).await;
264}
265
266async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task, base: nu::Engine) {
267 let start_event = emit_event(
269 &store,
270 &loop_ctx,
271 task.id,
272 task.return_options.as_ref(),
273 ServiceEventKind::Running,
274 )
275 .expect("failed to emit running event");
276 let mut start_id = start_event.frame.id;
277
278 let control_rx_options = ReadOptions::builder()
279 .follow(FollowOption::On)
280 .after(start_id)
281 .build();
282
283 let mut control_rx = store.read(control_rx_options).await;
284
285 enum LoopOutcome {
286 Continue,
287 Update(Box<Task>, Scru128Id),
288 Terminate,
289 Shutdown,
290 Error(String),
291 }
292
293 impl core::fmt::Debug for LoopOutcome {
294 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
295 match self {
296 LoopOutcome::Continue => write!(f, "Continue"),
297 LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
298 LoopOutcome::Terminate => write!(f, "Terminate"),
299 LoopOutcome::Shutdown => write!(f, "Shutdown"),
300 LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
301 }
302 }
303 }
304
305 impl From<&LoopOutcome> for StopReason {
306 fn from(value: &LoopOutcome) -> Self {
307 match value {
308 LoopOutcome::Continue => StopReason::Finished,
309 LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
310 LoopOutcome::Terminate => StopReason::Terminate,
311 LoopOutcome::Shutdown => StopReason::Shutdown,
312 LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
313 }
314 }
315 }
316
317 loop {
318 let input_pipeline = if task.duplex {
319 let options = ReadOptions::builder()
320 .follow(FollowOption::On)
321 .after(start_id)
322 .build();
323 let send_rx = store.read(options).await;
324 build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
325 } else {
326 PipelineData::empty()
327 };
328
329 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
330 spawn_thread(
331 store.clone(),
332 loop_ctx.clone(),
333 task.clone(),
334 input_pipeline,
335 done_tx,
336 );
337
338 let terminate_topic = format!("xs.service.{}.term", loop_ctx.topic);
339 let spawn_topic = format!("xs.service.{}.create", loop_ctx.topic);
340 tokio::pin!(done_rx);
341
342 let outcome = 'ctrl: loop {
343 tokio::select! {
344 biased;
345 maybe = control_rx.recv() => {
346 match maybe {
347 Some(frame) if frame.topic == terminate_topic => {
348 task.engine.state.signals().trigger();
349 task.engine.kill_job_by_name(&task.id.to_string());
350 let _ = (&mut done_rx).await;
351 break 'ctrl LoopOutcome::Terminate;
352 }
353 Some(frame) if frame.topic == "xs.stopping" => {
354 task.engine.state.signals().trigger();
355 task.engine.kill_job_by_name(&task.id.to_string());
356 let _ = (&mut done_rx).await;
357 break 'ctrl LoopOutcome::Shutdown;
358 }
359 Some(frame) if frame.topic == spawn_topic => {
360 if let Some(hash) = frame.hash.clone() {
361 if let Ok(mut reader) = store.cas_reader(hash).await {
362 let mut script = String::new();
363 if reader.read_to_string(&mut script).await.is_ok() {
364 let mut new_engine = match specialize(&base, &store, frame.id) {
368 Ok(e) => e,
369 Err(_) => continue,
370 };
371 match nu::parse_config(&mut new_engine, &script) {
372 Ok(cfg) => {
373 let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
374 let interrupt = Arc::new(AtomicBool::new(false));
375 new_engine.state.set_signals(Signals::new(interrupt.clone()));
376
377 task.engine.state.signals().trigger();
378 task.engine.kill_job_by_name(&task.id.to_string());
379 let _ = (&mut done_rx).await;
380
381 let new_task = Task {
382 id: frame.id,
383 run_closure: cfg.run_closure,
384 return_options: opts.return_options,
385 duplex: opts.duplex.unwrap_or(false),
386 engine: new_engine,
387 };
388
389 break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
390 }
391 Err(e) => {
392 let _ = emit_event(
393 &store,
394 &loop_ctx,
395 frame.id,
396 None,
397 ServiceEventKind::ParseError { message: e.to_string() },
398 );
399 }
400 }
401 }
402 }
403 }
404 }
405 Some(_) => {}
406 None => break 'ctrl LoopOutcome::Error("control".into()),
407 }
408 }
409 res = &mut done_rx => {
410 break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
411 Ok(()) => LoopOutcome::Continue,
412 Err(e) => LoopOutcome::Error(e),
413 };
414 }
415 }
416 };
417
418 let reason: StopReason = (&outcome).into();
419 match &reason {
430 StopReason::Finished | StopReason::Shutdown => {}
431 _ => {
432 let _ = emit_event(
433 &store,
434 &loop_ctx,
435 task.id,
436 task.return_options.as_ref(),
437 ServiceEventKind::Stopped(reason.clone()),
438 );
439 }
440 }
441
442 match outcome {
443 LoopOutcome::Continue => {
444 tokio::time::sleep(Duration::from_secs(1)).await;
445 if let Ok(event) = emit_event(
446 &store,
447 &loop_ctx,
448 task.id,
449 task.return_options.as_ref(),
450 ServiceEventKind::Running,
451 ) {
452 start_id = event.frame.id;
453 }
454 }
455 LoopOutcome::Update(new_task, _) => {
456 task = *new_task;
457 if let Ok(event) = emit_event(
458 &store,
459 &loop_ctx,
460 task.id,
461 task.return_options.as_ref(),
462 ServiceEventKind::Running,
463 ) {
464 start_id = event.frame.id;
465 }
466 }
467 LoopOutcome::Terminate | LoopOutcome::Error(_) => {
468 break;
469 }
470 LoopOutcome::Shutdown => {
471 let _ = emit_event(
472 &store,
473 &loop_ctx,
474 task.id,
475 task.return_options.as_ref(),
476 ServiceEventKind::Shutdown,
477 );
478 break;
479 }
480 }
481 }
482}
483
484async fn build_input_pipeline(
485 store: Store,
486 loop_ctx: &ServiceLoop,
487 task: &Task,
488 rx: tokio::sync::mpsc::Receiver<Frame>,
489) -> PipelineData {
490 let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
491 let signals = task.engine.state.signals().clone();
492 let mut rx = rx;
493 let iter = std::iter::from_fn(move || loop {
494 if signals.interrupted() {
495 return None;
496 }
497
498 match rx.try_recv() {
499 Ok(frame) => {
500 if frame.topic == topic {
501 if let Some(hash) = frame.hash {
502 if let Ok(bytes) = store.cas_read_sync(&hash) {
503 if let Ok(content) = String::from_utf8(bytes) {
504 return Some(content);
505 }
506 }
507 }
508 }
509 }
510 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
511 std::thread::sleep(std::time::Duration::from_millis(10));
512 continue;
513 }
514 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
515 return None;
516 }
517 }
518 });
519
520 ByteStream::from_iter(
521 iter,
522 Span::unknown(),
523 task.engine.state.signals().clone(),
524 ByteStreamType::Unknown,
525 )
526 .into()
527}
528
529fn spawn_thread(
530 store: Store,
531 loop_ctx: ServiceLoop,
532 mut task: Task,
533 input_pipeline: PipelineData,
534 done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
535) {
536 let handle = tokio::runtime::Handle::current();
537 std::thread::spawn(move || {
538 let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
539 let _ = done_tx.send(res);
540 });
541}
542
543fn run_pipeline(
544 handle: &tokio::runtime::Handle,
545 store: &Store,
546 loop_ctx: &ServiceLoop,
547 task: &mut Task,
548 input_pipeline: PipelineData,
549) -> Result<(), String> {
550 let pipeline = task
551 .engine
552 .run_closure_in_job(
553 &task.run_closure,
554 vec![],
555 Some(input_pipeline),
556 task.id.to_string(),
557 )
558 .map_err(|e| {
559 let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
560 nu_protocol::format_cli_error(None, &working_set, &*e, None)
561 })?;
562
563 let suffix = task
564 .return_options
565 .as_ref()
566 .and_then(|o| o.suffix.clone())
567 .unwrap_or_else(|| ".recv".into());
568 let use_cas = task
569 .return_options
570 .as_ref()
571 .and_then(|o| o.target.as_deref())
572 .is_some_and(|t| t == "cas");
573
574 let emit = |event| {
575 handle.block_on(async {
576 let _ = emit_event(
577 store,
578 loop_ctx,
579 task.id,
580 task.return_options.as_ref(),
581 event,
582 );
583 });
584 };
585
586 match pipeline {
587 PipelineData::Empty => {}
588 PipelineData::Value(value, _) => {
589 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
590 emit(event);
591 }
592 }
593 PipelineData::ListStream(mut stream, _) => {
594 while let Some(value) = stream.next_value() {
595 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
596 emit(event);
597 }
598 }
599 }
600 PipelineData::ByteStream(stream, _) => {
601 if let Some(mut reader) = stream.reader() {
602 let mut buf = [0u8; 8192];
603 loop {
604 match reader.read(&mut buf) {
605 Ok(0) => break,
606 Ok(n) => {
607 emit(ServiceEventKind::Recv {
608 suffix: suffix.clone(),
609 data: buf[..n].to_vec(),
610 });
611 }
612 Err(_) => break,
613 }
614 }
615 }
616 }
617 }
618 Ok(())
619}
620
621pub(crate) fn value_to_event(
622 value: &Value,
623 suffix: &str,
624 use_cas: bool,
625) -> Result<Option<ServiceEventKind>, String> {
626 match value {
627 Value::Nothing { .. } => Ok(None),
628 Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
629 suffix: suffix.to_string(),
630 meta: value_to_json(value),
631 })),
632 _ if use_cas => {
633 let data = match value {
634 Value::String { val, .. } => val.as_bytes().to_vec(),
635 Value::Binary { val, .. } => val.clone(),
636 _ => value_to_json(value).to_string().into_bytes(),
637 };
638 Ok(Some(ServiceEventKind::Recv {
639 suffix: suffix.to_string(),
640 data,
641 }))
642 }
643 _ => Err(format!(
644 "Service output must be a record when target is not \"cas\"; got {}. \
645 Set return_options.target to \"cas\" for non-record output.",
646 value.get_type()
647 )),
648 }
649}