Skip to main content

xs/processor/service/
service.rs

1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18    pub duplex: Option<bool>,
19    pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24    pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29    pub id: Scru128Id,
30    pub run_closure: nu_protocol::engine::Closure,
31    pub return_options: Option<ReturnOptions>,
32    pub duplex: bool,
33    pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39    Running,
40    /// output frame flushed; payload is raw bytes stored in CAS
41    Recv {
42        suffix: String,
43        data: Vec<u8>,
44    },
45    /// output frame flushed; payload is a JSON record stored as frame metadata
46    RecvMeta {
47        suffix: String,
48        meta: serde_json::Value,
49    },
50    Stopped(StopReason),
51    ParseError {
52        message: String,
53    },
54    Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60    pub kind: ServiceEventKind,
61    pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67    Finished,
68    Error { message: String },
69    Terminate,
70    Shutdown,
71    Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75    store: &Store,
76    loop_ctx: &ServiceLoop,
77    source_id: Scru128Id,
78    return_opts: Option<&ReturnOptions>,
79    kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81    let frame = match &kind {
82        ServiceEventKind::Running => store.append(
83            Frame::builder(format!("xs.service.{}.active", loop_ctx.topic))
84                .meta(json!({ "source_id": source_id.to_string() }))
85                .build(),
86        )?,
87
88        ServiceEventKind::Recv { suffix, data } => {
89            // Data topic: app namespace, unchanged.
90            let hash = store.cas_insert_bytes_sync(data)?;
91            store.append(
92                Frame::builder(format!(
93                    "{topic}{suffix}",
94                    topic = loop_ctx.topic,
95                    suffix = suffix
96                ))
97                .hash(hash)
98                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
99                .meta(json!({ "source_id": source_id.to_string() }))
100                .build(),
101            )?
102        }
103
104        ServiceEventKind::RecvMeta { suffix, meta } => {
105            // Data topic: app namespace, unchanged.
106            let mut merged = meta.clone();
107            merged["source_id"] = json!(source_id.to_string());
108            store.append(
109                Frame::builder(format!(
110                    "{topic}{suffix}",
111                    topic = loop_ctx.topic,
112                    suffix = suffix
113                ))
114                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
115                .meta(merged)
116                .build(),
117            )?
118        }
119
120        ServiceEventKind::Stopped(reason) => {
121            // Per ADR 0005:
122            //   Finished  -> .fin.ok       (terminal, won't restart)
123            //   Error     -> .fin.error    (terminal)
124            //   Terminate -> .fin.term     (terminal)
125            //   Update    -> .replaced     (transient; successor coming)
126            //   Shutdown  -> not a lifecycle frame on its own; the server-
127            //                shutdown ack is the separate ServiceEventKind::
128            //                Shutdown emission below. Skip here.
129            let event_suffix = match reason {
130                StopReason::Finished => "fin.ok",
131                StopReason::Error { .. } => "fin.error",
132                StopReason::Terminate => "fin.term",
133                StopReason::Update { .. } => "replaced",
134                StopReason::Shutdown => {
135                    // No frame: the run_loop emits ServiceEventKind::Shutdown
136                    // (which becomes the .stopped frame) for the xs-stopping
137                    // path; emitting here would double up.
138                    return Ok(ServiceEvent {
139                        kind,
140                        frame: Frame::builder("").build(),
141                    });
142                }
143            };
144            let mut meta = json!({
145                "source_id": source_id.to_string(),
146            });
147            if let StopReason::Update { update_id } = reason {
148                meta["update_id"] = json!(update_id.to_string());
149            }
150            if let StopReason::Error { message } = reason {
151                meta["message"] = json!(message);
152            }
153            store.append(
154                Frame::builder(format!("xs.service.{}.{event_suffix}", loop_ctx.topic))
155                    .meta(meta)
156                    .build(),
157            )?
158        }
159
160        ServiceEventKind::ParseError { message } => store.append(
161            Frame::builder(format!("xs.service.{}.invalid", loop_ctx.topic))
162                .meta(json!({
163                    "source_id": source_id.to_string(),
164                    "reason": message,
165                }))
166                .build(),
167        )?,
168
169        ServiceEventKind::Shutdown => store.append(
170            Frame::builder(format!("xs.service.{}.stopped", loop_ctx.topic))
171                .meta(json!({ "source_id": source_id.to_string() }))
172                .build(),
173        )?,
174    };
175
176    Ok(ServiceEvent { kind, frame })
177}
178
179pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
180    tokio::spawn(async move { run(store, spawn_frame).await })
181}
182
183async fn run(store: Store, spawn_frame: Frame) {
184    let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) {
185        Ok(e) => e,
186        Err(_) => return,
187    };
188
189    let hash = match spawn_frame.hash.clone() {
190        Some(h) => h,
191        None => return,
192    };
193    let mut reader = match store.cas_reader(hash).await {
194        Ok(r) => r,
195        Err(_) => return,
196    };
197    let mut script = String::new();
198    if reader.read_to_string(&mut script).await.is_err() {
199        return;
200    }
201
202    let loop_ctx = ServiceLoop {
203        // Topic is `xs.service.<name>.create`; strip both ends to get <name>.
204        topic: spawn_frame
205            .topic
206            .strip_prefix("xs.service.")
207            .and_then(|rest| rest.strip_suffix(".create"))
208            .unwrap_or(&spawn_frame.topic)
209            .to_string(),
210    };
211
212    let nu_config = match nu::parse_config(&mut engine, &script) {
213        Ok(cfg) => cfg,
214        Err(e) => {
215            let _ = emit_event(
216                &store,
217                &loop_ctx,
218                spawn_frame.id,
219                None,
220                ServiceEventKind::ParseError {
221                    message: e.to_string(),
222                },
223            );
224            return;
225        }
226    };
227    let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
228
229    // Create and set the interrupt signal on the engine state
230    let interrupt = Arc::new(AtomicBool::new(false));
231    engine.state.set_signals(Signals::new(interrupt.clone()));
232
233    let task = Task {
234        id: spawn_frame.id,
235        run_closure: nu_config.run_closure,
236        return_options: opts.return_options,
237        duplex: opts.duplex.unwrap_or(false),
238        engine,
239    };
240
241    run_loop(store, loop_ctx, task).await;
242}
243
244async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) {
245    // Create the first start frame and set up a persistent control subscription
246    let start_event = emit_event(
247        &store,
248        &loop_ctx,
249        task.id,
250        task.return_options.as_ref(),
251        ServiceEventKind::Running,
252    )
253    .expect("failed to emit running event");
254    let mut start_id = start_event.frame.id;
255
256    let control_rx_options = ReadOptions::builder()
257        .follow(FollowOption::On)
258        .after(start_id)
259        .build();
260
261    let mut control_rx = store.read(control_rx_options).await;
262
263    enum LoopOutcome {
264        Continue,
265        Update(Box<Task>, Scru128Id),
266        Terminate,
267        Shutdown,
268        Error(String),
269    }
270
271    impl core::fmt::Debug for LoopOutcome {
272        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
273            match self {
274                LoopOutcome::Continue => write!(f, "Continue"),
275                LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
276                LoopOutcome::Terminate => write!(f, "Terminate"),
277                LoopOutcome::Shutdown => write!(f, "Shutdown"),
278                LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
279            }
280        }
281    }
282
283    impl From<&LoopOutcome> for StopReason {
284        fn from(value: &LoopOutcome) -> Self {
285            match value {
286                LoopOutcome::Continue => StopReason::Finished,
287                LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
288                LoopOutcome::Terminate => StopReason::Terminate,
289                LoopOutcome::Shutdown => StopReason::Shutdown,
290                LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
291            }
292        }
293    }
294
295    loop {
296        let input_pipeline = if task.duplex {
297            let options = ReadOptions::builder()
298                .follow(FollowOption::On)
299                .after(start_id)
300                .build();
301            let send_rx = store.read(options).await;
302            build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
303        } else {
304            PipelineData::empty()
305        };
306
307        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
308        spawn_thread(
309            store.clone(),
310            loop_ctx.clone(),
311            task.clone(),
312            input_pipeline,
313            done_tx,
314        );
315
316        let terminate_topic = format!("xs.service.{}.term", loop_ctx.topic);
317        let spawn_topic = format!("xs.service.{}.create", loop_ctx.topic);
318        tokio::pin!(done_rx);
319
320        let outcome = 'ctrl: loop {
321            tokio::select! {
322                biased;
323                maybe = control_rx.recv() => {
324                    match maybe {
325                        Some(frame) if frame.topic == terminate_topic => {
326                            task.engine.state.signals().trigger();
327                            task.engine.kill_job_by_name(&task.id.to_string());
328                            let _ = (&mut done_rx).await;
329                            break 'ctrl LoopOutcome::Terminate;
330                        }
331                        Some(frame) if frame.topic == "xs.stopping" => {
332                            task.engine.state.signals().trigger();
333                            task.engine.kill_job_by_name(&task.id.to_string());
334                            let _ = (&mut done_rx).await;
335                            break 'ctrl LoopOutcome::Shutdown;
336                        }
337                        Some(frame) if frame.topic == spawn_topic => {
338                            if let Some(hash) = frame.hash.clone() {
339                                if let Ok(mut reader) = store.cas_reader(hash).await {
340                                    let mut script = String::new();
341                                    if reader.read_to_string(&mut script).await.is_ok() {
342                                        let mut new_engine = match crate::processor::build_engine(&store, &frame.id) {
343                                            Ok(e) => e,
344                                            Err(_) => continue,
345                                        };
346                                        match nu::parse_config(&mut new_engine, &script) {
347                                            Ok(cfg) => {
348                                                let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
349                                                let interrupt = Arc::new(AtomicBool::new(false));
350                                                new_engine.state.set_signals(Signals::new(interrupt.clone()));
351
352                                                task.engine.state.signals().trigger();
353                                                task.engine.kill_job_by_name(&task.id.to_string());
354                                                let _ = (&mut done_rx).await;
355
356                                                let new_task = Task {
357                                                    id: frame.id,
358                                                    run_closure: cfg.run_closure,
359                                                    return_options: opts.return_options,
360                                                    duplex: opts.duplex.unwrap_or(false),
361                                                    engine: new_engine,
362                                                };
363
364                                                break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
365                                            }
366                                            Err(e) => {
367                                                let _ = emit_event(
368                                                    &store,
369                                                    &loop_ctx,
370                                                    frame.id,
371                                                    None,
372                                                    ServiceEventKind::ParseError { message: e.to_string() },
373                                                );
374                                            }
375                                        }
376                                    }
377                                }
378                            }
379                        }
380                        Some(_) => {}
381                        None => break 'ctrl LoopOutcome::Error("control".into()),
382                    }
383                }
384                res = &mut done_rx => {
385                    break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
386                        Ok(()) => LoopOutcome::Continue,
387                        Err(e) => LoopOutcome::Error(e),
388                    };
389                }
390            }
391        };
392
393        let reason: StopReason = (&outcome).into();
394        // Pre-emit a Stopped frame only when it maps to a real lifecycle
395        // topic:
396        //   Update    -> .replaced (transient, successor coming)
397        //   Terminate -> .fin.term
398        //   Error     -> .fin.error
399        //   Finished  -> no frame: Continue auto-restarts, this isn't a
400        //                terminal stop.
401        //   Shutdown  -> no frame: the post-loop ServiceEventKind::Shutdown
402        //                emits the single .stopped frame for the xs-stopping
403        //                path.
404        match &reason {
405            StopReason::Finished | StopReason::Shutdown => {}
406            _ => {
407                let _ = emit_event(
408                    &store,
409                    &loop_ctx,
410                    task.id,
411                    task.return_options.as_ref(),
412                    ServiceEventKind::Stopped(reason.clone()),
413                );
414            }
415        }
416
417        match outcome {
418            LoopOutcome::Continue => {
419                tokio::time::sleep(Duration::from_secs(1)).await;
420                if let Ok(event) = emit_event(
421                    &store,
422                    &loop_ctx,
423                    task.id,
424                    task.return_options.as_ref(),
425                    ServiceEventKind::Running,
426                ) {
427                    start_id = event.frame.id;
428                }
429            }
430            LoopOutcome::Update(new_task, _) => {
431                task = *new_task;
432                if let Ok(event) = emit_event(
433                    &store,
434                    &loop_ctx,
435                    task.id,
436                    task.return_options.as_ref(),
437                    ServiceEventKind::Running,
438                ) {
439                    start_id = event.frame.id;
440                }
441            }
442            LoopOutcome::Terminate | LoopOutcome::Error(_) => {
443                break;
444            }
445            LoopOutcome::Shutdown => {
446                let _ = emit_event(
447                    &store,
448                    &loop_ctx,
449                    task.id,
450                    task.return_options.as_ref(),
451                    ServiceEventKind::Shutdown,
452                );
453                break;
454            }
455        }
456    }
457}
458
459async fn build_input_pipeline(
460    store: Store,
461    loop_ctx: &ServiceLoop,
462    task: &Task,
463    rx: tokio::sync::mpsc::Receiver<Frame>,
464) -> PipelineData {
465    let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
466    let signals = task.engine.state.signals().clone();
467    let mut rx = rx;
468    let iter = std::iter::from_fn(move || loop {
469        if signals.interrupted() {
470            return None;
471        }
472
473        match rx.try_recv() {
474            Ok(frame) => {
475                if frame.topic == topic {
476                    if let Some(hash) = frame.hash {
477                        if let Ok(bytes) = store.cas_read_sync(&hash) {
478                            if let Ok(content) = String::from_utf8(bytes) {
479                                return Some(content);
480                            }
481                        }
482                    }
483                }
484            }
485            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
486                std::thread::sleep(std::time::Duration::from_millis(10));
487                continue;
488            }
489            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
490                return None;
491            }
492        }
493    });
494
495    ByteStream::from_iter(
496        iter,
497        Span::unknown(),
498        task.engine.state.signals().clone(),
499        ByteStreamType::Unknown,
500    )
501    .into()
502}
503
504fn spawn_thread(
505    store: Store,
506    loop_ctx: ServiceLoop,
507    mut task: Task,
508    input_pipeline: PipelineData,
509    done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
510) {
511    let handle = tokio::runtime::Handle::current();
512    std::thread::spawn(move || {
513        let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
514        let _ = done_tx.send(res);
515    });
516}
517
518fn run_pipeline(
519    handle: &tokio::runtime::Handle,
520    store: &Store,
521    loop_ctx: &ServiceLoop,
522    task: &mut Task,
523    input_pipeline: PipelineData,
524) -> Result<(), String> {
525    let pipeline = task
526        .engine
527        .run_closure_in_job(
528            &task.run_closure,
529            vec![],
530            Some(input_pipeline),
531            task.id.to_string(),
532        )
533        .map_err(|e| {
534            let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
535            nu_protocol::format_cli_error(None, &working_set, &*e, None)
536        })?;
537
538    let suffix = task
539        .return_options
540        .as_ref()
541        .and_then(|o| o.suffix.clone())
542        .unwrap_or_else(|| ".recv".into());
543    let use_cas = task
544        .return_options
545        .as_ref()
546        .and_then(|o| o.target.as_deref())
547        .is_some_and(|t| t == "cas");
548
549    let emit = |event| {
550        handle.block_on(async {
551            let _ = emit_event(
552                store,
553                loop_ctx,
554                task.id,
555                task.return_options.as_ref(),
556                event,
557            );
558        });
559    };
560
561    match pipeline {
562        PipelineData::Empty => {}
563        PipelineData::Value(value, _) => {
564            if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
565                emit(event);
566            }
567        }
568        PipelineData::ListStream(mut stream, _) => {
569            while let Some(value) = stream.next_value() {
570                if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
571                    emit(event);
572                }
573            }
574        }
575        PipelineData::ByteStream(stream, _) => {
576            if let Some(mut reader) = stream.reader() {
577                let mut buf = [0u8; 8192];
578                loop {
579                    match reader.read(&mut buf) {
580                        Ok(0) => break,
581                        Ok(n) => {
582                            emit(ServiceEventKind::Recv {
583                                suffix: suffix.clone(),
584                                data: buf[..n].to_vec(),
585                            });
586                        }
587                        Err(_) => break,
588                    }
589                }
590            }
591        }
592    }
593    Ok(())
594}
595
596pub(crate) fn value_to_event(
597    value: &Value,
598    suffix: &str,
599    use_cas: bool,
600) -> Result<Option<ServiceEventKind>, String> {
601    match value {
602        Value::Nothing { .. } => Ok(None),
603        Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
604            suffix: suffix.to_string(),
605            meta: value_to_json(value),
606        })),
607        _ if use_cas => {
608            let data = match value {
609                Value::String { val, .. } => val.as_bytes().to_vec(),
610                Value::Binary { val, .. } => val.clone(),
611                _ => value_to_json(value).to_string().into_bytes(),
612            };
613            Ok(Some(ServiceEventKind::Recv {
614                suffix: suffix.to_string(),
615                data,
616            }))
617        }
618        _ => Err(format!(
619            "Service output must be a record when target is not \"cas\"; got {}. \
620             Set return_options.target to \"cas\" for non-record output.",
621            value.get_type()
622        )),
623    }
624}