1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18 pub duplex: Option<bool>,
19 pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24 pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29 pub id: Scru128Id,
30 pub run_closure: nu_protocol::engine::Closure,
31 pub return_options: Option<ReturnOptions>,
32 pub duplex: bool,
33 pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39 Running,
40 Recv {
42 suffix: String,
43 data: Vec<u8>,
44 },
45 RecvMeta {
47 suffix: String,
48 meta: serde_json::Value,
49 },
50 Stopped(StopReason),
51 ParseError {
52 message: String,
53 },
54 Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60 pub kind: ServiceEventKind,
61 pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67 Finished,
68 Error { message: String },
69 Terminate,
70 Shutdown,
71 Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75 store: &Store,
76 loop_ctx: &ServiceLoop,
77 source_id: Scru128Id,
78 return_opts: Option<&ReturnOptions>,
79 kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81 let frame = match &kind {
82 ServiceEventKind::Running => store.append(
83 Frame::builder(format!("xs.service.{}.active", loop_ctx.topic))
84 .meta(json!({ "source_id": source_id.to_string() }))
85 .build(),
86 )?,
87
88 ServiceEventKind::Recv { suffix, data } => {
89 let hash = store.cas_insert_bytes_sync(data)?;
91 store.append(
92 Frame::builder(format!(
93 "{topic}{suffix}",
94 topic = loop_ctx.topic,
95 suffix = suffix
96 ))
97 .hash(hash)
98 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
99 .meta(json!({ "source_id": source_id.to_string() }))
100 .build(),
101 )?
102 }
103
104 ServiceEventKind::RecvMeta { suffix, meta } => {
105 let mut merged = meta.clone();
107 merged["source_id"] = json!(source_id.to_string());
108 store.append(
109 Frame::builder(format!(
110 "{topic}{suffix}",
111 topic = loop_ctx.topic,
112 suffix = suffix
113 ))
114 .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
115 .meta(merged)
116 .build(),
117 )?
118 }
119
120 ServiceEventKind::Stopped(reason) => {
121 let event_suffix = match reason {
130 StopReason::Finished => "fin.ok",
131 StopReason::Error { .. } => "fin.error",
132 StopReason::Terminate => "fin.term",
133 StopReason::Update { .. } => "replaced",
134 StopReason::Shutdown => {
135 return Ok(ServiceEvent {
139 kind,
140 frame: Frame::builder("").build(),
141 });
142 }
143 };
144 let mut meta = json!({
145 "source_id": source_id.to_string(),
146 });
147 if let StopReason::Update { update_id } = reason {
148 meta["update_id"] = json!(update_id.to_string());
149 }
150 if let StopReason::Error { message } = reason {
151 meta["message"] = json!(message);
152 }
153 store.append(
154 Frame::builder(format!("xs.service.{}.{event_suffix}", loop_ctx.topic))
155 .meta(meta)
156 .build(),
157 )?
158 }
159
160 ServiceEventKind::ParseError { message } => store.append(
161 Frame::builder(format!("xs.service.{}.invalid", loop_ctx.topic))
162 .meta(json!({
163 "source_id": source_id.to_string(),
164 "reason": message,
165 }))
166 .build(),
167 )?,
168
169 ServiceEventKind::Shutdown => store.append(
170 Frame::builder(format!("xs.service.{}.stopped", loop_ctx.topic))
171 .meta(json!({ "source_id": source_id.to_string() }))
172 .build(),
173 )?,
174 };
175
176 Ok(ServiceEvent { kind, frame })
177}
178
179pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
180 tokio::spawn(async move { run(store, spawn_frame).await })
181}
182
183async fn run(store: Store, spawn_frame: Frame) {
184 let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) {
185 Ok(e) => e,
186 Err(_) => return,
187 };
188
189 let base_meta = serde_json::json!({ "service_id": spawn_frame.id.to_string() });
192 if crate::nu::add_read_commands(&mut engine, &store, crate::nu::ReadMode::Stream).is_err()
193 || crate::nu::add_write_commands(
194 &mut engine,
195 &store,
196 crate::nu::AppendMode::Direct(base_meta),
197 )
198 .is_err()
199 {
200 return;
201 }
202
203 let hash = match spawn_frame.hash.clone() {
204 Some(h) => h,
205 None => return,
206 };
207 let mut reader = match store.cas_reader(hash).await {
208 Ok(r) => r,
209 Err(_) => return,
210 };
211 let mut script = String::new();
212 if reader.read_to_string(&mut script).await.is_err() {
213 return;
214 }
215
216 let loop_ctx = ServiceLoop {
217 topic: spawn_frame
219 .topic
220 .strip_prefix("xs.service.")
221 .and_then(|rest| rest.strip_suffix(".create"))
222 .unwrap_or(&spawn_frame.topic)
223 .to_string(),
224 };
225
226 let nu_config = match nu::parse_config(&mut engine, &script) {
227 Ok(cfg) => cfg,
228 Err(e) => {
229 let _ = emit_event(
230 &store,
231 &loop_ctx,
232 spawn_frame.id,
233 None,
234 ServiceEventKind::ParseError {
235 message: e.to_string(),
236 },
237 );
238 return;
239 }
240 };
241 let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
242
243 let interrupt = Arc::new(AtomicBool::new(false));
245 engine.state.set_signals(Signals::new(interrupt.clone()));
246
247 let task = Task {
248 id: spawn_frame.id,
249 run_closure: nu_config.run_closure,
250 return_options: opts.return_options,
251 duplex: opts.duplex.unwrap_or(false),
252 engine,
253 };
254
255 run_loop(store, loop_ctx, task).await;
256}
257
258async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) {
259 let start_event = emit_event(
261 &store,
262 &loop_ctx,
263 task.id,
264 task.return_options.as_ref(),
265 ServiceEventKind::Running,
266 )
267 .expect("failed to emit running event");
268 let mut start_id = start_event.frame.id;
269
270 let control_rx_options = ReadOptions::builder()
271 .follow(FollowOption::On)
272 .after(start_id)
273 .build();
274
275 let mut control_rx = store.read(control_rx_options).await;
276
277 enum LoopOutcome {
278 Continue,
279 Update(Box<Task>, Scru128Id),
280 Terminate,
281 Shutdown,
282 Error(String),
283 }
284
285 impl core::fmt::Debug for LoopOutcome {
286 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
287 match self {
288 LoopOutcome::Continue => write!(f, "Continue"),
289 LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
290 LoopOutcome::Terminate => write!(f, "Terminate"),
291 LoopOutcome::Shutdown => write!(f, "Shutdown"),
292 LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
293 }
294 }
295 }
296
297 impl From<&LoopOutcome> for StopReason {
298 fn from(value: &LoopOutcome) -> Self {
299 match value {
300 LoopOutcome::Continue => StopReason::Finished,
301 LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
302 LoopOutcome::Terminate => StopReason::Terminate,
303 LoopOutcome::Shutdown => StopReason::Shutdown,
304 LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
305 }
306 }
307 }
308
309 loop {
310 let input_pipeline = if task.duplex {
311 let options = ReadOptions::builder()
312 .follow(FollowOption::On)
313 .after(start_id)
314 .build();
315 let send_rx = store.read(options).await;
316 build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
317 } else {
318 PipelineData::empty()
319 };
320
321 let (done_tx, done_rx) = tokio::sync::oneshot::channel();
322 spawn_thread(
323 store.clone(),
324 loop_ctx.clone(),
325 task.clone(),
326 input_pipeline,
327 done_tx,
328 );
329
330 let terminate_topic = format!("xs.service.{}.term", loop_ctx.topic);
331 let spawn_topic = format!("xs.service.{}.create", loop_ctx.topic);
332 tokio::pin!(done_rx);
333
334 let outcome = 'ctrl: loop {
335 tokio::select! {
336 biased;
337 maybe = control_rx.recv() => {
338 match maybe {
339 Some(frame) if frame.topic == terminate_topic => {
340 task.engine.state.signals().trigger();
341 task.engine.kill_job_by_name(&task.id.to_string());
342 let _ = (&mut done_rx).await;
343 break 'ctrl LoopOutcome::Terminate;
344 }
345 Some(frame) if frame.topic == "xs.stopping" => {
346 task.engine.state.signals().trigger();
347 task.engine.kill_job_by_name(&task.id.to_string());
348 let _ = (&mut done_rx).await;
349 break 'ctrl LoopOutcome::Shutdown;
350 }
351 Some(frame) if frame.topic == spawn_topic => {
352 if let Some(hash) = frame.hash.clone() {
353 if let Ok(mut reader) = store.cas_reader(hash).await {
354 let mut script = String::new();
355 if reader.read_to_string(&mut script).await.is_ok() {
356 let mut new_engine = match crate::processor::build_engine(&store, &frame.id) {
357 Ok(e) => e,
358 Err(_) => continue,
359 };
360 match nu::parse_config(&mut new_engine, &script) {
361 Ok(cfg) => {
362 let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
363 let interrupt = Arc::new(AtomicBool::new(false));
364 new_engine.state.set_signals(Signals::new(interrupt.clone()));
365
366 task.engine.state.signals().trigger();
367 task.engine.kill_job_by_name(&task.id.to_string());
368 let _ = (&mut done_rx).await;
369
370 let new_task = Task {
371 id: frame.id,
372 run_closure: cfg.run_closure,
373 return_options: opts.return_options,
374 duplex: opts.duplex.unwrap_or(false),
375 engine: new_engine,
376 };
377
378 break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
379 }
380 Err(e) => {
381 let _ = emit_event(
382 &store,
383 &loop_ctx,
384 frame.id,
385 None,
386 ServiceEventKind::ParseError { message: e.to_string() },
387 );
388 }
389 }
390 }
391 }
392 }
393 }
394 Some(_) => {}
395 None => break 'ctrl LoopOutcome::Error("control".into()),
396 }
397 }
398 res = &mut done_rx => {
399 break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
400 Ok(()) => LoopOutcome::Continue,
401 Err(e) => LoopOutcome::Error(e),
402 };
403 }
404 }
405 };
406
407 let reason: StopReason = (&outcome).into();
408 match &reason {
419 StopReason::Finished | StopReason::Shutdown => {}
420 _ => {
421 let _ = emit_event(
422 &store,
423 &loop_ctx,
424 task.id,
425 task.return_options.as_ref(),
426 ServiceEventKind::Stopped(reason.clone()),
427 );
428 }
429 }
430
431 match outcome {
432 LoopOutcome::Continue => {
433 tokio::time::sleep(Duration::from_secs(1)).await;
434 if let Ok(event) = emit_event(
435 &store,
436 &loop_ctx,
437 task.id,
438 task.return_options.as_ref(),
439 ServiceEventKind::Running,
440 ) {
441 start_id = event.frame.id;
442 }
443 }
444 LoopOutcome::Update(new_task, _) => {
445 task = *new_task;
446 if let Ok(event) = emit_event(
447 &store,
448 &loop_ctx,
449 task.id,
450 task.return_options.as_ref(),
451 ServiceEventKind::Running,
452 ) {
453 start_id = event.frame.id;
454 }
455 }
456 LoopOutcome::Terminate | LoopOutcome::Error(_) => {
457 break;
458 }
459 LoopOutcome::Shutdown => {
460 let _ = emit_event(
461 &store,
462 &loop_ctx,
463 task.id,
464 task.return_options.as_ref(),
465 ServiceEventKind::Shutdown,
466 );
467 break;
468 }
469 }
470 }
471}
472
473async fn build_input_pipeline(
474 store: Store,
475 loop_ctx: &ServiceLoop,
476 task: &Task,
477 rx: tokio::sync::mpsc::Receiver<Frame>,
478) -> PipelineData {
479 let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
480 let signals = task.engine.state.signals().clone();
481 let mut rx = rx;
482 let iter = std::iter::from_fn(move || loop {
483 if signals.interrupted() {
484 return None;
485 }
486
487 match rx.try_recv() {
488 Ok(frame) => {
489 if frame.topic == topic {
490 if let Some(hash) = frame.hash {
491 if let Ok(bytes) = store.cas_read_sync(&hash) {
492 if let Ok(content) = String::from_utf8(bytes) {
493 return Some(content);
494 }
495 }
496 }
497 }
498 }
499 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
500 std::thread::sleep(std::time::Duration::from_millis(10));
501 continue;
502 }
503 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
504 return None;
505 }
506 }
507 });
508
509 ByteStream::from_iter(
510 iter,
511 Span::unknown(),
512 task.engine.state.signals().clone(),
513 ByteStreamType::Unknown,
514 )
515 .into()
516}
517
518fn spawn_thread(
519 store: Store,
520 loop_ctx: ServiceLoop,
521 mut task: Task,
522 input_pipeline: PipelineData,
523 done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
524) {
525 let handle = tokio::runtime::Handle::current();
526 std::thread::spawn(move || {
527 let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
528 let _ = done_tx.send(res);
529 });
530}
531
532fn run_pipeline(
533 handle: &tokio::runtime::Handle,
534 store: &Store,
535 loop_ctx: &ServiceLoop,
536 task: &mut Task,
537 input_pipeline: PipelineData,
538) -> Result<(), String> {
539 let pipeline = task
540 .engine
541 .run_closure_in_job(
542 &task.run_closure,
543 vec![],
544 Some(input_pipeline),
545 task.id.to_string(),
546 )
547 .map_err(|e| {
548 let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
549 nu_protocol::format_cli_error(None, &working_set, &*e, None)
550 })?;
551
552 let suffix = task
553 .return_options
554 .as_ref()
555 .and_then(|o| o.suffix.clone())
556 .unwrap_or_else(|| ".recv".into());
557 let use_cas = task
558 .return_options
559 .as_ref()
560 .and_then(|o| o.target.as_deref())
561 .is_some_and(|t| t == "cas");
562
563 let emit = |event| {
564 handle.block_on(async {
565 let _ = emit_event(
566 store,
567 loop_ctx,
568 task.id,
569 task.return_options.as_ref(),
570 event,
571 );
572 });
573 };
574
575 match pipeline {
576 PipelineData::Empty => {}
577 PipelineData::Value(value, _) => {
578 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
579 emit(event);
580 }
581 }
582 PipelineData::ListStream(mut stream, _) => {
583 while let Some(value) = stream.next_value() {
584 if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
585 emit(event);
586 }
587 }
588 }
589 PipelineData::ByteStream(stream, _) => {
590 if let Some(mut reader) = stream.reader() {
591 let mut buf = [0u8; 8192];
592 loop {
593 match reader.read(&mut buf) {
594 Ok(0) => break,
595 Ok(n) => {
596 emit(ServiceEventKind::Recv {
597 suffix: suffix.clone(),
598 data: buf[..n].to_vec(),
599 });
600 }
601 Err(_) => break,
602 }
603 }
604 }
605 }
606 }
607 Ok(())
608}
609
610pub(crate) fn value_to_event(
611 value: &Value,
612 suffix: &str,
613 use_cas: bool,
614) -> Result<Option<ServiceEventKind>, String> {
615 match value {
616 Value::Nothing { .. } => Ok(None),
617 Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
618 suffix: suffix.to_string(),
619 meta: value_to_json(value),
620 })),
621 _ if use_cas => {
622 let data = match value {
623 Value::String { val, .. } => val.as_bytes().to_vec(),
624 Value::Binary { val, .. } => val.clone(),
625 _ => value_to_json(value).to_string().into_bytes(),
626 };
627 Ok(Some(ServiceEventKind::Recv {
628 suffix: suffix.to_string(),
629 data,
630 }))
631 }
632 _ => Err(format!(
633 "Service output must be a record when target is not \"cas\"; got {}. \
634 Set return_options.target to \"cas\" for non-record output.",
635 value.get_type()
636 )),
637 }
638}