use lsp_types::{notification::Notification as Notif, request::Request as Req, *};
use super::*;
type PureHandler<S, T> = fn(srv: &mut S, args: T) -> LspResult<()>;
impl<S: 'static> TypedLspClient<S> {
pub fn send_lsp_request<R: Req>(
&self,
params: R::Params,
handler: impl FnOnce(&mut S, lsp::Response) + Send + Sync + 'static,
) {
let caster = self.caster.clone();
self.client
.send_lsp_request_::<R>(params, move |s, resp| handler(caster(s), resp))
}
}
impl LspClient {
pub fn send_lsp_request_<R: Req>(
&self,
params: R::Params,
handler: impl FnOnce(&mut dyn Any, lsp::Response) + Send + Sync + 'static,
) {
let mut req_queue = self.req_queue.lock();
let request = req_queue.outgoing.register(
R::METHOD.to_owned(),
params,
Box::new(|s, resp| handler(s, resp.try_into().unwrap())),
);
self.sender.send_message(request.into());
}
pub fn respond_lsp(&self, response: lsp::Response) {
self.respond(response.id.clone(), response.into())
}
pub fn send_notification<N: Notif>(&self, params: &N::Params) {
self.send_notification_(lsp::Notification::new(N::METHOD.to_owned(), params));
}
pub fn send_notification_(&self, notif: lsp::Notification) {
self.sender.send_message(notif.into());
}
}
impl<Args: Initializer> LsBuilder<LspMessage, Args>
where
Args::S: 'static,
{
pub fn with_command_(
mut self,
cmd: &'static str,
handler: RawHandler<Args::S, Vec<JsonValue>>,
) -> Self {
self.command_handlers.insert(cmd, Box::new(handler));
self
}
pub fn with_command<R: Serialize + 'static>(
mut self,
cmd: &'static str,
handler: AsyncHandler<Args::S, Vec<JsonValue>, R>,
) -> Self {
self.command_handlers.insert(
cmd,
Box::new(move |s, req| erased_response(handler(s, req))),
);
self
}
pub fn with_notification_<R: Notif>(
mut self,
handler: PureHandler<Args::S, JsonValue>,
) -> Self {
self.notif_handlers.insert(R::METHOD, Box::new(handler));
self
}
pub fn with_notification<R: Notif>(mut self, handler: PureHandler<Args::S, R::Params>) -> Self {
self.notif_handlers.insert(
R::METHOD,
Box::new(move |s, req| handler(s, from_json(req)?)),
);
self
}
pub fn with_raw_request<R: Req>(mut self, handler: RawHandler<Args::S, JsonValue>) -> Self {
self.req_handlers.insert(R::METHOD, Box::new(handler));
self
}
pub fn with_request_<R: Req>(
mut self,
handler: fn(&mut Args::S, R::Params) -> ScheduleResult,
) -> Self {
self.req_handlers.insert(
R::METHOD,
Box::new(move |s, req| handler(s, from_json(req)?)),
);
self
}
pub fn with_request<R: Req>(
mut self,
handler: AsyncHandler<Args::S, R::Params, R::Result>,
) -> Self {
self.req_handlers.insert(
R::METHOD,
Box::new(move |s, req| erased_response(handler(s, from_json(req)?))),
);
self
}
}
impl<Args: Initializer> LsDriver<LspMessage, Args>
where
Args::S: 'static,
{
#[cfg(feature = "system")]
pub fn start(
&mut self,
inbox: TConnectionRx<LspMessage>,
is_replay: bool,
) -> anyhow::Result<()> {
let res = self.start_(inbox);
if is_replay {
let client = self.client.clone();
let _ = std::thread::spawn(move || {
let since = tinymist_std::time::Instant::now();
let timeout = std::env::var("REPLAY_TIMEOUT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(60);
client.handle.block_on(async {
while client.has_pending_requests() {
if since.elapsed().as_secs() > timeout {
log::error!("replay timeout reached, {timeout}s");
client.begin_panic();
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
})
.join();
}
res
}
#[cfg(feature = "system")]
pub fn start_(&mut self, inbox: TConnectionRx<LspMessage>) -> anyhow::Result<()> {
use EventOrMessage::*;
while let Ok(msg) = inbox.recv() {
const EXIT_METHOD: &str = notification::Exit::METHOD;
let loop_start = tinymist_std::time::now();
match msg {
Evt(event) => {
let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
log::warn!("unhandled event: {:?}", event.as_ref().type_id());
continue;
};
let s = match &mut self.state {
State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
State::ShuttingDown => {
log::warn!("server is shutting down");
continue;
}
};
event_handler(s, &self.client, event)?;
}
Msg(LspMessage::Request(req)) => {
let client = self.client.clone();
let req_id = req.id.clone();
client.register_request(&req.method, &req_id, loop_start);
let fut =
client.schedule_tail(req_id, self.on_lsp_request(&req.method, req.params));
self.client.handle.spawn(fut);
}
Msg(LspMessage::Notification(not)) => {
let is_exit = not.method == EXIT_METHOD;
let track_id = self
.next_not_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.client.hook.start_notification(track_id, ¬.method);
let result = self.on_notification(¬.method, not.params);
self.client
.hook
.stop_notification(track_id, ¬.method, loop_start, result);
if is_exit {
return Ok(());
}
}
Msg(LspMessage::Response(resp)) => {
let s = match &mut self.state {
State::Ready(s) => s,
_ => {
log::warn!("server is not ready yet");
continue;
}
};
self.client.clone().complete_lsp_request(s, resp)
}
}
}
log::warn!("client exited without proper shutdown sequence");
Ok(())
}
#[cfg(feature = "web")]
pub fn on_server_event(&mut self, event_id: u32) {
let evt = match &self.client.sender {
TransportHost::Js { events, .. } => events.lock().remove(&event_id),
TransportHost::System(_) => {
panic!("cannot send server event in system transport");
}
};
if let Some(event) = evt {
let Some(event_handler) = self.events.get(&event.as_ref().type_id()) else {
log::warn!("unhandled event: {:?}", event.as_ref().type_id());
return;
};
let s = match &mut self.state {
State::Uninitialized(u) => ServiceState::Uninitialized(u.as_deref_mut()),
State::Initializing(s) | State::Ready(s) => ServiceState::Ready(s),
State::ShuttingDown => {
log::warn!("server is shutting down");
return;
}
};
let res = event_handler(s, &self.client, event);
if let Err(err) = res {
log::error!("failed to handle server event {event_id}: {err}");
}
}
}
pub fn on_lsp_request(&mut self, method: &str, params: JsonValue) -> ScheduleResult {
match (&mut self.state, method) {
(State::Uninitialized(args), request::Initialize::METHOD) => {
let params = serde_json::from_value::<Args::I>(params);
match params {
Ok(params) => {
let args = args.take().expect("already initialized");
let (s, res) = args.initialize(params);
self.state = State::Initializing(s);
res
}
Err(e) => just_result(Err(invalid_request(e))),
}
}
(State::Uninitialized(..) | State::Initializing(..), _) => {
just_result(Err(not_initialized()))
}
(_, request::Initialize::METHOD) => {
just_result(Err(invalid_request("server is already initialized")))
}
(State::Ready(..), request::ExecuteCommand::METHOD) => self.on_execute_command(params),
(State::Ready(s), method) => 'serve_req: {
let is_shutdown = method == request::Shutdown::METHOD;
let Some(handler) = self.requests.get(method) else {
log::warn!("unhandled lsp request: {method}");
break 'serve_req just_result(Err(method_not_found()));
};
let resp = handler(s, params);
if is_shutdown {
self.state = State::ShuttingDown;
}
resp
}
(State::ShuttingDown, _) => {
just_result(Err(invalid_request("server is shutting down")))
}
}
}
fn on_execute_command(&mut self, params: JsonValue) -> ScheduleResult {
let s = self.state.opt_mut().ok_or_else(not_initialized)?;
let params = from_value::<ExecuteCommandParams>(params)
.map_err(|e| invalid_params(e.to_string()))?;
let ExecuteCommandParams {
command, arguments, ..
} = params;
if command == "tinymist.getResources" {
self.get_resources(arguments)
} else {
let Some(handler) = self.commands.get(command.as_str()) else {
log::error!("asked to execute unknown command: {command}");
return Err(method_not_found());
};
handler(s, arguments)
}
}
pub fn on_notification(&mut self, method: &str, params: JsonValue) -> LspResult<()> {
let handle = |s, method: &str, params: JsonValue| {
let Some(handler) = self.notifications.get(method) else {
log::warn!("unhandled notification: {method}");
return Ok(());
};
handler(s, params)
};
match (&mut self.state, method) {
(state, notification::Initialized::METHOD) => {
let mut s = State::ShuttingDown;
std::mem::swap(state, &mut s);
match s {
State::Initializing(s) => {
*state = State::Ready(s);
}
_ => {
std::mem::swap(state, &mut s);
}
}
let s = match state {
State::Ready(s) => s,
_ => {
log::warn!("server is not ready yet");
return Ok(());
}
};
handle(s, method, params)
}
(State::Ready(state), method) => handle(state, method, params),
(State::Uninitialized(..) | State::Initializing(..), method) => {
log::warn!("server is not ready yet, while received notification {method}");
Ok(())
}
(State::ShuttingDown, method) => {
log::warn!("server is shutting down, while received notification {method}");
Ok(())
}
}
}
pub fn on_lsp_response(&mut self, resp: lsp::Response) {
let client = self.client.clone();
let Some(s) = self.state_mut() else {
log::warn!("server is not ready yet, while received response");
return;
};
client.complete_lsp_request(s, resp)
}
}