1use itertools::{Either, Itertools};
2use notify_debouncer_full::{
3 DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
4 notify::{
5 self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
6 event::{DataChange, ModifyKind, RenameMode},
7 },
8};
9use nu_engine::{ClosureEval, command_prelude::*};
10use nu_protocol::{
11 DeprecationEntry, DeprecationType, ReportMode, Signals, engine::Closure, report_shell_error,
12 shell_error::io::IoError,
13};
14
15use std::{
16 borrow::Cow,
17 path::{Path, PathBuf},
18 sync::mpsc::{Receiver, RecvTimeoutError, channel},
19 time::Duration,
20};
21
22const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
24const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
25
26#[derive(Clone)]
27pub struct Watch;
28
29impl Command for Watch {
30 fn name(&self) -> &str {
31 "watch"
32 }
33
34 fn description(&self) -> &str {
35 "Watch for file changes and execute Nu code when they happen."
36 }
37
38 fn extra_description(&self) -> &str {
39 "When run without a closure, `watch` returns a stream of events instead."
40 }
41
42 fn search_terms(&self) -> Vec<&str> {
43 vec!["watcher", "reload", "filesystem"]
44 }
45
46 fn deprecation_info(&self) -> Vec<DeprecationEntry> {
47 vec![DeprecationEntry {
48 ty: DeprecationType::Flag("debounce-ms".into()),
49 report_mode: ReportMode::FirstUse,
50 since: Some("0.107.0".into()),
51 expected_removal: Some("0.109.0".into()),
52 help: Some("`--debounce-ms` will be removed in favour of `--debounce`".into()),
53 }]
54 }
55
56 fn signature(&self) -> nu_protocol::Signature {
57 Signature::build("watch")
58 .input_output_types(vec![
59 (Type::Nothing, Type::Nothing),
60 (
61 Type::Nothing,
62 Type::Table(vec![
63 ("operation".into(), Type::String),
64 ("path".into(), Type::String),
65 ("new_path".into(), Type::String),
66 ].into_boxed_slice())
67 ),
68 ])
69 .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
70 .optional(
71 "closure",
72 SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
73 "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.",
74 )
75 .named(
76 "debounce-ms",
77 SyntaxShape::Int,
78 "Debounce changes for this many milliseconds (default: 100). Adjust if you find that single writes are reported as multiple events (deprecated)",
79 Some('d'),
80 )
81 .named(
82 "debounce",
83 SyntaxShape::Duration,
84 "Debounce changes for this duration (default: 100ms). Adjust if you find that single writes are reported as multiple events",
85 None,
86 )
87 .named(
88 "glob",
89 SyntaxShape::String, "Only report changes for files that match this glob pattern (default: all files)",
91 Some('g'),
92 )
93 .named(
94 "recursive",
95 SyntaxShape::Boolean,
96 "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true)",
97 Some('r'),
98 )
99 .switch("quiet", "Hide the initial status message (default: false)", Some('q'))
100 .switch("verbose", "Operate in verbose mode (default: false)", Some('v'))
101 .category(Category::FileSystem)
102 }
103
104 fn run(
105 &self,
106 engine_state: &EngineState,
107 stack: &mut Stack,
108 call: &Call,
109 _input: PipelineData,
110 ) -> Result<PipelineData, ShellError> {
111 let head = call.head;
112 let cwd = engine_state.cwd_as_string(Some(stack))?;
113 let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
114
115 let path_no_whitespace = path_arg
116 .item
117 .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
118
119 let path = nu_path::canonicalize_with(path_no_whitespace, cwd).map_err(|err| {
120 ShellError::Io(IoError::new(
121 err,
122 path_arg.span,
123 PathBuf::from(path_no_whitespace),
124 ))
125 })?;
126
127 let closure: Option<Closure> = call.opt(engine_state, stack, 1)?;
128 let verbose = call.has_flag(engine_state, stack, "verbose")?;
129 let quiet = call.has_flag(engine_state, stack, "quiet")?;
130 let debounce_duration: Duration = resolve_duration_arguments(
131 call.get_flag(engine_state, stack, "debounce-ms")?,
132 call.get_flag(engine_state, stack, "debounce")?,
133 )?;
134
135 let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
136 let glob_pattern = glob_flag
137 .map(|glob| {
138 let absolute_path = path.join(glob.item);
139 if verbose {
140 eprintln!("Absolute glob path: {absolute_path:?}");
141 }
142
143 nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
144 ShellError::TypeMismatch {
145 err_message: "Glob pattern is invalid".to_string(),
146 span: glob.span,
147 }
148 })
149 })
150 .transpose()?;
151
152 let recursive_flag: Option<Spanned<bool>> =
153 call.get_flag(engine_state, stack, "recursive")?;
154 let recursive_mode = match recursive_flag {
155 Some(recursive) => {
156 if recursive.item {
157 RecursiveMode::Recursive
158 } else {
159 RecursiveMode::NonRecursive
160 }
161 }
162 None => RecursiveMode::Recursive,
163 };
164
165 let (tx, rx) = channel();
166
167 let mut debouncer =
168 new_debouncer(debounce_duration, None, tx).map_err(|err| ShellError::GenericError {
169 error: "Failed to create watcher".to_string(),
170 msg: err.to_string(),
171 span: Some(call.head),
172 help: None,
173 inner: vec![],
174 })?;
175
176 if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) {
177 return Err(ShellError::GenericError {
178 error: "Failed to create watcher".to_string(),
179 msg: err.to_string(),
180 span: Some(call.head),
181 help: None,
182 inner: vec![],
183 });
184 }
185 debouncer.cache().add_root(&path, recursive_mode);
187
188 if !quiet {
189 eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
190 }
191
192 let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone());
193
194 if let Some(closure) = closure {
195 let mut closure = ClosureEval::new(engine_state, stack, closure);
196
197 for events in iter {
198 for event in events? {
199 let matches_glob = match &glob_pattern {
200 Some(glob) => glob.matches_path(&event.path),
201 None => true,
202 };
203 if verbose && glob_pattern.is_some() {
204 eprintln!("Matches glob: {matches_glob}");
205 }
206
207 if matches_glob {
208 let result = closure
209 .add_arg(event.operation.into_value(head))
210 .add_arg(event.path.to_string_lossy().into_value(head))
211 .add_arg(
212 event
213 .new_path
214 .as_deref()
215 .map(Path::to_string_lossy)
216 .into_value(head),
217 )
218 .run_with_input(PipelineData::empty());
219
220 match result {
221 Ok(val) => val.print_table(engine_state, stack, false, false)?,
222 Err(err) => report_shell_error(engine_state, &err),
223 };
224 }
225 }
226 }
227
228 Ok(PipelineData::empty())
229 } else {
230 fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool {
231 let Some(glob) = glob else { return true };
232 glob.matches_path(path)
233 }
234
235 let out = iter
236 .flat_map(|e| match e {
237 Ok(events) => Either::Right(events.into_iter().map(Ok)),
238 Err(err) => Either::Left(std::iter::once(Err(err))),
239 })
240 .filter_map(move |e| match e {
241 Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path)
242 .then(|| WatchEventRecord::from(&ev).into_value(head)),
243 Err(err) => Some(Value::error(err, head)),
244 })
245 .into_pipeline_data(head, engine_state.signals().clone());
246 Ok(out)
247 }
248 }
249
250 fn examples(&self) -> Vec<Example> {
251 vec![
252 Example {
253 description: "Run `cargo test` whenever a Rust file changes",
254 example: r#"watch . --glob=**/*.rs {|| cargo test }"#,
255 result: None,
256 },
257 Example {
258 description: "Watch all changes in the current directory",
259 example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
260 result: None,
261 },
262 Example {
263 description: "`watch` (when run without a closure) can also emit a stream of events it detects.",
264 example: r#"watch /foo/bar
265 | where operation == Create
266 | first 5
267 | each {|e| $"New file!: ($e.path)" }
268 | to text
269 | save --append changes_in_bar.log"#,
270 result: None,
271 },
272 Example {
273 description: "Print file changes with a debounce time of 5 minutes",
274 example: r#"watch /foo/bar --debounce 5min { |op, path| $"Registered ($op) on ($path)" | print }"#,
275 result: None,
276 },
277 Example {
278 description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep",
279 example: r#"loop { command; sleep duration }"#,
280 result: None,
281 },
282 ]
283 }
284}
285
286fn resolve_duration_arguments(
287 debounce_duration_flag_ms: Option<Spanned<i64>>,
288 debounce_duration_flag: Option<Spanned<Duration>>,
289) -> Result<Duration, ShellError> {
290 match (debounce_duration_flag, debounce_duration_flag_ms) {
291 (None, None) => Ok(DEFAULT_WATCH_DEBOUNCE_DURATION),
292 (Some(l), Some(r)) => Err(ShellError::IncompatibleParameters {
293 left_message: "Here".to_string(),
294 left_span: l.span,
295 right_message: "and here".to_string(),
296 right_span: r.span,
297 }),
298 (None, Some(val)) => match u64::try_from(val.item) {
299 Ok(v) => Ok(Duration::from_millis(v)),
300 Err(_) => Err(ShellError::TypeMismatch {
301 err_message: "Debounce duration is invalid".to_string(),
302 span: val.span,
303 }),
304 },
305 (Some(v), None) => Ok(v.item),
306 }
307}
308
309struct WatchEvent {
310 operation: &'static str,
311 path: PathBuf,
312 new_path: Option<PathBuf>,
313}
314
315#[derive(IntoValue)]
316struct WatchEventRecord<'a> {
317 operation: &'static str,
318 path: Cow<'a, str>,
319 new_path: Option<Cow<'a, str>>,
320}
321
322impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> {
323 fn from(value: &'a WatchEvent) -> Self {
324 Self {
325 operation: value.operation,
326 path: value.path.to_string_lossy(),
327 new_path: value.new_path.as_deref().map(Path::to_string_lossy),
328 }
329 }
330}
331
332impl TryFrom<DebouncedEvent> for WatchEvent {
333 type Error = ();
334
335 fn try_from(mut ev: DebouncedEvent) -> Result<Self, Self::Error> {
336 match ev.event.kind {
338 EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent {
339 operation: "Create",
340 path: p,
341 new_path: None,
342 }),
343 EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent {
344 operation: "Remove",
345 path: p,
346 new_path: None,
347 }),
348 EventKind::Modify(
349 ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
350 ) => ev.paths.pop().map(|p| WatchEvent {
351 operation: "Write",
352 path: p,
353 new_path: None,
354 }),
355 EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev
356 .paths
357 .drain(..)
358 .rev()
359 .next_array()
360 .map(|[from, to]| WatchEvent {
361 operation: "Rename",
362 path: from,
363 new_path: Some(to),
364 }),
365 _ => None,
366 }
367 .ok_or(())
368 }
369}
370
371struct WatchIterator {
372 _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
374 rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
375 signals: Signals,
376}
377
378impl WatchIterator {
379 fn new(
380 debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
381 rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
382 signals: Signals,
383 ) -> Self {
384 Self {
385 _debouncer: debouncer,
386 rx: Some(rx),
387 signals,
388 }
389 }
390}
391
392impl Iterator for WatchIterator {
393 type Item = Result<Vec<WatchEvent>, ShellError>;
394
395 fn next(&mut self) -> Option<Self::Item> {
396 let rx = self.rx.as_ref()?;
397 while !self.signals.interrupted() {
398 let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
399 Ok(x) => x,
400 Err(RecvTimeoutError::Timeout) => continue,
401 Err(RecvTimeoutError::Disconnected) => {
402 self.rx = None;
403 return Some(Err(ShellError::GenericError {
404 error: "Disconnected".to_string(),
405 msg: "Unexpected disconnect from file watcher".into(),
406 span: None,
407 help: None,
408 inner: vec![],
409 }));
410 }
411 };
412
413 let Ok(events) = x else {
414 self.rx = None;
415 return Some(Err(ShellError::GenericError {
416 error: "Receiving events failed".to_string(),
417 msg: "Unexpected errors when receiving events".into(),
418 span: None,
419 help: None,
420 inner: vec![],
421 }));
422 };
423
424 let watch_events = events
425 .into_iter()
426 .filter_map(|ev| WatchEvent::try_from(ev).ok())
427 .collect::<Vec<_>>();
428
429 return Some(Ok(watch_events));
430 }
431 self.rx = None;
432 None
433 }
434}