use {
crate::{
connect::lsp::{ClientId, ClientRegistry},
daemon::{
DaemonConfig, daemon_task::DaemonTask, idle_monitor::idle_monitor_task,
},
server::IpcServer,
},
concurrent_queue::ConcurrentQueue,
std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
},
};
mod gc;
pub mod key_watcher;
pub mod lanes;
pub mod task;
mod worker;
use {
crate::{
Partitions, TRACER,
connect::ipc::Connection,
database::{
Database, GenerationEpoch, chunk::RecordWriter, gc::GarbageCollector,
reaper::Reaper,
},
progress::ProgressTracker,
protocol::{lsp::LanguageServer, task::RpcTask},
scheduler::{
key_watcher::dispatch_builtin_watcher,
lanes::{Lane, lane_priority},
task::{LaburnumTask, TaskContext},
},
},
std::{
collections::BTreeMap,
future::Future,
},
};
#[derive(Debug, Clone)]
pub struct SchedulerConfiguration {
pub rpc_response_capacity: usize,
pub enable_periodic_gc: bool,
}
impl Default for SchedulerConfiguration {
fn default() -> Self {
Self {
rpc_response_capacity: 100,
#[cfg(feature = "test")]
enable_periodic_gc: false,
#[cfg(not(feature = "test"))]
enable_periodic_gc: true,
}
}
}
pub struct Scheduler<P: Partitions, T: LanguageServer<P>> {
db: Database<P>,
pub(crate) connection: Connection,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
lane_queues: [ConcurrentQueue<Arc<LaburnumTask<P, T>>>; 31],
rpc_rotation_lock: parking_lot::Mutex<()>,
worker_threads: parking_lot::RwLock<Vec<std::thread::JoinHandle<()>>>,
worker_count: usize,
pub server: Arc<T>,
pub shutdown_flag: Arc<AtomicBool>,
config: SchedulerConfiguration,
shutdown_requested: Arc<AtomicBool>,
pub(crate) progress_tracker: Arc<ProgressTracker>,
pub(crate) registry: Arc<ClientRegistry>,
pub(crate) reaper: Reaper<P>,
pub(crate) gc: GarbageCollector,
pub(crate) active_epochs:
parking_lot::Mutex<BTreeMap<GenerationEpoch, usize>>,
}
impl<P: Partitions, T: LanguageServer<P>> Scheduler<P, T> {
pub fn new(
connection: Connection,
server: Arc<T>,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
) -> Arc<Self> {
let worker_count = num_cpus::get().saturating_sub(1).max(1);
Self::new_with_config(
connection,
server,
filesystems,
source_cache,
worker_count,
SchedulerConfiguration::default(),
)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn new_with_worker_count(
connection: Connection,
server: Arc<T>,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
worker_count: usize,
) -> Arc<Self> {
Self::new_with_config(
connection,
server,
filesystems,
source_cache,
worker_count,
SchedulerConfiguration::default(),
)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn new_with_config(
connection: Connection,
server: Arc<T>,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
worker_count: usize,
config: SchedulerConfiguration,
) -> Arc<Self>
where
T: crate::hooks::LaburnumHooks<P, T>,
{
Self::new_inner(
connection,
server,
filesystems,
source_cache,
worker_count,
config,
Arc::new(ClientRegistry::new()),
)
}
pub fn new_daemon(
server: Arc<T>,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
worker_count: usize,
config: SchedulerConfiguration,
registry: Arc<ClientRegistry>,
) -> Arc<Self>
where
T: crate::hooks::LaburnumHooks<P, T>,
{
let (placeholder_sender, placeholder_receiver) = async_channel::unbounded();
let placeholder_connection = Connection {
sender: placeholder_sender,
receiver: placeholder_receiver,
};
Self::new_inner(
placeholder_connection,
server,
filesystems,
source_cache,
worker_count,
config,
registry,
)
}
fn new_inner(
connection: Connection,
server: Arc<T>,
filesystems: Arc<parking_lot::RwLock<Vec<crate::fs::FS>>>,
source_cache: Arc<parking_lot::RwLock<crate::SourceCache<P, T>>>,
worker_count: usize,
config: SchedulerConfiguration,
registry: Arc<ClientRegistry>,
) -> Arc<Self>
where
T: crate::hooks::LaburnumHooks<P, T>,
{
otel::span!("laburnum.scheduler.new");
let shutdown_flag = Arc::new(AtomicBool::new(false));
let progress_tracker = Arc::new(ProgressTracker::new_disconnected());
progress_tracker
.register_client(ClientId::INTERNAL, connection.sender.clone());
let db = Database::new();
let reaper = Reaper::new(db.cas.stores_arc());
let gc = GarbageCollector::new();
let s = Arc::new(Self {
db,
connection: connection.clone(),
filesystems,
source_cache,
lane_queues: std::array::from_fn(|_| ConcurrentQueue::unbounded()),
rpc_rotation_lock: parking_lot::Mutex::new(()),
worker_threads: parking_lot::RwLock::new(Vec::new()),
worker_count,
server: server.clone(),
shutdown_flag: shutdown_flag.clone(),
config: config.clone(),
shutdown_requested: Arc::new(AtomicBool::new(false)),
progress_tracker,
registry,
reaper,
gc,
active_epochs: parking_lot::Mutex::new(BTreeMap::new()),
});
s.source_cache.write().set_scheduler(Arc::downgrade(&s));
s
}
pub fn registry(&self) -> &Arc<ClientRegistry> {
&self.registry
}
pub(crate) fn source_cache(
&self,
) -> &Arc<parking_lot::RwLock<crate::SourceCache<P, T>>> {
&self.source_cache
}
pub fn request_shutdown(&self) {
self.shutdown_requested.store(true, Ordering::Release);
}
pub fn is_shutdown_requested(&self) -> bool {
self.shutdown_requested.load(Ordering::Acquire)
}
pub(crate) fn create_rpc_task_for_client(
self: &Arc<Self>,
connection: Connection,
client_id: ClientId,
shutdown_flag: Arc<AtomicBool>,
) -> Arc<LaburnumTask<P, T>> {
RpcTask::create(
(*self).clone(),
connection,
client_id,
self.server.clone(),
shutdown_flag,
self.config.rpc_response_capacity,
)
}
pub fn queue_client_rpc_task(
self: &Arc<Self>,
connection: Connection,
client_id: ClientId,
shutdown_flag: Arc<AtomicBool>,
) {
let task =
self.create_rpc_task_for_client(connection, client_id, shutdown_flag);
self.queue_rpc_task(task);
}
pub fn progress_tracker(&self) -> &Arc<ProgressTracker> {
&self.progress_tracker
}
pub fn run_daemon(
self: &Arc<Self>,
ipc_server: IpcServer,
config: DaemonConfig,
) where
T: crate::hooks::LaburnumHooks<P, T>,
{
otel::span!("laburnum.scheduler.run_daemon");
let idle_triggered = Arc::new(AtomicBool::new(false));
if let Some(idle_timeout) = config.idle_timeout {
self.queue_task(LaburnumTask::new_with_parent(
self.clone(),
idle_monitor_task(
self.shutdown_flag.clone(),
idle_triggered.clone(),
idle_timeout,
),
lanes::IDLE_LANE,
None,
ClientId::INTERNAL,
));
}
self.queue_task(DaemonTask::create(
self.clone(),
ipc_server,
config,
idle_triggered,
));
if self.config.enable_periodic_gc {
self.queue_task(LaburnumTask::new_with_parent(
self.clone(),
gc::periodic_gc_task(self.shutdown_flag.clone()),
lanes::IDLE_LANE,
None,
ClientId::INTERNAL,
));
}
self.spawn_workers();
while !self.shutdown_flag.load(Ordering::Acquire) {
std::thread::park_timeout(Duration::from_millis(100));
}
self.notify_workers();
let handles = {
let mut threads = self.worker_threads.write();
std::mem::take(&mut *threads)
};
for handle in handles {
if let Err(e) = handle.join() {
otel::error!(
"worker_thread_join_failed",
format!("Failed to join worker thread: {:?}", e)
);
}
}
}
pub fn spawn_workers(self: &Arc<Self>) {
let trace_context =
crate::protocol::otel::TraceContext::from_current_span();
let mut threads = Vec::with_capacity(self.worker_count);
for id in 0..self.worker_count {
let handle =
worker::Worker::spawn(id, self.clone(), trace_context.clone());
threads.push(handle);
}
*self.worker_threads.write() = threads;
}
pub fn queue<F, Fut>(self: &Arc<Self>, task_fn: F, lane: Lane)
where
F: FnOnce(TaskContext<P, T>) -> Fut + Send + 'static,
Fut: Future<Output = Option<RecordWriter<P>>> + Send + 'static,
{
self.queue_task(LaburnumTask::new(
self.clone(),
task_fn,
lane,
ClientId::INTERNAL,
));
}
pub(crate) fn queue_task(&self, task: Arc<LaburnumTask<P, T>>) {
let mut lane_idx = lane_priority(task.lane) as usize;
if lane_idx > 31 {
eprintln!("unable to push task onto queue: lane out of bounds");
return;
}
while lane_idx > 0 {
if let Some(lane) = self.lane_queues.get(lane_idx) {
match lane.push(task.clone()) {
| Ok(_) => {
break;
},
| Err(_) => {
otel::error!(
"scheduler.lane_push_failed",
"lane queue is full",
"lane_idx" = lane_idx as i64
);
},
}
}
lane_idx -= 1;
}
if lane_idx == 0
&& let Some(lane) = self.lane_queues.get(lane_idx)
&& let Err(_) = lane.push(task)
{
otel::error!("scheduler.lowest_lane_push_failed", "lane queue is full");
}
self.notify_workers();
}
pub(crate) fn queue_rpc_task(&self, task: Arc<LaburnumTask<P, T>>) {
use lanes::{RPC_LANE_HIGH_IDX, RPC_LANE_LOW_IDX};
let _guard = self.rpc_rotation_lock.lock();
for to_idx in RPC_LANE_HIGH_IDX..RPC_LANE_LOW_IDX {
let from_idx = to_idx + 1;
while let Ok(t) = self.lane_queues[from_idx].pop() {
let _ = self.lane_queues[to_idx].push(t);
}
}
let _ = self.lane_queues[RPC_LANE_LOW_IDX].push(task);
self.notify_workers();
}
pub(crate) fn add_initial_tasks(self: &Arc<Self>)
where
T: crate::hooks::LaburnumHooks<P, T>,
{
self.queue_task(RpcTask::create(
(*self).clone(),
self.connection.clone(),
ClientId::INTERNAL,
self.server.clone(),
self.shutdown_flag.clone(),
self.config.rpc_response_capacity,
));
if self.config.enable_periodic_gc {
self.queue_task(LaburnumTask::new_with_parent(
self.clone(),
gc::periodic_gc_task(self.shutdown_flag.clone()),
lanes::IDLE_LANE,
None,
ClientId::INTERNAL,
));
}
}
pub fn run(self: &Arc<Self>)
where
T: crate::hooks::LaburnumHooks<P, T>,
{
otel::span!("laburnum.scheduler.run");
self.add_initial_tasks();
self.spawn_workers();
while !self.shutdown_flag.load(Ordering::Acquire) {
std::thread::park_timeout(Duration::from_millis(100));
}
self.notify_workers();
let handles = {
let mut threads = self.worker_threads.write();
std::mem::take(&mut *threads)
};
for handle in handles {
if let Err(e) = handle.join() {
otel::error!(
"worker_thread_join_failed",
format!("Failed to join worker thread: {:?}", e)
);
}
}
}
}
fn sort_keys_to_record_keys(
pk: crate::Ident,
sks: &[String],
) -> Vec<crate::database::RecordKey> {
sks
.iter()
.map(|sk| crate::database::RecordKey::new(pk, sk.clone()))
.collect()
}
impl<P: Partitions, T: LanguageServer<P>> Scheduler<P, T> {
pub fn server(&self) -> Arc<T> {
self.server.clone()
}
pub(crate) fn on_new_chunk(
self: &Arc<Self>,
task_id: crate::Ident,
result: crate::database::CommitResult,
) {
let inserted_count: usize =
result.inserted_keys.values().map(|v| v.len()).sum();
let deleted_count: usize =
result.deleted_keys.values().map(|v| v.len()).sum();
otel::span!(
"laburnum.scheduler.on_new_chunk",
"inserted_keys.count" = inserted_count as i64,
"deleted_keys.count" = deleted_count as i64
);
for pk in result.affected_partition_keys() {
let updated: Vec<String> = result
.inserted_keys
.get(&pk)
.map(|keys| keys.iter().map(|k| k.sort_key().to_owned()).collect())
.unwrap_or_default();
let deleted: Vec<String> = result
.deleted_keys
.get(&pk)
.map(|keys| keys.iter().map(|k| k.sort_key().to_owned()).collect())
.unwrap_or_default();
let scheduler = self.clone();
dispatch_builtin_watcher(pk, updated.clone(), deleted.clone(), {
let scheduler = scheduler.clone();
move |task_pk, filtered_updated, filtered_deleted, handler_fn| {
let scheduler_inner = scheduler.clone();
scheduler.queue_task(LaburnumTask::new_with_parent(
self.clone(),
{
move |mut ctx| async move {
ctx.set_matched_keys(
sort_keys_to_record_keys(task_pk, &filtered_updated),
sort_keys_to_record_keys(task_pk, &filtered_deleted),
);
let mut writer = RecordWriter::new(task_pk);
let mut writer_ctx =
crate::database::PartitionWriteContextRef::new_for_watcher(
&mut writer,
task_pk,
);
let result = handler_fn(&mut ctx, &mut writer_ctx).await;
for follow_up in result.follow_ups {
let sched = scheduler_inner.clone();
scheduler_inner.queue_task(LaburnumTask::new(
sched.clone(),
move |mut ctx| async move {
let mut writer = RecordWriter::new(task_pk);
let mut writer_ctx =
crate::database::PartitionWriteContextRef::new_for_watcher(
&mut writer,
task_pk,
);
(follow_up.task_fn)(&mut ctx, &mut writer_ctx).await;
Some(writer)
},
follow_up.lane,
ClientId::INTERNAL,
));
}
Some(writer)
}
},
lanes::DEFAULT_LANE,
Some(task_id),
ClientId::INTERNAL,
))
}
});
T::dispatch_watcher(pk, updated, deleted, {
move |task_pk, filtered_updated, filtered_deleted, handler_fn| {
let scheduler_inner = scheduler.clone();
scheduler.queue(
move |mut ctx| async move {
ctx.set_matched_keys(
sort_keys_to_record_keys(task_pk, &filtered_updated),
sort_keys_to_record_keys(task_pk, &filtered_deleted),
);
let mut writer = RecordWriter::new(task_pk);
let mut writer_ctx =
crate::database::PartitionWriteContextRef::new_for_watcher(
&mut writer,
task_pk,
);
let result = handler_fn(&mut ctx, &mut writer_ctx).await;
for follow_up in result.follow_ups {
let sched = scheduler_inner.clone();
scheduler_inner.queue_task(LaburnumTask::new(
sched.clone(),
move |mut ctx| async move {
let mut writer = RecordWriter::new(task_pk);
let mut writer_ctx =
crate::database::PartitionWriteContextRef::new_for_watcher(
&mut writer,
task_pk,
);
(follow_up.task_fn)(&mut ctx, &mut writer_ctx).await;
Some(writer)
},
follow_up.lane,
ClientId::INTERNAL,
));
}
Some(writer)
},
lanes::DEFAULT_LANE,
);
}
});
}
}
pub(crate) fn register_active_epoch(&self, epoch: GenerationEpoch) {
let mut epochs = self.active_epochs.lock();
*epochs.entry(epoch).or_insert(0) += 1;
}
pub(crate) fn deregister_active_epoch(&self, epoch: GenerationEpoch) {
let mut epochs = self.active_epochs.lock();
if let Some(count) = epochs.get_mut(&epoch) {
*count -= 1;
if *count == 0 {
epochs.remove(&epoch);
}
}
}
pub(crate) fn oldest_running_epoch(&self) -> GenerationEpoch {
let epochs = self.active_epochs.lock();
epochs
.keys()
.next()
.copied()
.unwrap_or_else(|| self.db.get_current_epoch())
}
fn notify_workers(&self) {
self.worker_threads.read().iter().for_each(|handle| {
handle.thread().unpark();
});
}
}
#[cfg(test)]
pub mod tests;