#[cfg(feature = "dap")]
mod dap_srv;
#[cfg(feature = "lsp")]
mod lsp_srv;
use core::fmt;
use std::any::Any;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::AtomicI32;
#[cfg(feature = "web")]
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Weak};
use futures::future::MaybeDone;
use parking_lot::Mutex;
use serde::Serialize;
use serde_json::{Value as JsonValue, from_value};
use tinymist_std::time::Time;
use crate::msg::*;
use crate::req_queue;
use crate::*;
type ImmutPath = Arc<Path>;
pub type ResponseFuture<T> = MaybeDone<Pin<Box<dyn std::future::Future<Output = T> + Send>>>;
pub type LspResponseFuture<T> = LspResult<ResponseFuture<T>>;
pub type SchedulableResponse<T> = LspResponseFuture<LspResult<T>>;
pub type AnySchedulableResponse = SchedulableResponse<JsonValue>;
pub type ScheduleResult = AnySchedulableResponse;
pub type ScheduledResult = LspResult<Option<()>>;
pub type ConnectionTx = TConnectionTx<Message>;
pub type ConnectionRx = TConnectionRx<Message>;
#[derive(Debug, Clone)]
pub struct TConnectionTx<M> {
pub event: crossbeam_channel::Sender<Event>,
pub lsp: crossbeam_channel::Sender<Message>,
pub(crate) marker: std::marker::PhantomData<M>,
}
#[derive(Debug, Clone)]
pub struct TConnectionRx<M> {
pub event: crossbeam_channel::Receiver<Event>,
pub lsp: crossbeam_channel::Receiver<Message>,
pub(crate) marker: std::marker::PhantomData<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> TConnectionRx<M> {
pub fn recv(&self) -> anyhow::Result<EventOrMessage<M>> {
crossbeam_channel::select_biased! {
recv(self.lsp) -> msg => Ok(EventOrMessage::Msg(msg?.try_into()?)),
recv(self.event) -> event => Ok(event.map(EventOrMessage::Evt)?),
}
}
}
pub enum EventOrMessage<M> {
Evt(Event),
Msg(M),
}
pub struct Connection<M> {
pub sender: TConnectionTx<M>,
pub receiver: TConnectionRx<M>,
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<Connection<Message>> for Connection<M> {
fn from(conn: Connection<Message>) -> Self {
Self {
sender: TConnectionTx {
event: conn.sender.event,
lsp: conn.sender.lsp,
marker: std::marker::PhantomData,
},
receiver: TConnectionRx {
event: conn.receiver.event,
lsp: conn.receiver.lsp,
marker: std::marker::PhantomData,
},
}
}
}
impl<M: TryFrom<Message, Error = anyhow::Error>> From<TConnectionTx<M>> for ConnectionTx {
fn from(conn: TConnectionTx<M>) -> Self {
Self {
event: conn.event,
lsp: conn.lsp,
marker: std::marker::PhantomData,
}
}
}
type AnyCaster<S> = Arc<dyn Fn(&mut dyn Any) -> &mut S + Send + Sync>;
pub struct TypedLspClient<S> {
client: LspClient,
caster: AnyCaster<S>,
}
impl<S> TypedLspClient<S> {
pub fn to_untyped(self) -> LspClient {
self.client
}
}
impl<S: 'static> TypedLspClient<S> {
pub fn untyped(&self) -> &LspClient {
&self.client
}
pub fn cast<T: 'static>(&self, f: fn(&mut S) -> &mut T) -> TypedLspClient<T> {
let caster = self.caster.clone();
TypedLspClient {
client: self.client.clone(),
caster: Arc::new(move |s| f(caster(s))),
}
}
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
self.sender.send_event(event);
}
}
impl<S> Clone for TypedLspClient<S> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
caster: self.caster.clone(),
}
}
}
impl<S> std::ops::Deref for TypedLspClient<S> {
type Target = LspClient;
fn deref(&self) -> &Self::Target {
&self.client
}
}
#[derive(Debug, Clone)]
pub struct LspClientRoot {
weak: LspClient,
_strong: Arc<ConnectionTx>,
}
impl LspClientRoot {
pub fn new<M: TryFrom<Message, Error = anyhow::Error> + GetMessageKind>(
handle: tokio::runtime::Handle,
sender: TConnectionTx<M>,
) -> Self {
let _strong = Arc::new(sender.into());
let weak = LspClient {
handle,
msg_kind: M::MESSAGE_KIND,
sender: TransportHost::System(SystemTransportSender {
sender: Arc::downgrade(&_strong),
}),
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
hook: Arc::new(()),
};
Self { weak, _strong }
}
#[cfg(feature = "web")]
pub fn new_js(handle: tokio::runtime::Handle, sender: JsTransportSender) -> Self {
let dummy = dummy_transport::<LspMessage>();
let _strong = Arc::new(dummy.sender.into());
let weak = LspClient {
handle,
msg_kind: LspMessage::MESSAGE_KIND,
sender: TransportHost::Js {
event_id: Arc::new(AtomicU32::new(0)),
events: Arc::new(Mutex::new(HashMap::new())),
sender,
},
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
hook: Arc::new(()),
};
Self { weak, _strong }
}
pub fn with_hook(mut self, hook: Arc<dyn LsHook>) -> Self {
self.weak.hook = hook;
self
}
pub fn weak(&self) -> LspClient {
self.weak.clone()
}
}
type ReqHandler = Box<dyn for<'a> FnOnce(&'a mut dyn Any, LspOrDapResponse) + Send + Sync>;
type ReqQueue = req_queue::ReqQueue<(String, Time), ReqHandler>;
#[derive(Debug, Clone)]
pub enum TransportHost {
System(SystemTransportSender),
#[cfg(feature = "web")]
Js {
event_id: Arc<AtomicU32>,
events: Arc<Mutex<HashMap<u32, Event>>>,
sender: JsTransportSender,
},
}
#[derive(Debug, Clone)]
pub struct SystemTransportSender {
pub(crate) sender: Weak<ConnectionTx>,
}
#[cfg(feature = "web")]
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsTransportSender {
#[serde(with = "serde_wasm_bindgen::preserve")]
pub(crate) send_event: js_sys::Function,
#[serde(with = "serde_wasm_bindgen::preserve")]
pub(crate) send_request: js_sys::Function,
#[serde(with = "serde_wasm_bindgen::preserve")]
pub(crate) send_notification: js_sys::Function,
#[serde(with = "serde_wasm_bindgen::preserve")]
pub resolve_fn: js_sys::Function,
}
#[cfg(feature = "web")]
unsafe impl Send for TransportHost {}
#[cfg(feature = "web")]
unsafe impl Sync for TransportHost {}
impl TransportHost {
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
match self {
TransportHost::System(host) => {
let Some(sender) = host.sender.upgrade() else {
log::warn!("failed to send request: connection closed");
return;
};
if let Err(res) = sender.event.send(Box::new(event)) {
log::warn!("failed to send event: {res:?}");
}
}
#[cfg(feature = "web")]
TransportHost::Js {
event_id,
sender,
events,
} => {
let event_id = {
let event_id = event_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut lg = events.lock();
lg.insert(event_id, Box::new(event));
js_sys::Number::from(event_id)
};
if let Err(err) = sender
.send_event
.call1(&wasm_bindgen::JsValue::UNDEFINED, &event_id.into())
{
log::error!("failed to send event: {err:?}");
}
}
}
}
pub fn send_message(&self, response: Message) {
match self {
TransportHost::System(host) => {
let Some(sender) = host.sender.upgrade() else {
log::warn!("failed to send response: connection closed");
return;
};
if let Err(res) = sender.lsp.send(response) {
log::warn!("failed to send response: {res:?}");
}
}
#[cfg(feature = "web")]
TransportHost::Js { sender, .. } => match response {
#[cfg(feature = "lsp")]
Message::Lsp(lsp::Message::Request(req)) => {
let msg = to_js_value(&req).expect("failed to serialize request to js value");
if let Err(err) = sender
.send_request
.call1(&wasm_bindgen::JsValue::UNDEFINED, &msg)
{
log::error!("failed to send request: {err:?}");
}
}
#[cfg(feature = "lsp")]
Message::Lsp(lsp::Message::Notification(req)) => {
let msg = to_js_value(&req).expect("failed to serialize request to js value");
if let Err(err) = sender
.send_notification
.call1(&wasm_bindgen::JsValue::UNDEFINED, &msg)
{
log::error!("failed to send request: {err:?}");
}
}
#[cfg(feature = "lsp")]
Message::Lsp(lsp::Message::Response(req)) => {
panic!("unexpected response to js world: {req:?}");
}
#[cfg(feature = "dap")]
Message::Dap(dap::Message::Request(req)) => {
let msg = to_js_value(&req).expect("failed to serialize request to js value");
if let Err(err) = sender
.send_request
.call1(&wasm_bindgen::JsValue::UNDEFINED, &msg)
{
log::error!("failed to send request: {err:?}");
}
}
#[cfg(feature = "dap")]
Message::Dap(dap::Message::Event(req)) => {
let msg = to_js_value(&req).expect("failed to serialize request to js value");
if let Err(err) = sender
.send_notification
.call1(&wasm_bindgen::JsValue::UNDEFINED, &msg)
{
log::error!("failed to send request: {err:?}");
}
}
#[cfg(feature = "dap")]
Message::Dap(dap::Message::Response(req)) => {
panic!("unexpected response to js world: {req:?}");
}
},
}
}
}
#[cfg(feature = "web")]
fn to_js_value<T: serde::Serialize>(
value: &T,
) -> Result<wasm_bindgen::JsValue, serde_wasm_bindgen::Error> {
value.serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true))
}
#[derive(Debug, Clone)]
pub struct LspClient {
pub handle: tokio::runtime::Handle,
pub(crate) msg_kind: MessageKind,
pub sender: TransportHost,
pub(crate) req_queue: Arc<Mutex<ReqQueue>>,
pub(crate) hook: Arc<dyn LsHook>,
}
impl LspClient {
pub fn untyped(&self) -> &Self {
self
}
pub fn to_typed<S: Any>(&self) -> TypedLspClient<S> {
TypedLspClient {
client: self.clone(),
caster: Arc::new(|s| s.downcast_mut().expect("invalid cast")),
}
}
pub fn has_pending_requests(&self) -> bool {
self.req_queue.lock().incoming.has_pending()
}
pub fn begin_panic(&self) {
self.req_queue.lock().begin_panic();
}
pub fn send_event<T: std::any::Any + Send + 'static>(&self, event: T) {
self.sender.send_event(event);
}
#[cfg(feature = "lsp")]
pub fn complete_lsp_request<S: Any>(&self, service: &mut S, response: lsp::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue.outgoing.complete(response.id.clone()) else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
#[cfg(feature = "dap")]
pub fn complete_dap_request<S: Any>(&self, service: &mut S, response: dap::Response) {
let mut req_queue = self.req_queue.lock();
let Some(handler) = req_queue
.outgoing
.complete((response.request_seq as i32).into())
else {
log::warn!("received response for unknown request");
return;
};
drop(req_queue);
handler(service, response.into())
}
pub fn register_request(&self, method: &str, id: &RequestId, received_at: Time) {
let mut req_queue = self.req_queue.lock();
self.hook.start_request(id, method);
req_queue
.incoming
.register(id.clone(), (method.to_owned(), received_at));
}
fn respond_result(&self, id: RequestId, result: LspResult<JsonValue>) {
let req_id = id.clone();
let msg: Message = match (self.msg_kind, result) {
#[cfg(feature = "lsp")]
(MessageKind::Lsp, res) => lsp::Response::new(id, res).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Ok(resp)) => dap::Response::success(RequestId::dap(id), resp).into(),
#[cfg(feature = "dap")]
(MessageKind::Dap, Err(e)) => {
dap::Response::error(RequestId::dap(id), Some(e.message), None).into()
}
};
self.respond(req_id, msg);
}
pub fn respond(&self, id: RequestId, response: Message) {
let mut req_queue = self.req_queue.lock();
let Some((method, received_at)) = req_queue.incoming.complete(&id) else {
return;
};
self.hook.stop_request(&id, &method, received_at);
let delay = tinymist_std::time::now().duration_since(received_at);
match delay {
Ok(delay) => {
if delay.as_secs() > 10 {
let worst_outgoing =
req_queue.incoming.pending().max_by_key(|(_, data)| data.1);
let worst_case = if let Some((id, (method, since))) = worst_outgoing {
let duration = tinymist_std::time::now().duration_since(*since);
format!(", worst case: req({method:?}, {id:?}) - {duration:?}")
} else {
String::new()
};
log::warn!(
"request {id:?} is completed after {delay:?}, pending incoming requests: {:?}, pending outgoing requests: {:?}{worst_case}",
req_queue.incoming,
req_queue.outgoing
);
}
}
Err(err) => {
log::error!("failed to get delay for request {id:?}: {err:?}");
}
}
self.sender.send_message(response);
}
}
impl LspClient {
pub async fn schedule_tail(self, req_id: RequestId, resp: ScheduleResult) {
match resp {
Ok(MaybeDone::Done(result)) => {
self.respond_result(req_id, result);
}
Ok(MaybeDone::Future(result)) => {
self.respond_result(req_id, result.await);
}
Ok(MaybeDone::Gone) => {
log::warn!("response for request({req_id:?}) already taken");
self.respond_result(req_id, Err(internal_error("response already taken")));
}
Err(err) => {
self.respond_result(req_id, Err(err));
}
}
}
}
pub trait LsHook: fmt::Debug + Send + Sync {
fn start_request(&self, req_id: &RequestId, method: &str);
fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Time);
fn start_notification(&self, track_id: i32, method: &str);
fn stop_notification(
&self,
track_id: i32,
method: &str,
received_at: Time,
result: LspResult<()>,
);
}
impl LsHook for () {
fn start_request(&self, req_id: &RequestId, method: &str) {
log::info!("handling {method} - ({req_id})");
}
fn stop_request(&self, req_id: &RequestId, method: &str, received_at: Time) {
let duration = received_at.elapsed();
log::info!("handled {method} - ({req_id}) in {duration:0.2?}");
}
fn start_notification(&self, track_id: i32, method: &str) {
log::info!("notifying ({track_id}) - {method}");
}
fn stop_notification(
&self,
track_id: i32,
method: &str,
received_at: Time,
result: LspResult<()>,
) {
let request_duration = received_at.elapsed();
if let Err(err) = result {
log::error!(
"notify ({track_id}) - {method} failed in {request_duration:0.2?}: {err:?}"
);
} else {
log::info!("notify ({track_id}) - {method} succeeded in {request_duration:0.2?}");
}
}
}
type AsyncHandler<S, T, R> = fn(srv: &mut S, args: T) -> SchedulableResponse<R>;
type RawHandler<S, T> = fn(srv: &mut S, args: T) -> ScheduleResult;
type BoxPureHandler<S, T> = Box<dyn Fn(&mut S, T) -> LspResult<()>>;
type BoxHandler<S, T> = Box<dyn Fn(&mut S, T) -> SchedulableResponse<JsonValue>>;
type ExecuteCmdMap<S> = HashMap<&'static str, BoxHandler<S, Vec<JsonValue>>>;
type RegularCmdMap<S> = HashMap<&'static str, BoxHandler<S, JsonValue>>;
type NotifyCmdMap<S> = HashMap<&'static str, BoxPureHandler<S, JsonValue>>;
type ResourceMap<S> = HashMap<ImmutPath, BoxHandler<S, Vec<JsonValue>>>;
type MayInitBoxHandler<A, S, T> =
Box<dyn for<'a> Fn(ServiceState<'a, A, S>, &LspClient, T) -> anyhow::Result<()>>;
type EventMap<A, S> = HashMap<core::any::TypeId, MayInitBoxHandler<A, S, Event>>;
pub trait Initializer {
type I: for<'de> serde::Deserialize<'de>;
type S;
fn initialize(self, req: Self::I) -> (Self::S, AnySchedulableResponse);
}
#[cfg(feature = "lsp")]
pub type LspBuilder<Args> = LsBuilder<LspMessage, Args>;
#[cfg(feature = "dap")]
pub type DapBuilder<Args> = LsBuilder<DapMessage, Args>;
pub struct LsBuilder<M, Args: Initializer> {
pub args: Args,
pub client: LspClient,
pub events: EventMap<Args, Args::S>,
pub command_handlers: ExecuteCmdMap<Args::S>,
pub notif_handlers: NotifyCmdMap<Args::S>,
pub req_handlers: RegularCmdMap<Args::S>,
pub resource_handlers: ResourceMap<Args::S>,
_marker: std::marker::PhantomData<M>,
}
impl<M, Args: Initializer> LsBuilder<M, Args>
where
Args::S: 'static,
{
pub fn new(args: Args, client: LspClient) -> Self {
Self {
args,
client,
events: EventMap::new(),
command_handlers: ExecuteCmdMap::new(),
notif_handlers: NotifyCmdMap::new(),
req_handlers: RegularCmdMap::new(),
resource_handlers: ResourceMap::new(),
_marker: std::marker::PhantomData,
}
}
pub fn with_event<T: std::any::Any>(
mut self,
ins: &T,
handler: impl for<'a> Fn(ServiceState<'a, Args, Args::S>, T) -> anyhow::Result<()> + 'static,
) -> Self {
self.events.insert(
ins.type_id(),
Box::new(move |s, _client, req| handler(s, *req.downcast().unwrap())),
);
self
}
pub fn with_resource(
mut self,
path: &'static str,
handler: fn(&mut Args::S, Vec<JsonValue>) -> AnySchedulableResponse,
) -> Self {
self.resource_handlers
.insert(Path::new(path).into(), Box::new(handler));
self
}
pub fn build(self) -> LsDriver<M, Args> {
LsDriver {
state: State::Uninitialized(Some(Box::new(self.args))),
next_not_id: AtomicI32::new(1),
events: self.events,
client: self.client,
commands: self.command_handlers,
notifications: self.notif_handlers,
requests: self.req_handlers,
resources: self.resource_handlers,
_marker: std::marker::PhantomData,
}
}
}
pub enum ServiceState<'a, A, S> {
Uninitialized(Option<&'a mut A>),
Ready(&'a mut S),
}
impl<A, S> ServiceState<'_, A, S> {
pub fn ready(&mut self) -> Option<&mut S> {
match self {
ServiceState::Ready(s) => Some(s),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum State<Args, S> {
Uninitialized(Option<Box<Args>>),
Initializing(S),
Ready(S),
ShuttingDown,
}
impl<Args, S> State<Args, S> {
fn opt(&self) -> Option<&S> {
match &self {
State::Ready(s) => Some(s),
_ => None,
}
}
fn opt_mut(&mut self) -> Option<&mut S> {
match self {
State::Ready(s) => Some(s),
_ => None,
}
}
}
pub struct LsDriver<M, Args: Initializer> {
state: State<Args, Args::S>,
pub client: LspClient,
pub next_not_id: AtomicI32,
pub events: EventMap<Args, Args::S>,
pub commands: ExecuteCmdMap<Args::S>,
pub notifications: NotifyCmdMap<Args::S>,
pub requests: RegularCmdMap<Args::S>,
pub resources: ResourceMap<Args::S>,
_marker: std::marker::PhantomData<M>,
}
impl<M, Args: Initializer> LsDriver<M, Args> {
pub fn state(&self) -> Option<&Args::S> {
self.state.opt()
}
pub fn state_mut(&mut self) -> Option<&mut Args::S> {
self.state.opt_mut()
}
pub fn ready(&mut self, params: Args::I) -> AnySchedulableResponse {
let args = match &mut self.state {
State::Uninitialized(args) => args,
_ => return just_result(Err(invalid_request("server is already initialized"))),
};
let args = args.take().expect("already initialized");
let (s, res) = args.initialize(params);
self.state = State::Ready(s);
res
}
pub fn get_resources(&mut self, args: Vec<JsonValue>) -> ScheduleResult {
let s = self.state.opt_mut().ok_or_else(not_initialized)?;
let path =
from_value::<PathBuf>(args[0].clone()).map_err(|e| invalid_params(e.to_string()))?;
let Some(handler) = self.resources.get(path.as_path()) else {
log::error!("asked for unknown resource: {path:?}");
return Err(method_not_found());
};
handler(s, args)
}
}
pub fn just_ok<T, E>(res: T) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(Ok(res)))
}
pub fn just_result<T, E>(res: Result<T, E>) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Done(res))
}
pub fn just_future<T, E>(
fut: impl std::future::Future<Output = Result<T, E>> + Send + 'static,
) -> Result<ResponseFuture<Result<T, E>>, E> {
Ok(futures::future::MaybeDone::Future(Box::pin(fut)))
}
pub fn invalid_params(msg: impl fmt::Display) -> ResponseError {
resp_err(ErrorCode::InvalidParams, msg)
}
pub fn internal_error(msg: impl fmt::Display) -> ResponseError {
resp_err(ErrorCode::InternalError, msg)
}
pub fn not_initialized() -> ResponseError {
resp_err(ErrorCode::ServerNotInitialized, "not initialized yet")
}
pub fn method_not_found() -> ResponseError {
resp_err(ErrorCode::MethodNotFound, "method not found")
}
pub fn invalid_request(msg: impl fmt::Display) -> ResponseError {
resp_err(ErrorCode::InvalidRequest, msg)
}
fn from_json<T: serde::de::DeserializeOwned>(json: JsonValue) -> LspResult<T> {
serde_json::from_value(json).map_err(invalid_request)
}
pub fn erased_response<T: Serialize + 'static>(resp: SchedulableResponse<T>) -> ScheduleResult {
fn map_respond_result<T: Serialize>(result: LspResult<T>) -> LspResult<JsonValue> {
result.and_then(|t| serde_json::to_value(t).map_err(internal_error))
}
let resp = resp?;
use futures::future::MaybeDone::*;
Ok(match resp {
Done(result) => MaybeDone::Done(map_respond_result(result)),
Future(fut) => MaybeDone::Future(Box::pin(async move { map_respond_result(fut.await) })),
Gone => {
log::warn!("response already taken");
MaybeDone::Done(Err(internal_error("response already taken")))
}
})
}
fn resp_err(code: ErrorCode, msg: impl fmt::Display) -> ResponseError {
ResponseError {
code: code as i32,
message: msg.to_string(),
data: None,
}
}