Skip to main content

xs/processor/service/
service.rs

1use scru128::Scru128Id;
2use tokio::task::JoinHandle;
3
4use nu_protocol::{ByteStream, ByteStreamType, PipelineData, Signals, Span, Value};
5use std::io::Read;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::io::AsyncReadExt;
10
11use crate::nu;
12use crate::nu::{value_to_json, ReturnOptions};
13use crate::store::{FollowOption, Frame, ReadOptions, Store};
14use serde_json::json;
15
16#[derive(Clone, Debug, serde::Deserialize, Default)]
17pub struct ServiceScriptOptions {
18    pub duplex: Option<bool>,
19    pub return_options: Option<ReturnOptions>,
20}
21
22#[derive(Clone)]
23pub struct ServiceLoop {
24    pub topic: String,
25}
26
27#[derive(Clone)]
28pub struct Task {
29    pub id: Scru128Id,
30    pub run_closure: nu_protocol::engine::Closure,
31    pub return_options: Option<ReturnOptions>,
32    pub duplex: bool,
33    pub engine: nu::Engine,
34}
35
36#[cfg_attr(not(test), allow(dead_code))]
37#[derive(Debug, Clone)]
38pub enum ServiceEventKind {
39    Running,
40    /// output frame flushed; payload is raw bytes stored in CAS
41    Recv {
42        suffix: String,
43        data: Vec<u8>,
44    },
45    /// output frame flushed; payload is a JSON record stored as frame metadata
46    RecvMeta {
47        suffix: String,
48        meta: serde_json::Value,
49    },
50    Stopped(StopReason),
51    ParseError {
52        message: String,
53    },
54    Shutdown,
55}
56
57#[cfg_attr(not(test), allow(dead_code))]
58#[derive(Debug, Clone)]
59pub struct ServiceEvent {
60    pub kind: ServiceEventKind,
61    pub frame: Frame,
62}
63
64#[cfg_attr(not(test), allow(dead_code))]
65#[derive(Debug, Clone)]
66pub enum StopReason {
67    Finished,
68    Error { message: String },
69    Terminate,
70    Shutdown,
71    Update { update_id: Scru128Id },
72}
73
74pub(crate) fn emit_event(
75    store: &Store,
76    loop_ctx: &ServiceLoop,
77    source_id: Scru128Id,
78    return_opts: Option<&ReturnOptions>,
79    kind: ServiceEventKind,
80) -> Result<ServiceEvent, Box<dyn std::error::Error + Send + Sync>> {
81    let frame = match &kind {
82        ServiceEventKind::Running => store.append(
83            Frame::builder(format!("{topic}.running", topic = loop_ctx.topic))
84                .meta(json!({ "source_id": source_id.to_string() }))
85                .build(),
86        )?,
87
88        ServiceEventKind::Recv { suffix, data } => {
89            let hash = store.cas_insert_bytes_sync(data)?;
90            store.append(
91                Frame::builder(format!(
92                    "{topic}.{suffix}",
93                    topic = loop_ctx.topic,
94                    suffix = suffix
95                ))
96                .hash(hash)
97                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
98                .meta(json!({ "source_id": source_id.to_string() }))
99                .build(),
100            )?
101        }
102
103        ServiceEventKind::RecvMeta { suffix, meta } => {
104            let mut merged = meta.clone();
105            merged["source_id"] = json!(source_id.to_string());
106            store.append(
107                Frame::builder(format!(
108                    "{topic}.{suffix}",
109                    topic = loop_ctx.topic,
110                    suffix = suffix
111                ))
112                .maybe_ttl(return_opts.and_then(|o| o.ttl.clone()))
113                .meta(merged)
114                .build(),
115            )?
116        }
117
118        ServiceEventKind::Stopped(reason) => {
119            let mut meta = json!({
120                "source_id": source_id.to_string(),
121                "reason": stop_reason_str(reason),
122            });
123            if let StopReason::Update { update_id } = reason {
124                meta["update_id"] = json!(update_id.to_string());
125            }
126            if let StopReason::Error { message } = reason {
127                meta["message"] = json!(message);
128            }
129            store.append(
130                Frame::builder(format!("{topic}.stopped", topic = loop_ctx.topic))
131                    .meta(meta)
132                    .build(),
133            )?
134        }
135
136        ServiceEventKind::ParseError { message } => store.append(
137            Frame::builder(format!("{topic}.parse.error", topic = loop_ctx.topic))
138                .meta(json!({
139                    "source_id": source_id.to_string(),
140                    "reason": message,
141                }))
142                .build(),
143        )?,
144
145        ServiceEventKind::Shutdown => store.append(
146            Frame::builder(format!("{topic}.shutdown", topic = loop_ctx.topic))
147                .meta(json!({ "source_id": source_id.to_string() }))
148                .build(),
149        )?,
150    };
151
152    Ok(ServiceEvent { kind, frame })
153}
154
155fn stop_reason_str(r: &StopReason) -> &'static str {
156    match r {
157        StopReason::Finished => "finished",
158        StopReason::Error { .. } => "error",
159        StopReason::Terminate => "terminate",
160        StopReason::Shutdown => "shutdown",
161        StopReason::Update { .. } => "update",
162    }
163}
164
165pub fn spawn(store: Store, spawn_frame: Frame) -> JoinHandle<()> {
166    tokio::spawn(async move { run(store, spawn_frame).await })
167}
168
169async fn run(store: Store, spawn_frame: Frame) {
170    let mut engine = match crate::processor::build_engine(&store, &spawn_frame.id) {
171        Ok(e) => e,
172        Err(_) => return,
173    };
174
175    let hash = match spawn_frame.hash.clone() {
176        Some(h) => h,
177        None => return,
178    };
179    let mut reader = match store.cas_reader(hash).await {
180        Ok(r) => r,
181        Err(_) => return,
182    };
183    let mut script = String::new();
184    if reader.read_to_string(&mut script).await.is_err() {
185        return;
186    }
187
188    let loop_ctx = ServiceLoop {
189        topic: spawn_frame
190            .topic
191            .strip_suffix(".spawn")
192            .unwrap_or(&spawn_frame.topic)
193            .to_string(),
194    };
195
196    let nu_config = match nu::parse_config(&mut engine, &script) {
197        Ok(cfg) => cfg,
198        Err(e) => {
199            let _ = emit_event(
200                &store,
201                &loop_ctx,
202                spawn_frame.id,
203                None,
204                ServiceEventKind::ParseError {
205                    message: e.to_string(),
206                },
207            );
208            return;
209        }
210    };
211    let opts: ServiceScriptOptions = nu_config.deserialize_options().unwrap_or_default();
212
213    // Create and set the interrupt signal on the engine state
214    let interrupt = Arc::new(AtomicBool::new(false));
215    engine.state.set_signals(Signals::new(interrupt.clone()));
216
217    let task = Task {
218        id: spawn_frame.id,
219        run_closure: nu_config.run_closure,
220        return_options: opts.return_options,
221        duplex: opts.duplex.unwrap_or(false),
222        engine,
223    };
224
225    run_loop(store, loop_ctx, task).await;
226}
227
228async fn run_loop(store: Store, loop_ctx: ServiceLoop, mut task: Task) {
229    // Create the first start frame and set up a persistent control subscription
230    let start_event = emit_event(
231        &store,
232        &loop_ctx,
233        task.id,
234        task.return_options.as_ref(),
235        ServiceEventKind::Running,
236    )
237    .expect("failed to emit running event");
238    let mut start_id = start_event.frame.id;
239
240    let control_rx_options = ReadOptions::builder()
241        .follow(FollowOption::On)
242        .after(start_id)
243        .build();
244
245    let mut control_rx = store.read(control_rx_options).await;
246
247    enum LoopOutcome {
248        Continue,
249        Update(Box<Task>, Scru128Id),
250        Terminate,
251        Shutdown,
252        Error(String),
253    }
254
255    impl core::fmt::Debug for LoopOutcome {
256        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
257            match self {
258                LoopOutcome::Continue => write!(f, "Continue"),
259                LoopOutcome::Update(_, id) => f.debug_tuple("Update").field(id).finish(),
260                LoopOutcome::Terminate => write!(f, "Terminate"),
261                LoopOutcome::Shutdown => write!(f, "Shutdown"),
262                LoopOutcome::Error(e) => f.debug_tuple("Error").field(e).finish(),
263            }
264        }
265    }
266
267    impl From<&LoopOutcome> for StopReason {
268        fn from(value: &LoopOutcome) -> Self {
269            match value {
270                LoopOutcome::Continue => StopReason::Finished,
271                LoopOutcome::Update(_, id) => StopReason::Update { update_id: *id },
272                LoopOutcome::Terminate => StopReason::Terminate,
273                LoopOutcome::Shutdown => StopReason::Shutdown,
274                LoopOutcome::Error(e) => StopReason::Error { message: e.clone() },
275            }
276        }
277    }
278
279    loop {
280        let input_pipeline = if task.duplex {
281            let options = ReadOptions::builder()
282                .follow(FollowOption::On)
283                .after(start_id)
284                .build();
285            let send_rx = store.read(options).await;
286            build_input_pipeline(store.clone(), &loop_ctx, &task, send_rx).await
287        } else {
288            PipelineData::empty()
289        };
290
291        let (done_tx, done_rx) = tokio::sync::oneshot::channel();
292        spawn_thread(
293            store.clone(),
294            loop_ctx.clone(),
295            task.clone(),
296            input_pipeline,
297            done_tx,
298        );
299
300        let terminate_topic = format!("{topic}.terminate", topic = loop_ctx.topic);
301        let spawn_topic = format!("{topic}.spawn", topic = loop_ctx.topic);
302        tokio::pin!(done_rx);
303
304        let outcome = 'ctrl: loop {
305            tokio::select! {
306                biased;
307                maybe = control_rx.recv() => {
308                    match maybe {
309                        Some(frame) if frame.topic == terminate_topic => {
310                            task.engine.state.signals().trigger();
311                            task.engine.kill_job_by_name(&task.id.to_string());
312                            let _ = (&mut done_rx).await;
313                            break 'ctrl LoopOutcome::Terminate;
314                        }
315                        Some(frame) if frame.topic == "xs.stopping" => {
316                            task.engine.state.signals().trigger();
317                            task.engine.kill_job_by_name(&task.id.to_string());
318                            let _ = (&mut done_rx).await;
319                            break 'ctrl LoopOutcome::Shutdown;
320                        }
321                        Some(frame) if frame.topic == spawn_topic => {
322                            if let Some(hash) = frame.hash.clone() {
323                                if let Ok(mut reader) = store.cas_reader(hash).await {
324                                    let mut script = String::new();
325                                    if reader.read_to_string(&mut script).await.is_ok() {
326                                        let mut new_engine = match crate::processor::build_engine(&store, &frame.id) {
327                                            Ok(e) => e,
328                                            Err(_) => continue,
329                                        };
330                                        match nu::parse_config(&mut new_engine, &script) {
331                                            Ok(cfg) => {
332                                                let opts: ServiceScriptOptions = cfg.deserialize_options().unwrap_or_default();
333                                                let interrupt = Arc::new(AtomicBool::new(false));
334                                                new_engine.state.set_signals(Signals::new(interrupt.clone()));
335
336                                                task.engine.state.signals().trigger();
337                                                task.engine.kill_job_by_name(&task.id.to_string());
338                                                let _ = (&mut done_rx).await;
339
340                                                let new_task = Task {
341                                                    id: frame.id,
342                                                    run_closure: cfg.run_closure,
343                                                    return_options: opts.return_options,
344                                                    duplex: opts.duplex.unwrap_or(false),
345                                                    engine: new_engine,
346                                                };
347
348                                                break 'ctrl LoopOutcome::Update(Box::new(new_task), frame.id);
349                                            }
350                                            Err(e) => {
351                                                let _ = emit_event(
352                                                    &store,
353                                                    &loop_ctx,
354                                                    frame.id,
355                                                    None,
356                                                    ServiceEventKind::ParseError { message: e.to_string() },
357                                                );
358                                            }
359                                        }
360                                    }
361                                }
362                            }
363                        }
364                        Some(_) => {}
365                        None => break 'ctrl LoopOutcome::Error("control".into()),
366                    }
367                }
368                res = &mut done_rx => {
369                    break 'ctrl match res.unwrap_or(Err("thread failed".into())) {
370                        Ok(()) => LoopOutcome::Continue,
371                        Err(e) => LoopOutcome::Error(e),
372                    };
373                }
374            }
375        };
376
377        let reason: StopReason = (&outcome).into();
378        let _ = emit_event(
379            &store,
380            &loop_ctx,
381            task.id,
382            task.return_options.as_ref(),
383            ServiceEventKind::Stopped(reason.clone()),
384        );
385
386        match outcome {
387            LoopOutcome::Continue => {
388                tokio::time::sleep(Duration::from_secs(1)).await;
389                if let Ok(event) = emit_event(
390                    &store,
391                    &loop_ctx,
392                    task.id,
393                    task.return_options.as_ref(),
394                    ServiceEventKind::Running,
395                ) {
396                    start_id = event.frame.id;
397                }
398            }
399            LoopOutcome::Update(new_task, _) => {
400                task = *new_task;
401                if let Ok(event) = emit_event(
402                    &store,
403                    &loop_ctx,
404                    task.id,
405                    task.return_options.as_ref(),
406                    ServiceEventKind::Running,
407                ) {
408                    start_id = event.frame.id;
409                }
410            }
411            LoopOutcome::Terminate | LoopOutcome::Shutdown | LoopOutcome::Error(_) => {
412                let _ = emit_event(
413                    &store,
414                    &loop_ctx,
415                    task.id,
416                    task.return_options.as_ref(),
417                    ServiceEventKind::Shutdown,
418                );
419                break;
420            }
421        }
422    }
423}
424
425async fn build_input_pipeline(
426    store: Store,
427    loop_ctx: &ServiceLoop,
428    task: &Task,
429    rx: tokio::sync::mpsc::Receiver<Frame>,
430) -> PipelineData {
431    let topic = format!("{loop_topic}.send", loop_topic = loop_ctx.topic);
432    let signals = task.engine.state.signals().clone();
433    let mut rx = rx;
434    let iter = std::iter::from_fn(move || loop {
435        if signals.interrupted() {
436            return None;
437        }
438
439        match rx.try_recv() {
440            Ok(frame) => {
441                if frame.topic == topic {
442                    if let Some(hash) = frame.hash {
443                        if let Ok(bytes) = store.cas_read_sync(&hash) {
444                            if let Ok(content) = String::from_utf8(bytes) {
445                                return Some(content);
446                            }
447                        }
448                    }
449                }
450            }
451            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
452                std::thread::sleep(std::time::Duration::from_millis(10));
453                continue;
454            }
455            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
456                return None;
457            }
458        }
459    });
460
461    ByteStream::from_iter(
462        iter,
463        Span::unknown(),
464        task.engine.state.signals().clone(),
465        ByteStreamType::Unknown,
466    )
467    .into()
468}
469
470fn spawn_thread(
471    store: Store,
472    loop_ctx: ServiceLoop,
473    mut task: Task,
474    input_pipeline: PipelineData,
475    done_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
476) {
477    let handle = tokio::runtime::Handle::current();
478    std::thread::spawn(move || {
479        let res = run_pipeline(&handle, &store, &loop_ctx, &mut task, input_pipeline);
480        let _ = done_tx.send(res);
481    });
482}
483
484fn run_pipeline(
485    handle: &tokio::runtime::Handle,
486    store: &Store,
487    loop_ctx: &ServiceLoop,
488    task: &mut Task,
489    input_pipeline: PipelineData,
490) -> Result<(), String> {
491    let pipeline = task
492        .engine
493        .run_closure_in_job(
494            &task.run_closure,
495            vec![],
496            Some(input_pipeline),
497            task.id.to_string(),
498        )
499        .map_err(|e| {
500            let working_set = nu_protocol::engine::StateWorkingSet::new(&task.engine.state);
501            nu_protocol::format_cli_error(None, &working_set, &*e, None)
502        })?;
503
504    let suffix = task
505        .return_options
506        .as_ref()
507        .and_then(|o| o.suffix.clone())
508        .unwrap_or_else(|| "recv".into());
509    let use_cas = task
510        .return_options
511        .as_ref()
512        .and_then(|o| o.target.as_deref())
513        .is_some_and(|t| t == "cas");
514
515    let emit = |event| {
516        handle.block_on(async {
517            let _ = emit_event(
518                store,
519                loop_ctx,
520                task.id,
521                task.return_options.as_ref(),
522                event,
523            );
524        });
525    };
526
527    match pipeline {
528        PipelineData::Empty => {}
529        PipelineData::Value(value, _) => {
530            if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
531                emit(event);
532            }
533        }
534        PipelineData::ListStream(mut stream, _) => {
535            while let Some(value) = stream.next_value() {
536                if let Some(event) = value_to_event(&value, &suffix, use_cas)? {
537                    emit(event);
538                }
539            }
540        }
541        PipelineData::ByteStream(stream, _) => {
542            if let Some(mut reader) = stream.reader() {
543                let mut buf = [0u8; 8192];
544                loop {
545                    match reader.read(&mut buf) {
546                        Ok(0) => break,
547                        Ok(n) => {
548                            emit(ServiceEventKind::Recv {
549                                suffix: suffix.clone(),
550                                data: buf[..n].to_vec(),
551                            });
552                        }
553                        Err(_) => break,
554                    }
555                }
556            }
557        }
558    }
559    Ok(())
560}
561
562fn value_to_event(
563    value: &Value,
564    suffix: &str,
565    use_cas: bool,
566) -> Result<Option<ServiceEventKind>, String> {
567    match value {
568        Value::Nothing { .. } => Ok(None),
569        Value::Record { .. } if !use_cas => Ok(Some(ServiceEventKind::RecvMeta {
570            suffix: suffix.to_string(),
571            meta: value_to_json(value),
572        })),
573        _ if use_cas => {
574            let data = match value {
575                Value::String { val, .. } => val.as_bytes().to_vec(),
576                Value::Binary { val, .. } => val.clone(),
577                _ => value_to_json(value).to_string().into_bytes(),
578            };
579            Ok(Some(ServiceEventKind::Recv {
580                suffix: suffix.to_string(),
581                data,
582            }))
583        }
584        _ => Err(format!(
585            "Service output must be a record when target is not \"cas\"; got {}. \
586             Set return_options.target to \"cas\" for non-record output.",
587            value.get_type()
588        )),
589    }
590}