#![allow(missing_docs)]
use crate::runtime::epoch_gc::{CleanupWork, EpochGC};
use crate::types::{RegionId, TaskId};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct EpochGCIntegrationConfig {
pub enable_obligation_gc: bool,
pub enable_waker_gc: bool,
pub enable_region_gc: bool,
pub enable_timer_gc: bool,
pub enable_channel_gc: bool,
pub fallback_timeout: Duration,
pub enable_integration_logging: bool,
pub enable_metrics: bool,
}
impl Default for EpochGCIntegrationConfig {
fn default() -> Self {
Self {
enable_obligation_gc: true,
enable_waker_gc: true,
enable_region_gc: true,
enable_timer_gc: true,
enable_channel_gc: true,
fallback_timeout: Duration::from_millis(100),
enable_integration_logging: false,
enable_metrics: true,
}
}
}
impl EpochGCIntegrationConfig {
#[must_use]
pub fn disabled() -> Self {
Self {
enable_obligation_gc: false,
enable_waker_gc: false,
enable_region_gc: false,
enable_timer_gc: false,
enable_channel_gc: false,
fallback_timeout: Duration::from_millis(100),
enable_integration_logging: false,
enable_metrics: false,
}
}
#[must_use]
pub fn enable_all(mut self) -> Self {
self.enable_obligation_gc = true;
self.enable_waker_gc = true;
self.enable_region_gc = true;
self.enable_timer_gc = true;
self.enable_channel_gc = true;
self
}
#[must_use]
pub fn disable_all(mut self) -> Self {
self.enable_obligation_gc = false;
self.enable_waker_gc = false;
self.enable_region_gc = false;
self.enable_timer_gc = false;
self.enable_channel_gc = false;
self
}
#[must_use]
pub fn with_logging(mut self) -> Self {
self.enable_integration_logging = true;
self
}
}
pub trait EpochCleanupIntegration {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork>;
fn direct_cleanup_fallback(&self, work: CleanupWork);
fn is_epoch_gc_enabled(&self) -> bool;
}
pub struct ObligationTableEpochGC {
epoch_gc: Option<Arc<EpochGC>>,
config: EpochGCIntegrationConfig,
}
impl ObligationTableEpochGC {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self { epoch_gc, config }
}
pub fn cleanup_obligation(&self, obligation_id: u64, metadata: Vec<u8>) {
if !self.config.enable_obligation_gc {
self.direct_cleanup_obligation(obligation_id, &metadata);
return;
}
let work = CleanupWork::Obligation {
id: obligation_id,
metadata,
};
match self.try_defer_cleanup(work) {
Ok(()) =>
{
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
obligation_id = obligation_id,
"Deferred obligation cleanup to epoch GC"
);
}
}
Err(work) => {
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::warn!(
obligation_id = obligation_id,
"Failed to defer obligation cleanup, using direct cleanup"
);
}
self.direct_cleanup_fallback(work);
}
}
}
fn direct_cleanup_obligation(&self, obligation_id: u64, metadata: &[u8]) {
#[cfg(not(feature = "tracing-integration"))]
let _ = (obligation_id, metadata);
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
obligation_id = obligation_id,
metadata_size = metadata.len(),
"Direct obligation cleanup"
);
}
}
}
impl EpochCleanupIntegration for ObligationTableEpochGC {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.defer_cleanup(work)
} else {
Err(work)
}
}
fn direct_cleanup_fallback(&self, work: CleanupWork) {
if let CleanupWork::Obligation { id, metadata } = work {
self.direct_cleanup_obligation(id, &metadata);
}
}
fn is_epoch_gc_enabled(&self) -> bool {
self.config.enable_obligation_gc && self.epoch_gc.is_some()
}
}
pub struct IODriverWakerEpochGC {
epoch_gc: Option<Arc<EpochGC>>,
config: EpochGCIntegrationConfig,
}
impl IODriverWakerEpochGC {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self { epoch_gc, config }
}
pub fn cleanup_waker(&self, waker_id: u64, source: impl Into<String>) {
if !self.config.enable_waker_gc {
self.direct_cleanup_waker(waker_id, &source.into());
return;
}
let work = CleanupWork::WakerCleanup {
waker_id,
source: source.into(),
};
match self.try_defer_cleanup(work) {
Ok(()) =>
{
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(waker_id = waker_id, "Deferred waker cleanup to epoch GC");
}
}
Err(work) => {
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::warn!(
waker_id = waker_id,
"Failed to defer waker cleanup, using direct cleanup"
);
}
self.direct_cleanup_fallback(work);
}
}
}
fn direct_cleanup_waker(&self, waker_id: u64, source: &str) {
#[cfg(not(feature = "tracing-integration"))]
let _ = waker_id;
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(waker_id = waker_id, source = source, "Direct waker cleanup");
}
match source {
"epoll" => {
}
"kqueue" => {
}
"iocp" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(source = source, "Unknown waker source for direct cleanup");
}
}
}
}
impl EpochCleanupIntegration for IODriverWakerEpochGC {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.defer_cleanup(work)
} else {
Err(work)
}
}
fn direct_cleanup_fallback(&self, work: CleanupWork) {
if let CleanupWork::WakerCleanup { waker_id, source } = work {
self.direct_cleanup_waker(waker_id, &source);
}
}
fn is_epoch_gc_enabled(&self) -> bool {
self.config.enable_waker_gc && self.epoch_gc.is_some()
}
}
pub struct RegionStateEpochGC {
epoch_gc: Option<Arc<EpochGC>>,
config: EpochGCIntegrationConfig,
}
impl RegionStateEpochGC {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self { epoch_gc, config }
}
pub fn cleanup_region(&self, region_id: RegionId, task_ids: Vec<TaskId>) {
if !self.config.enable_region_gc {
self.direct_cleanup_region(region_id, &task_ids);
return;
}
let work = CleanupWork::RegionCleanup {
region_id,
task_ids,
};
match self.try_defer_cleanup(work) {
Ok(()) =>
{
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
region_id = region_id.as_u64(),
"Deferred region cleanup to epoch GC"
);
}
}
Err(work) => {
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::warn!(
region_id = region_id.as_u64(),
"Failed to defer region cleanup, using direct cleanup"
);
}
self.direct_cleanup_fallback(work);
}
}
}
fn direct_cleanup_region(&self, region_id: RegionId, task_ids: &[TaskId]) {
#[cfg(not(feature = "tracing-integration"))]
let _ = region_id;
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
region_id = region_id.as_u64(),
task_count = task_ids.len(),
"Direct region cleanup"
);
}
for &task_id in task_ids {
let _ = task_id;
}
}
}
impl EpochCleanupIntegration for RegionStateEpochGC {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.defer_cleanup(work)
} else {
Err(work)
}
}
fn direct_cleanup_fallback(&self, work: CleanupWork) {
if let CleanupWork::RegionCleanup {
region_id,
task_ids,
} = work
{
self.direct_cleanup_region(region_id, &task_ids);
}
}
fn is_epoch_gc_enabled(&self) -> bool {
self.config.enable_region_gc && self.epoch_gc.is_some()
}
}
pub struct TimerEpochGC {
epoch_gc: Option<Arc<EpochGC>>,
config: EpochGCIntegrationConfig,
}
impl TimerEpochGC {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self { epoch_gc, config }
}
pub fn cleanup_timer(&self, timer_id: u64, timer_type: impl Into<String>) {
if !self.config.enable_timer_gc {
self.direct_cleanup_timer(timer_id, &timer_type.into());
return;
}
let work = CleanupWork::TimerCleanup {
timer_id,
timer_type: timer_type.into(),
};
match self.try_defer_cleanup(work) {
Ok(()) =>
{
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(timer_id = timer_id, "Deferred timer cleanup to epoch GC");
}
}
Err(work) => {
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::warn!(
timer_id = timer_id,
"Failed to defer timer cleanup, using direct cleanup"
);
}
self.direct_cleanup_fallback(work);
}
}
}
fn direct_cleanup_timer(&self, timer_id: u64, timer_type: &str) {
#[cfg(not(feature = "tracing-integration"))]
let _ = timer_id;
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
timer_id = timer_id,
timer_type = timer_type,
"Direct timer cleanup"
);
}
match timer_type {
"sleep" => {
}
"timeout" => {
}
"interval" => {
}
"deadline" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(
timer_type = timer_type,
"Unknown timer type for direct cleanup"
);
}
}
}
}
impl EpochCleanupIntegration for TimerEpochGC {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.defer_cleanup(work)
} else {
Err(work)
}
}
fn direct_cleanup_fallback(&self, work: CleanupWork) {
if let CleanupWork::TimerCleanup {
timer_id,
timer_type,
} = work
{
self.direct_cleanup_timer(timer_id, &timer_type);
}
}
fn is_epoch_gc_enabled(&self) -> bool {
self.config.enable_timer_gc && self.epoch_gc.is_some()
}
}
pub struct ChannelEpochGC {
epoch_gc: Option<Arc<EpochGC>>,
config: EpochGCIntegrationConfig,
}
impl ChannelEpochGC {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self { epoch_gc, config }
}
pub fn cleanup_channel(&self, channel_id: u64, cleanup_type: impl Into<String>, data: Vec<u8>) {
if !self.config.enable_channel_gc {
self.direct_cleanup_channel(channel_id, &cleanup_type.into(), &data);
return;
}
let work = CleanupWork::ChannelCleanup {
channel_id,
cleanup_type: cleanup_type.into(),
data,
};
match self.try_defer_cleanup(work) {
Ok(()) =>
{
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
channel_id = channel_id,
"Deferred channel cleanup to epoch GC"
);
}
}
Err(work) => {
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::warn!(
channel_id = channel_id,
"Failed to defer channel cleanup, using direct cleanup"
);
}
self.direct_cleanup_fallback(work);
}
}
}
fn direct_cleanup_channel(&self, channel_id: u64, cleanup_type: &str, data: &[u8]) {
#[cfg(not(feature = "tracing-integration"))]
let _ = (channel_id, data);
#[cfg(feature = "tracing-integration")]
if self.config.enable_integration_logging {
tracing::debug!(
channel_id = channel_id,
cleanup_type = cleanup_type,
data_size = data.len(),
"Direct channel cleanup"
);
}
match cleanup_type {
"waker" => {
}
"buffer" => {
}
"mpsc_sender" | "mpsc_receiver" => {
}
"oneshot" => {
}
"broadcast" => {
}
"watch" => {
}
"session" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(cleanup_type = cleanup_type, "Unknown channel cleanup type");
}
}
}
}
impl EpochCleanupIntegration for ChannelEpochGC {
fn try_defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.defer_cleanup(work)
} else {
Err(work)
}
}
fn direct_cleanup_fallback(&self, work: CleanupWork) {
if let CleanupWork::ChannelCleanup {
channel_id,
cleanup_type,
data,
} = work
{
self.direct_cleanup_channel(channel_id, &cleanup_type, &data);
}
}
fn is_epoch_gc_enabled(&self) -> bool {
self.config.enable_channel_gc && self.epoch_gc.is_some()
}
}
pub struct RuntimeEpochGCIntegration {
pub obligation_gc: ObligationTableEpochGC,
pub waker_gc: IODriverWakerEpochGC,
pub region_gc: RegionStateEpochGC,
pub timer_gc: TimerEpochGC,
pub channel_gc: ChannelEpochGC,
epoch_gc: Option<Arc<EpochGC>>,
}
impl RuntimeEpochGCIntegration {
#[must_use]
pub fn new(epoch_gc: Option<Arc<EpochGC>>, config: EpochGCIntegrationConfig) -> Self {
Self {
obligation_gc: ObligationTableEpochGC::new(epoch_gc.clone(), config.clone()),
waker_gc: IODriverWakerEpochGC::new(epoch_gc.clone(), config.clone()),
region_gc: RegionStateEpochGC::new(epoch_gc.clone(), config.clone()),
timer_gc: TimerEpochGC::new(epoch_gc.clone(), config.clone()),
channel_gc: ChannelEpochGC::new(epoch_gc.clone(), config),
epoch_gc,
}
}
#[must_use]
pub fn disabled() -> Self {
let config = EpochGCIntegrationConfig::disabled();
Self::new(None, config)
}
#[must_use]
pub fn try_advance_epoch(&self) -> usize {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.try_advance_and_cleanup()
} else {
0
}
}
#[cfg(test)]
pub fn force_advance_epoch(&self) -> usize {
if let Some(ref epoch_gc) = self.epoch_gc {
epoch_gc.force_advance_and_cleanup()
} else {
0
}
}
#[must_use]
pub fn stats(&self) -> Option<&crate::runtime::epoch_gc::CleanupStats> {
self.epoch_gc.as_ref().map(|gc| gc.stats())
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.epoch_gc.is_some()
}
#[must_use]
pub fn is_near_capacity(&self) -> bool {
self.epoch_gc
.as_ref()
.is_some_and(|gc| gc.is_cleanup_queue_near_capacity())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_integration_config_defaults() {
let config = EpochGCIntegrationConfig::default();
assert!(config.enable_obligation_gc);
assert!(config.enable_waker_gc);
assert!(config.enable_region_gc);
assert!(config.enable_timer_gc);
assert!(config.enable_channel_gc);
}
#[test]
fn test_disabled_integration() {
let integration = RuntimeEpochGCIntegration::disabled();
assert!(!integration.is_enabled());
assert!(!integration.obligation_gc.is_epoch_gc_enabled());
assert_eq!(integration.try_advance_epoch(), 0);
}
#[test]
fn test_enabled_integration() {
let epoch_gc = Arc::new(crate::runtime::epoch_gc::EpochGC::new());
let config = EpochGCIntegrationConfig::default();
let integration = RuntimeEpochGCIntegration::new(Some(epoch_gc), config);
assert!(integration.is_enabled());
assert!(integration.obligation_gc.is_epoch_gc_enabled());
assert!(integration.waker_gc.is_epoch_gc_enabled());
assert!(integration.region_gc.is_epoch_gc_enabled());
}
#[test]
fn test_obligation_cleanup_integration() {
let epoch_gc = Arc::new(crate::runtime::epoch_gc::EpochGC::new());
let config = EpochGCIntegrationConfig::default();
let obligation_gc = ObligationTableEpochGC::new(Some(epoch_gc), config);
obligation_gc.cleanup_obligation(123, vec![1, 2, 3]);
assert!(obligation_gc.is_epoch_gc_enabled());
}
#[test]
fn test_fallback_when_epoch_gc_disabled() {
let config = EpochGCIntegrationConfig::default();
let obligation_gc = ObligationTableEpochGC::new(None, config);
obligation_gc.cleanup_obligation(456, vec![]);
assert!(!obligation_gc.is_epoch_gc_enabled());
}
}