Skip to main content

dmc/engine/
sidecar.rs

1use std::io::{BufRead, BufReader, Write};
2use std::sync::atomic::Ordering;
3use std::{
4  process::{Child, ChildStdin, ChildStdout, Command, Stdio},
5  sync::{
6    Mutex, OnceLock,
7    atomic::{AtomicU64, AtomicUsize},
8  },
9};
10
11use serde_json::{Value, json};
12
13use crate::engine::config::EngineConfig;
14
15pub struct Sidecar {
16  stdin: ChildStdin,
17  stdout: BufReader<ChildStdout>,
18  _child: Child,
19}
20
21static POOL: OnceLock<Vec<Mutex<Option<Sidecar>>>> = OnceLock::new();
22static REQ_ID: AtomicU64 = AtomicU64::new(0);
23static NEXT_SLOT: AtomicUsize = AtomicUsize::new(0);
24
25fn pool_size() -> usize {
26  std::env::var("DMC_SIDECAR_POOL_SIZE")
27    .ok()
28    .and_then(|s| s.parse().ok())
29    .unwrap_or_else(|| std::thread::available_parallelism().map(|p| p.get().min(4)).unwrap_or(2))
30}
31
32fn pool() -> &'static Vec<Mutex<Option<Sidecar>>> {
33  POOL.get_or_init(|| (0..pool_size()).map(|_| Mutex::new(None)).collect())
34}
35
36impl Sidecar {
37  pub fn new() -> Option<Self> {
38    let entry = std::env::var("dmc_SIDECAR").ok().unwrap_or_else(|| "dmc-sidecar/index.mjs".into());
39    let mut child = Command::new("node")
40      .arg(&entry)
41      .stdin(Stdio::piped())
42      .stdout(Stdio::piped())
43      .stderr(Stdio::null())
44      .spawn()
45      .ok()?;
46    let stdin = child.stdin.take().unwrap();
47    let stdout = BufReader::new(child.stdout.take().unwrap());
48    Some(Self { stdin, stdout, _child: child })
49  }
50}
51
52pub fn run_sidecar(markdown: &str, cfg: &EngineConfig) -> Option<String> {
53  let pool = pool();
54  let n = pool.len();
55
56  // Grab any idle slot; if all busy, block on round-robin pick.
57  let mut guard = None;
58  for _ in 0..n {
59    let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
60    if let Ok(g) = pool[idx].try_lock() {
61      guard = Some(g);
62      break;
63    }
64  }
65  let mut guard = match guard {
66    Some(g) => g,
67    None => {
68      let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
69      pool[idx].lock().ok()?
70    },
71  };
72
73  if guard.is_none() {
74    *guard = Some(Sidecar::new()?);
75  }
76  let child = guard.as_mut().unwrap();
77
78  let id = REQ_ID.fetch_add(1, Ordering::Relaxed);
79  let merge = |a: &Vec<Value>, b: &Vec<Value>| -> Value { Value::Array(a.iter().chain(b.iter()).cloned().collect()) };
80  let remark_md = cfg.compile.effective_markdown_remark_plugins();
81  let remark_mdx = cfg.compile.effective_mdx_remark_plugins();
82  let rehype_md = cfg.compile.effective_markdown_rehype_plugins();
83  let rehype_mdx = cfg.compile.effective_mdx_rehype_plugins();
84  let req = json!({
85    "id": id,
86    "markdown": markdown,
87    "remarkPlugins": merge(&remark_md, &remark_mdx),
88    "rehypePlugins": merge(&rehype_md, &rehype_mdx),
89  });
90
91  writeln!(child.stdin, "{}", req).ok()?;
92  child.stdin.flush().ok()?;
93  let mut line = String::new();
94  child.stdout.read_line(&mut line).ok()?;
95  let parsed: Value = serde_json::from_str(&line).ok()?;
96  parsed.get("html").and_then(|v| v.as_str()).map(String::from)
97}
98
99/// Bench-only: drop every live child; next `run_sidecar` re-spawns.
100/// Dropping closes `ChildStdin`, which makes node exit on readline 'close'.
101pub fn shutdown_pool() {
102  if let Some(pool) = POOL.get() {
103    for slot in pool {
104      if let Ok(mut g) = slot.lock() {
105        *g = None;
106      }
107    }
108  }
109}