use std::collections::HashMap;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use lex_extension::{
schema::HandlerSpec, CodeAction, Completion, Diagnostic, Format, HandlerError, Hover, LabelCtx,
LexHandler, RenderOut, WireNode,
};
use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::sync::mpsc;
use super::jsonrpc::{
codes, encode_frame, parse_headers, FrameError, IncomingFrame, JsonRpcError,
OutgoingNotification, OutgoingRequest,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(2000);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5);
const SHUTDOWN_GRACE: Duration = Duration::from_millis(500);
#[derive(Debug, Clone, Default)]
pub struct SpawnEnv {
pub workspace_root: Option<String>,
pub lex_cache: Option<String>,
pub handler_config: Option<String>,
}
impl SpawnEnv {
fn expand(&self, raw: &str) -> Result<String, SpawnError> {
let mut out = String::with_capacity(raw.len());
let mut rest = raw;
while let Some(start) = rest.find("${") {
out.push_str(&rest[..start]);
let after_brace = &rest[start + 2..];
let close = after_brace
.find('}')
.ok_or_else(|| SpawnError::UnclosedVariable {
fragment: raw.to_string(),
})?;
let name = &after_brace[..close];
let value = match name {
"WORKSPACE_ROOT" => self.workspace_root.as_deref(),
"LEX_CACHE" => self.lex_cache.as_deref(),
"HANDLER_CONFIG" => self.handler_config.as_deref(),
other => {
return Err(SpawnError::UnknownVariable {
name: other.to_string(),
});
}
};
out.push_str(value.unwrap_or(""));
rest = &after_brace[close + 1..];
}
out.push_str(rest);
Ok(out)
}
}
#[derive(Debug)]
pub enum SpawnError {
EmptyCommand,
UnknownVariable { name: String },
UnclosedVariable { fragment: String },
Spawn(std::io::Error),
ChildStreamMissing,
Sandbox(String),
Initialize(InitializeError),
}
#[derive(Debug)]
pub enum InitializeError {
Timeout,
Transport(String),
HandlerError {
code: i32,
message: String,
},
VersionMismatch {
handler: u32,
host: u32,
},
BadResponse(String),
}
impl std::fmt::Display for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SpawnError::EmptyCommand => f.write_str("subprocess handler: empty command array"),
SpawnError::UnknownVariable { name } => write!(
f,
"subprocess handler: unknown environment variable `${{{name}}}` in command array"
),
SpawnError::UnclosedVariable { fragment } => write!(
f,
"subprocess handler: unclosed `${{...}}` in command array entry: {fragment:?}"
),
SpawnError::Spawn(e) => write!(f, "subprocess handler: failed to spawn child: {e}"),
SpawnError::ChildStreamMissing => {
f.write_str("subprocess handler: child closed stdio before initialize")
}
SpawnError::Sandbox(message) => {
write!(
f,
"subprocess handler: sandbox policy install failed: {message}"
)
}
SpawnError::Initialize(InitializeError::Timeout) => {
f.write_str("subprocess handler: initialize handshake timed out")
}
SpawnError::Initialize(InitializeError::Transport(m)) => {
write!(
f,
"subprocess handler: transport error during initialize: {m}"
)
}
SpawnError::Initialize(InitializeError::HandlerError { code, message }) => write!(
f,
"subprocess handler: initialize returned error {code}: {message}",
),
SpawnError::Initialize(InitializeError::VersionMismatch { handler, host }) => write!(
f,
"subprocess handler: wire_version mismatch (handler={handler}, host={host})"
),
SpawnError::Initialize(InitializeError::BadResponse(m)) => {
write!(f, "subprocess handler: malformed initialize response: {m}")
}
}
}
}
impl std::error::Error for SpawnError {}
#[derive(serde::Serialize)]
struct InitializeParams<'a> {
wire_version: u32,
lex_version: &'a str,
namespace: &'a str,
labels: &'a [String],
capabilities: lex_extension::schema::Capabilities,
workspace: Option<&'a str>,
}
#[derive(Deserialize, Debug)]
struct InitializeResult {
wire_version: u32,
#[serde(default)]
implements: Vec<String>,
}
struct PendingReply {
tx: std::sync::mpsc::Sender<Result<serde_json::Value, JsonRpcError>>,
}
enum WorkerCmd {
Call {
id: u64,
method: &'static str,
params: serde_json::Value,
reply: std::sync::mpsc::Sender<Result<serde_json::Value, JsonRpcError>>,
},
Notify {
method: &'static str,
params: serde_json::Value,
},
CancelPending {
id: u64,
},
Shutdown,
}
#[derive(Debug)]
pub struct SubprocessHandler {
cmd_tx: mpsc::UnboundedSender<WorkerCmd>,
timeout: Duration,
next_id: AtomicU64,
implements: std::collections::HashSet<String>,
disabled: Arc<AtomicBool>,
worker: Option<thread::JoinHandle<()>>,
}
impl SubprocessHandler {
pub fn spawn(
spec: &HandlerSpec,
namespace: &str,
labels: &[String],
capabilities: lex_extension::schema::Capabilities,
lex_version: &str,
env: &SpawnEnv,
) -> Result<Self, SpawnError> {
Self::spawn_with_sandbox(
spec,
namespace,
labels,
capabilities,
lex_version,
env,
std::sync::Arc::new(crate::sandbox::NullSandbox),
)
}
pub fn spawn_with_sandbox(
spec: &HandlerSpec,
namespace: &str,
labels: &[String],
capabilities: lex_extension::schema::Capabilities,
lex_version: &str,
env: &SpawnEnv,
sandbox: std::sync::Arc<dyn crate::sandbox::Sandbox>,
) -> Result<Self, SpawnError> {
if spec.command.is_empty() {
return Err(SpawnError::EmptyCommand);
}
let expanded: Vec<String> = spec
.command
.iter()
.map(|raw| env.expand(raw))
.collect::<Result<_, _>>()?;
let timeout = spec
.timeout_ms
.map(|ms| Duration::from_millis(ms as u64))
.unwrap_or(DEFAULT_TIMEOUT);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::<WorkerCmd>();
let (init_tx, init_rx) = std::sync::mpsc::channel::<Result<InitializeResult, SpawnError>>();
let disabled = Arc::new(AtomicBool::new(false));
let disabled_for_worker = disabled.clone();
let init_params = serde_json::to_value(InitializeParams {
wire_version: lex_extension::WIRE_VERSION,
lex_version,
namespace,
labels,
capabilities,
workspace: env.workspace_root.as_deref(),
})
.expect("InitializeParams serialises");
let worker_sandbox = sandbox;
let worker_caps = capabilities;
let worker = thread::spawn(move || {
run_worker(
expanded,
init_params,
init_tx,
cmd_rx,
disabled_for_worker,
worker_sandbox,
worker_caps,
);
});
let init = init_rx
.recv_timeout(INITIALIZE_TIMEOUT)
.map_err(|_| SpawnError::Initialize(InitializeError::Timeout))??;
if init.wire_version != lex_extension::WIRE_VERSION {
let _ = cmd_tx.send(WorkerCmd::Shutdown);
let _ = worker.join();
return Err(SpawnError::Initialize(InitializeError::VersionMismatch {
handler: init.wire_version,
host: lex_extension::WIRE_VERSION,
}));
}
Ok(Self {
cmd_tx,
timeout,
next_id: AtomicU64::new(100),
implements: init.implements.into_iter().collect(),
disabled,
worker: Some(worker),
})
}
pub fn implements(&self, method: &str) -> bool {
self.implements.contains(method)
}
fn call(
&self,
method: &'static str,
params: serde_json::Value,
) -> Result<serde_json::Value, HandlerError> {
if self.disabled.load(Ordering::SeqCst) {
return Err(HandlerError::internal("subprocess handler disabled"));
}
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = std::sync::mpsc::channel();
self.cmd_tx
.send(WorkerCmd::Call {
id,
method,
params,
reply: tx,
})
.map_err(|_| {
self.disabled.store(true, Ordering::SeqCst);
HandlerError::internal("subprocess handler worker has stopped")
})?;
match rx.recv_timeout(self.timeout) {
Ok(Ok(v)) => Ok(v),
Ok(Err(err)) => Err(handler_error_from_jsonrpc(err)),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
let _ = self.cmd_tx.send(WorkerCmd::CancelPending { id });
Err(HandlerError::internal(format!(
"subprocess handler timed out after {} ms on `{method}`",
self.timeout.as_millis()
)))
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
self.disabled.store(true, Ordering::SeqCst);
Err(HandlerError::internal(
"subprocess handler worker dropped reply channel",
))
}
}
}
}
impl Drop for SubprocessHandler {
fn drop(&mut self) {
let _ = self.cmd_tx.send(WorkerCmd::Shutdown);
if let Some(handle) = self.worker.take() {
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while !handle.is_finished() && std::time::Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(10));
}
if handle.is_finished() {
let _ = handle.join();
}
}
}
}
fn handler_error_from_jsonrpc(err: JsonRpcError) -> HandlerError {
match err.code {
codes::METHOD_NOT_FOUND => HandlerError::Unsupported {
detail: err.message,
},
codes::INTERNAL => HandlerError::Internal {
message: err.message,
},
code if (-32099..=-32000).contains(&code) => HandlerError::Custom {
code,
message: err.message,
data: err.data,
},
_ => HandlerError::Internal {
message: format!(
"handler returned reserved error code {}: {}",
err.code, err.message
),
},
}
}
impl SubprocessHandler {
fn advertised(&self, method: &str) -> bool {
self.implements.is_empty() || self.implements.contains(method)
}
}
impl LexHandler for SubprocessHandler {
fn on_label(&self, ctx: &LabelCtx) {
if self.disabled.load(Ordering::SeqCst) || !self.advertised("on_label") {
return;
}
let _ = self.cmd_tx.send(WorkerCmd::Notify {
method: "on_label",
params: serde_json::to_value(ctx).expect("LabelCtx"),
});
}
fn on_validate(&self, ctx: &LabelCtx) -> Result<Vec<Diagnostic>, HandlerError> {
if !self.advertised("on_validate") {
return Ok(Vec::new());
}
#[derive(Deserialize)]
struct ValidateResult {
#[serde(default)]
diagnostics: Vec<Diagnostic>,
}
let v = self.call("on_validate", serde_json::to_value(ctx).expect("LabelCtx"))?;
let result: ValidateResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_validate response decode: {e}")))?;
Ok(result.diagnostics)
}
fn on_resolve(&self, ctx: &LabelCtx) -> Result<Option<WireNode>, HandlerError> {
if !self.advertised("on_resolve") {
return Ok(None);
}
#[derive(Deserialize)]
struct ResolveResult {
#[serde(default)]
replacement: Option<WireNode>,
}
let v = self.call("on_resolve", serde_json::to_value(ctx).expect("LabelCtx"))?;
let result: ResolveResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_resolve response decode: {e}")))?;
Ok(result.replacement)
}
fn on_ir_build(&self, ctx: &LabelCtx) -> Result<Option<WireNode>, HandlerError> {
if !self.advertised("on_ir_build") {
return Ok(None);
}
#[derive(Deserialize)]
struct IrBuildResult {
#[serde(default)]
node: Option<WireNode>,
}
let v = self.call("on_ir_build", serde_json::to_value(ctx).expect("LabelCtx"))?;
let result: IrBuildResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_ir_build response decode: {e}")))?;
Ok(result.node)
}
fn on_render(&self, ctx: &LabelCtx, format: Format) -> Result<Option<RenderOut>, HandlerError> {
if !self.advertised("on_render") {
return Ok(None);
}
#[derive(Deserialize)]
struct RenderResult {
#[serde(default)]
output: Option<RenderOut>,
}
let mut params = serde_json::to_value(ctx).expect("LabelCtx");
let obj = params
.as_object_mut()
.expect("LabelCtx serialises as object");
obj.insert(
"format".into(),
serde_json::to_value(&format).expect("Format"),
);
obj.insert(
"format_options".into(),
serde_json::Value::Object(Default::default()),
);
let v = self.call("on_render", params)?;
let result: RenderResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_render response decode: {e}")))?;
Ok(result.output)
}
fn on_hover(&self, ctx: &LabelCtx) -> Result<Option<Hover>, HandlerError> {
if !self.advertised("on_hover") {
return Ok(None);
}
#[derive(Deserialize)]
struct HoverResult {
#[serde(default)]
hover: Option<Hover>,
}
let v = self.call("on_hover", serde_json::to_value(ctx).expect("LabelCtx"))?;
let result: HoverResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_hover response decode: {e}")))?;
Ok(result.hover)
}
fn on_completion(&self, ctx: &LabelCtx) -> Result<Vec<Completion>, HandlerError> {
if !self.advertised("on_completion") {
return Ok(Vec::new());
}
#[derive(Deserialize)]
struct CompletionResult {
#[serde(default)]
items: Vec<Completion>,
}
let v = self.call(
"on_completion",
serde_json::to_value(ctx).expect("LabelCtx"),
)?;
let result: CompletionResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_completion response decode: {e}")))?;
Ok(result.items)
}
fn on_code_action(&self, ctx: &LabelCtx) -> Result<Vec<CodeAction>, HandlerError> {
if !self.advertised("on_code_action") {
return Ok(Vec::new());
}
#[derive(Deserialize)]
struct CodeActionResult {
#[serde(default)]
actions: Vec<CodeAction>,
}
let v = self.call(
"on_code_action",
serde_json::to_value(ctx).expect("LabelCtx"),
)?;
let result: CodeActionResult = serde_json::from_value(v)
.map_err(|e| HandlerError::internal(format!("on_code_action response decode: {e}")))?;
Ok(result.actions)
}
}
#[allow(clippy::too_many_arguments)]
fn run_worker(
argv: Vec<String>,
init_params: serde_json::Value,
init_tx: std::sync::mpsc::Sender<Result<InitializeResult, SpawnError>>,
cmd_rx: mpsc::UnboundedReceiver<WorkerCmd>,
disabled: Arc<AtomicBool>,
sandbox: std::sync::Arc<dyn crate::sandbox::Sandbox>,
capabilities: lex_extension::schema::Capabilities,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build current_thread runtime");
runtime.block_on(async move {
worker_main(
argv,
init_params,
init_tx,
cmd_rx,
disabled,
sandbox,
capabilities,
)
.await;
});
}
#[allow(clippy::too_many_arguments)]
async fn worker_main(
argv: Vec<String>,
init_params: serde_json::Value,
init_tx: std::sync::mpsc::Sender<Result<InitializeResult, SpawnError>>,
mut cmd_rx: mpsc::UnboundedReceiver<WorkerCmd>,
disabled: Arc<AtomicBool>,
sandbox: std::sync::Arc<dyn crate::sandbox::Sandbox>,
capabilities: lex_extension::schema::Capabilities,
) {
let mut cmd = Command::new(&argv[0]);
cmd.args(&argv[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
if let Err(e) = sandbox.apply_to(cmd.as_std_mut(), capabilities) {
let _ = init_tx.send(Err(SpawnError::Sandbox(format!("{e}"))));
return;
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
let _ = init_tx.send(Err(SpawnError::Spawn(e)));
return;
}
};
let stdin = match child.stdin.take() {
Some(s) => s,
None => {
let _ = init_tx.send(Err(SpawnError::ChildStreamMissing));
return;
}
};
let stdout = match child.stdout.take() {
Some(s) => s,
None => {
let _ = init_tx.send(Err(SpawnError::ChildStreamMissing));
return;
}
};
if let Some(stderr) = child.stderr.take() {
tokio::spawn(forward_stderr(stderr));
}
let mut io = HandlerIo {
stdin,
reader: FrameReader::new(stdout),
};
let init_result = match do_initialize(&mut io, &init_params).await {
Ok(r) => r,
Err(e) => {
disabled.store(true, Ordering::SeqCst);
let _ = init_tx.send(Err(e));
let _ = child.kill().await;
return;
}
};
let _ = init_tx.send(Ok(init_result));
let HandlerIo {
mut stdin,
mut reader,
} = io;
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let mut writer_handle = tokio::spawn(async move {
while let Some(bytes) = write_rx.recv().await {
if stdin.write_all(&bytes).await.is_err() {
break;
}
}
let _ = stdin.shutdown().await;
});
let mut pending: HashMap<u64, PendingReply> = HashMap::new();
loop {
tokio::select! {
cmd = cmd_rx.recv() => {
match cmd {
Some(WorkerCmd::Call { id, method, params, reply }) => {
let req = OutgoingRequest {
jsonrpc: "2.0",
id,
method,
params,
};
let bytes = encode_frame(&serde_json::to_value(&req).expect("OutgoingRequest"));
if write_tx.send(bytes).is_err() {
let _ = reply.send(Err(JsonRpcError {
code: codes::INTERNAL,
message: "subprocess writer task ended".into(),
data: None,
}));
disabled.store(true, Ordering::SeqCst);
break;
}
pending.insert(id, PendingReply { tx: reply });
}
Some(WorkerCmd::Notify { method, params }) => {
let note = OutgoingNotification {
jsonrpc: "2.0",
method,
params,
};
let bytes = encode_frame(
&serde_json::to_value(¬e).expect("OutgoingNotification"),
);
if write_tx.send(bytes).is_err() {
disabled.store(true, Ordering::SeqCst);
break;
}
}
Some(WorkerCmd::CancelPending { id }) => {
pending.remove(&id);
}
Some(WorkerCmd::Shutdown) | None => {
break;
}
}
}
frame = reader.read_frame() => {
match frame {
Ok(IncomingFrame::Response { id, result, error, .. }) => {
if let Some(pending_reply) = pending.remove(&id) {
let payload = match (result, error) {
(Some(v), None) => Ok(v),
(None, Some(err)) => Err(err),
(Some(_), Some(err)) => {
Err(err)
}
(None, None) => Err(JsonRpcError {
code: codes::INTERNAL,
message: "handler response carried neither result nor error".into(),
data: None,
}),
};
let _ = pending_reply.tx.send(payload);
}
}
Ok(IncomingFrame::Notification { method, .. }) => {
eprintln!("[lex-extension-host] subprocess notification: {method}");
}
Err(FrameError::UnexpectedEof) => {
disabled.store(true, Ordering::SeqCst);
fail_all_pending(&mut pending, "subprocess handler closed stdout");
break;
}
Err(e) => {
disabled.store(true, Ordering::SeqCst);
fail_all_pending(&mut pending, &format!("framing error: {e}"));
break;
}
}
}
}
}
let shutdown = OutgoingNotification {
jsonrpc: "2.0",
method: "shutdown",
params: serde_json::Value::Null,
};
let _ = write_tx.send(encode_frame(
&serde_json::to_value(&shutdown).expect("OutgoingNotification"),
));
drop(write_tx);
if tokio::time::timeout(SHUTDOWN_GRACE, &mut writer_handle)
.await
.is_err()
{
writer_handle.abort();
}
let _ = tokio::time::timeout(SHUTDOWN_GRACE, child.wait()).await;
fail_all_pending(&mut pending, "subprocess handler shutting down");
}
fn fail_all_pending(pending: &mut HashMap<u64, PendingReply>, message: &str) {
for (_, reply) in pending.drain() {
let _ = reply.tx.send(Err(JsonRpcError {
code: codes::INTERNAL,
message: message.to_string(),
data: None,
}));
}
}
async fn do_initialize(
io: &mut HandlerIo,
params: &serde_json::Value,
) -> Result<InitializeResult, SpawnError> {
let req = OutgoingRequest {
jsonrpc: "2.0",
id: 1,
method: "initialize",
params: params.clone(),
};
let bytes = encode_frame(&serde_json::to_value(&req).expect("OutgoingRequest"));
io.stdin
.write_all(&bytes)
.await
.map_err(|e| SpawnError::Initialize(InitializeError::Transport(e.to_string())))?;
let deadline = tokio::time::Instant::now() + INITIALIZE_TIMEOUT;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(SpawnError::Initialize(InitializeError::Timeout));
}
let frame = tokio::time::timeout(remaining, io.reader.read_frame())
.await
.map_err(|_| SpawnError::Initialize(InitializeError::Timeout))?
.map_err(|e| SpawnError::Initialize(InitializeError::Transport(e.to_string())))?;
match frame {
IncomingFrame::Response {
id: 1,
result: Some(r),
error: None,
..
} => {
return serde_json::from_value::<InitializeResult>(r).map_err(|e| {
SpawnError::Initialize(InitializeError::BadResponse(e.to_string()))
});
}
IncomingFrame::Response {
id: 1,
error: Some(err),
..
} => {
return Err(SpawnError::Initialize(InitializeError::HandlerError {
code: err.code,
message: err.message,
}));
}
IncomingFrame::Response { id, .. } => {
return Err(SpawnError::Initialize(InitializeError::BadResponse(
format!("initialize response id={id}, expected 1"),
)));
}
IncomingFrame::Notification { method, .. } => {
eprintln!(
"[lex-extension-host] subprocess notification during initialize: {method}"
);
continue;
}
}
}
}
struct HandlerIo {
stdin: ChildStdin,
reader: FrameReader,
}
struct FrameReader {
stdout: ChildStdout,
buf: Vec<u8>,
}
impl FrameReader {
fn new(stdout: ChildStdout) -> Self {
Self {
stdout,
buf: Vec::with_capacity(4096),
}
}
async fn read_frame(&mut self) -> Result<IncomingFrame, FrameError> {
const MAX_HEADER_BYTES: usize = 8 * 1024;
let header_end = loop {
if let Some(pos) = find_header_end(&self.buf) {
break pos;
}
if self.buf.len() >= MAX_HEADER_BYTES {
return Err(FrameError::MalformedHeader(format!(
"no header separator after {} bytes",
self.buf.len()
)));
}
let mut chunk = [0u8; 4096];
let n = self.stdout.read(&mut chunk).await?;
if n == 0 {
return Err(FrameError::UnexpectedEof);
}
self.buf.extend_from_slice(&chunk[..n]);
};
let header_bytes = &self.buf[..header_end];
let header_str = std::str::from_utf8(header_bytes)
.map_err(|_| FrameError::MalformedHeader("non-UTF-8 bytes in headers".to_string()))?;
let body_start = header_end + 4;
let body_len = parse_headers(header_str)?;
while self.buf.len() < body_start + body_len {
let mut chunk = [0u8; 4096];
let n = self.stdout.read(&mut chunk).await?;
if n == 0 {
return Err(FrameError::UnexpectedEof);
}
self.buf.extend_from_slice(&chunk[..n]);
}
let body = self.buf[body_start..body_start + body_len].to_vec();
self.buf.drain(..body_start + body_len);
let frame: IncomingFrame = serde_json::from_slice(&body)?;
Ok(frame)
}
}
fn find_header_end(buf: &[u8]) -> Option<usize> {
buf.windows(4).position(|w| w == b"\r\n\r\n")
}
async fn forward_stderr(mut stderr: ChildStderr) {
let mut buf = [0u8; 4096];
loop {
match stderr.read(&mut buf).await {
Ok(0) | Err(_) => return,
Ok(n) => {
let chunk = String::from_utf8_lossy(&buf[..n]);
for line in chunk.lines() {
if !line.is_empty() {
eprintln!("[lex-extension-host:handler] {line}");
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn spawn_env_expands_known_variables() {
let env = SpawnEnv {
workspace_root: Some("/ws".into()),
lex_cache: Some("/cache".into()),
handler_config: Some("/conf.toml".into()),
};
assert_eq!(env.expand("--ws=${WORKSPACE_ROOT}").unwrap(), "--ws=/ws");
assert_eq!(env.expand("${LEX_CACHE}/x").unwrap(), "/cache/x");
assert_eq!(env.expand("${HANDLER_CONFIG}").unwrap(), "/conf.toml");
assert_eq!(
env.expand("${WORKSPACE_ROOT}:${LEX_CACHE}").unwrap(),
"/ws:/cache"
);
assert_eq!(env.expand("plain").unwrap(), "plain");
}
#[test]
fn spawn_env_unknown_variable_rejected() {
let env = SpawnEnv::default();
let err = env.expand("--x=${MYSTERY}").unwrap_err();
match err {
SpawnError::UnknownVariable { name } => assert_eq!(name, "MYSTERY"),
other => panic!("expected UnknownVariable, got {other}"),
}
}
#[test]
fn spawn_env_unclosed_variable_rejected() {
let env = SpawnEnv::default();
let err = env.expand("--x=${WORKSPACE_ROOT").unwrap_err();
assert!(matches!(err, SpawnError::UnclosedVariable { .. }));
}
#[test]
fn spawn_env_known_but_unset_substitutes_empty() {
let env = SpawnEnv::default();
assert_eq!(env.expand("--cache=${LEX_CACHE}").unwrap(), "--cache=");
}
#[test]
fn handler_error_from_jsonrpc_maps_reserved_codes() {
let err = handler_error_from_jsonrpc(JsonRpcError {
code: codes::METHOD_NOT_FOUND,
message: "no such".into(),
data: None,
});
assert!(matches!(err, HandlerError::Unsupported { .. }));
let err = handler_error_from_jsonrpc(JsonRpcError {
code: codes::INTERNAL,
message: "oops".into(),
data: None,
});
assert!(matches!(err, HandlerError::Internal { .. }));
let err = handler_error_from_jsonrpc(JsonRpcError {
code: -32050,
message: "custom".into(),
data: Some(serde_json::json!(1)),
});
match err {
HandlerError::Custom { code, .. } => assert_eq!(code, -32050),
other => panic!("expected Custom, got {other:?}"),
}
}
#[test]
fn find_header_end_empty_buffer() {
assert!(find_header_end(b"").is_none());
assert!(find_header_end(b"Content-Length: 1\r\n").is_none());
let pos = find_header_end(b"Content-Length: 1\r\n\r\nx").unwrap();
assert_eq!(pos, 17);
}
}