pub mod flush;
pub mod kick_buckets;
pub mod ping_validator;
pub mod update_statistics;
use super::*;
impl_veilid_log_facility!("rtab");
impl RoutingTable {
pub fn setup_tasks(&self) {
impl_setup_task_async!(self, Self, flush_task, flush_task_routine);
impl_setup_task!(
self,
Self,
rolling_transfers_task,
rolling_transfers_task_routine
);
impl_setup_task!(
self,
Self,
update_state_stats_task,
update_state_stats_task_routine
);
impl_setup_task!(
self,
Self,
rolling_answers_task,
rolling_answers_task_routine
);
impl_setup_task!(self, Self, kick_buckets_task, kick_buckets_task_routine);
}
pub async fn tick_event_handler(&self, _evt: Arc<TickEvent>) {
if let Err(e) = self.tick().await {
error!("Error in routing table tick: {}", e);
}
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", name = "RoutingTable::tick", skip_all, err, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_startup_guard) = self.startup_context.startup_lock.enter() else {
return Ok(());
};
let Some(_tick_guard) = self.critical_sections.try_lock_tag(LOCK_TAG_TICK) else {
return Ok(());
};
self.flush_task.tick().await?;
self.rolling_transfers_task.tick().await?;
self.update_state_stats_task.tick().await?;
self.rolling_answers_task.tick().await?;
let kick_bucket_queue_count = self.kick_queue.lock().len();
if kick_bucket_queue_count > 0 {
self.kick_buckets_task.tick().await?;
}
self.refresh_summaries(RoutingDomainSet::empty());
for rdc in self.get_routing_domain_controllers(RoutingDomainSet::all()) {
rdc.tick().await?;
}
Ok(())
}
pub async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
self.critical_sections.lock_tag(LOCK_TAG_TICK).await
}
pub async fn cancel_tasks(&self) {
veilid_log!(self debug "stopping flush task");
if let Err(e) = self.flush_task.stop().await {
veilid_log!(self warn "flush_task not stopped: {}", e);
}
veilid_log!(self debug "stopping rolling transfers task");
if let Err(e) = self.rolling_transfers_task.stop().await {
veilid_log!(self warn "rolling_transfers_task not stopped: {}", e);
}
veilid_log!(self debug "stopping update state stats task");
if let Err(e) = self.update_state_stats_task.stop().await {
veilid_log!(self warn "update_state_stats_task not stopped: {}", e);
}
veilid_log!(self debug "stopping rolling answers task");
if let Err(e) = self.rolling_answers_task.stop().await {
veilid_log!(self warn "rolling_answers_task not stopped: {}", e);
}
veilid_log!(self debug "stopping kick buckets task");
if let Err(e) = self.kick_buckets_task.stop().await {
veilid_log!(self warn "kick_buckets_task not stopped: {}", e);
}
veilid_log!(self debug "stopping routing domain controller task");
for rdc in self.get_routing_domain_controllers(RoutingDomainSet::all()) {
rdc.cancel_tasks().await;
}
}
}