Skip to main content

xs/processor/service/
service.rs

1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18    pub duplex: Option<bool>,
19    pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24    pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29    pub id: Scru128Id,
30    pub run_closure: nu_protocol::engine::Closure,
31    pub return_options: Option<ReturnOptions>,
32    pub duplex: bool,
33    pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39    Running,
40    /// output frame flushed; payload is raw bytes stored in CAS
41    Recv {
42        suffix: String,
43        data: Vec<u8>,
44    },
45    /// output frame flushed; payload is a JSON record stored as frame metadata
46    RecvMeta {
47        suffix: String,
48        meta: serde_json::Value,
49    },
50    Stopped(StopReason),
51    ParseError {
52        message: String,
53    },
54    Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60    pub kind: ServiceEventKind,
61    pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67    Finished,
68    Error { message: String },
69    Terminate,
70    Shutdown,
71    Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75    store: &Store,
76    loop_ctx: &ServiceLoop,
77    source_id: Scru128Id,
78    return_opts: Option<&ReturnOptions>,
79    kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81    let frame = match &kind {
82        ServiceEventKind::Running => store.append(
83            Frame::builder(format!("xs.service.{}.active", loop_ctx.topic))
84                .meta(json!({ "source_id": source_id.to_string() }))
85                .build(),
86        )?,
87
88        ServiceEventKind::Recv { suffix, data } => {
89            // Data topic: app namespace, unchanged.
90            let hash = store.cas_insert_bytes_sync(data)?;
91            store.append(
92                Frame::builder(format!(
93                    "{topic}{suffix}",
94                    topic = loop_ctx.topic,
95                    suffix = suffix
96                ))
97                .hash(hash)
98                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
99                .meta(json!({ "source_id": source_id.to_string() }))
100                .build(),
101            )?
102        }
103
104        ServiceEventKind::RecvMeta { suffix, meta } => {
105            // Data topic: app namespace, unchanged.
106            let mut merged = meta.clone();
107            merged["source_id"] = json!(source_id.to_string());
108            store.append(
109                Frame::builder(format!(
110                    "{topic}{suffix}",
111                    topic = loop_ctx.topic,
112                    suffix = suffix
113                ))
114                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
115                .meta(merged)
116                .build(),
117            )?
118        }
119
120        ServiceEventKind::Stopped(reason) => {
121            // Per ADR 0005:
122            //   Finished  -> .fin.ok       (terminal, won't restart)
123            //   Error     -> .fin.error    (terminal)
124            //   Terminate -> .fin.term     (terminal)
125            //   Update    -> .replaced     (transient; successor coming)
126            //   Shutdown  -> not a lifecycle frame on its own; the server-
127            //                shutdown ack is the separate ServiceEventKind::
128            //                Shutdown emission below. Skip here.
129            let event_suffix = match reason {
130                StopReason::Finished => "fin.ok",
131                StopReason::Error { .. } => "fin.error",
132                StopReason::Terminate => "fin.term",
133                StopReason::Update { .. } => "replaced",
134                StopReason::Shutdown => {
135                    // No frame: the run_loop emits ServiceEventKind::Shutdown
136                    // (which becomes the .stopped frame) for the xs-stopping
137                    // path; emitting here would double up.
138                    return Ok(ServiceEvent {
139                        kind,
140                        frame: Frame::builder("").build(),
141                    });
142                }
143            };
144            let mut meta = json!({
145                "source_id": source_id.to_string(),
146            });
147            if let StopReason::Update { update_id } = reason {
148                meta["update_id"] = json!(update_id.to_string());
149            }
150            if let StopReason::Error { message } = reason {
151                meta["message"] = json!(message);
152            }
153            store.append(
154                Frame::builder(format!("xs.service.{}.{event_suffix}", loop_ctx.topic))
155                    .meta(meta)
156                    .build(),
157            )?
158        }
159
160        ServiceEventKind::ParseError { message } => store.append(
161            Frame::builder(format!("xs.service.{}.invalid", loop_ctx.topic))
162                .meta(json!({
163                    "source_id": source_id.to_string(),
164                    "reason": message,
165                }))
166                .build(),
167        )?,
168
169        ServiceEventKind::Shutdown => store.append(
170            Frame::builder(format!("xs.service.{}.stopped", loop_ctx.topic))
171                .meta(json!({ "source_id": source_id.to_string() }))
172                .build(),
173        )?,
174    };
175
176    Ok(ServiceEvent { kind, frame })
177}
178
179pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
180    tokio::spawn(async move { run(store, spawn_frame).await })
181}
182
183async fn run(store: Store, spawn_frame: Frame) {
184    let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) {
185        Ok(e) => e,
186        Err(_) => return,
187    };
188
189    // Services get the same read/append surface as the other runners. Their
190    // appends are tagged with the spawning frame's id as `service_id`.
191    let base_meta = serde_json::json!({ "service_id": spawn_frame.id.to_string() });
192    if crate::nu::add_read_commands(&mut engine, &store, crate::nu::ReadMode::Stream).is_err()
193        || crate::nu::add_write_commands(
194            &mut engine,
195            &store,
196            crate::nu::AppendMode::Direct(base_meta),
197        )
198        .is_err()
199    {
200        return;
201    }
202
203    let hash = match spawn_frame.hash.clone() {
204        Some(h) => h,
205        None => return,
206    };
207    let mut reader = match store.cas_reader(hash).await {
208        Ok(r) => r,
209        Err(_) => return,
210    };
211    let mut script = String::new();
212    if reader.read_to_string(&mut script).await.is_err() {
213        return;
214    }
215
216    let loop_ctx = ServiceLoop {
217        // Topic is `xs.service.<name>.create`; strip both ends to get <name>.
218        topic: spawn_frame
219            .topic
220            .strip_prefix("xs.service.")
221            .and_then(|rest| rest.strip_suffix(".create"))
222            .unwrap_or(&spawn_frame.topic)
223            .to_string(),
224    };
225
226    let nu_config = match nu::parse_config(&mut engine, &script) {
227        Ok(cfg) => cfg,
228        Err(e) => {
229            let _ = emit_event(
230                &store,
231                &loop_ctx,
232                spawn_frame.id,
233                None,
234                ServiceEventKind::ParseError {
235                    message: e.to_string(),
236                },
237            );
238            return;
239        }
240    };
241    let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
242
243    // Create and set the interrupt signal on the engine state
244    let interrupt = Arc::new(AtomicBool::new(false));
245    engine.state.set_signals(Signals::new(interrupt.clone()));
246
247    let task = Task {
248        id: spawn_frame.id,
249        run_closure: nu_config.run_closure,
250        return_options: opts.return_options,
251        duplex: opts.duplex.unwrap_or(false),
252        engine,
253    };
254
255    run_loop(store, loop_ctx, task).await;
256}
257
258async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) {
259    // Create the first start frame and set up a persistent control subscription
260    let start_event = emit_event(
261        &store,
262        &loop_ctx,
263        task.id,
264        task.return_options.as_ref(),
265        ServiceEventKind::Running,
266    )
267    .expect("failed to emit running event");
268    let mut start_id = start_event.frame.id;
269
270    let control_rx_options = ReadOptions::builder()
271        .follow(FollowOption::On)
272        .after(start_id)
273        .build();
274
275    let mut control_rx = store.read(control_rx_options).await;
276
277    enum LoopOutcome {
278        Continue,
279        Update(Box<Task>, Scru128Id),
280        Terminate,
281        Shutdown,
282        Error(String),
283    }
284
285    impl core::fmt::Debug for LoopOutcome {
286        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
287            match self {
288                LoopOutcome::Continue => write!(f, "Continue"),
289                LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
290                LoopOutcome::Terminate => write!(f, "Terminate"),
291                LoopOutcome::Shutdown => write!(f, "Shutdown"),
292                LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
293            }
294        }
295    }
296
297    impl From<&LoopOutcome> for StopReason {
298        fn from(value: &LoopOutcome) -> Self {
299            match value {
300                LoopOutcome::Continue => StopReason::Finished,
301                LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
302                LoopOutcome::Terminate => StopReason::Terminate,
303                LoopOutcome::Shutdown => StopReason::Shutdown,
304                LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
305            }
306        }
307    }
308
309    loop {
310        let input_pipeline = if task.duplex {
311            let options = ReadOptions::builder()
312                .follow(FollowOption::On)
313                .after(start_id)
314                .build();
315            let send_rx = store.read(options).await;
316            build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
317        } else {
318            PipelineData::empty()
319        };
320
321        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
322        spawn_thread(
323            store.clone(),
324            loop_ctx.clone(),
325            task.clone(),
326            input_pipeline,
327            done_tx,
328        );
329
330        let terminate_topic = format!("xs.service.{}.term", loop_ctx.topic);
331        let spawn_topic = format!("xs.service.{}.create", loop_ctx.topic);
332        tokio::pin!(done_rx);
333
334        let outcome = 'ctrl: loop {
335            tokio::select! {
336                biased;
337                maybe = control_rx.recv() => {
338                    match maybe {
339                        Some(frame) if frame.topic == terminate_topic => {
340                            task.engine.state.signals().trigger();
341                            task.engine.kill_job_by_name(&task.id.to_string());
342                            let _ = (&mut done_rx).await;
343                            break 'ctrl LoopOutcome::Terminate;
344                        }
345                        Some(frame) if frame.topic == "xs.stopping" => {
346                            task.engine.state.signals().trigger();
347                            task.engine.kill_job_by_name(&task.id.to_string());
348                            let _ = (&mut done_rx).await;
349                            break 'ctrl LoopOutcome::Shutdown;
350                        }
351                        Some(frame) if frame.topic == spawn_topic => {
352                            if let Some(hash) = frame.hash.clone() {
353                                if let Ok(mut reader) = store.cas_reader(hash).await {
354                                    let mut script = String::new();
355                                    if reader.read_to_string(&mut script).await.is_ok() {
356                                        let mut new_engine = match crate::processor::build_engine(&store, &frame.id) {
357                                            Ok(e) => e,
358                                            Err(_) => continue,
359                                        };
360                                        match nu::parse_config(&mut new_engine, &script) {
361                                            Ok(cfg) => {
362                                                let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
363                                                let interrupt = Arc::new(AtomicBool::new(false));
364                                                new_engine.state.set_signals(Signals::new(interrupt.clone()));
365
366                                                task.engine.state.signals().trigger();
367                                                task.engine.kill_job_by_name(&task.id.to_string());
368                                                let _ = (&mut done_rx).await;
369
370                                                let new_task = Task {
371                                                    id: frame.id,
372                                                    run_closure: cfg.run_closure,
373                                                    return_options: opts.return_options,
374                                                    duplex: opts.duplex.unwrap_or(false),
375                                                    engine: new_engine,
376                                                };
377
378                                                break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
379                                            }
380                                            Err(e) => {
381                                                let _ = emit_event(
382                                                    &store,
383                                                    &loop_ctx,
384                                                    frame.id,
385                                                    None,
386                                                    ServiceEventKind::ParseError { message: e.to_string() },
387                                                );
388                                            }
389                                        }
390                                    }
391                                }
392                            }
393                        }
394                        Some(_) => {}
395                        None => break 'ctrl LoopOutcome::Error("control".into()),
396                    }
397                }
398                res = &mut done_rx => {
399                    break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
400                        Ok(()) => LoopOutcome::Continue,
401                        Err(e) => LoopOutcome::Error(e),
402                    };
403                }
404            }
405        };
406
407        let reason: StopReason = (&outcome).into();
408        // Pre-emit a Stopped frame only when it maps to a real lifecycle
409        // topic:
410        //   Update    -> .replaced (transient, successor coming)
411        //   Terminate -> .fin.term
412        //   Error     -> .fin.error
413        //   Finished  -> no frame: Continue auto-restarts, this isn't a
414        //                terminal stop.
415        //   Shutdown  -> no frame: the post-loop ServiceEventKind::Shutdown
416        //                emits the single .stopped frame for the xs-stopping
417        //                path.
418        match &reason {
419            StopReason::Finished | StopReason::Shutdown => {}
420            _ => {
421                let _ = emit_event(
422                    &store,
423                    &loop_ctx,
424                    task.id,
425                    task.return_options.as_ref(),
426                    ServiceEventKind::Stopped(reason.clone()),
427                );
428            }
429        }
430
431        match outcome {
432            LoopOutcome::Continue => {
433                tokio::time::sleep(Duration::from_secs(1)).await;
434                if let Ok(event) = emit_event(
435                    &store,
436                    &loop_ctx,
437                    task.id,
438                    task.return_options.as_ref(),
439                    ServiceEventKind::Running,
440                ) {
441                    start_id = event.frame.id;
442                }
443            }
444            LoopOutcome::Update(new_task, _) => {
445                task = *new_task;
446                if let Ok(event) = emit_event(
447                    &store,
448                    &loop_ctx,
449                    task.id,
450                    task.return_options.as_ref(),
451                    ServiceEventKind::Running,
452                ) {
453                    start_id = event.frame.id;
454                }
455            }
456            LoopOutcome::Terminate | LoopOutcome::Error(_) => {
457                break;
458            }
459            LoopOutcome::Shutdown => {
460                let _ = emit_event(
461                    &store,
462                    &loop_ctx,
463                    task.id,
464                    task.return_options.as_ref(),
465                    ServiceEventKind::Shutdown,
466                );
467                break;
468            }
469        }
470    }
471}
472
473async fn build_input_pipeline(
474    store: Store,
475    loop_ctx: &ServiceLoop,
476    task: &Task,
477    rx: tokio::sync::mpsc::Receiver<Frame>,
478) -> PipelineData {
479    let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
480    let signals = task.engine.state.signals().clone();
481    let mut rx = rx;
482    let iter = std::iter::from_fn(move || loop {
483        if signals.interrupted() {
484            return None;
485        }
486
487        match rx.try_recv() {
488            Ok(frame) => {
489                if frame.topic == topic {
490                    if let Some(hash) = frame.hash {
491                        if let Ok(bytes) = store.cas_read_sync(&hash) {
492                            if let Ok(content) = String::from_utf8(bytes) {
493                                return Some(content);
494                            }
495                        }
496                    }
497                }
498            }
499            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
500                std::thread::sleep(std::time::Duration::from_millis(10));
501                continue;
502            }
503            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
504                return None;
505            }
506        }
507    });
508
509    ByteStream::from_iter(
510        iter,
511        Span::unknown(),
512        task.engine.state.signals().clone(),
513        ByteStreamType::Unknown,
514    )
515    .into()
516}
517
518fn spawn_thread(
519    store: Store,
520    loop_ctx: ServiceLoop,
521    mut task: Task,
522    input_pipeline: PipelineData,
523    done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
524) {
525    let handle = tokio::runtime::Handle::current();
526    std::thread::spawn(move || {
527        let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
528        let _ = done_tx.send(res);
529    });
530}
531
532fn run_pipeline(
533    handle: &tokio::runtime::Handle,
534    store: &Store,
535    loop_ctx: &ServiceLoop,
536    task: &mut Task,
537    input_pipeline: PipelineData,
538) -> Result<(), String> {
539    let pipeline = task
540        .engine
541        .run_closure_in_job(
542            &task.run_closure,
543            vec![],
544            Some(input_pipeline),
545            task.id.to_string(),
546        )
547        .map_err(|e| {
548            let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
549            nu_protocol::format_cli_error(None, &working_set, &*e, None)
550        })?;
551
552    let suffix = task
553        .return_options
554        .as_ref()
555        .and_then(|o| o.suffix.clone())
556        .unwrap_or_else(|| ".recv".into());
557    let use_cas = task
558        .return_options
559        .as_ref()
560        .and_then(|o| o.target.as_deref())
561        .is_some_and(|t| t == "cas");
562
563    let emit = |event| {
564        handle.block_on(async {
565            let _ = emit_event(
566                store,
567                loop_ctx,
568                task.id,
569                task.return_options.as_ref(),
570                event,
571            );
572        });
573    };
574
575    match pipeline {
576        PipelineData::Empty => {}
577        PipelineData::Value(value, _) => {
578            if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
579                emit(event);
580            }
581        }
582        PipelineData::ListStream(mut stream, _) => {
583            while let Some(value) = stream.next_value() {
584                if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
585                    emit(event);
586                }
587            }
588        }
589        PipelineData::ByteStream(stream, _) => {
590            if let Some(mut reader) = stream.reader() {
591                let mut buf = [0u8; 8192];
592                loop {
593                    match reader.read(&mut buf) {
594                        Ok(0) => break,
595                        Ok(n) => {
596                            emit(ServiceEventKind::Recv {
597                                suffix: suffix.clone(),
598                                data: buf[..n].to_vec(),
599                            });
600                        }
601                        Err(_) => break,
602                    }
603                }
604            }
605        }
606    }
607    Ok(())
608}
609
610pub(crate) fn value_to_event(
611    value: &Value,
612    suffix: &str,
613    use_cas: bool,
614) -> Result<Option<ServiceEventKind>, String> {
615    match value {
616        Value::Nothing { .. } => Ok(None),
617        Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
618            suffix: suffix.to_string(),
619            meta: value_to_json(value),
620        })),
621        _ if use_cas => {
622            let data = match value {
623                Value::String { val, .. } => val.as_bytes().to_vec(),
624                Value::Binary { val, .. } => val.clone(),
625                _ => value_to_json(value).to_string().into_bytes(),
626            };
627            Ok(Some(ServiceEventKind::Recv {
628                suffix: suffix.to_string(),
629                data,
630            }))
631        }
632        _ => Err(format!(
633            "Service output must be a record when target is not \"cas\"; got {}. \
634             Set return_options.target to \"cas\" for non-record output.",
635            value.get_type()
636        )),
637    }
638}