use fs4::{FileExt, TryLockError};
use std::{
fs::{File, OpenOptions},
path::{Path, PathBuf},
thread::sleep,
time::{Duration, Instant},
};
use crate::*;
pub const LOCK_EXT: &str = "lock";
pub const WAIT_INTERVAL_MS: u64 = 50;
pub struct FileStorageOptions {
interval: Duration,
timeout: Option<Duration>,
filename: PathBuf,
}
impl FileStorageOptions {
pub fn new<P: AsRef<Path>>(filename: P) -> Self {
Self {
interval: Duration::from_millis(WAIT_INTERVAL_MS),
timeout: None,
filename: filename.as_ref().to_path_buf(),
}
}
pub fn interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn open<B: BlockDef, PL: PayloadDef<Inner>, Inner, O>(
self,
opt: O,
) -> Result<FileWriterDef<B, PL, Inner, O>, Error>
where
Inner: PayloadInnerDef,
for<'a> Inner: ProtocolSchema<Context<'a> = O>,
{
FileWriterDef::<B, PL, Inner, O>::with_opt(
self.filename,
self.timeout,
Some(self.interval),
opt,
)
}
}
pub struct FileWriterDef<B: BlockDef, PL: PayloadDef<Inner>, Inner: PayloadInnerDef, O> {
_filelock: File,
inner: WriterDef<File, B, PL, Inner>,
opt: O,
}
impl<B: BlockDef, PL: PayloadDef<Inner>, Inner: PayloadInnerDef, O> FileWriterDef<B, PL, Inner, O>
where
for<'a> Inner: ProtocolSchema<Context<'a> = O>,
{
pub fn with_opt<P: AsRef<Path>>(
filename: P,
timeout: Option<Duration>,
interval: Option<Duration>,
opt: O,
) -> Result<Self, Error> {
let filename = filename.as_ref().to_path_buf();
let filename_str = filename.to_string_lossy().to_string();
if filename.exists() && !filename.is_file() {
return Err(Error::PathIsNotFile(filename_str));
}
let lock_file = filename.with_extension(LOCK_EXT);
let started = Instant::now();
let interval = interval.unwrap_or(Duration::from_millis(WAIT_INTERVAL_MS));
let wait_or_fail = || {
if let Some(timeout) = timeout {
if started.elapsed() >= timeout {
return Err(Error::TimeoutToWaitLockedFile(filename_str.clone()));
}
sleep(interval);
Ok(())
} else {
Err(Error::FileIsLocked(filename_str.clone()))
}
};
let filelock = loop {
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(&lock_file)
.map_err(Error::FailToLockFile)?;
match FileExt::try_lock(&file) {
Ok(()) => break file,
Err(TryLockError::WouldBlock) => wait_or_fail()?,
Err(err) => {
return Err(Error::FailToLockFile(err.into()));
}
};
};
let storage_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(filename)?;
Ok(Self {
_filelock: filelock,
inner: WriterDef::new(storage_file)?,
opt,
})
}
pub fn new<P: AsRef<Path>>(
filename: P,
timeout: Option<Duration>,
interval: Option<Duration>,
) -> Result<Self, Error>
where
O: Default,
{
Self::with_opt(filename, timeout, interval, O::default())
}
pub fn insert(&mut self, packet: PacketDef<B, PL, Inner>) -> Result<(), Error> {
self.inner.insert(packet, &mut self.opt)
}
}
#[cfg(test)]
mod tests {
use crate::*;
use crate::{storage::writer::FileStorageOptions, tests::*};
use std::{
env::temp_dir,
fs,
sync::mpsc::{Receiver, Sender, channel},
thread::{sleep, spawn},
time::Duration,
};
#[test]
fn success() {
let filename = temp_dir().join("test_brec_filestorage_success_lock.bin");
if filename.exists() {
fs::remove_file(&filename).expect("Test file has been removed");
}
let filename_a = filename.clone();
let (tx, rx): (Sender<()>, Receiver<()>) = channel();
let a = spawn(move || {
let a = FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
filename_a,
None,
None,
(),
)
.expect("Storage A has been created");
tx.send(()).expect("Signal has been send");
sleep(Duration::from_millis(100));
drop(a);
true
});
let b = spawn(move || {
rx.recv().expect("Signal has been gotten");
FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
&filename,
Some(Duration::from_millis(300)),
None,
(),
)
.is_ok()
});
let a = a.join().expect("Storage A has been created");
let b = b.join().expect("Storage B has been created");
assert_eq!(a, b);
}
#[test]
fn success_with_opt() {
let filename = temp_dir().join("test_brec_filestorage_success_lock_opt.bin");
if filename.exists() {
fs::remove_file(&filename).expect("Test file has been removed");
}
let filename_a = filename.clone();
let (tx, rx): (Sender<()>, Receiver<()>) = channel();
let a = spawn(move || {
let a = FileStorageOptions::new(filename_a)
.open::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>(())
.expect("Storage A has been created");
tx.send(()).expect("Signal has been send");
sleep(Duration::from_millis(100));
drop(a);
true
});
let b = spawn(move || {
rx.recv().expect("Signal has been gotten");
FileStorageOptions::new(filename)
.timeout(Duration::from_millis(300))
.open::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>(())
.is_ok()
});
let a = a.join().expect("Storage A has been created");
let b = b.join().expect("Storage B has been created");
assert_eq!(a, b);
}
#[test]
fn timeout() {
let filename = temp_dir().join("test_brec_filestorage_timeout.bin");
if filename.exists() {
fs::remove_file(&filename).expect("Test file has been removed");
}
let filename_a = filename.clone();
let (tx, rx): (Sender<()>, Receiver<()>) = channel();
let a = spawn(move || {
let a = FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
filename_a,
None,
None,
(),
)
.expect("Storage A has been created");
tx.send(()).expect("Signal has been send");
sleep(Duration::from_millis(500));
drop(a);
true
});
let b = spawn(move || {
rx.recv().expect("Signal has been gotten");
FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
&filename,
Some(Duration::from_millis(100)),
None,
(),
)
.is_ok()
});
let a = a.join().expect("Storage A has been created");
let b = b.join().expect("Storage B has been created");
assert!(a);
assert!(!b);
}
#[test]
fn fail() {
let filename = temp_dir().join("test_brec_filestorage_fail.bin");
if filename.exists() {
fs::remove_file(&filename).expect("Test file has been removed");
}
let filename_a = filename.clone();
let (tx, rx): (Sender<()>, Receiver<()>) = channel();
let a = spawn(move || {
let a = FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
filename_a,
None,
None,
(),
)
.expect("Storage A has been created");
tx.send(()).expect("Signal has been send");
sleep(Duration::from_millis(500));
drop(a);
true
});
let b = spawn(move || {
rx.recv().expect("Signal has been gotten");
FileWriterDef::<TestBlock, TestPayload, TestPayload, DefaultProtocolContext>::with_opt(
&filename,
None,
None,
(),
)
.is_ok()
});
let a = a.join().expect("Storage A has been created");
let b = b.join().expect("Storage B has been created");
assert!(a);
assert!(!b);
}
}