#![allow(clippy::borrow_interior_mutable_const, clippy::cast_possible_wrap)]
use std::fs;
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use std::sync::atomic::Ordering;
use std::sync::mpsc::channel;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
#[cfg(feature = "async-std")]
use async_std1 as async_std;
use futures_util::stream::{FuturesUnordered, StreamExt};
use once_cell::sync::Lazy;
use tempfile::tempdir;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
use crate::ffi::{
kFSEventStreamCreateFlagFileEvents, kFSEventStreamCreateFlagNoDefer,
kFSEventStreamCreateFlagNone, kFSEventStreamCreateFlagUseCFTypes,
kFSEventStreamCreateFlagUseExtendedData, kFSEventStreamEventIdSinceNow,
FSEventStreamCreateFlags,
};
use crate::stream::{
create_event_stream, StreamContextInfo, StreamFlags, TEST_RUNNING_RUNLOOP_COUNT,
};
#[cfg(feature = "tokio")]
static TEST_PARALLEL_LOCK: Lazy<tokio::sync::Mutex<()>> = Lazy::new(|| tokio::sync::Mutex::new(()));
#[cfg(feature = "async-std")]
static TEST_PARALLEL_LOCK: Lazy<async_std::sync::Mutex<()>> =
Lazy::new(|| async_std::sync::Mutex::new(()));
#[test]
fn must_steam_context_info_send_and_sync() {
fn check_send<T: Send + Sync>() {}
check_send::<StreamContextInfo>();
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn must_abort_stream_tokio() {
must_abort_stream().await;
}
#[cfg(feature = "async-std")]
#[async_std::test]
async fn must_abort_stream_async_std() {
must_abort_stream().await;
}
async fn must_abort_stream() {
let _guard = TEST_PARALLEL_LOCK.lock().await;
let (stream, mut handler) = create_event_stream(
["."],
kFSEventStreamEventIdSinceNow,
Duration::ZERO,
kFSEventStreamCreateFlagNone,
)
.expect("to be created");
assert_eq!(TEST_RUNNING_RUNLOOP_COUNT.load(Ordering::SeqCst), 1);
let abort_thread = thread::spawn(move || {
handler.abort();
});
#[cfg(feature = "tokio")]
drop(
tokio::time::timeout(
Duration::from_secs(1),
stream.into_flatten().collect::<Vec<_>>(),
)
.await
.expect("to complete"),
);
#[cfg(feature = "async-std")]
drop(
async_std::future::timeout(
Duration::from_secs(1),
stream.into_flatten().collect::<Vec<_>>(),
)
.await
.expect("to complete"),
);
assert_eq!(TEST_RUNNING_RUNLOOP_COUNT.load(Ordering::SeqCst), 0);
abort_thread.join().expect("to join");
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn must_receive_fs_events_tokio() {
must_receive_fs_events().await;
}
#[cfg(feature = "async-std")]
#[async_std::test]
async fn must_receive_fs_events_async_std() {
must_receive_fs_events().await;
}
async fn must_receive_fs_events() {
let _guard = TEST_PARALLEL_LOCK.lock().await;
let ci = option_env!("CI").is_some();
let futs: FuturesUnordered<_> = [
must_receive_fs_events_impl(
kFSEventStreamCreateFlagFileEvents
| kFSEventStreamCreateFlagUseCFTypes
| kFSEventStreamCreateFlagUseExtendedData,
!ci,
!ci,
),
must_receive_fs_events_impl(
kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagUseCFTypes,
false,
!ci,
),
must_receive_fs_events_impl(kFSEventStreamCreateFlagFileEvents, false, !ci),
must_receive_fs_events_impl(
kFSEventStreamCreateFlagUseCFTypes | kFSEventStreamCreateFlagUseExtendedData,
false,
false,
),
must_receive_fs_events_impl(kFSEventStreamCreateFlagUseCFTypes, false, false),
]
.into_iter()
.collect();
assert_eq!(futs.collect::<Vec<_>>().await.len(), 5);
}
async fn must_receive_fs_events_impl(
flags: FSEventStreamCreateFlags,
verify_inode: bool,
verify_file_events: bool,
) {
let dir = tempdir().expect("to be created");
let test_file = dir
.path()
.canonicalize() .expect("to succeed")
.join("test_file");
let (tx, rx) = channel();
let (stream, mut handler) = create_event_stream(
[dir.path()],
kFSEventStreamEventIdSinceNow,
Duration::ZERO,
flags | kFSEventStreamCreateFlagNoDefer,
)
.expect("to be created");
let abort_thread = thread::spawn(move || {
rx.recv().expect("to be signaled");
sleep(Duration::from_secs(1));
handler.abort();
});
let f = File::create(&test_file).expect("to be created");
let inode = f.metadata().expect("to be fetched").ino() as i64;
f.sync_all().expect("to succeed");
drop(f);
fs::remove_file(&test_file).expect("to be removed");
unsafe { libc::sync() };
tx.send(()).expect("to signal");
#[cfg(feature = "tokio")]
let events: Vec<_> =
tokio::time::timeout(Duration::from_secs(6), stream.into_flatten().collect())
.await
.expect("to complete");
#[cfg(feature = "async-std")]
let events: Vec<_> =
async_std::future::timeout(Duration::from_secs(6), stream.into_flatten().collect())
.await
.expect("to complete");
if verify_file_events {
assert!(events.len() == 2 || events.len() == 3);
let event_fst = events.get(events.len() - 2).expect("to exist");
assert_eq!(event_fst.path.as_path(), test_file.as_path());
if verify_inode {
assert_eq!(event_fst.inode, Some(inode));
}
assert!(event_fst
.flags
.contains(StreamFlags::ITEM_CREATED | StreamFlags::IS_FILE));
let event_snd = events.last().expect("to exist");
assert_eq!(event_snd.path.as_path(), test_file.as_path());
if verify_inode {
assert_eq!(event_snd.inode, Some(inode));
}
assert!(event_snd
.flags
.contains(StreamFlags::ITEM_REMOVED | StreamFlags::IS_FILE));
} else {
assert!(!events.is_empty());
}
abort_thread.join().expect("to join");
}