ts_bridge/rpc/
mod.rs

1//! =============================================================================
2//! RPC Bridge
3//! =============================================================================
4//!
5//! This layer glues Neovim’s LSP transport to the tsserver processes.
6//! * request routing (syntax vs semantic)
7//! * request queue/priorities/cancellation
8//! * handler dispatch into the protocol module tree
9
10mod queue;
11pub use queue::{Priority, Request, RequestQueue};
12
13use crossbeam_channel::{Receiver, TryRecvError};
14use serde_json::Value;
15
16use crate::config::Config;
17use crate::process::{ServerKind, TsserverProcess};
18use crate::provider::Provider;
19/// Public facade invoked by Neovim (or any embedding host).  Eventually this
20/// type will implement whatever trait the chosen RPC runtime expects.
21pub struct Service {
22    config: Config,
23    provider: Provider,
24    syntax: Option<TsserverProcess>,
25    semantic: Option<TsserverProcess>,
26    syntax_rx: Option<Receiver<Value>>,
27    semantic_rx: Option<Receiver<Value>>,
28    syntax_queue: RequestQueue,
29    semantic_queue: RequestQueue,
30}
31
32impl Service {
33    pub fn new(config: Config, provider: Provider) -> Self {
34        Self {
35            config,
36            provider,
37            syntax: None,
38            semantic: None,
39            syntax_rx: None,
40            semantic_rx: None,
41            syntax_queue: RequestQueue::default(),
42            semantic_queue: RequestQueue::default(),
43        }
44    }
45
46    /// Bootstraps tsserver processes once
47    pub fn start(&mut self) -> Result<(), ServiceError> {
48        let binary = self.provider.resolve().map_err(ServiceError::Provider)?;
49        let launch = self.config.plugin().tsserver.clone();
50        let mut syntax = TsserverProcess::new(ServerKind::Syntax, binary.clone(), launch.clone());
51        syntax.start().map_err(ServiceError::Process)?;
52        self.syntax_rx = syntax.response_rx();
53        self.syntax = Some(syntax);
54
55        if self.config.plugin().separate_diagnostic_server {
56            let mut semantic = TsserverProcess::new(ServerKind::Semantic, binary, launch);
57            semantic.start().map_err(ServiceError::Process)?;
58            self.semantic_rx = semantic.response_rx();
59            self.semantic = Some(semantic);
60        }
61
62        Ok(())
63    }
64
65    fn syntax_mut(&mut self) -> Result<&mut TsserverProcess, ServiceError> {
66        if self.syntax.is_none() {
67            self.start()?;
68        }
69        self.syntax.as_mut().ok_or(ServiceError::ProcessNotStarted)
70    }
71
72    fn semantic_mut(&mut self) -> Option<&mut TsserverProcess> {
73        self.semantic.as_mut()
74    }
75
76    /// Queues a request for the given route and returns the syntax seq (when applicable).
77    pub fn dispatch_request(
78        &mut self,
79        route: Route,
80        payload: Value,
81        priority: Priority,
82    ) -> Result<Vec<DispatchReceipt>, ServiceError> {
83        let mut receipts = Vec::new();
84        match route {
85            Route::Syntax => {
86                let seq = self.syntax_queue.enqueue(payload, priority);
87                self.flush_queue(ServerKind::Syntax)?;
88                receipts.push(DispatchReceipt {
89                    server: ServerKind::Syntax,
90                    seq,
91                });
92            }
93            Route::Semantic => {
94                if self.semantic.is_some() {
95                    let seq = self.semantic_queue.enqueue(payload, priority);
96                    self.flush_queue(ServerKind::Semantic)?;
97                    receipts.push(DispatchReceipt {
98                        server: ServerKind::Semantic,
99                        seq,
100                    });
101                }
102            }
103            Route::Both => {
104                let seq = self.syntax_queue.enqueue(payload.clone(), priority);
105                self.flush_queue(ServerKind::Syntax)?;
106                receipts.push(DispatchReceipt {
107                    server: ServerKind::Syntax,
108                    seq,
109                });
110                if self.semantic.is_some() {
111                    let semantic_seq = self.semantic_queue.enqueue(payload, priority);
112                    self.flush_queue(ServerKind::Semantic)?;
113                    receipts.push(DispatchReceipt {
114                        server: ServerKind::Semantic,
115                        seq: semantic_seq,
116                    });
117                }
118            }
119        }
120
121        Ok(receipts)
122    }
123
124    /// Cancels a pending request on both servers.
125    pub fn cancel(&self, seq: u64) -> Result<(), ServiceError> {
126        if let Some(server) = &self.syntax {
127            server.cancel(seq).map_err(ServiceError::Process)?;
128        }
129        if let Some(server) = &self.semantic {
130            server.cancel(seq).map_err(ServiceError::Process)?;
131        }
132        Ok(())
133    }
134
135    /// Drains any ready responses from syntax/semantic readers without blocking.
136    pub fn poll_responses(&self) -> Vec<ServerEvent> {
137        let mut events = Vec::new();
138        if let Some(rx) = &self.syntax_rx {
139            collect_events(ServerKind::Syntax, rx, &mut events);
140        }
141        if let Some(rx) = &self.semantic_rx {
142            collect_events(ServerKind::Semantic, rx, &mut events);
143        }
144        events
145    }
146
147    pub fn workspace_root(&self) -> &std::path::Path {
148        self.provider.workspace_root()
149    }
150
151    fn flush_queue(&mut self, kind: ServerKind) -> Result<(), ServiceError> {
152        match kind {
153            ServerKind::Syntax => {
154                while let Some(request) = self.syntax_queue.dequeue() {
155                    let server = self.syntax_mut()?;
156                    server
157                        .write(&request.payload)
158                        .map_err(ServiceError::Process)?;
159                }
160            }
161            ServerKind::Semantic => {
162                while let Some(request) = self.semantic_queue.dequeue() {
163                    if let Some(server) = self.semantic_mut() {
164                        server
165                            .write(&request.payload)
166                            .map_err(ServiceError::Process)?;
167                    }
168                }
169            }
170        }
171        Ok(())
172    }
173
174    pub fn update_config(&mut self, new_config: Config) {
175        self.config = new_config;
176    }
177
178    pub fn restart(
179        &mut self,
180        restart_syntax: bool,
181        restart_semantic: bool,
182    ) -> Result<(), ServiceError> {
183        if restart_syntax {
184            self.syntax = None;
185            self.syntax_rx = None;
186            self.syntax_queue.reset();
187        }
188        if restart_semantic {
189            self.semantic = None;
190            self.semantic_rx = None;
191            self.semantic_queue.reset();
192        }
193        Ok(())
194    }
195
196    pub fn config(&self) -> &Config {
197        &self.config
198    }
199
200    pub fn config_mut(&mut self) -> &mut Config {
201        &mut self.config
202    }
203
204    pub fn tsserver_status(&self) -> TsserverStatus {
205        TsserverStatus {
206            syntax_pid: self.syntax.as_ref().and_then(|process| process.pid()),
207            semantic_pid: self.semantic.as_ref().and_then(|process| process.pid()),
208        }
209    }
210}
211
212#[derive(Debug, Clone, Copy)]
213pub struct TsserverStatus {
214    pub syntax_pid: Option<u32>,
215    pub semantic_pid: Option<u32>,
216}
217
218#[derive(thiserror::Error, Debug)]
219pub enum ServiceError {
220    #[error(transparent)]
221    Provider(#[from] crate::provider::ProviderError),
222    #[error("failed interaction with tsserver process: {0}")]
223    Process(#[from] crate::process::ProcessError),
224    #[error("syntax process not started yet")]
225    ProcessNotStarted,
226}
227
228#[derive(Debug, Clone)]
229pub struct ServerEvent {
230    pub server: ServerKind,
231    pub payload: Value,
232}
233
234#[derive(Debug, Clone, Copy)]
235pub struct DispatchReceipt {
236    pub server: ServerKind,
237    pub seq: u64,
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum Route {
242    Syntax,
243    Semantic,
244    Both,
245}
246
247fn collect_events(kind: ServerKind, rx: &Receiver<Value>, out: &mut Vec<ServerEvent>) {
248    loop {
249        match rx.try_recv() {
250            Ok(payload) => out.push(ServerEvent {
251                server: kind,
252                payload,
253            }),
254            Err(TryRecvError::Empty) => break,
255            Err(TryRecvError::Disconnected) => break,
256        }
257    }
258}