pub mod collect;
pub mod collector;
pub mod flush;
mod perf_table;
pub mod system_info;
use crate::cli::CollectionOptions;
use crate::collection::collect::WorkingBuffers;
use crate::collection::collector::Collector;
use crate::collection::flush::FlushLog;
use crate::collection::perf_table::TableMetadata;
use crate::shared::{CollectionEvent, CollectionMethod, IntervalWorkerContext};
use crate::shell::Shell;
use crate::timer::{Stoppable, Timer};
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::Path;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread;
const EVENT_BUFFER_LENGTH: usize = 8 * 1024;
struct CollectStatus {
terminating: bool,
collecting: bool,
}
type CollectorMap = Arc<Mutex<HashMap<String, RefCell<Collector>>>>;
#[allow(clippy::too_many_lines)]
pub fn run(
rx: &Receiver<CollectionEvent>,
context: IntervalWorkerContext,
options: &CollectionOptions,
) {
let location = &options.directory;
let buffer_size = usize::try_from(options.buffer_size.get_bytes()).unwrap();
context.shell.status(
"Beginning",
format!(
"statistics collection with {} interval",
humantime::Duration::from(context.interval)
),
);
let (timer, stop_handle) = Timer::new(context.interval, "collect");
let collectors: CollectorMap = Arc::new(Mutex::new(HashMap::new()));
let table_metadata = Arc::new(collect::get_table_metadata());
let flush_log = match &options.flush_log {
Some(log_path) => Some(Arc::new(Mutex::new(FlushLog::new(
log_path,
EVENT_BUFFER_LENGTH,
)))),
None => None,
};
let status_mutex = Arc::new(Mutex::new(CollectStatus {
terminating: false,
collecting: false,
}));
let collectors_c = Arc::clone(&collectors);
let status_mutex_c = Arc::clone(&status_mutex);
let shell_c = Arc::clone(&context.shell);
let flush_log_c = flush_log.as_ref().map(|c| Arc::clone(c));
let stop_handle_c = stop_handle.clone();
let mut term_rx = context.term_rx;
thread::Builder::new()
.name(String::from("collect-term"))
.spawn(move || {
term_rx.recv().unwrap();
let mut status = status_mutex_c.lock().unwrap();
match status.collecting {
true => {
status.terminating = true;
shell_c.verbose(|sh| {
sh.info(
"Currently collecting: stopping & flushing buffers at the end of the \
next collector tick",
)
});
},
false => {
shell_c.verbose(|sh| {
sh.info("Currently yielding: stopping & flushing buffers right now")
});
let collectors = collectors_c.lock().unwrap();
flush_buffers(&collectors, &shell_c, flush_log_c);
stop_handle_c.stop();
},
}
})
.unwrap();
let mut working_buffers = WorkingBuffers::new();
for _ in timer {
let mut status = status_mutex.lock().unwrap();
if status.terminating {
continue;
}
status.collecting = true;
drop(status);
let mut collectors = collectors.lock().unwrap();
let flush_log_ref = flush_log.as_ref().map(|r| Arc::clone(r));
for event in rx.try_iter() {
handle_event(
event,
&mut collectors,
location,
buffer_size,
&table_metadata,
&flush_log_ref,
&context.shell,
);
}
for (id, c) in collectors.iter() {
let mut collector = c.borrow_mut();
match collector.collect(&mut working_buffers) {
Ok(_) => (),
Err(err) => {
context.shell.error(format!(
"Could not run collector for target {}: {}",
id, err
));
},
};
}
let mut status = status_mutex.lock().unwrap();
if status.terminating {
context.shell.verbose(|sh| {
sh.info(
"Received termination flag from term handler thread; stopping and flushing \
buffers now",
)
});
let flush_log_ref = flush_log.map(|r| Arc::clone(&r));
flush_buffers(&collectors, &context.shell, flush_log_ref);
stop_handle.stop();
break;
} else {
status.collecting = false;
}
}
}
fn flush_buffers(
collectors: &HashMap<String, RefCell<Collector>>,
shell: &Arc<Shell>,
flush_log_option: Option<Arc<Mutex<FlushLog>>>,
) {
shell.status("Stopping", "collecting and flushing buffers");
for (id, c) in collectors.iter() {
let mut collector = c.borrow_mut();
if let Err(err) = collector.writer.flush() {
shell.warn(format!(
"Could not flush buffer on termination for target {}: {}",
id, err
));
}
}
if let Some(flush_log_lock) = flush_log_option {
let mut flush_log = flush_log_lock.lock().unwrap();
let path_str = flush_log
.path
.clone()
.into_os_string()
.into_string()
.ok()
.unwrap_or_else(|| String::from("~"));
match flush_log.write() {
Ok(count) => shell.info(format!(
"Wrote {} buffer flush events to {}",
count, path_str
)),
Err(err) => shell.warn(format!(
"Could not write buffer flush events to {}: {}",
path_str, err
)),
}
}
}
fn handle_event(
event: CollectionEvent,
collectors: &mut HashMap<String, RefCell<Collector>>,
logs_location: &Path,
buffer_capacity: usize,
table_metadata: &Arc<TableMetadata>,
flush_log: &Option<Arc<Mutex<FlushLog>>>,
shell: &Shell,
) {
match event {
CollectionEvent::Start { target, method } => {
shell.verbose(|sh| {
sh.info(format!(
"Received start event for target '{}' from the collection thread",
target.name
))
});
match method {
CollectionMethod::LinuxCgroups(path) => {
let id = target.id.clone();
let flush_log_c = flush_log.as_ref().map(|r| Arc::clone(r));
match Collector::create(
logs_location,
target,
&path,
buffer_capacity,
&Arc::clone(table_metadata),
flush_log_c,
) {
Ok(new_collector) => {
collectors.insert(id, RefCell::new(new_collector));
},
Err(err) => {
shell.error(format!(
"Could not initialize collector for target id {}: {}",
id, err
));
},
}
},
}
},
CollectionEvent::Stop(id) => {
shell.verbose(|sh| {
sh.info(format!(
"Received stop event for target '{}' from the collection thread",
collectors
.get(&id)
.map(|c| c.borrow().target.name.clone())
.as_ref()
.unwrap_or(&id)
))
});
let collector = collectors.remove(&id);
drop(collector);
},
}
}