1use std::io::{BufRead, BufReader, Write};
2use std::sync::atomic::Ordering;
3use std::{
4 process::{Child, ChildStdin, ChildStdout, Command, Stdio},
5 sync::{
6 Mutex, OnceLock,
7 atomic::{AtomicU64, AtomicUsize},
8 },
9};
10
11use serde_json::{Value, json};
12
13use crate::engine::config::EngineConfig;
14
15pub struct Sidecar {
16 stdin: ChildStdin,
17 stdout: BufReader<ChildStdout>,
18 _child: Child,
19}
20
21static POOL: OnceLock<Vec<Mutex<Option<Sidecar>>>> = OnceLock::new();
22static REQ_ID: AtomicU64 = AtomicU64::new(0);
23static NEXT_SLOT: AtomicUsize = AtomicUsize::new(0);
24
25fn pool_size() -> usize {
26 std::env::var("DMC_SIDECAR_POOL_SIZE")
27 .ok()
28 .and_then(|s| s.parse().ok())
29 .unwrap_or_else(|| std::thread::available_parallelism().map(|p| p.get().min(4)).unwrap_or(2))
30}
31
32fn pool() -> &'static Vec<Mutex<Option<Sidecar>>> {
33 POOL.get_or_init(|| (0..pool_size()).map(|_| Mutex::new(None)).collect())
34}
35
36impl Sidecar {
37 pub fn new() -> Option<Self> {
38 let entry = std::env::var("dmc_SIDECAR").ok().unwrap_or_else(|| "dmc-sidecar/index.mjs".into());
39 let mut child = Command::new("node")
40 .arg(&entry)
41 .stdin(Stdio::piped())
42 .stdout(Stdio::piped())
43 .stderr(Stdio::null())
44 .spawn()
45 .ok()?;
46 let stdin = child.stdin.take().unwrap();
47 let stdout = BufReader::new(child.stdout.take().unwrap());
48 Some(Self { stdin, stdout, _child: child })
49 }
50}
51
52pub fn run_sidecar(markdown: &str, cfg: &EngineConfig) -> Option<String> {
53 let pool = pool();
54 let n = pool.len();
55
56 let mut guard = None;
58 for _ in 0..n {
59 let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
60 if let Ok(g) = pool[idx].try_lock() {
61 guard = Some(g);
62 break;
63 }
64 }
65 let mut guard = match guard {
66 Some(g) => g,
67 None => {
68 let idx = NEXT_SLOT.fetch_add(1, Ordering::Relaxed) % n;
69 pool[idx].lock().ok()?
70 },
71 };
72
73 if guard.is_none() {
74 *guard = Some(Sidecar::new()?);
75 }
76 let child = guard.as_mut().unwrap();
77
78 let id = REQ_ID.fetch_add(1, Ordering::Relaxed);
79 let merge = |a: &Vec<Value>, b: &Vec<Value>| -> Value { Value::Array(a.iter().chain(b.iter()).cloned().collect()) };
80 let remark_md = cfg.compile.effective_markdown_remark_plugins();
81 let remark_mdx = cfg.compile.effective_mdx_remark_plugins();
82 let rehype_md = cfg.compile.effective_markdown_rehype_plugins();
83 let rehype_mdx = cfg.compile.effective_mdx_rehype_plugins();
84 let req = json!({
85 "id": id,
86 "markdown": markdown,
87 "remarkPlugins": merge(&remark_md, &remark_mdx),
88 "rehypePlugins": merge(&rehype_md, &rehype_mdx),
89 });
90
91 writeln!(child.stdin, "{}", req).ok()?;
92 child.stdin.flush().ok()?;
93 let mut line = String::new();
94 child.stdout.read_line(&mut line).ok()?;
95 let parsed: Value = serde_json::from_str(&line).ok()?;
96 parsed.get("html").and_then(|v| v.as_str()).map(String::from)
97}
98
99pub fn shutdown_pool() {
102 if let Some(pool) = POOL.get() {
103 for slot in pool {
104 if let Ok(mut g) = slot.lock() {
105 *g = None;
106 }
107 }
108 }
109}