use std::fs::*;
use std::io;
use std::path::Path;
use sysinfo::*;
use super::queue::{recv_lock_filename, send_lock_filename};
use super::state::{QueueState, QueueStatePersistence};
use super::sync::{FileGuard, UNIQUE_PROCESS_TOKEN};
pub fn unlock<P: AsRef<Path>>(lock_filename: P) -> io::Result<()> {
let contents = match read_to_string(&lock_filename) {
Ok(contents) => contents,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err),
};
let owner_pid = contents
.split("pid=")
.collect::<Vec<_>>()
.get(1)
.map(|token| {
token
.chars()
.take_while(|ch| ch.is_digit(10))
.collect::<String>()
.parse::<sysinfo::Pid>()
})
.expect("failed to parse recv lock file: no pid")
.expect("failed to parse recv lock file: bad pid");
let owner_token = contents
.split("token=")
.collect::<Vec<_>>()
.get(1)
.map(|token| {
token
.chars()
.take_while(|ch| ch.is_digit(10))
.collect::<String>()
.parse::<u64>()
})
.expect("failed to parse recv lock file: no token")
.expect("failed to parse recv lock file: bad token");
let system =
System::new_with_specifics(RefreshKind::new().with_processes(ProcessRefreshKind::new()));
let process_exists_and_is_not_me =
owner_pid.as_u32() != std::process::id() && system.process(owner_pid).is_some();
let lock_is_the_same_and_is_me =
owner_pid.as_u32() == std::process::id() && owner_token == *UNIQUE_PROCESS_TOKEN;
if process_exists_and_is_not_me {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"another process, of id {}, is still locking `{:?}`",
owner_pid,
lock_filename.as_ref()
),
));
} else if lock_is_the_same_and_is_me {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"current process is still locking `{:?}`",
lock_filename.as_ref()
),
));
} else {
remove_file(lock_filename)?;
Ok(())
}
}
pub fn unlock_for_sending<P: AsRef<Path>>(base: P) -> io::Result<()> {
unlock(send_lock_filename(base.as_ref()))
}
pub fn unlock_for_receiving<P: AsRef<Path>>(base: P) -> io::Result<()> {
unlock(recv_lock_filename(base.as_ref()))
}
pub fn unlock_queue<P: AsRef<Path>>(base: P) -> io::Result<()> {
unlock_for_sending(base.as_ref())?;
unlock_for_receiving(base.as_ref())?;
Ok(())
}
pub fn guess_recv_metadata<P: AsRef<Path>>(base: P) -> io::Result<()> {
let lock = FileGuard::try_lock(recv_lock_filename(base.as_ref()))?;
let mut min_segment = std::u64::MAX;
for maybe_entry in read_dir(base.as_ref())? {
let path = maybe_entry?.path();
if path.extension().map(|ext| ext == "q").unwrap_or(false) {
let segment = path
.file_stem()
.expect("has extension, therefore has stem")
.to_string_lossy()
.parse::<u64>()
.expect("failed to parse segment filename");
min_segment = u64::min(segment, min_segment);
}
}
let queue_state = QueueState {
segment: min_segment,
..QueueState::default()
};
let mut persistence = QueueStatePersistence::new();
let old_state = persistence.open(base.as_ref())?;
persistence.save(if queue_state > old_state {
&queue_state
} else {
&old_state
})?;
drop(lock);
Ok(())
}
pub fn guess_recv_metadata_with_loss<P: AsRef<Path>>(base: P) -> io::Result<()> {
let lock = FileGuard::try_lock(recv_lock_filename(base.as_ref()))?;
let mut min_segment = std::u64::MAX;
for maybe_entry in read_dir(base.as_ref())? {
let path = maybe_entry?.path();
if path.extension().map(|ext| ext == "q").unwrap_or(false) {
let segment = path
.file_stem()
.expect("has extension, therefore has stem")
.to_string_lossy()
.parse::<u64>()
.expect("failed to parse segment filename");
min_segment = u64::min(segment, min_segment);
}
}
remove_file(base.as_ref().join(format!("{}.q", min_segment)))?;
let queue_state = QueueState {
segment: min_segment + 1,
..QueueState::default()
};
let mut persistence = QueueStatePersistence::new();
let _ = persistence.open(base.as_ref())?;
persistence.save(&queue_state)?;
drop(lock);
Ok(())
}
pub fn recover<P: AsRef<Path>>(base: P) -> io::Result<()> {
unlock_queue(base.as_ref())?;
guess_recv_metadata(base.as_ref())?;
Ok(())
}
pub fn recover_with_loss<P: AsRef<Path>>(base: P) -> io::Result<()> {
unlock_queue(base.as_ref())?;
guess_recv_metadata_with_loss(base.as_ref())?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unlock() {
let guard = FileGuard::try_lock("data/test-unlock.lock").unwrap();
std::mem::forget(guard);
assert_eq!(
io::ErrorKind::Other,
unlock("data/test-unlock.lock").unwrap_err().kind()
);
remove_file("data/test-unlock.lock").unwrap();
}
#[test]
fn test_unlock_inexistent() {
unlock("data/inexistent-lock.lock").unwrap();
}
#[test]
#[should_panic]
fn test_recover_while_open() {
let channel = crate::channel("data/recover-while-open").unwrap();
recover("data/recover-while-open").unwrap();
drop(channel);
}
#[test]
#[should_panic]
fn test_recover_inexistent() {
recover("data/recover-inexistent").unwrap();
}
}