use crossbeam_channel::Sender;
use crossbeam_utils::sync::{Parker, Unparker};
use crossbeam_utils::Backoff;
use log::{debug, error, info, warn};
use std::fs;
use std::io::{Error, ErrorKind};
use std::path::Path;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
pub fn read_file(path: &Path, filename: &Path, iters: Option<u32>) -> Result<Vec<u8>, Error> {
let fpath = path.join(filename);
let mut iters = iters.unwrap_or(100);
let ten_millis = Duration::from_millis(10);
while !Path::exists(&fpath) && iters > 0 {
debug!("Waiting for {:?}", &fpath);
sleep(ten_millis);
if !Path::exists(&path) {
debug!("Job directory {:?} no longer exists", &path);
return Err(Error::new(
ErrorKind::NotFound,
format!("Job directory {:?} no longer exists", &path),
));
}
iters -= 1;
}
match iters {
0 => {
warn!("Timeout waiting for {:?} to appear", &fpath);
Err(Error::new(
ErrorKind::NotFound,
format!("File {:?} did not appear after waiting 1s", &fpath),
))
}
_ => fs::read(&fpath),
}
}
pub fn register_signal_handler(signal: i32, unparker: &Unparker, notification: &Arc<AtomicBool>) {
info!("Registering signal handler for signal {}", signal);
let u1 = unparker.clone();
let n1 = Arc::clone(¬ification);
unsafe {
if let Err(e) = signal_hook::low_level::register(signal, move || {
info!("Received signal {}", signal);
n1.store(true, SeqCst);
u1.unpark()
}) {
error!("Cannot register signal {}: {:?}", signal, e);
exit(1);
}
};
}
pub fn signal_handler_atomic(sender: &Sender<bool>, sig: Arc<AtomicBool>, p: &Parker) {
let backoff = Backoff::new();
while !sig.load(SeqCst) {
if backoff.is_completed() {
p.park();
} else {
backoff.snooze();
}
}
for _ in 0..20 {
sender.send(true).unwrap();
}
info!("Sent 20 notifications");
}