1use std::io::{BufRead, BufReader, Write};
2use std::sync::atomic::Ordering;
3use std::{
4 process::{Child, ChildStdin, ChildStdout, Command, Stdio},
5 sync::{
6 Mutex, OnceLock,
7 atomic::{AtomicU64, AtomicUsize},
8 },
9};
10
11use serde_json::{Value, json};
12
13use crate::engine::config::EngineConfig;
14
15pub struct Sidecar {
16 stdin: ChildStdin,
17 stdout: BufReader<ChildStdout>,
18 _child: Child,
19}
20
21static POOL: OnceLock<Vec<Mutex<Option<Sidecar>>>> = OnceLock::new();
22static REQ_ID: AtomicU64 = AtomicU64::new(0);
23static NEXT_SLOT: AtomicUsize = AtomicUsize::new(0);
24
25fn pool_size() -> usize {
26 std::env::var("DMC_SIDECAR_POOL_SIZE")
27 .ok()
28 .and_then(|s| s.parse().ok())
29 .unwrap_or_else(|| std::thread::available_parallelism().map(|p| p.get().min(4)).unwrap_or(2))
30}
31
32fn pool() -> &'static Vec<Mutex<Option<Sidecar>>> {
33 POOL.get_or_init(|| (0..pool_size()).map(|_| Mutex::new(None)).collect())
34}
35
36impl Sidecar {
37 pub fn new() -> Option<Self> {
38 let entry = std::env::var("dmc_SIDECAR").ok().unwrap_or_else(|| "dmc-sidecar/index.mjs".into());
39 let mut child = Command::new("node")
40 .arg(&entry)
41 .stdin(Stdio::piped())
42 .stdout(Stdio::piped())
43 .stderr(Stdio::null())
44 .spawn()
45 .ok()?;
46 let stdin = child.stdin.take().unwrap();
47 let stdout = BufReader::new(child.stdout.take().unwrap());
48 Some(Self { stdin, stdout, _child: child })
49 }
50}
51
52pub fn run_sidecar(markdown: &str, cfg: &EngineConfig) -> Option<String> {
53 let pool = pool();
54 let n = pool.len();
55
56 let mut guard = None;
58 for _ in 0..n {
59 let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
60 if let Ok(g) = pool[idx].try_lock() {
61 guard = Some(g);
62 break;
63 }
64 }
65 let mut guard = match guard {
67 Some(g) => g,
68 None => {
69 let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
70 pool[idx].lock().ok()?
71 },
72 };
73
74 if guard.is_none() {
75 *guard = Some(Sidecar::new()?);
76 }
77 let child = guard.as_mut().unwrap();
78
79 let id = REQ_ID.fetch_add(1, Ordering::Relaxed);
80 let merge = |a: &Vec<Value>, b: &Vec<Value>| -> Value { Value::Array(a.iter().chain(b.iter()).cloned().collect()) };
81 let remark_md = cfg.compile.effective_markdown_remark_plugins();
82 let remark_mdx = cfg.compile.effective_mdx_remark_plugins();
83 let rehype_md = cfg.compile.effective_markdown_rehype_plugins();
84 let rehype_mdx = cfg.compile.effective_mdx_rehype_plugins();
85 let req = json!({
86 "id": id,
87 "markdown": markdown,
88 "remarkPlugins": merge(&remark_md, &remark_mdx),
89 "rehypePlugins": merge(&rehype_md, &rehype_mdx),
90 });
91
92 writeln!(child.stdin, "{}", req).ok()?;
93 child.stdin.flush().ok()?;
94 let mut line = String::new();
95 child.stdout.read_line(&mut line).ok()?;
96 let parsed: Value = serde_json::from_str(&line).ok()?;
97 parsed.get("html").and_then(|v| v.as_str()).map(String::from)
98}
99
100pub fn shutdown_pool() {
103 if let Some(pool) = POOL.get() {
104 for slot in pool {
105 if let Ok(mut g) = slot.lock() {
106 *g = None;
109 }
110 }
111 }
112}