use lazy_static::lazy_static;
use notify::RecommendedWatcher;
use rand::Rng;
use std::fs::*;
use std::future::Future;
use std::io::{self, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use crate::watcher::{file_removal_watcher, file_watcher, removal_watcher};
lazy_static! {
pub(crate) static ref UNIQUE_PROCESS_TOKEN: u64 = rand::thread_rng().gen();
}
pub fn render_lock() -> String {
format!(
"pid={}\ntoken={}",
std::process::id(),
*UNIQUE_PROCESS_TOKEN
)
}
pub struct FileGuard {
path: PathBuf,
ignore: bool,
}
impl Drop for FileGuard {
fn drop(&mut self) {
if let Err(err) = remove_file(&self.path) {
if !self.ignore {
log::error!("unable to drop file lock: {}", err);
return;
}
}
log::trace!("file guard on `{:?}` dropped", self.path);
}
}
impl FileGuard {
pub(crate) fn ignore(&mut self) {
self.ignore = true;
}
pub fn try_lock<P: AsRef<Path>>(path: P) -> io::Result<Option<FileGuard>> {
match OpenOptions::new().write(true).create_new(true).open(&path) {
Ok(mut file) => {
writeln!(file, "{}", render_lock())?;
Ok(Some(FileGuard {
path: path.as_ref().to_path_buf(),
ignore: false,
}))
}
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => Ok(None),
Err(err) => Err(err),
}
}
pub async fn lock<P: AsRef<Path>>(path: P) -> io::Result<FileGuard> {
let waker = Arc::new(Mutex::new(None));
let _watcher = file_removal_watcher(path.as_ref(), waker.clone());
Lock { path, waker }.await
}
}
struct Lock<P: AsRef<Path>> {
path: P,
waker: Arc<Mutex<Option<Waker>>>,
}
impl<P: AsRef<Path>> Future for Lock<P> {
type Output = io::Result<FileGuard>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let mut lock = self.waker.lock().expect("waker mutex poisoned");
*lock = Some(context.waker().clone());
match FileGuard::try_lock(self.path.as_ref()) {
Ok(Some(file_guard)) => Poll::Ready(Ok(file_guard)),
Ok(None) => Poll::Pending,
Err(err) => Poll::Ready(Err(err)),
}
}
}
fn open_new<P: AsRef<Path>>(path: P) -> io::Result<File> {
let maybe_new = OpenOptions::new().create_new(true).append(true).open(&path);
if maybe_new.is_ok() {
log::debug!("file `{:?}` didn't exist. Created new", path.as_ref());
}
File::open(&path)
}
pub struct TailFollower {
file: io::BufReader<File>,
read_and_unused: usize,
_watcher: RecommendedWatcher,
waker: Arc<Mutex<Option<Waker>>>,
}
impl TailFollower {
fn new(path: &Path, file: File) -> TailFollower {
let waker = Arc::new(Mutex::new(None));
let watcher = file_watcher(path, waker.clone());
TailFollower {
file: io::BufReader::new(file),
read_and_unused: 0,
_watcher: watcher,
waker,
}
}
pub fn open(path: &Path) -> io::Result<TailFollower> {
let file = open_new(&path)?;
Ok(TailFollower::new(path, file))
}
pub fn seek(&mut self, seek: io::SeekFrom) -> io::Result<()> {
self.file.seek(seek).map(|_| ())
}
#[must_use = "futures do nothing until polled"]
pub fn read_exact<'a>(&'a mut self, buffer: &'a mut [u8]) -> ReadExact<'a> {
if self.read_and_unused != 0 {
log::trace!("found {} bytes read but unused", self.read_and_unused);
self.seek(io::SeekFrom::Current(-(self.read_and_unused as i64)))
.expect("could not seek back read and unused bytes");
self.read_and_unused = 0;
}
ReadExact {
file: &mut self.file,
buffer,
waker: &self.waker,
read_and_unused: &mut self.read_and_unused,
was_polled: false,
}
}
}
pub struct ReadExact<'a> {
file: &'a mut io::BufReader<File>,
buffer: &'a mut [u8],
waker: &'a Mutex<Option<Waker>>,
read_and_unused: &'a mut usize,
was_polled: bool,
}
impl<'a> ReadExact<'a> {
fn read_until_you_drain(&mut self) -> Poll<io::Result<()>> {
log::trace!("reading until drained");
loop {
break match self.file.read(&mut self.buffer[*self.read_and_unused..]) {
Ok(0) => {
log::trace!("will have to wait for more");
Poll::Pending
}
Ok(i) => {
log::trace!("read {} bytes", i);
*self.read_and_unused += i;
if *self.read_and_unused == self.buffer.len() {
log::trace!("enough! Done reading");
*self.read_and_unused = 0;
Poll::Ready(Ok(()))
} else {
log::trace!("can read more");
continue;
}
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {
log::trace!("got interrupted by eof");
Poll::Pending
}
Err(err) => {
log::trace!("oops! error");
Poll::Ready(Err(err))
}
};
}
}
}
impl<'a> Future for ReadExact<'a> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.was_polled = true;
let outcome = self.read_until_you_drain();
if outcome.is_pending() {
let mut lock = self.waker.lock().expect("waker mutex poisoned");
*lock = Some(context.waker().clone());
self.read_until_you_drain()
} else {
outcome
}
}
}
impl<'a> Drop for ReadExact<'a> {
fn drop(&mut self) {
if !self.was_polled {
log::warn!("read_exact future never polled");
}
}
}
pub struct DeletionEvent {
waker: Arc<Mutex<Option<Waker>>>,
_watcher: RecommendedWatcher,
}
impl Future for DeletionEvent {
type Output = ();
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let mut lock = self.waker.lock().expect("waker mutex poisoned");
*lock = Some(context.waker().clone());
Poll::Ready(())
}
}
impl DeletionEvent {
pub fn new(base: &Path) -> DeletionEvent {
let waker = Arc::new(Mutex::new(None));
let watcher = removal_watcher(base, Arc::clone(&waker));
DeletionEvent {
waker,
_watcher: watcher,
}
}
}
pub struct SyncFollower {
file: io::BufReader<File>,
}
impl SyncFollower {
pub fn open<P>(path: P) -> io::Result<SyncFollower>
where
P: 'static + AsRef<Path> + Send + Sync,
{
let file = io::BufReader::new(open_new(&path)?);
Ok(SyncFollower { file })
}
pub fn seek(&mut self, seek: io::SeekFrom) -> io::Result<()> {
self.file.seek(seek).map(|_| ())
}
pub fn read_exact(&mut self, buffer: &mut [u8]) -> io::Result<()> {
self.file.read_exact(buffer)
}
}