mod fragment;
use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
use crossbeam_channel::Receiver;
use kwik::file::FileWriter;
use log::error;
use parking_lot::RwLock;
pub use crate::worker::policy::trace::fragment::TraceFragment;
use crate::{
error::CacheError,
worker::{
Worker,
policy::event::{StackEvent, TraceEvent},
},
};
const POLL_DELAY: Duration = Duration::from_secs(1);
pub struct TraceWorker {
listener: Receiver<StackEvent>,
trace_fragments: Arc<RwLock<VecDeque<TraceFragment>>>,
}
impl Worker for TraceWorker {
fn run(&mut self) -> Result<(), CacheError> {
loop {
let events = self.listener.try_iter().collect::<Vec<_>>();
if !events.is_empty() {
self.refresh_fragments()?;
let mut should_flush = false;
for event in events {
if matches!(event, StackEvent::Wipe) {
self.trace_fragments.write().clear();
self.refresh_fragments()?;
}
if let Some(event) = TraceEvent::maybe_from_stack_event(&event) {
let fragments = self.trace_fragments.read();
let Some(fragment) = fragments.back() else {
error!("No trace fragment found");
return Err(CacheError::Internal);
};
let mut modifiers = fragment.lock();
let writer = &mut modifiers.1;
if let Err(err) = writer.write_chunk(&event) {
error!("Could not write to trace fragment: {err:?}");
return Err(CacheError::Internal);
}
should_flush = true;
}
}
if should_flush {
let fragments = self.trace_fragments.read();
let Some(fragment) = fragments.back() else {
error!("No trace fragment found");
return Err(CacheError::Internal);
};
let mut modifiers = fragment.lock();
let writer = &mut modifiers.1;
if let Err(err) = writer.flush() {
error!("Could not flush trace fragment: {err:?}");
return Err(CacheError::Internal);
}
}
}
thread::sleep(POLL_DELAY);
}
}
}
impl TraceWorker {
pub fn new(
listener: Receiver<StackEvent>,
trace_fragments: Arc<RwLock<VecDeque<TraceFragment>>>,
) -> Self {
TraceWorker {
listener,
trace_fragments,
}
}
fn refresh_fragments(&mut self) -> Result<(), CacheError> {
while self
.trace_fragments
.read()
.front()
.is_some_and(|fragment| fragment.is_expired())
{
self.trace_fragments.write().pop_front();
}
if self
.trace_fragments
.read()
.back()
.is_some_and(|fragment| fragment.is_valid())
{
return Ok(());
}
let fragment = match TraceFragment::new() {
Ok(fragment) => fragment,
Err(err) => {
error!("Could not create trace fragment: {err:?}");
return Err(CacheError::Internal);
},
};
self.trace_fragments.write().push_back(fragment);
Ok(())
}
}
unsafe impl Send for TraceWorker {}