use {
super::DaemonConfig,
crate::{
Partitions,
connect::lsp::{
ClientId,
ClientKind,
},
protocol::{
jsonrpc::Notification,
lsp::LanguageServer,
},
scheduler::{
Scheduler,
lanes::RPC_LANE_0,
task::LaburnumTask,
},
server::IpcServer,
},
dashmap::DashMap,
std::{
io,
sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
},
time::Duration,
},
};
enum ShutdownReason {
IdleTimeout,
Requested,
External,
}
impl ShutdownReason {
fn to_notification(&self) -> (&'static str, &'static str) {
match self {
| ShutdownReason::IdleTimeout => {
("idle_timeout", "Server shutting down due to idle timeout")
},
| ShutdownReason::Requested => {
("shutdown_requested", "Shutdown requested via command")
},
| ShutdownReason::External => ("external", "Server shutting down"),
}
}
}
pub(crate) struct DaemonTask<P: Partitions, T: LanguageServer<P>> {
ipc_server: IpcServer,
scheduler: Arc<Scheduler<P, T>>,
config: DaemonConfig,
client_shutdown_flags: DashMap<ClientId, Arc<AtomicBool>>,
idle_triggered: Arc<AtomicBool>,
}
impl<P: Partitions, T: LanguageServer<P>> DaemonTask<P, T> {
pub(crate) fn create(
scheduler: Arc<Scheduler<P, T>>,
mut ipc_server: IpcServer,
config: DaemonConfig,
idle_triggered: Arc<AtomicBool>,
) -> Arc<LaburnumTask<P, T>> {
let client_shutdown_flags: DashMap<ClientId, Arc<AtomicBool>> =
DashMap::new();
let disconnect_flags = client_shutdown_flags.clone();
let disconnect_scheduler = scheduler.clone();
ipc_server.set_disconnect_callback(Arc::new(move |client_id| {
if let Some((_, shutdown_flag)) = disconnect_flags.remove(&client_id) {
shutdown_flag.store(true, Ordering::Release);
}
disconnect_scheduler
.progress_tracker()
.unregister_client(client_id);
{
let mut source_cache = disconnect_scheduler.source_cache().write();
source_cache.on_client_disconnect(client_id);
}
let client_id_val = client_id.as_u64() as i64;
otel::event!("daemon_client_disconnected", "client_id" = client_id_val);
}));
LaburnumTask::new_with_parent(
scheduler.clone(),
move |_ctx| {
Box::pin(async move {
let mut task = DaemonTask {
ipc_server,
scheduler: scheduler.clone(),
config,
client_shutdown_flags,
idle_triggered,
};
task.run_accept_loop().await;
None
})
},
RPC_LANE_0,
None,
ClientId::INTERNAL,
)
}
async fn run_accept_loop(&mut self) {
let mut consecutive_errors = 0u32;
let mut shutdown_reason = ShutdownReason::External;
loop {
if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
break;
}
enum Event {
Accept(io::Result<ClientId>),
Timeout,
}
let event = futures_lite::future::or(
async { Event::Accept(self.ipc_server.accept_client().await) },
async {
smol::Timer::after(Duration::from_secs(1)).await;
Event::Timeout
},
)
.await;
match event {
| Event::Accept(Ok(client_id)) => {
consecutive_errors = 0;
self.handle_client_connected(client_id);
},
| Event::Accept(Err(e)) => {
consecutive_errors = consecutive_errors.saturating_add(1);
let err_msg = e.to_string();
otel::error!(
"daemon_accept_error",
err_msg,
"consecutive_errors" = consecutive_errors as i64
);
if consecutive_errors >= 10 {
let backoff =
Duration::from_millis(100 * (1 << consecutive_errors.min(6)));
smol::Timer::after(backoff).await;
}
},
| Event::Timeout => {
if let Some(reason) = self.check_shutdown_conditions() {
shutdown_reason = reason;
break;
}
},
}
}
let (reason, message) = shutdown_reason.to_notification();
self.graceful_shutdown(reason, message).await;
}
fn handle_client_connected(&self, client_id: ClientId) {
let registry = self.scheduler.registry();
let Some(client) = registry.get(client_id) else {
otel::event!(
"daemon_client_not_found",
"client_id" = client_id.as_u64() as i64
);
return;
};
let client_kind = client.kind();
let connection = client.connection().clone();
drop(client);
let client_shutdown = Arc::new(AtomicBool::new(false));
self
.client_shutdown_flags
.insert(client_id, client_shutdown.clone());
if client_kind == ClientKind::Ide {
self
.scheduler
.progress_tracker()
.register_client(client_id, connection.sender.clone());
}
self
.scheduler
.queue_client_rpc_task(connection, client_id, client_shutdown);
let client_id_val = client_id.as_u64() as i64;
let kind_str = client_kind.to_string();
otel::event!(
"daemon_client_connected",
"client_id" = client_id_val,
"client_kind" = kind_str
);
}
fn check_shutdown_conditions(&self) -> Option<ShutdownReason> {
if !self.scheduler.is_shutdown_requested() {
return None;
}
let reason = if self.idle_triggered.load(Ordering::Acquire) {
otel::event!("daemon_idle_shutdown");
ShutdownReason::IdleTimeout
} else {
otel::event!("daemon_shutdown_requested");
ShutdownReason::Requested
};
self.scheduler.shutdown_flag.store(true, Ordering::Release);
Some(reason)
}
async fn notify_shutdown(&self, reason: &str, message: &str) {
let notification = Notification::build("$/serverShutdown")
.params(serde_json::json!({
"reason": reason,
"message": message
}))
.finish();
self.scheduler.registry().broadcast_all(notification).await;
}
async fn graceful_shutdown(&self, reason: &str, message: &str) {
self.scheduler.shutdown_flag.store(true, Ordering::Release);
self.notify_shutdown(reason, message).await;
let deadline = std::time::Instant::now() + self.config.shutdown_timeout;
while std::time::Instant::now() < deadline {
let active = self
.client_shutdown_flags
.iter()
.filter(|e| !e.value().load(Ordering::Acquire))
.count();
if active == 0 {
break;
}
smol::Timer::after(Duration::from_millis(100)).await;
}
for entry in self.client_shutdown_flags.iter() {
entry.value().store(true, Ordering::Release);
}
#[cfg(unix)]
{
let _ = std::fs::remove_file(self.config.socket_path());
let _ = std::fs::remove_file(self.config.pid_file_path());
let _ = std::fs::remove_file(self.config.lock_file_path());
otel::event!(
"daemon_cleanup_complete",
"socket_path" = self.config.socket_path().display().to_string()
);
}
}
}