1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
//! Watch folder automation for automatic transcoding of new files.
//!
//! This module provides [`TranscodeWatcher`] which monitors a directory for
//! newly-created media files and exposes them for downstream transcoding.
//! Stable-file detection prevents processing files that are still being written.
use std::collections::HashSet;
/// Configuration for a watch folder.
#[derive(Debug, Clone)]
pub struct WatchConfig {
/// Directory to monitor for new files.
pub watch_dir: String,
/// Directory where transcoded output files should be written.
pub output_dir: String,
/// Name of the transcode preset to apply to discovered files.
pub preset_name: String,
/// How often to poll the watch directory, in milliseconds.
pub poll_interval_ms: u64,
}
impl WatchConfig {
/// Creates a new [`WatchConfig`] with the given paths and preset.
#[must_use]
pub fn new(
watch_dir: impl Into<String>,
output_dir: impl Into<String>,
preset_name: impl Into<String>,
poll_interval_ms: u64,
) -> Self {
Self {
watch_dir: watch_dir.into(),
output_dir: output_dir.into(),
preset_name: preset_name.into(),
poll_interval_ms,
}
}
}
/// A file discovered by the watcher.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WatchedFile {
/// Absolute path of the discovered file.
pub path: String,
/// File size in bytes at the time of discovery.
pub size_bytes: u64,
/// Monotonic-style timestamp (milliseconds) when the file was first seen.
pub discovered_at_ms: u64,
}
impl WatchedFile {
/// Creates a new [`WatchedFile`] record.
#[must_use]
pub fn new(path: impl Into<String>, size_bytes: u64, discovered_at_ms: u64) -> Self {
Self {
path: path.into(),
size_bytes,
discovered_at_ms,
}
}
/// Returns `true` when the file has been stable long enough to be
/// processed safely.
///
/// A file is considered stable when `elapsed_since_discovery_ms >=
/// min_stable_ms`. Callers should re-check the file size between polls
/// and reset the discovery timestamp if it changes.
#[must_use]
pub fn is_stable(&self, elapsed_since_discovery_ms: u64, min_stable_ms: u64) -> bool {
elapsed_since_discovery_ms >= min_stable_ms
}
}
/// Watch-folder automation engine.
///
/// [`TranscodeWatcher`] tracks which files it has already seen so that each
/// new file is emitted exactly once. The actual filesystem walk is performed
/// in [`scan_for_new_files`](TranscodeWatcher::scan_for_new_files); on a real
/// system this calls `std::fs::read_dir` — the simulation path returns an
/// empty vec so the module compiles and tests without a live filesystem.
#[derive(Debug)]
pub struct TranscodeWatcher {
/// Watch folder configuration.
pub config: WatchConfig,
/// Set of file paths that have already been discovered (and optionally processed).
pub seen_files: HashSet<String>,
}
impl TranscodeWatcher {
/// Creates a new [`TranscodeWatcher`] with an empty seen-files set.
#[must_use]
pub fn new(config: WatchConfig) -> Self {
Self {
config,
seen_files: HashSet::new(),
}
}
/// Scans the watch directory for new files and returns those not yet seen.
///
/// `now_ms` is the caller-supplied current time in milliseconds and is
/// recorded as the discovery timestamp for each new file.
///
/// The default implementation performs an actual `read_dir` scan. On
/// platforms where the watch directory does not exist the function returns
/// an empty vec rather than propagating an error, so the polling loop
/// continues gracefully.
#[must_use]
pub fn scan_for_new_files(&mut self, now_ms: u64) -> Vec<WatchedFile> {
let mut new_files = Vec::new();
let read_result = std::fs::read_dir(&self.config.watch_dir);
let entries = match read_result {
Ok(rd) => rd,
Err(_) => return new_files,
};
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(_) => continue,
};
let metadata = match entry.metadata() {
Ok(m) => m,
Err(_) => continue,
};
// Only consider regular files.
if !metadata.is_file() {
continue;
}
let path = entry.path();
let path_str = path.to_string_lossy().into_owned();
if self.seen_files.contains(&path_str) {
continue;
}
let size_bytes = metadata.len();
let watched = WatchedFile::new(path_str.clone(), size_bytes, now_ms);
self.seen_files.insert(path_str);
new_files.push(watched);
}
new_files
}
/// Marks a file path as processed so it is not returned again by
/// [`scan_for_new_files`](TranscodeWatcher::scan_for_new_files).
pub fn mark_processed(&mut self, path: &str) {
self.seen_files.insert(path.to_owned());
}
/// Returns `true` if the given path has already been seen by this watcher.
#[must_use]
pub fn is_known(&self, path: &str) -> bool {
self.seen_files.contains(path)
}
/// Returns the configured poll interval in milliseconds.
#[must_use]
pub fn poll_interval_ms(&self) -> u64 {
self.config.poll_interval_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_config() -> WatchConfig {
WatchConfig::new("/tmp/watch_in", "/tmp/watch_out", "youtube_1080p", 2000)
}
#[test]
fn test_watcher_starts_empty() {
let watcher = TranscodeWatcher::new(make_config());
assert!(watcher.seen_files.is_empty());
}
#[test]
fn test_mark_processed_and_is_known() {
let mut watcher = TranscodeWatcher::new(make_config());
assert!(!watcher.is_known("/tmp/watch_in/video.mp4"));
watcher.mark_processed("/tmp/watch_in/video.mp4");
assert!(watcher.is_known("/tmp/watch_in/video.mp4"));
}
#[test]
fn test_scan_nonexistent_dir_returns_empty() {
let config = WatchConfig::new(
"/nonexistent_oximedia_watch_dir_xyz",
"/tmp/out",
"preset",
1000,
);
let mut watcher = TranscodeWatcher::new(config);
let found = watcher.scan_for_new_files(12345);
assert!(found.is_empty());
}
#[test]
fn test_scan_real_dir_does_not_duplicate() {
let tmp = std::env::temp_dir();
let watch_dir = tmp.join("oximedia_watcher_test");
let _ = std::fs::create_dir_all(&watch_dir);
// Create a dummy file.
let file_path = watch_dir.join("test_video.mp4");
std::fs::write(&file_path, b"fake mp4 data").ok();
let config = WatchConfig::new(
watch_dir.to_string_lossy().as_ref(),
"/tmp/out",
"preset",
1000,
);
let mut watcher = TranscodeWatcher::new(config);
let first_scan = watcher.scan_for_new_files(1000);
assert_eq!(first_scan.len(), 1);
// Second scan must not return the same file again.
let second_scan = watcher.scan_for_new_files(2000);
assert_eq!(second_scan.len(), 0);
// Cleanup.
let _ = std::fs::remove_file(&file_path);
let _ = std::fs::remove_dir(&watch_dir);
}
#[test]
fn test_watched_file_is_stable() {
let f = WatchedFile::new("/tmp/video.mp4", 1024, 0);
assert!(!f.is_stable(4999, 5000));
assert!(f.is_stable(5000, 5000));
assert!(f.is_stable(9999, 5000));
}
#[test]
fn test_watched_file_fields() {
let f = WatchedFile::new("/tmp/a.mkv", 999_000, 42_000);
assert_eq!(f.path, "/tmp/a.mkv");
assert_eq!(f.size_bytes, 999_000);
assert_eq!(f.discovered_at_ms, 42_000);
}
#[test]
fn test_config_fields() {
let c = WatchConfig::new("/in", "/out", "vimeo_4k", 500);
assert_eq!(c.watch_dir, "/in");
assert_eq!(c.output_dir, "/out");
assert_eq!(c.preset_name, "vimeo_4k");
assert_eq!(c.poll_interval_ms, 500);
}
#[test]
fn test_poll_interval_accessor() {
let watcher = TranscodeWatcher::new(make_config());
assert_eq!(watcher.poll_interval_ms(), 2000);
}
}