use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
const MAX_TOTAL_INDEX_BYTES: usize = 16 * 1024 * 1024 * 1024;
use crate::builder::BlockIndexBuilder;
use crate::error::Result;
use crate::index::BlockIndex;
#[derive(Debug, Clone)]
pub struct WatchConfig {
pub block_size: usize,
pub bloom_bits: usize,
pub poll_interval: Duration,
pub max_file_size: u64,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
block_size: 4096,
bloom_bits: 1024,
poll_interval: Duration::from_secs(5),
max_file_size: 256 * 1024 * 1024,
}
}
}
pub struct IncrementalWatch {
root: PathBuf,
config: WatchConfig,
known_files: HashSet<PathBuf>,
last_poll: Instant,
}
#[derive(Debug, Clone, Default)]
pub struct ChangeSet {
pub modified: Vec<PathBuf>,
pub removed: Vec<PathBuf>,
}
impl ChangeSet {
#[must_use]
pub fn is_empty(&self) -> bool {
self.modified.is_empty() && self.removed.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.modified.len() + self.removed.len()
}
}
impl IncrementalWatch {
pub fn new(root: impl Into<PathBuf>, config: WatchConfig) -> Self {
Self {
root: root.into(),
config,
known_files: HashSet::new(),
last_poll: Instant::now(),
}
}
pub fn poll(&mut self) -> ChangeSet {
let mut current_files = HashSet::new();
let mut changes = ChangeSet::default();
if let Ok(entries) = walk_dir(&self.root, self.config.max_file_size) {
for path in entries {
current_files.insert(path.clone());
if !self.known_files.contains(&path) {
changes.modified.push(path);
}
}
}
for known in &self.known_files {
if !current_files.contains(known) {
changes.removed.push(known.clone());
}
}
self.known_files = current_files;
self.last_poll = Instant::now();
changes
}
pub fn index_changes(&self, changes: &ChangeSet) -> Result<Option<BlockIndex>> {
if changes.modified.is_empty() {
return Ok(None);
}
let mut all_bytes = Vec::new();
for path in &changes.modified {
if let Ok(data) = std::fs::read(path) {
if all_bytes.len().saturating_add(data.len()) > MAX_TOTAL_INDEX_BYTES {
return Err(crate::error::Error::DataTooLarge);
}
all_bytes.extend_from_slice(&data);
}
}
if all_bytes.is_empty() {
return Ok(None);
}
let index = BlockIndexBuilder::new()
.block_size(self.config.block_size)
.bloom_bits(self.config.bloom_bits)
.build(&all_bytes)?;
Ok(Some(index))
}
#[must_use]
pub fn root(&self) -> &Path {
&self.root
}
#[must_use]
pub fn since_last_poll(&self) -> Duration {
self.last_poll.elapsed()
}
}
fn walk_dir(root: &Path, max_size: u64) -> std::io::Result<Vec<PathBuf>> {
let mut files = Vec::new();
walk_dir_inner(root, max_size, &mut files)?;
Ok(files)
}
fn walk_dir_inner(dir: &Path, max_size: u64, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
let entries = std::fs::read_dir(dir)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
let _ = walk_dir_inner(&path, max_size, out);
} else if ft.is_file() {
if let Ok(meta) = entry.metadata() {
if meta.len() <= max_size {
out.push(path);
}
}
}
}
Ok(())
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
mod tests {
use super::*;
use std::fs;
#[test]
fn detects_new_file() {
let dir = tempfile::tempdir().unwrap();
let mut watcher = IncrementalWatch::new(dir.path(), WatchConfig::default());
let changes = watcher.poll();
assert!(changes.is_empty());
fs::write(dir.path().join("test.txt"), b"hello world").unwrap();
let changes = watcher.poll();
assert_eq!(changes.modified.len(), 1);
assert!(changes.removed.is_empty());
}
#[test]
fn detects_removed_file() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("test.txt"), b"hello").unwrap();
let mut watcher = IncrementalWatch::new(dir.path(), WatchConfig::default());
let _ = watcher.poll();
fs::remove_file(dir.path().join("test.txt")).unwrap();
let changes = watcher.poll();
assert_eq!(changes.removed.len(), 1);
}
#[test]
fn no_changes_on_second_poll() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("test.txt"), b"hello").unwrap();
let mut watcher = IncrementalWatch::new(dir.path(), WatchConfig::default());
let _ = watcher.poll();
let changes = watcher.poll();
assert!(changes.is_empty());
}
#[test]
fn index_changes_builds_index() {
let dir = tempfile::tempdir().unwrap();
fs::write(dir.path().join("data.bin"), vec![0u8; 8192]).unwrap();
let mut watcher = IncrementalWatch::new(dir.path(), WatchConfig::default());
let changes = watcher.poll();
assert!(!changes.is_empty());
let index = watcher.index_changes(&changes).unwrap();
assert!(index.is_some());
}
}