laburnum 1.17.0

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

//! DaemonServer implementation for multi-client daemon mode.

use {
  super::{
    DaemonConfig,
    socket::{
      DaemonStatus,
      check_daemon_status,
      cleanup_stale_socket,
      write_pid_file,
    },
  },
  crate::{
    Partitions,
    SourceCache,
    connect::lsp::ClientRegistry,
    fs::FS,
    protocol::lsp::LanguageServer,
    scheduler::{
      Scheduler,
      SchedulerConfiguration,
    },
    server::IpcServer,
  },
  std::{
    io,
    sync::Arc,
  },
};

pub struct DaemonServer<P: Partitions, T: LanguageServer<P>> {
  scheduler: Arc<Scheduler<P, T>>,
  config:    DaemonConfig,
}

impl<P: Partitions, T: LanguageServer<P>> DaemonServer<P, T> {
  pub fn new(
    server: Arc<T>,
    config: DaemonConfig,
    filesystems: Arc<parking_lot::RwLock<Vec<FS>>>,
    source_cache: Arc<parking_lot::RwLock<SourceCache<P, T>>>,
    worker_count: usize,
    scheduler_config: SchedulerConfiguration,
  ) -> Self
  where
    T: crate::hooks::LaburnumHooks<P, T>,
  {
    let registry = Arc::new(ClientRegistry::new());

    let scheduler = Scheduler::new_daemon(
      server.clone(),
      filesystems,
      source_cache,
      worker_count,
      scheduler_config,
      registry,
    );

    Self { scheduler, config }
  }

  pub fn scheduler(&self) -> &Arc<Scheduler<P, T>> {
    &self.scheduler
  }

  pub fn registry(&self) -> &Arc<ClientRegistry> {
    self.scheduler.registry()
  }

  pub fn config(&self) -> &DaemonConfig {
    &self.config
  }

  pub fn master_shutdown(&self) -> &Arc<std::sync::atomic::AtomicBool> {
    &self.scheduler.shutdown_flag
  }

  pub fn run(&self) -> io::Result<()>
  where
    T: crate::hooks::LaburnumHooks<P, T>,
  {
    #[cfg(unix)]
    {
      let runtime_dir = self.config.runtime_dir();
      std::fs::create_dir_all(&runtime_dir)?;
      write_pid_file(&self.config)?;
    }

    let endpoint = self.config.endpoint();

    match smol::block_on(check_daemon_status(&self.config))? {
      | DaemonStatus::StaleSocket => {
        cleanup_stale_socket(&self.config)?;
      },
      | DaemonStatus::Running => {
        return Err(io::Error::new(
          io::ErrorKind::AddrInUse,
          "daemon already running",
        ));
      },
      | DaemonStatus::NotRunning => {},
    }

    let ipc_server = smol::block_on(IpcServer::bind(
      &endpoint,
      self.registry().clone(),
      env!("CARGO_PKG_VERSION"),
    ))?;

    let ws_id = self.config.workspace_id.clone();
    otel::event!("daemon_server_started", "workspace_id" = ws_id);

    self.scheduler.run_daemon(ipc_server, self.config.clone());

    Ok(())
  }
}

#[cfg(test)]
mod tests {
  use {
    super::*,
    crate::{
      connect::{ipc::Connection, lsp::ClientKind},
      protocol::jsonrpc::Notification,
    },
    std::{
      collections::HashMap,
      sync::Arc,
      time::Duration,
    },
  };

  #[test]
  fn test_daemon_config_creation() {
    let config = DaemonConfig::new("test-server", "ws-123");
    assert_eq!(config.server_name, "test-server");
    assert_eq!(config.workspace_id, "ws-123");
  }

  #[test]
  fn test_check_idle_timeout_returns_false_when_clients_connected() {
    let registry = Arc::new(ClientRegistry::new());
    let (conn, _) = Connection::memory();
    let _id = registry.register(ClientKind::Cli, conn, HashMap::new());

    assert!(registry.idle_duration().is_none());

    let result = registry
      .idle_duration()
      .map(|d| d >= Duration::from_secs(300))
      .unwrap_or(false);
    assert!(!result);
  }

  #[test]
  fn test_check_idle_timeout_returns_false_before_timeout() {
    let registry = Arc::new(ClientRegistry::new());
    let (conn, _) = Connection::memory();
    let id = registry.register(ClientKind::Cli, conn, HashMap::new());
    registry.unregister(id);

    let timeout = Duration::from_secs(300);
    let result = registry
      .idle_duration()
      .map(|d| d >= timeout)
      .unwrap_or(false);

    assert!(!result);
  }

  #[test]
  fn test_check_idle_timeout_logic_with_exceeded_timeout() {
    let idle_duration = Some(Duration::from_secs(600));
    let timeout = Duration::from_secs(300);

    let result = idle_duration.map(|d| d >= timeout).unwrap_or(false);

    assert!(result);
  }

  #[test]
  fn test_shutdown_notification_format() {
    let notification = Notification::build("$/serverShutdown")
      .params(serde_json::json!({
        "reason": "idle_timeout",
        "message": "Server shutting down due to idle timeout"
      }))
      .finish();

    let json = serde_json::to_value(&notification).unwrap();

    assert_eq!(json["method"], "$/serverShutdown");
    assert_eq!(json["params"]["reason"], "idle_timeout");
    assert!(
      json["params"]["message"]
        .as_str()
        .unwrap()
        .contains("idle timeout")
    );
  }

  #[test]
  fn test_cleanup_does_not_panic_on_missing_files() {
    let config = DaemonConfig::new("test-cleanup", "nonexistent-workspace");

    #[cfg(unix)]
    {
      let socket_path = config.socket_path();
      let pid_path = config.pid_file_path();
      let lock_path = config.lock_file_path();

      assert!(!socket_path.exists());
      assert!(!pid_path.exists());
      assert!(!lock_path.exists());

      let _ = std::fs::remove_file(&socket_path);
      let _ = std::fs::remove_file(&pid_path);
      let _ = std::fs::remove_file(&lock_path);
    }
  }

  #[test]
  fn test_shutdown_notification_actually_broadcasts() {
    smol::block_on(async {
      let registry = Arc::new(ClientRegistry::new());

      let (server_conn1, client_conn1) = Connection::memory();
      let (server_conn2, client_conn2) = Connection::memory();

      let _id1 = registry.register(ClientKind::Ide, server_conn1, HashMap::new());
      let _id2 = registry.register(ClientKind::Cli, server_conn2, HashMap::new());

      let notification = Notification::build("$/serverShutdown")
        .params(serde_json::json!({
          "reason": "test",
          "message": "Test shutdown"
        }))
        .finish();

      registry.broadcast_all(notification).await;

      let msg1 = client_conn1.receiver.try_recv();
      let msg2 = client_conn2.receiver.try_recv();

      assert!(msg1.is_ok(), "IDE should receive shutdown notification");
      assert!(msg2.is_ok(), "CLI should receive shutdown notification");

      if let Ok(crate::protocol::jsonrpc::Message::Notification(n)) = msg1 {
        assert_eq!(n.method(), "$/serverShutdown");
        let params = n.params().unwrap();
        assert_eq!(params["reason"], "test");
      } else {
        panic!("Expected shutdown notification");
      }
    });
  }
}