use {
crate::{
Partitions,
TRACER,
protocol::lsp::LanguageServer,
scheduler::{
Scheduler,
lanes::{
Lane,
RPC_LANE_HIGH_IDX,
RPC_LANE_LOW_IDX,
is_rpc_lane,
lane_priority,
},
task::LaburnumTask,
},
},
std::{
sync::{
Arc,
atomic::Ordering,
},
task::Poll,
thread,
time::Instant,
},
};
pub(crate) struct Worker<P: Partitions, T: LanguageServer<P>> {
id: usize,
scheduler: Arc<Scheduler<P, T>>,
local_same_lane: Vec<Arc<LaburnumTask<P, T>>>,
current_lane: Option<Lane>,
poll_count: usize,
last_rpc_check: Instant,
trace_context: crate::protocol::otel::TraceContext,
}
impl<P: Partitions, T: LanguageServer<P>> Worker<P, T> {
const RPC_POLL_INTERVAL: std::time::Duration =
std::time::Duration::from_millis(1);
const SYNC_CHECK_PRIORITY_INTERVAL: usize = 16;
pub(crate) fn new(
id: usize,
scheduler: Arc<Scheduler<P, T>>,
trace_context: crate::protocol::otel::TraceContext,
) -> Self {
Self {
id,
scheduler,
local_same_lane: Vec::new(),
current_lane: None,
poll_count: 0,
last_rpc_check: Instant::now() - Self::RPC_POLL_INTERVAL,
trace_context,
}
}
pub(crate) fn spawn(
id: usize,
scheduler: Arc<Scheduler<P, T>>,
trace_context: crate::protocol::otel::TraceContext,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut worker = Worker::new(id, scheduler, trace_context);
worker.run();
})
}
fn run(&mut self) {
let _context_guard = self.trace_context.attach();
otel::span!("laburnum.worker.run", "worker.id" = self.id as i64);
loop {
if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
return;
}
if let Some(task) = self.local_same_lane.pop() {
self.poll_task(task);
continue;
}
if let Some(task) = self.steal_work() {
self.current_lane = Some(task.lane);
self.poll_task(task);
continue;
}
self.park();
}
}
fn poll_task(&mut self, task: Arc<LaburnumTask<P, T>>) {
if task.lane != super::lanes::SYNC_LANE {
self.poll_count += 1;
if self
.poll_count
.is_multiple_of(Self::SYNC_CHECK_PRIORITY_INTERVAL)
&& let Some(higher) = self.has_higher_priority_work(task.lane)
{
if is_rpc_lane(task.lane) {
self.scheduler.queue_rpc_task(task);
} else {
self.scheduler.queue_task(task);
}
self.current_lane = Some(higher.lane);
self.poll_task(higher);
return;
}
}
match task.poll_once() {
| Poll::Ready(()) => {},
| Poll::Pending => {},
}
if is_rpc_lane(task.lane) {
self.last_rpc_check = Instant::now();
}
}
fn steal_work(&mut self) -> Option<Arc<LaburnumTask<P, T>>> {
for lane_idx in 0..RPC_LANE_HIGH_IDX {
if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
return Some(task);
}
}
if self.last_rpc_check.elapsed() >= Self::RPC_POLL_INTERVAL {
self.last_rpc_check = Instant::now();
if let Some(task) = self.steal_rpc_work() {
return Some(task);
}
}
for lane_idx in (RPC_LANE_LOW_IDX + 1)..=30 {
if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
return Some(task);
}
}
None
}
fn steal_rpc_work(&self) -> Option<Arc<LaburnumTask<P, T>>> {
for idx in RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX {
if let Ok(task) = self.scheduler.lane_queues[idx].pop() {
return Some(task);
}
}
None
}
fn has_higher_priority_work(
&self,
current_lane: Lane,
) -> Option<Arc<LaburnumTask<P, T>>> {
let current_priority = lane_priority(current_lane);
for lane_idx in 0..current_priority as usize {
if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
return Some(task);
}
}
None
}
#[allow(dead_code)]
fn steal_same_lane_work(
&self,
lane: Lane,
) -> Option<Arc<LaburnumTask<P, T>>> {
let lane_idx = lane_priority(lane) as usize;
self.scheduler.lane_queues[lane_idx].pop().ok()
}
}
impl<P: Partitions, T: LanguageServer<P>> Worker<P, T> {
fn park(&mut self) {
if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
return;
}
thread::park_timeout(std::time::Duration::from_millis(100));
}
}