ts_bridge/process/
mod.rs

1//! =============================================================================
2//! Tsserver Process Management
3//! =============================================================================
4//!
5//! Tracks child Node processes, implements the `Content-Length` framed protocol,
6//! and exposes cancellation pipes
7
8use std::fs::{self, OpenOptions};
9use std::io::{BufRead, BufReader, Read, Write};
10use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
11use std::thread;
12
13use crossbeam_channel::{Receiver, Sender, unbounded};
14use serde_json::Value;
15use tempfile::TempDir;
16
17use crate::config::TsserverLaunchOptions;
18use crate::provider::TsserverBinary;
19
20/// Represents an owned tsserver instance (syntax or semantic).
21pub struct TsserverProcess {
22    kind: ServerKind,
23    binary: TsserverBinary,
24    launch: TsserverLaunchOptions,
25    child: Option<ChildHandles>,
26}
27
28impl TsserverProcess {
29    pub fn new(kind: ServerKind, binary: TsserverBinary, launch: TsserverLaunchOptions) -> Self {
30        Self {
31            kind,
32            binary,
33            launch,
34            child: None,
35        }
36    }
37
38    /// Spawns the tsserver child process and starts the reader thread.
39    pub fn start(&mut self) -> Result<(), ProcessError> {
40        if self.child.is_some() {
41            return Ok(());
42        }
43
44        let mut command = Command::new("node");
45        let server_label = match self.kind {
46            ServerKind::Syntax => "syntax",
47            ServerKind::Semantic => "semantic",
48        };
49        command.env("TS_LSP_RS_SERVER_KIND", server_label);
50        self.apply_node_args(&mut command);
51        command.arg(&self.binary.executable);
52        self.apply_tsserver_args(&mut command)?;
53        command.arg("--stdio");
54        command.stdin(Stdio::piped());
55        command.stdout(Stdio::piped());
56        command.stderr(Stdio::inherit());
57
58        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
59        let stdout = child.stdout.take().ok_or(ProcessError::MissingStdout)?;
60        let stdin = child.stdin.take().ok_or(ProcessError::MissingStdin)?;
61
62        let cancellation_dir = TempDir::new().map_err(ProcessError::CreateCancellationDir)?;
63        let (tx, rx) = unbounded();
64        let reader_handle = spawn_reader(stdout, tx);
65
66        self.child = Some(ChildHandles {
67            child,
68            stdin,
69            cancellation_dir,
70            response_rx: rx,
71            reader_handle: Some(reader_handle),
72        });
73
74        Ok(())
75    }
76
77    fn apply_node_args(&self, command: &mut Command) {
78        if let Some(limit) = self.launch.max_old_space_size {
79            command.arg(format!("--max-old-space-size={limit}"));
80        }
81    }
82
83    fn apply_tsserver_args(&self, command: &mut Command) -> Result<(), ProcessError> {
84        if let Some(locale) = &self.launch.locale {
85            command.arg("--locale");
86            command.arg(locale);
87        }
88
89        if let Some(log_file) = self.prepare_log_file()? {
90            command.arg("--logFile");
91            command.arg(log_file);
92            if let Some(verbosity) = self.launch.log_verbosity {
93                command.arg("--logVerbosity");
94                command.arg(verbosity.as_cli_flag());
95            }
96        } else if let Some(verbosity) = self.launch.log_verbosity {
97            log::warn!(
98                "tsserver log verbosity {:?} ignored (log_directory not configured)",
99                verbosity
100            );
101        }
102
103        let mut probe_locations = Vec::new();
104        if let Some(binary_probe) = &self.binary.plugin_probe {
105            probe_locations.push(binary_probe.clone());
106        }
107        probe_locations.extend(self.launch.plugin_probe_dirs.iter().cloned());
108        for location in probe_locations {
109            command.arg("--pluginProbeLocations");
110            command.arg(location);
111        }
112
113        for plugin in &self.launch.global_plugins {
114            command.arg("--globalPlugins");
115            command.arg(plugin);
116        }
117
118        for arg in &self.launch.extra_args {
119            command.arg(arg);
120        }
121
122        Ok(())
123    }
124
125    fn prepare_log_file(&self) -> Result<Option<std::path::PathBuf>, ProcessError> {
126        let Some(dir) = &self.launch.log_directory else {
127            return Ok(None);
128        };
129        fs::create_dir_all(dir).map_err(ProcessError::LogDirectory)?;
130        let mut path = dir.clone();
131        let suffix = match self.kind {
132            ServerKind::Syntax => "syntax",
133            ServerKind::Semantic => "semantic",
134        };
135        path.push(format!("tsserver.{suffix}.log"));
136        Ok(Some(path))
137    }
138
139    /// Sends a JSON payload to tsserver using the newline-delimited framing that
140    /// `tsserver --stdio` expects (it only *emits* Content-Length headers).
141    pub fn write(&mut self, payload: &Value) -> Result<(), ProcessError> {
142        let child = self.child.as_mut().ok_or(ProcessError::NotStarted)?;
143        let mut serialized = serde_json::to_string(payload).map_err(ProcessError::Serialize)?;
144        serialized.push('\n');
145        log::trace!("tsserver {:?} <= {}", self.kind, serialized.trim_end());
146        child
147            .stdin
148            .write_all(serialized.as_bytes())
149            .map_err(ProcessError::Write)?;
150        child.stdin.flush().map_err(ProcessError::Write)?;
151        Ok(())
152    }
153
154    /// Signals cancellation by touching `seq_{id}` inside the cancellation pipe directory.
155    pub fn cancel(&self, seq: u64) -> Result<(), ProcessError> {
156        let child = self.child.as_ref().ok_or(ProcessError::NotStarted)?;
157        let path = child.cancellation_dir.path().join(format!("seq_{}", seq));
158        OpenOptions::new()
159            .create(true)
160            .write(true)
161            .open(path)
162            .map(|_| ())
163            .map_err(ProcessError::Write)
164    }
165
166    pub fn response_rx(&self) -> Option<Receiver<Value>> {
167        self.child
168            .as_ref()
169            .map(|handles| handles.response_rx.clone())
170    }
171
172    pub fn pid(&self) -> Option<u32> {
173        self.child.as_ref().map(|handles| handles.child.id())
174    }
175}
176
177impl Drop for TsserverProcess {
178    fn drop(&mut self) {
179        if let Some(mut handles) = self.child.take() {
180            let _ = handles.child.kill();
181            let _ = handles.child.wait();
182        }
183    }
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
187pub enum ServerKind {
188    Syntax,
189    Semantic,
190}
191
192struct ChildHandles {
193    child: Child,
194    stdin: ChildStdin,
195    cancellation_dir: TempDir,
196    response_rx: Receiver<Value>,
197    reader_handle: Option<thread::JoinHandle<()>>,
198}
199
200impl Drop for ChildHandles {
201    fn drop(&mut self) {
202        if let Some(handle) = self.reader_handle.take() {
203            let _ = handle.join();
204        }
205    }
206}
207
208fn spawn_reader(stdout: ChildStdout, tx: Sender<Value>) -> thread::JoinHandle<()> {
209    thread::spawn(move || {
210        let mut reader = BufReader::new(stdout);
211        loop {
212            match read_message(&mut reader) {
213                Ok(message) => {
214                    let _ = tx.send(message);
215                }
216                Err(ProcessError::Eof) => break,
217                Err(_) => continue,
218            }
219        }
220    })
221}
222
223fn read_message<T: Read>(reader: &mut BufReader<T>) -> Result<Value, ProcessError> {
224    let mut header = String::new();
225    loop {
226        header.clear();
227        let bytes = reader.read_line(&mut header).map_err(ProcessError::Read)?;
228        if bytes == 0 {
229            return Err(ProcessError::Eof);
230        }
231        if header == "\r\n" {
232            continue;
233        }
234        if header.to_ascii_lowercase().starts_with("content-length:") {
235            let len: usize = header["Content-Length:".len()..]
236                .trim()
237                .parse()
238                .map_err(|_| ProcessError::InvalidHeader)?;
239            // consume blank line
240            let mut blank = [0; 2];
241            reader.read_exact(&mut blank).map_err(ProcessError::Read)?;
242            let mut body = vec![0u8; len];
243            reader.read_exact(&mut body).map_err(ProcessError::Read)?;
244            return serde_json::from_slice(&body).map_err(ProcessError::Deserialize);
245        }
246    }
247}
248
249#[derive(thiserror::Error, Debug)]
250pub enum ProcessError {
251    #[error("process not started")]
252    NotStarted,
253    #[error("failed to spawn tsserver: {0}")]
254    Spawn(std::io::Error),
255    #[error("failed to create cancellation directory: {0}")]
256    CreateCancellationDir(std::io::Error),
257    #[error("tsserver stdout missing (stdio must be piped)")]
258    MissingStdout,
259    #[error("tsserver stdin missing (stdio must be piped)")]
260    MissingStdin,
261    #[error("failed to serialize payload: {0}")]
262    Serialize(serde_json::Error),
263    #[error("failed to write to tsserver stdin: {0}")]
264    Write(std::io::Error),
265    #[error("failed to parse response json: {0}")]
266    Deserialize(serde_json::Error),
267    #[error("unexpected EOF while reading tsserver output")]
268    Eof,
269    #[error("invalid Content-Length header")]
270    InvalidHeader,
271    #[error("io error while reading tsserver stdout: {0}")]
272    Read(std::io::Error),
273    #[error("failed to prepare tsserver log directory: {0}")]
274    LogDirectory(std::io::Error),
275}