use failure::Fallible;
use nix::errno::Errno;
use nix::unistd;
use nix::sys::{self, signal};
use signal_hook;
use std::cmp;
use std::fs;
use std::io::{self, Read};
use std::os::unix::io as unix_io;
use std::path::{Path, PathBuf};
use std::process;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::time;
use std::thread;
fn nix_to_io_error(err: nix::Error) -> io::Error {
match err {
nix::Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
e => panic!("Did not expect to get an error without an errno from a nix call: {:?}", e),
}
}
pub struct ShareableFile {
fd: unix_io::RawFd,
watchers: Vec<unix_io::RawFd>,
closed: Arc<AtomicBool>,
}
impl ShareableFile {
pub fn from(file: fs::File) -> ShareableFile {
use std::os::unix::io::IntoRawFd;
ShareableFile {
fd: file.into_raw_fd(),
watchers: vec!(),
closed: Arc::from(AtomicBool::new(false)),
}
}
pub fn reader(&mut self) -> io::Result<ShareableFileReader> {
let (notifier, watcher) = unistd::pipe().map_err(nix_to_io_error)?;
self.watchers.push(watcher);
Ok(ShareableFileReader {
fd: self.fd,
notifier: notifier,
closed: self.closed.clone(),
})
}
}
impl Drop for ShareableFile {
fn drop(&mut self) {
debug!("Closing ShareableFile with fd {}", self.fd);
self.closed.store(true, Ordering::SeqCst);
for watcher in &self.watchers {
if let Err(e) = unistd::write(*watcher, &[0]) {
if e.as_errno().expect("Must have been a system error") != Errno::EPIPE {
warn!("Failed to tell ShareableFileReader with handle {} of close: {}",
*watcher, e)
}
}
if let Err(e) = unistd::close(*watcher) {
warn!("Failed to close pipe write end with handle {}: {}", *watcher, e)
}
}
if let Err(e) = unistd::close(self.fd) {
warn!("Failed to close fd {}: {}", self.fd, e);
}
}
}
pub struct ShareableFileReader {
fd: unix_io::RawFd,
notifier: unix_io::RawFd,
closed: Arc<AtomicBool>,
}
impl Drop for ShareableFileReader {
fn drop(&mut self) {
if let Err(e) = unistd::close(self.notifier) {
warn!("Failed to close fd {}: {}", self.notifier, e);
}
}
}
impl Read for ShareableFileReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut read_set = sys::select::FdSet::new();
read_set.insert(self.fd);
read_set.insert(self.notifier);
let result = sys::select::select(None, Some(&mut read_set), None, None, None)
.and_then(|ready_count| {
debug_assert!(ready_count <= 2);
if read_set.contains(self.notifier) {
Ok(0)
} else {
assert!(read_set.contains(self.fd));
unistd::read(self.fd, buf)
}
})
.map_err(nix_to_io_error);
match result {
Ok(read_count) => Ok(read_count),
err => {
if self.closed.load(Ordering::SeqCst) {
Ok(0) } else {
err
}
},
}
}
}
static CAPTURED_SIGNALS: [signal::Signal; 4] = [
signal::Signal::SIGHUP,
signal::Signal::SIGTERM,
signal::Signal::SIGINT,
signal::Signal::SIGQUIT,
];
pub struct SignalsInstaller {
old_sigset: signal::SigSet,
}
impl SignalsInstaller {
pub fn prepare() -> SignalsInstaller {
let mut old_sigset = signal::SigSet::empty();
let mut sigset = signal::SigSet::empty();
for signal in CAPTURED_SIGNALS.iter() {
sigset.add(*signal);
}
signal::pthread_sigmask(
signal::SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_sigset))
.expect("pthread_sigmask is not expected to fail");
SignalsInstaller { old_sigset }
}
pub fn install(self, mount_point: PathBuf) -> Fallible<SignalsHandler> {
let (signal_sender, signal_receiver) = mpsc::channel();
let mut signums = vec!();
for signal in CAPTURED_SIGNALS.iter() {
signums.push(*signal as i32);
}
let signals = signal_hook::iterator::Signals::new(&signums)?;
std::thread::spawn(move || SignalsHandler::handler(&signals, mount_point, &signal_sender));
Ok(SignalsHandler { signal_receiver })
}
}
impl Drop for SignalsInstaller {
fn drop(&mut self) {
signal::pthread_sigmask(signal::SigmaskHow::SIG_SETMASK, Some(&self.old_sigset), None)
.expect("pthread_sigmask is not expected to fail and we cannot correctly clean up");
}
}
fn unmount(path: &Path) -> Fallible<()> {
#[cfg(not(any(target_os = "linux")))]
fn run_unmount(path: &Path) -> io::Result<process::Output> {
process::Command::new("umount").arg(path).output()
}
#[cfg(any(target_os = "linux"))]
fn run_unmount(path: &Path) -> io::Result<process::Output> {
process::Command::new("fusermount").arg("-u").arg(path).output()
}
let output = run_unmount(path)?;
if output.status.success() {
Ok(())
} else {
Err(format_err!("stdout: {}, stderr: {}",
String::from_utf8_lossy(&output.stdout).trim(),
String::from_utf8_lossy(&output.stderr).trim()))
}
}
fn retry_unmount<P: AsRef<Path>>(mount_point: P) {
let mut backoff = time::Duration::from_millis(10);
let goal = time::Duration::from_secs(1);
'retry: loop {
match unmount(mount_point.as_ref()) {
Ok(()) => break 'retry,
Err(e) => {
if backoff >= goal {
warn!("Unmounting file system failed with '{}'; will retry in {:?}",
e, backoff);
}
thread::sleep(backoff);
if backoff < goal {
backoff = cmp::min(goal, backoff * 2);
}
},
}
}
}
pub struct SignalsHandler {
signal_receiver: mpsc::Receiver<i32>,
}
impl SignalsHandler {
pub fn caught(&self) -> Option<i32> {
match self.signal_receiver.try_recv() {
Ok(signo) => Some(signo),
Err(_) => None,
}
}
fn handler(signals: &signal_hook::iterator::Signals, mount_point: PathBuf,
signal_sender: &mpsc::Sender<i32>) {
let signo = signals.forever().next().unwrap();
if let Err(e) = signal_sender.send(signo) {
warn!("Failed to propagate signal to main thread; will get stuck exiting: {}", e);
}
info!("Caught signal {}; unmounting {}", signo, mount_point.display());
retry_unmount(mount_point);
}
}
#[cfg(test)]
mod tests {
use nix::sys;
use std::io::{Read, Write};
use std::thread;
use super::*;
use tempfile;
#[test]
fn test_shareable_file_clones_share_descriptor_and_only_one_owns() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("file");
fs::File::create(&path).unwrap().write_all(b"ABCDEFG").unwrap();
fn read_one_byte(input: &mut impl Read) -> u8 {
let mut buffer = [0];
input.read_exact(&mut buffer).unwrap();
buffer[0]
}
let mut file = ShareableFile::from(fs::File::open(&path).unwrap());
let mut reader1 = file.reader().unwrap();
let mut reader2 = file.reader().unwrap();
assert_eq!(b'A', read_one_byte(&mut reader1));
drop(reader1); assert_eq!(b'B', read_one_byte(&mut reader2));
drop(file); let mut buffer = [0];
assert_eq!(0, reader2.read(&mut buffer).expect("Expected 0 byte count as EOF after close"));
}
fn try_shareable_file_close_unblocks_reads_without_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("pipe");
unistd::mkfifo(&path, sys::stat::Mode::S_IRUSR | sys::stat::Mode::S_IWUSR).unwrap();
let writer_handle = {
let path = path.clone();
thread::spawn(move || fs::File::create(path))
};
let mut file = ShareableFile::from(fs::File::open(&path).unwrap());
let (reader_ready_tx, reader_ready_rx) = mpsc::channel();
let reader_handle = {
let mut file = file.reader().unwrap();
thread::spawn(move || {
let mut buffer = [0];
reader_ready_tx.send(()).unwrap();
file.read(&mut buffer)
})
};
reader_ready_rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(1));
drop(file); reader_handle.join().unwrap().expect("Read didn't return success on EOF-like condition");
let writer = writer_handle.join().unwrap().expect("Write didn't finish successfully");
drop(writer);
}
#[test]
fn test_shareable_file_close_unblocks_reads_without_error() {
for _ in 0..10 {
try_shareable_file_close_unblocks_reads_without_error()
}
}
}