use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use crate::environment::Environment;
pub struct LaxSingleProcessFsFlag<TEnvironment: Environment>(Option<LaxSingleProcessFsFlagInner<TEnvironment>>);
impl<TEnvironment: Environment> LaxSingleProcessFsFlag<TEnvironment> {
pub async fn lock(environment: &TEnvironment, file_path: PathBuf, long_wait_message: &str) -> Self {
if !environment.is_real() {
return Self(None);
}
log_debug!(environment, "Acquiring file lock at {}", file_path.display());
use fs3::FileExt;
let last_updated_path = file_path.with_extension("lock.poll");
let start_instant = std::time::Instant::now();
let open_result = std::fs::OpenOptions::new().read(true).write(true).truncate(true).create(true).open(&file_path);
match open_result {
Ok(fs_file) => {
let mut pb_update_guard = None;
let mut error_count = 0;
while error_count < 10 {
let lock_result = fs_file.try_lock_exclusive();
let poll_file_update_ms = 100;
match lock_result {
Ok(_) => {
log_debug!(environment, "Acquired file lock at {}", file_path.display());
#[allow(clippy::disallowed_methods)]
let _ignore = std::fs::write(&last_updated_path, "");
let token = Arc::new(tokio_util::sync::CancellationToken::new());
dprint_core::async_runtime::spawn_blocking({
let token = token.clone();
let last_updated_path = last_updated_path.clone();
move || {
let mut i = 0;
while !token.is_cancelled() {
i += 1;
#[allow(clippy::disallowed_methods)]
let _ignore = std::fs::write(&last_updated_path, i.to_string());
std::thread::sleep(Duration::from_millis(poll_file_update_ms));
}
}
});
return Self(Some(LaxSingleProcessFsFlagInner {
file_path,
fs_file,
finished_token: token,
environment: environment.clone(),
}));
}
Err(_) => {
if pb_update_guard.is_none() && start_instant.elapsed().as_millis() > 1_000 {
pb_update_guard = environment
.progress_bars()
.map(|pb| pb.add_progress(long_wait_message.to_string(), crate::utils::ProgressBarStyle::Action, 1));
}
tokio::time::sleep(Duration::from_millis(20)).await;
#[allow(clippy::disallowed_methods)]
match std::fs::metadata(&last_updated_path).and_then(|p| p.modified()) {
Ok(last_updated_time) => {
let current_time = std::time::SystemTime::now();
match current_time.duration_since(last_updated_time) {
Ok(duration) => {
if duration.as_millis() > (poll_file_update_ms * 2) as u128 {
return Self(None);
} else {
error_count = 0; }
}
Err(_) => {
error_count += 1;
}
}
}
Err(_) => {
error_count += 1;
}
}
}
}
}
drop(pb_update_guard); Self(None)
}
Err(err) => {
log_debug!(environment, "Failed to open file lock at {}. {:#}", file_path.display(), err);
Self(None) }
}
}
}
struct LaxSingleProcessFsFlagInner<TEnvironment: Environment> {
file_path: PathBuf,
fs_file: std::fs::File,
finished_token: Arc<tokio_util::sync::CancellationToken>,
environment: TEnvironment,
}
impl<TEnvironment: Environment> Drop for LaxSingleProcessFsFlagInner<TEnvironment> {
fn drop(&mut self) {
use fs3::FileExt;
self.finished_token.cancel();
if let Err(err) = FileExt::unlock(&self.fs_file) {
log_debug!(self.environment, "Failed releasing lock for {}. {:#}", self.file_path.display(), err);
}
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use dprint_core::async_runtime::future;
use dprint_core::async_runtime::FutureExt;
use parking_lot::Mutex;
use tempfile::TempDir;
use tokio::sync::Notify;
use crate::environment::RealEnvironment;
use super::*;
#[test]
fn lax_fs_lock() {
RealEnvironment::run_test_with_real_env(|env| {
async move {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join("file.lock");
let signal1 = Arc::new(Notify::new());
let signal2 = Arc::new(Notify::new());
let signal3 = Arc::new(Notify::new());
let signal4 = Arc::new(Notify::new());
tokio::spawn({
let lock_path = lock_path.clone();
let signal1 = signal1.clone();
let signal2 = signal2.clone();
let signal3 = signal3.clone();
let signal4 = signal4.clone();
let temp_dir_path = temp_dir.path().to_path_buf();
let env = env.clone();
async move {
let flag = LaxSingleProcessFsFlag::lock(&env, lock_path.to_path_buf(), "waiting").await;
signal1.notify_one();
signal2.notified().await;
tokio::time::sleep(Duration::from_millis(10)).await; std::fs::write(temp_dir_path.join("file.txt"), "update1").unwrap();
signal3.notify_one();
signal4.notified().await;
drop(flag);
}
});
let signal5 = Arc::new(Notify::new());
tokio::spawn({
let temp_dir_path = temp_dir.path().to_path_buf();
let signal5 = signal5.clone();
let env = env.clone();
async move {
signal1.notified().await;
signal2.notify_one();
let flag = LaxSingleProcessFsFlag::lock(&env, lock_path.to_path_buf(), "waiting").await;
std::fs::write(temp_dir_path.join("file.txt"), "update2").unwrap();
signal5.notify_one();
drop(flag);
}
});
signal3.notified().await;
assert_eq!(std::fs::read_to_string(temp_dir.path().join("file.txt")).unwrap(), "update1");
signal4.notify_one();
signal5.notified().await;
assert_eq!(std::fs::read_to_string(temp_dir.path().join("file.txt")).unwrap(), "update2");
}
.boxed_local()
});
}
#[test]
fn lax_fs_lock_ordered() {
RealEnvironment::run_test_with_real_env(|env| {
async move {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join("file.lock");
let output_path = temp_dir.path().join("output");
let expected_order = Arc::new(Mutex::new(Vec::new()));
let count = 10;
let mut tasks = Vec::with_capacity(count);
std::fs::write(&output_path, "").unwrap();
for i in 0..count {
let lock_path = lock_path.clone();
let output_path = output_path.clone();
let expected_order = expected_order.clone();
let env = env.clone();
tasks.push(tokio::spawn(async move {
let flag = LaxSingleProcessFsFlag::lock(&env, lock_path.to_path_buf(), "waiting").await;
expected_order.lock().push(i.to_string());
let mut output = std::fs::read_to_string(&output_path).unwrap();
if !output.is_empty() {
output.push('\n');
}
output.push_str(&i.to_string());
std::fs::write(&output_path, output).unwrap();
drop(flag);
}));
}
future::join_all(tasks).await;
let expected_output = expected_order.lock().join("\n");
assert_eq!(std::fs::read_to_string(output_path).unwrap(), expected_output);
}
.boxed_local()
})
}
}