laburnum 1.17.3

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

//! Pluggable spawning of detached background tasks.
//!
//! Background pumps (the LSP client demux reader, the IPC server I/O pumps)
//! need somewhere to run concurrently with whoever drives them. Production
//! processes own a [`Runtime`] — a laburnum-owned [`async_executor::Executor`]
//! backed by a pool of driver threads — and spawn through its
//! [`spawner`](Runtime::spawner). This replaces smol's process-global
//! executor, which is single-threaded by default and lazily initialised
//! outside our control. Tests inject an [`ExecutorSpawner`] wrapping a single
//! per-test executor, so an entire test — client reader, server scheduler, and
//! the test body — runs cooperatively on one thread instead of oversubscribing
//! the machine with several threads per test process.

use std::{future::Future, io, pin::Pin, sync::Arc, thread};

/// Default driver-thread count for a client-side connection's I/O runtime. A
/// client talks to a single daemon, so its pumps are a handful of mostly-parked
/// tasks; a small pool keeps them off a single thread without overcommitting.
pub const DEFAULT_CLIENT_IO_THREADS: usize = 2;

/// Default driver-thread count for the daemon's I/O runtime, which carries the
/// per-client IPC pumps for every connected client.
pub fn default_daemon_io_threads() -> usize {
  num_cpus::get().max(2)
}

/// A boxed, sendable future with no output — the unit of detached background
/// work handed to a [`Spawn`].
pub type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// A boxed, sendable future yielding an [`io::Result`] — an I/O pump handed to
/// [`Spawn::spawn_io`].
pub type BoxIoFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'static>>;

/// A spawned I/O pump. Awaiting it joins the task; dropping it cancels the
/// task (matching `smol::Task`/`async_executor::Task` ownership semantics).
pub type IoTask = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'static>>;

/// Spawns background tasks onto some executor.
pub trait Spawn: Send + Sync {
  /// Spawn a fire-and-forget task. It runs until completion regardless of
  /// whether the caller keeps a handle.
  fn spawn_detached(&self, future: BoxFuture);

  /// Spawn an I/O pump, returning a handle that can be awaited to join or
  /// dropped to cancel.
  fn spawn_io(&self, future: BoxIoFuture) -> IoTask;
}

/// A shared, type-erased [`Spawn`] handle.
pub type Spawner = Arc<dyn Spawn>;

/// A laburnum-owned async executor backed by a pool of driver threads.
///
/// This is the production home for background pumps: long-lived, mostly-parked
/// I/O tasks (the IPC server's per-client read/write loops, the LSP client's
/// demux reader, the client-side socket pumps). Owning the executor — rather
/// than leaning on smol's lazily-initialised, single-threaded process-global —
/// gives an explicit thread count, a deterministic shutdown (threads are
/// joined on drop), and no reliance on a process-wide singleton.
///
/// The lane [`Scheduler`](crate::scheduler) remains the separate executor for
/// CPU-bound compilation work; this runtime carries only I/O.
pub struct Runtime {
  executor: Arc<async_executor::Executor<'static>>,
  shutdown: Arc<event_listener::Event>,
  threads:  Vec<thread::JoinHandle<()>>,
}

impl Runtime {
  /// Creates a runtime with `threads` driver threads (clamped to at least 1).
  /// Each thread drives the shared executor until the runtime is dropped.
  pub fn new(threads: usize) -> Self {
    let executor = Arc::new(async_executor::Executor::new());
    let shutdown = Arc::new(event_listener::Event::new());

    let threads = (0..threads.max(1))
      .map(|i| {
        let executor = executor.clone();
        let shutdown = shutdown.clone();
        thread::Builder::new()
          .name(format!("laburnum-rt-{i}"))
          .spawn(move || {
            // Run the executor until shutdown is signalled on drop. Driving
            // via async-io's `block_on` services the I/O reactor inline.
            async_io::block_on(executor.run(shutdown.listen()));
          })
          .expect("spawn laburnum runtime thread")
      })
      .collect();

    Self {
      executor,
      shutdown,
      threads,
    }
  }

  /// A [`Spawner`] handle for this runtime. Cheap to clone; keeps the
  /// executor alive for as long as any handle (or spawned task) lives.
  pub fn spawner(&self) -> Spawner {
    Arc::new(ExecutorSpawner::new(self.executor.clone()))
  }

  /// Spawns a task on this runtime, returning a joinable handle. Await the
  /// handle to join the task, or `.detach()` it to let it run independently;
  /// dropping it cancels the task. This is the laburnum replacement for
  /// `smol::spawn` in tests and tooling.
  pub fn spawn<T: Send + 'static>(
    &self,
    future: impl Future<Output = T> + Send + 'static,
  ) -> async_executor::Task<T> {
    self.executor.spawn(future)
  }
}

impl Drop for Runtime {
  fn drop(&mut self) {
    self.shutdown.notify(usize::MAX);
    for handle in self.threads.drain(..) {
      let _ = handle.join();
    }
  }
}

/// Spawns onto a specific [`async_executor::Executor`]. Backs both [`Runtime`]
/// (a thread-pool-driven executor) and tests (a single per-test executor).
pub struct ExecutorSpawner {
  executor: Arc<async_executor::Executor<'static>>,
}

impl ExecutorSpawner {
  pub fn new(executor: Arc<async_executor::Executor<'static>>) -> Self {
    Self { executor }
  }
}

impl Spawn for ExecutorSpawner {
  fn spawn_detached(&self, future: BoxFuture) {
    self.executor.spawn(future).detach();
  }

  fn spawn_io(&self, future: BoxIoFuture) -> IoTask {
    Box::pin(self.executor.spawn(future))
  }
}

/// A spawner over a fresh, never-driven executor. Spawned tasks are held but
/// never polled, and are dropped when the spawner (and thus the client that
/// owns it) drops — no threads are created and teardown is clean. For tests
/// that construct a client whose background reader is never exercised (e.g.
/// MCP protocol tests that don't round-trip to a language server).
#[cfg(feature = "test")]
pub fn inert_spawner() -> Spawner {
  Arc::new(ExecutorSpawner::new(Arc::new(async_executor::Executor::new())))
}