laburnum 1.17.0

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  super::DaemonConfig,
  crate::{
    Partitions,
    connect::lsp::{
      ClientId,
      ClientKind,
    },
    protocol::{
      jsonrpc::Notification,
      lsp::LanguageServer,
    },
    scheduler::{
      Scheduler,
      lanes::RPC_LANE_0,
      task::LaburnumTask,
    },
    server::IpcServer,
  },
  dashmap::DashMap,
  std::{
    io,
    sync::{
      Arc,
      atomic::{
        AtomicBool,
        Ordering,
      },
    },
    time::Duration,
  },
};

enum ShutdownReason {
  IdleTimeout,
  Requested,
  External,
}

impl ShutdownReason {
  fn to_notification(&self) -> (&'static str, &'static str) {
    match self {
      | ShutdownReason::IdleTimeout => {
        ("idle_timeout", "Server shutting down due to idle timeout")
      },
      | ShutdownReason::Requested => {
        ("shutdown_requested", "Shutdown requested via command")
      },
      | ShutdownReason::External => ("external", "Server shutting down"),
    }
  }
}

pub(crate) struct DaemonTask<P: Partitions, T: LanguageServer<P>> {
  ipc_server:            IpcServer,
  scheduler:             Arc<Scheduler<P, T>>,
  config:                DaemonConfig,
  client_shutdown_flags: DashMap<ClientId, Arc<AtomicBool>>,
  idle_triggered:        Arc<AtomicBool>,
}

impl<P: Partitions, T: LanguageServer<P>> DaemonTask<P, T> {
  pub(crate) fn create(
    scheduler: Arc<Scheduler<P, T>>,
    mut ipc_server: IpcServer,
    config: DaemonConfig,
    idle_triggered: Arc<AtomicBool>,
  ) -> Arc<LaburnumTask<P, T>> {
    let client_shutdown_flags: DashMap<ClientId, Arc<AtomicBool>> =
      DashMap::new();

    let disconnect_flags = client_shutdown_flags.clone();
    let disconnect_scheduler = scheduler.clone();
    ipc_server.set_disconnect_callback(Arc::new(move |client_id| {
      if let Some((_, shutdown_flag)) = disconnect_flags.remove(&client_id) {
        shutdown_flag.store(true, Ordering::Release);
      }

      disconnect_scheduler
        .progress_tracker()
        .unregister_client(client_id);

      // Clean up source cache state for this client
      {
        let mut source_cache = disconnect_scheduler.source_cache().write();
        source_cache.on_client_disconnect(client_id);
      }

      let client_id_val = client_id.as_u64() as i64;
      otel::event!("daemon_client_disconnected", "client_id" = client_id_val);
    }));

    LaburnumTask::new_with_parent(
      scheduler.clone(),
      move |_ctx| {
        Box::pin(async move {
          let mut task = DaemonTask {
            ipc_server,
            scheduler: scheduler.clone(),
            config,
            client_shutdown_flags,
            idle_triggered,
          };

          task.run_accept_loop().await;

          None
        })
      },
      RPC_LANE_0,
      None,
      ClientId::INTERNAL,
    )
  }

  async fn run_accept_loop(&mut self) {
    let mut consecutive_errors = 0u32;
    let mut shutdown_reason = ShutdownReason::External;

    loop {
      if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
        break;
      }

      enum Event {
        Accept(io::Result<ClientId>),
        Timeout,
      }

      // Race accept against a 1s poll so the loop notices
      // `is_shutdown_requested()` (set by IdleMonitor or external callers)
      // even when no client is trying to connect.
      let event = futures_lite::future::or(
        async { Event::Accept(self.ipc_server.accept_client().await) },
        async {
          smol::Timer::after(Duration::from_secs(1)).await;
          Event::Timeout
        },
      )
      .await;

      match event {
        | Event::Accept(Ok(client_id)) => {
          consecutive_errors = 0;
          self.handle_client_connected(client_id);
        },
        | Event::Accept(Err(e)) => {
          consecutive_errors = consecutive_errors.saturating_add(1);
          let err_msg = e.to_string();
          otel::error!(
            "daemon_accept_error",
            err_msg,
            "consecutive_errors" = consecutive_errors as i64
          );

          if consecutive_errors >= 10 {
            let backoff =
              Duration::from_millis(100 * (1 << consecutive_errors.min(6)));
            smol::Timer::after(backoff).await;
          }
        },
        | Event::Timeout => {
          if let Some(reason) = self.check_shutdown_conditions() {
            shutdown_reason = reason;
            break;
          }
        },
      }
    }

    let (reason, message) = shutdown_reason.to_notification();
    self.graceful_shutdown(reason, message).await;
  }

  fn handle_client_connected(&self, client_id: ClientId) {
    let registry = self.scheduler.registry();
    let Some(client) = registry.get(client_id) else {
      otel::event!(
        "daemon_client_not_found",
        "client_id" = client_id.as_u64() as i64
      );
      return;
    };

    let client_kind = client.kind();
    let connection = client.connection().clone();
    drop(client);

    let client_shutdown = Arc::new(AtomicBool::new(false));
    self
      .client_shutdown_flags
      .insert(client_id, client_shutdown.clone());

    if client_kind == ClientKind::Ide {
      self
        .scheduler
        .progress_tracker()
        .register_client(client_id, connection.sender.clone());
    }

    self
      .scheduler
      .queue_client_rpc_task(connection, client_id, client_shutdown);

    let client_id_val = client_id.as_u64() as i64;
    let kind_str = client_kind.to_string();
    otel::event!(
      "daemon_client_connected",
      "client_id" = client_id_val,
      "client_kind" = kind_str
    );
  }

  fn check_shutdown_conditions(&self) -> Option<ShutdownReason> {
    if !self.scheduler.is_shutdown_requested() {
      return None;
    }

    let reason = if self.idle_triggered.load(Ordering::Acquire) {
      otel::event!("daemon_idle_shutdown");
      ShutdownReason::IdleTimeout
    } else {
      otel::event!("daemon_shutdown_requested");
      ShutdownReason::Requested
    };

    self.scheduler.shutdown_flag.store(true, Ordering::Release);
    Some(reason)
  }

  async fn notify_shutdown(&self, reason: &str, message: &str) {
    let notification = Notification::build("$/serverShutdown")
      .params(serde_json::json!({
        "reason": reason,
        "message": message
      }))
      .finish();

    self.scheduler.registry().broadcast_all(notification).await;
  }

  async fn graceful_shutdown(&self, reason: &str, message: &str) {
    self.scheduler.shutdown_flag.store(true, Ordering::Release);

    self.notify_shutdown(reason, message).await;

    let deadline = std::time::Instant::now() + self.config.shutdown_timeout;
    while std::time::Instant::now() < deadline {
      let active = self
        .client_shutdown_flags
        .iter()
        .filter(|e| !e.value().load(Ordering::Acquire))
        .count();
      if active == 0 {
        break;
      }
      smol::Timer::after(Duration::from_millis(100)).await;
    }

    for entry in self.client_shutdown_flags.iter() {
      entry.value().store(true, Ordering::Release);
    }

    #[cfg(unix)]
    {
      let _ = std::fs::remove_file(self.config.socket_path());
      let _ = std::fs::remove_file(self.config.pid_file_path());
      let _ = std::fs::remove_file(self.config.lock_file_path());
      otel::event!(
        "daemon_cleanup_complete",
        "socket_path" = self.config.socket_path().display().to_string()
      );
    }
  }
}