Skip to main content

xs/processor/service/
service.rs

1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18    pub duplex: Option<bool>,
19    pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24    pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29    pub id: Scru128Id,
30    pub run_closure: nu_protocol::engine::Closure,
31    pub return_options: Option<ReturnOptions>,
32    pub duplex: bool,
33    pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39    Running,
40    /// output frame flushed; payload is raw bytes stored in CAS
41    Recv {
42        suffix: String,
43        data: Vec<u8>,
44    },
45    /// output frame flushed; payload is a JSON record stored as frame metadata
46    RecvMeta {
47        suffix: String,
48        meta: serde_json::Value,
49    },
50    Stopped(StopReason),
51    ParseError {
52        message: String,
53    },
54    Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60    pub kind: ServiceEventKind,
61    pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67    Finished,
68    Error { message: String },
69    Terminate,
70    Shutdown,
71    Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75    store: &Store,
76    loop_ctx: &ServiceLoop,
77    source_id: Scru128Id,
78    return_opts: Option<&ReturnOptions>,
79    kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81    let frame = match &kind {
82        ServiceEventKind::Running => store.append(
83            Frame::builder(format!("xs.service.{}.active", loop_ctx.topic))
84                .meta(json!({ "source_id": source_id.to_string() }))
85                .build(),
86        )?,
87
88        ServiceEventKind::Recv { suffix, data } => {
89            // Data topic: app namespace, unchanged.
90            let hash = store.cas_insert_bytes_sync(data)?;
91            store.append(
92                Frame::builder(format!(
93                    "{topic}{suffix}",
94                    topic = loop_ctx.topic,
95                    suffix = suffix
96                ))
97                .hash(hash)
98                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
99                .meta(json!({ "source_id": source_id.to_string() }))
100                .build(),
101            )?
102        }
103
104        ServiceEventKind::RecvMeta { suffix, meta } => {
105            // Data topic: app namespace, unchanged.
106            let mut merged = meta.clone();
107            merged["source_id"] = json!(source_id.to_string());
108            store.append(
109                Frame::builder(format!(
110                    "{topic}{suffix}",
111                    topic = loop_ctx.topic,
112                    suffix = suffix
113                ))
114                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
115                .meta(merged)
116                .build(),
117            )?
118        }
119
120        ServiceEventKind::Stopped(reason) => {
121            // Per ADR 0005:
122            //   Finished  -> .fin.ok       (terminal, won't restart)
123            //   Error     -> .fin.error    (terminal)
124            //   Terminate -> .fin.term     (terminal)
125            //   Update    -> .replaced     (transient; successor coming)
126            //   Shutdown  -> not a lifecycle frame on its own; the server-
127            //                shutdown ack is the separate ServiceEventKind::
128            //                Shutdown emission below. Skip here.
129            let event_suffix = match reason {
130                StopReason::Finished => "fin.ok",
131                StopReason::Error { .. } => "fin.error",
132                StopReason::Terminate => "fin.term",
133                StopReason::Update { .. } => "replaced",
134                StopReason::Shutdown => {
135                    // No frame: the run_loop emits ServiceEventKind::Shutdown
136                    // (which becomes the .stopped frame) for the xs-stopping
137                    // path; emitting here would double up.
138                    return Ok(ServiceEvent {
139                        kind,
140                        frame: Frame::builder("").build(),
141                    });
142                }
143            };
144            let mut meta = json!({
145                "source_id": source_id.to_string(),
146            });
147            if let StopReason::Update { update_id } = reason {
148                meta["update_id"] = json!(update_id.to_string());
149            }
150            if let StopReason::Error { message } = reason {
151                meta["message"] = json!(message);
152            }
153            store.append(
154                Frame::builder(format!("xs.service.{}.{event_suffix}", loop_ctx.topic))
155                    .meta(meta)
156                    .build(),
157            )?
158        }
159
160        ServiceEventKind::ParseError { message } => store.append(
161            Frame::builder(format!("xs.service.{}.invalid", loop_ctx.topic))
162                .meta(json!({
163                    "source_id": source_id.to_string(),
164                    "reason": message,
165                }))
166                .build(),
167        )?,
168
169        ServiceEventKind::Shutdown => store.append(
170            Frame::builder(format!("xs.service.{}.stopped", loop_ctx.topic))
171                .meta(json!({ "source_id": source_id.to_string() }))
172                .build(),
173        )?,
174    };
175
176    Ok(ServiceEvent { kind, frame })
177}
178
179pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
180    tokio::spawn(async move { run(store, spawn_frame).await })
181}
182
183/// Clone the prepared base engine and specialize it for a `<name>.create`
184/// frame: load the modules visible as of that frame and stamp `service_id`.
185/// The read and write builtins already live on `base`, so neither the
186/// initial spawn nor a hot-replace re-registers builtins.
187fn specialize(
188    base: &nu::Engine,
189    store: &Store,
190    create_id: Scru128Id,
191) -> Result<nu::Engine, Box<dyn std::error::Error + Send + Sync>> {
192    let mut engine = base.clone();
193    let modules = store.nu_modules_at(&create_id);
194    nu::load_modules(&mut engine.state, store, &modules)?;
195    engine.set_append_meta(&serde_json::json!({ "service_id": create_id.to_string() }));
196    Ok(engine)
197}
198
199async fn run(store: Store, spawn_frame: Frame) {
200    // Prepared once (nushell + stdlib + core + Stream reads + Direct `.append`);
201    // cloned for the initial run and every hot-replace.
202    let base = match nu::prepared_base(&store, nu::ReadMode::Stream, true) {
203        Ok(e) => e,
204        Err(_) => return,
205    };
206    let mut engine = match specialize(&base, &store, spawn_frame.id) {
207        Ok(e) => e,
208        Err(_) => return,
209    };
210
211    let hash = match spawn_frame.hash.clone() {
212        Some(h) => h,
213        None => return,
214    };
215    let mut reader = match store.cas_reader(hash).await {
216        Ok(r) => r,
217        Err(_) => return,
218    };
219    let mut script = String::new();
220    if reader.read_to_string(&mut script).await.is_err() {
221        return;
222    }
223
224    let loop_ctx = ServiceLoop {
225        // Topic is `xs.service.<name>.create`; strip both ends to get <name>.
226        topic: spawn_frame
227            .topic
228            .strip_prefix("xs.service.")
229            .and_then(|rest| rest.strip_suffix(".create"))
230            .unwrap_or(&spawn_frame.topic)
231            .to_string(),
232    };
233
234    let nu_config = match nu::parse_config(&mut engine, &script) {
235        Ok(cfg) => cfg,
236        Err(e) => {
237            let _ = emit_event(
238                &store,
239                &loop_ctx,
240                spawn_frame.id,
241                None,
242                ServiceEventKind::ParseError {
243                    message: e.to_string(),
244                },
245            );
246            return;
247        }
248    };
249    let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
250
251    // Create and set the interrupt signal on the engine state
252    let interrupt = Arc::new(AtomicBool::new(false));
253    engine.state.set_signals(Signals::new(interrupt.clone()));
254
255    let task = Task {
256        id: spawn_frame.id,
257        run_closure: nu_config.run_closure,
258        return_options: opts.return_options,
259        duplex: opts.duplex.unwrap_or(false),
260        engine,
261    };
262
263    run_loop(store, loop_ctx, task, base).await;
264}
265
266async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task, base: nu::Engine) {
267    // Create the first start frame and set up a persistent control subscription
268    let start_event = emit_event(
269        &store,
270        &loop_ctx,
271        task.id,
272        task.return_options.as_ref(),
273        ServiceEventKind::Running,
274    )
275    .expect("failed to emit running event");
276    let mut start_id = start_event.frame.id;
277
278    let control_rx_options = ReadOptions::builder()
279        .follow(FollowOption::On)
280        .after(start_id)
281        .build();
282
283    let mut control_rx = store.read(control_rx_options).await;
284
285    enum LoopOutcome {
286        Continue,
287        Update(Box<Task>, Scru128Id),
288        Terminate,
289        Shutdown,
290        Error(String),
291    }
292
293    impl core::fmt::Debug for LoopOutcome {
294        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
295            match self {
296                LoopOutcome::Continue => write!(f, "Continue"),
297                LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
298                LoopOutcome::Terminate => write!(f, "Terminate"),
299                LoopOutcome::Shutdown => write!(f, "Shutdown"),
300                LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
301            }
302        }
303    }
304
305    impl From<&LoopOutcome> for StopReason {
306        fn from(value: &LoopOutcome) -> Self {
307            match value {
308                LoopOutcome::Continue => StopReason::Finished,
309                LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
310                LoopOutcome::Terminate => StopReason::Terminate,
311                LoopOutcome::Shutdown => StopReason::Shutdown,
312                LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
313            }
314        }
315    }
316
317    loop {
318        let input_pipeline = if task.duplex {
319            let options = ReadOptions::builder()
320                .follow(FollowOption::On)
321                .after(start_id)
322                .build();
323            let send_rx = store.read(options).await;
324            build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
325        } else {
326            PipelineData::empty()
327        };
328
329        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
330        spawn_thread(
331            store.clone(),
332            loop_ctx.clone(),
333            task.clone(),
334            input_pipeline,
335            done_tx,
336        );
337
338        let terminate_topic = format!("xs.service.{}.term", loop_ctx.topic);
339        let spawn_topic = format!("xs.service.{}.create", loop_ctx.topic);
340        tokio::pin!(done_rx);
341
342        let outcome = 'ctrl: loop {
343            tokio::select! {
344                biased;
345                maybe = control_rx.recv() => {
346                    match maybe {
347                        Some(frame) if frame.topic == terminate_topic => {
348                            task.engine.state.signals().trigger();
349                            task.engine.kill_job_by_name(&task.id.to_string());
350                            let _ = (&mut done_rx).await;
351                            break 'ctrl LoopOutcome::Terminate;
352                        }
353                        Some(frame) if frame.topic == "xs.stopping" => {
354                            task.engine.state.signals().trigger();
355                            task.engine.kill_job_by_name(&task.id.to_string());
356                            let _ = (&mut done_rx).await;
357                            break 'ctrl LoopOutcome::Shutdown;
358                        }
359                        Some(frame) if frame.topic == spawn_topic => {
360                            if let Some(hash) = frame.hash.clone() {
361                                if let Ok(mut reader) = store.cas_reader(hash).await {
362                                    let mut script = String::new();
363                                    if reader.read_to_string(&mut script).await.is_ok() {
364                                        // Clone the prepared base; the read and write builtins
365                                        // are already on it, so a hot-replace never re-registers
366                                        // builtins.
367                                        let mut new_engine = match specialize(&base, &store, frame.id) {
368                                            Ok(e) => e,
369                                            Err(_) => continue,
370                                        };
371                                        match nu::parse_config(&mut new_engine, &script) {
372                                            Ok(cfg) => {
373                                                let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
374                                                let interrupt = Arc::new(AtomicBool::new(false));
375                                                new_engine.state.set_signals(Signals::new(interrupt.clone()));
376
377                                                task.engine.state.signals().trigger();
378                                                task.engine.kill_job_by_name(&task.id.to_string());
379                                                let _ = (&mut done_rx).await;
380
381                                                let new_task = Task {
382                                                    id: frame.id,
383                                                    run_closure: cfg.run_closure,
384                                                    return_options: opts.return_options,
385                                                    duplex: opts.duplex.unwrap_or(false),
386                                                    engine: new_engine,
387                                                };
388
389                                                break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
390                                            }
391                                            Err(e) => {
392                                                let _ = emit_event(
393                                                    &store,
394                                                    &loop_ctx,
395                                                    frame.id,
396                                                    None,
397                                                    ServiceEventKind::ParseError { message: e.to_string() },
398                                                );
399                                            }
400                                        }
401                                    }
402                                }
403                            }
404                        }
405                        Some(_) => {}
406                        None => break 'ctrl LoopOutcome::Error("control".into()),
407                    }
408                }
409                res = &mut done_rx => {
410                    break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
411                        Ok(()) => LoopOutcome::Continue,
412                        Err(e) => LoopOutcome::Error(e),
413                    };
414                }
415            }
416        };
417
418        let reason: StopReason = (&outcome).into();
419        // Pre-emit a Stopped frame only when it maps to a real lifecycle
420        // topic:
421        //   Update    -> .replaced (transient, successor coming)
422        //   Terminate -> .fin.term
423        //   Error     -> .fin.error
424        //   Finished  -> no frame: Continue auto-restarts, this isn't a
425        //                terminal stop.
426        //   Shutdown  -> no frame: the post-loop ServiceEventKind::Shutdown
427        //                emits the single .stopped frame for the xs-stopping
428        //                path.
429        match &reason {
430            StopReason::Finished | StopReason::Shutdown => {}
431            _ => {
432                let _ = emit_event(
433                    &store,
434                    &loop_ctx,
435                    task.id,
436                    task.return_options.as_ref(),
437                    ServiceEventKind::Stopped(reason.clone()),
438                );
439            }
440        }
441
442        match outcome {
443            LoopOutcome::Continue => {
444                tokio::time::sleep(Duration::from_secs(1)).await;
445                if let Ok(event) = emit_event(
446                    &store,
447                    &loop_ctx,
448                    task.id,
449                    task.return_options.as_ref(),
450                    ServiceEventKind::Running,
451                ) {
452                    start_id = event.frame.id;
453                }
454            }
455            LoopOutcome::Update(new_task, _) => {
456                task = *new_task;
457                if let Ok(event) = emit_event(
458                    &store,
459                    &loop_ctx,
460                    task.id,
461                    task.return_options.as_ref(),
462                    ServiceEventKind::Running,
463                ) {
464                    start_id = event.frame.id;
465                }
466            }
467            LoopOutcome::Terminate | LoopOutcome::Error(_) => {
468                break;
469            }
470            LoopOutcome::Shutdown => {
471                let _ = emit_event(
472                    &store,
473                    &loop_ctx,
474                    task.id,
475                    task.return_options.as_ref(),
476                    ServiceEventKind::Shutdown,
477                );
478                break;
479            }
480        }
481    }
482}
483
484async fn build_input_pipeline(
485    store: Store,
486    loop_ctx: &ServiceLoop,
487    task: &Task,
488    rx: tokio::sync::mpsc::Receiver<Frame>,
489) -> PipelineData {
490    let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
491    let signals = task.engine.state.signals().clone();
492    let mut rx = rx;
493    let iter = std::iter::from_fn(move || loop {
494        if signals.interrupted() {
495            return None;
496        }
497
498        match rx.try_recv() {
499            Ok(frame) => {
500                if frame.topic == topic {
501                    if let Some(hash) = frame.hash {
502                        if let Ok(bytes) = store.cas_read_sync(&hash) {
503                            if let Ok(content) = String::from_utf8(bytes) {
504                                return Some(content);
505                            }
506                        }
507                    }
508                }
509            }
510            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
511                std::thread::sleep(std::time::Duration::from_millis(10));
512                continue;
513            }
514            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
515                return None;
516            }
517        }
518    });
519
520    ByteStream::from_iter(
521        iter,
522        Span::unknown(),
523        task.engine.state.signals().clone(),
524        ByteStreamType::Unknown,
525    )
526    .into()
527}
528
529fn spawn_thread(
530    store: Store,
531    loop_ctx: ServiceLoop,
532    mut task: Task,
533    input_pipeline: PipelineData,
534    done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
535) {
536    let handle = tokio::runtime::Handle::current();
537    std::thread::spawn(move || {
538        let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
539        let _ = done_tx.send(res);
540    });
541}
542
543fn run_pipeline(
544    handle: &tokio::runtime::Handle,
545    store: &Store,
546    loop_ctx: &ServiceLoop,
547    task: &mut Task,
548    input_pipeline: PipelineData,
549) -> Result<(), String> {
550    let pipeline = task
551        .engine
552        .run_closure_in_job(
553            &task.run_closure,
554            vec![],
555            Some(input_pipeline),
556            task.id.to_string(),
557        )
558        .map_err(|e| {
559            let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
560            nu_protocol::format_cli_error(None, &working_set, &*e, None)
561        })?;
562
563    let suffix = task
564        .return_options
565        .as_ref()
566        .and_then(|o| o.suffix.clone())
567        .unwrap_or_else(|| ".recv".into());
568    let use_cas = task
569        .return_options
570        .as_ref()
571        .and_then(|o| o.target.as_deref())
572        .is_some_and(|t| t == "cas");
573
574    let emit = |event| {
575        handle.block_on(async {
576            let _ = emit_event(
577                store,
578                loop_ctx,
579                task.id,
580                task.return_options.as_ref(),
581                event,
582            );
583        });
584    };
585
586    match pipeline {
587        PipelineData::Empty => {}
588        PipelineData::Value(value, _) => {
589            if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
590                emit(event);
591            }
592        }
593        PipelineData::ListStream(mut stream, _) => {
594            while let Some(value) = stream.next_value() {
595                if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
596                    emit(event);
597                }
598            }
599        }
600        PipelineData::ByteStream(stream, _) => {
601            if let Some(mut reader) = stream.reader() {
602                let mut buf = [0u8; 8192];
603                loop {
604                    match reader.read(&mut buf) {
605                        Ok(0) => break,
606                        Ok(n) => {
607                            emit(ServiceEventKind::Recv {
608                                suffix: suffix.clone(),
609                                data: buf[..n].to_vec(),
610                            });
611                        }
612                        Err(_) => break,
613                    }
614                }
615            }
616        }
617    }
618    Ok(())
619}
620
621pub(crate) fn value_to_event(
622    value: &Value,
623    suffix: &str,
624    use_cas: bool,
625) -> Result<Option<ServiceEventKind>, String> {
626    match value {
627        Value::Nothing { .. } => Ok(None),
628        Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
629            suffix: suffix.to_string(),
630            meta: value_to_json(value),
631        })),
632        _ if use_cas => {
633            let data = match value {
634                Value::String { val, .. } => val.as_bytes().to_vec(),
635                Value::Binary { val, .. } => val.clone(),
636                _ => value_to_json(value).to_string().into_bytes(),
637            };
638            Ok(Some(ServiceEventKind::Recv {
639                suffix: suffix.to_string(),
640                data,
641            }))
642        }
643        _ => Err(format!(
644            "Service output must be a record when target is not \"cas\"; got {}. \
645             Set return_options.target to \"cas\" for non-record output.",
646            value.get_type()
647        )),
648    }
649}