use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use crate::observability::{CountersFile, COUNTERS_FILE_RESERVED_BYTES};
use std::ptr::NonNull;
#[derive(Debug, Clone)]
pub struct AggregatorConfig {
pub interval: Duration,
pub metric_suffix: Option<String>,
}
impl Default for AggregatorConfig {
fn default() -> Self {
Self {
interval: Duration::from_millis(100),
metric_suffix: None,
}
}
}
pub struct AggregatorHandle {
stop: Arc<Mutex<bool>>,
join: Option<JoinHandle<()>>,
_file_keeper: Option<Arc<CountersFile>>,
}
impl AggregatorHandle {
pub unsafe fn spawn(file: &CountersFile, config: AggregatorConfig) -> Self {
let stop = Arc::new(Mutex::new(false));
let stop_for_worker = Arc::clone(&stop);
let addr = file.base_address();
let interval = config.interval;
let suffix = config.metric_suffix.clone();
let join = std::thread::Builder::new()
.name("disruptor-mp-aggregator".into())
.spawn(move || worker_loop(addr, interval, suffix, stop_for_worker))
.expect("spawn aggregator thread");
Self {
stop,
join: Some(join),
_file_keeper: None,
}
}
#[must_use]
pub fn spawn_arc(file: Arc<CountersFile>, config: AggregatorConfig) -> Self {
let mut handle = unsafe { Self::spawn(&file, config) };
handle._file_keeper = Some(file);
handle
}
pub fn stop(&self) {
let mut guard = self.stop.lock().unwrap();
*guard = true;
}
}
impl Drop for AggregatorHandle {
fn drop(&mut self) {
self.stop();
if let Some(handle) = self.join.take() {
let _ = handle.join();
}
}
}
fn worker_loop(addr: usize, interval: Duration, suffix: Option<String>, stop: Arc<Mutex<bool>>) {
let ptr = match NonNull::new(addr as *mut u8) {
Some(p) => p,
None => return,
};
let file = match unsafe { CountersFile::attach(ptr) } {
Ok(f) => f,
Err(_) => return,
};
loop {
if *stop.lock().unwrap() {
break;
}
for c in file.snapshot() {
let name = match &suffix {
Some(s) => format!("disruptor_mp_{}{}", c.label, s),
None => format!("disruptor_mp_{}", c.label),
};
metrics::counter!(name).absolute(c.value);
}
std::thread::sleep(interval);
}
for c in file.snapshot() {
let name = match &suffix {
Some(s) => format!("disruptor_mp_{}{}", c.label, s),
None => format!("disruptor_mp_{}", c.label),
};
metrics::counter!(name).absolute(c.value);
}
let _ = COUNTERS_FILE_RESERVED_BYTES; }
impl CountersFile {
pub(crate) fn base_address(&self) -> usize {
self.base.as_ptr() as usize
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::observability::{
ids, AggregatorConfig, AggregatorHandle, CountersFile, COUNTERS_FILE_RESERVED_BYTES,
COUNTER_FLAG_PRODUCER,
};
#[repr(C, align(64))]
struct AlignedBuf([u8; COUNTERS_FILE_RESERVED_BYTES]);
#[test]
fn spawn_arc_keeps_file_alive_for_worker() {
let buf: Box<AlignedBuf> = Box::new(AlignedBuf([0u8; COUNTERS_FILE_RESERVED_BYTES]));
let leaked = Box::leak(buf);
let ptr = std::ptr::NonNull::new(leaked.0.as_mut_ptr()).expect("non-null leaked ptr");
let file = unsafe { CountersFile::init(ptr) };
let arc = Arc::new(file);
let h = arc
.register(ids::EVENTS_PUBLISHED, COUNTER_FLAG_PRODUCER, "test_arc")
.expect("first slot");
h.inc();
h.inc();
h.inc();
let handle = AggregatorHandle::spawn_arc(
Arc::clone(&arc),
AggregatorConfig {
interval: Duration::from_millis(5),
metric_suffix: None,
},
);
std::thread::sleep(Duration::from_millis(30));
drop(handle);
let snap = arc.snapshot();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].value, 3);
assert_eq!(snap[0].label, "test_arc");
}
}