use crossbeam_channel::Sender;
use crossbeam_utils::sync::{Parker, Unparker};
use crossbeam_utils::Backoff;
use log::{debug, error, info, warn};
use std::fs;
use std::io::{Error, ErrorKind};
use std::path::Path;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
pub fn read_file(path: &Path, filename: &Path, iters: Option<u32>) -> Result<Vec<u8>, Error> {
let fpath = path.join(filename);
let mut iters = iters.unwrap_or(100);
let ten_millis = Duration::from_millis(10);
while !Path::exists(&fpath) && iters > 0 {
debug!("Waiting for {:?}", &fpath);
sleep(ten_millis);
if !Path::exists(path) {
debug!("Job directory {:?} no longer exists", &path);
return Err(Error::new(
ErrorKind::NotFound,
format!("Job directory {:?} no longer exists", &path),
));
}
iters -= 1;
}
match iters {
0 => {
warn!("Timeout waiting for {:?} to appear", &fpath);
Err(Error::new(
ErrorKind::NotFound,
format!("File {:?} did not appear after waiting 1s", &fpath),
))
}
_ => fs::read(&fpath),
}
}
pub fn register_signal_handler(signal: i32, unparker: &Unparker, notification: &Arc<AtomicBool>) {
info!("Registering signal handler for signal {}", signal);
let u1 = unparker.clone();
let n1 = Arc::clone(notification);
unsafe {
if let Err(e) = signal_hook::low_level::register(signal, move || {
info!("Received signal {}", signal);
n1.store(true, SeqCst);
u1.unpark()
}) {
error!("Cannot register signal {}: {:?}", signal, e);
exit(1);
}
};
}
pub fn signal_handler_atomic(sender: &Sender<bool>, sig: Arc<AtomicBool>, p: &Parker) {
let backoff = Backoff::new();
while !sig.load(SeqCst) {
if backoff.is_completed() {
p.park();
} else {
backoff.snooze();
}
}
for _ in 0..20 {
sender.send(true).unwrap();
}
info!("Sent 20 notifications");
}
#[cfg(test)]
mod tests {
use crossbeam::channel::{Receiver, Sender};
use crossbeam_channel::bounded;
use crossbeam_utils::sync::Parker;
use std::fs;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tempfile::tempdir;
use super::*;
#[test]
fn test_read_file_existing_file() {
let temp_dir = tempdir().expect("Failed to create temporary directory");
let file_path = temp_dir.path().join("test_file.txt");
fs::write(&file_path, b"test contents").expect("Failed to write to test file");
let result = read_file(temp_dir.path(), &Path::new("test_file.txt"), None);
assert!(result.is_ok());
assert_eq!(result.unwrap(), b"test contents");
}
#[test]
fn test_read_file_nonexistent_file() {
let temp_dir = tempdir().expect("Failed to create temporary directory");
let result = read_file(temp_dir.path(), &Path::new("nonexistent_file.txt"), Some(1));
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
format!(
"File \"{}/nonexistent_file.txt\" did not appear after waiting 1s",
temp_dir.path().display()
)
);
}
#[test]
fn test_register_signal_handler() {
let unparker = Parker::new();
let notification = Arc::new(AtomicBool::new(false));
register_signal_handler(1, &unparker.unparker(), ¬ification);
std::thread::sleep(Duration::from_millis(100));
unsafe {
libc::raise(1); }
std::thread::sleep(std::time::Duration::from_millis(100));
unparker.unparker().unpark();
assert!(notification.load(Ordering::SeqCst));
}
#[test]
fn test_signal_handler_atomic() {
let (sender, receiver): (Sender<bool>, Receiver<bool>) = bounded(20);
let sig = Arc::new(AtomicBool::new(false));
let parker = Parker::new();
let unparker = parker.unparker().clone();
let sig_clone = sig.clone();
let sender_clone = sender.clone();
let handle = std::thread::spawn(move || {
signal_handler_atomic(&sender_clone, sig_clone, &parker);
});
std::thread::sleep(std::time::Duration::from_millis(100)); sig.store(true, Ordering::SeqCst);
unparker.unpark();
handle.join().unwrap();
let mut count = 0;
while let Ok(_) = receiver.try_recv() {
count += 1;
}
assert_eq!(count, 20, "Expected 20 messages to be sent");
}
}