Skip to main content

neco_watchnorm/
lib.rs

1//! Host-agnostic file watcher event normalization and batch coalescing.
2
3/// Host-level watcher kind before normalization.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum RawWatchKind {
6    Create,
7    Remove,
8    Modify,
9    Rename,
10}
11
12/// Optional rename-side hint carried by the host event.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RenameHint {
15    From,
16    To,
17    Both,
18    Any,
19}
20
21/// Host-agnostic watcher input.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct RawWatchEvent {
24    pub kind: RawWatchKind,
25    pub paths: Vec<String>,
26    pub rename_from: Option<String>,
27    pub rename_to: Option<String>,
28    pub rename_hint: Option<RenameHint>,
29    pub generation: u64,
30}
31
32/// Consumer-facing normalized watcher event.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct NormalizedWatchEvent {
35    pub generation: u64,
36    pub kind: NormalizedWatchKind,
37}
38
39/// Consumer-facing normalized watcher kind.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum NormalizedWatchKind {
42    Create {
43        path: String,
44    },
45    Remove {
46        path: String,
47    },
48    Modify {
49        path: String,
50    },
51    Rename {
52        from: String,
53        to: String,
54    },
55    PartialRename {
56        from: Option<String>,
57        to: Option<String>,
58    },
59}
60
61/// Result returned from a batch drain.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct FlushResult {
64    pub events: Vec<NormalizedWatchEvent>,
65    pub discarded_stale: usize,
66}
67
68/// Stateful batch normalizer for watcher events.
69#[derive(Debug, Default, Clone)]
70pub struct WatchBatchNormalizer {
71    pending: Vec<RawWatchEvent>,
72}
73
74impl WatchBatchNormalizer {
75    /// Create an empty normalizer.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Queue a raw host event for the next drain.
81    pub fn push(&mut self, event: RawWatchEvent) {
82        self.pending.push(event);
83    }
84
85    /// Normalize and coalesce queued events.
86    pub fn drain(&mut self, current_generation: u64) -> FlushResult {
87        let pending = core::mem::take(&mut self.pending);
88        let mut discarded_stale = 0usize;
89        let mut normalized = Vec::new();
90        let mut pending_rename_from: Option<(u64, String)> = None;
91
92        for event in pending {
93            if event.generation < current_generation {
94                discarded_stale += 1;
95                continue;
96            }
97
98            match event.kind {
99                RawWatchKind::Create => {
100                    for path in event.paths {
101                        normalized.push(NormalizedWatchEvent {
102                            generation: event.generation,
103                            kind: NormalizedWatchKind::Create { path },
104                        });
105                    }
106                }
107                RawWatchKind::Remove => {
108                    for path in event.paths {
109                        normalized.push(NormalizedWatchEvent {
110                            generation: event.generation,
111                            kind: NormalizedWatchKind::Remove { path },
112                        });
113                    }
114                }
115                RawWatchKind::Modify => {
116                    for path in event.paths {
117                        normalized.push(NormalizedWatchEvent {
118                            generation: event.generation,
119                            kind: NormalizedWatchKind::Modify { path },
120                        });
121                    }
122                }
123                RawWatchKind::Rename => {
124                    normalize_rename_event(event, &mut pending_rename_from, &mut normalized);
125                }
126            }
127        }
128
129        if let Some((generation, from)) = pending_rename_from.take() {
130            normalized.push(NormalizedWatchEvent {
131                generation,
132                kind: NormalizedWatchKind::PartialRename {
133                    from: Some(from),
134                    to: None,
135                },
136            });
137        }
138
139        FlushResult {
140            events: coalesce_events(normalized),
141            discarded_stale,
142        }
143    }
144}
145
146fn normalize_rename_event(
147    event: RawWatchEvent,
148    pending_rename_from: &mut Option<(u64, String)>,
149    normalized: &mut Vec<NormalizedWatchEvent>,
150) {
151    let generation = event.generation;
152    let (rename_from, rename_to) = resolve_rename_paths(&event);
153
154    match (rename_from, rename_to) {
155        (Some(from), Some(to)) => {
156            if let Some((pending_generation, pending_from)) = pending_rename_from.take() {
157                normalized.push(NormalizedWatchEvent {
158                    generation: pending_generation,
159                    kind: NormalizedWatchKind::PartialRename {
160                        from: Some(pending_from),
161                        to: None,
162                    },
163                });
164            }
165            normalized.push(NormalizedWatchEvent {
166                generation,
167                kind: NormalizedWatchKind::Rename { from, to },
168            });
169        }
170        (Some(from), None) => {
171            if let Some((pending_generation, pending_from)) =
172                pending_rename_from.replace((generation, from))
173            {
174                normalized.push(NormalizedWatchEvent {
175                    generation: pending_generation,
176                    kind: NormalizedWatchKind::PartialRename {
177                        from: Some(pending_from),
178                        to: None,
179                    },
180                });
181            }
182        }
183        (None, Some(to)) => {
184            if let Some((pending_generation, from)) = pending_rename_from.take() {
185                if pending_generation == generation {
186                    normalized.push(NormalizedWatchEvent {
187                        generation,
188                        kind: NormalizedWatchKind::Rename { from, to },
189                    });
190                } else {
191                    normalized.push(NormalizedWatchEvent {
192                        generation: pending_generation,
193                        kind: NormalizedWatchKind::PartialRename {
194                            from: Some(from),
195                            to: None,
196                        },
197                    });
198                    normalized.push(NormalizedWatchEvent {
199                        generation,
200                        kind: NormalizedWatchKind::PartialRename {
201                            from: None,
202                            to: Some(to),
203                        },
204                    });
205                }
206            } else {
207                normalized.push(NormalizedWatchEvent {
208                    generation,
209                    kind: NormalizedWatchKind::PartialRename {
210                        from: None,
211                        to: Some(to),
212                    },
213                });
214            }
215        }
216        (None, None) => {}
217    }
218}
219
220fn resolve_rename_paths(event: &RawWatchEvent) -> (Option<String>, Option<String>) {
221    let explicit_from = event.rename_from.clone();
222    let explicit_to = event.rename_to.clone();
223    if explicit_from.is_some() || explicit_to.is_some() {
224        return (explicit_from, explicit_to);
225    }
226
227    match event.rename_hint {
228        Some(RenameHint::From) => (event.paths.first().cloned(), None),
229        Some(RenameHint::To) => (None, event.paths.first().cloned()),
230        Some(RenameHint::Both) | Some(RenameHint::Any) | None => {
231            if event.paths.len() > 1 {
232                (event.paths.first().cloned(), event.paths.get(1).cloned())
233            } else {
234                (None, None)
235            }
236        }
237    }
238}
239
240fn coalesce_events(events: Vec<NormalizedWatchEvent>) -> Vec<NormalizedWatchEvent> {
241    let mut coalesced = Vec::with_capacity(events.len());
242
243    for event in events {
244        match &event.kind {
245            NormalizedWatchKind::Modify { path } => {
246                if last_is_modify_for_path(&coalesced, path) {
247                    continue;
248                }
249                if last_is_create_for_path(&coalesced, path) {
250                    continue;
251                }
252                coalesced.push(event);
253            }
254            NormalizedWatchKind::Remove { path } => {
255                coalesced.retain(|item| {
256                    !matches!(&item.kind, NormalizedWatchKind::Modify { path: last_path } if last_path == path)
257                });
258                coalesced.push(event);
259            }
260            _ => coalesced.push(event),
261        }
262    }
263
264    coalesced
265}
266
267fn last_is_modify_for_path(events: &[NormalizedWatchEvent], path: &str) -> bool {
268    matches!(
269        events.last(),
270        Some(NormalizedWatchEvent {
271            kind: NormalizedWatchKind::Modify { path: last_path },
272            ..
273        }) if last_path == path
274    )
275}
276
277fn last_is_create_for_path(events: &[NormalizedWatchEvent], path: &str) -> bool {
278    matches!(
279        events.last(),
280        Some(NormalizedWatchEvent {
281            kind: NormalizedWatchKind::Create { path: last_path },
282            ..
283        }) if last_path == path
284    )
285}
286
287#[cfg(test)]
288mod tests {
289    use super::{
290        FlushResult, NormalizedWatchEvent, NormalizedWatchKind, RawWatchEvent, RawWatchKind,
291        RenameHint, WatchBatchNormalizer,
292    };
293
294    #[test]
295    fn rename_both_normalizes_into_single_rename() {
296        let mut normalizer = WatchBatchNormalizer::new();
297        normalizer.push(raw_rename(
298            4,
299            vec!["/old.txt", "/new.txt"],
300            None,
301            None,
302            Some(RenameHint::Both),
303        ));
304
305        let result = normalizer.drain(4);
306
307        assert_eq!(
308            result.events,
309            vec![normalized(
310                4,
311                NormalizedWatchKind::Rename {
312                    from: "/old.txt".into(),
313                    to: "/new.txt".into(),
314                },
315            )]
316        );
317        assert_eq!(result.discarded_stale, 0);
318    }
319
320    #[test]
321    fn rename_from_and_to_are_joined_within_one_drain() {
322        let mut normalizer = WatchBatchNormalizer::new();
323        normalizer.push(raw_rename(
324            7,
325            vec!["/from.txt"],
326            Some("/from.txt"),
327            None,
328            Some(RenameHint::From),
329        ));
330        normalizer.push(raw_rename(
331            7,
332            vec!["/to.txt"],
333            None,
334            Some("/to.txt"),
335            Some(RenameHint::To),
336        ));
337
338        let result = normalizer.drain(7);
339
340        assert_eq!(
341            result.events,
342            vec![normalized(
343                7,
344                NormalizedWatchKind::Rename {
345                    from: "/from.txt".into(),
346                    to: "/to.txt".into(),
347                },
348            )]
349        );
350    }
351
352    #[test]
353    fn unresolved_rename_from_becomes_partial_rename() {
354        let mut normalizer = WatchBatchNormalizer::new();
355        normalizer.push(raw_rename(
356            2,
357            vec!["/ghost.txt"],
358            Some("/ghost.txt"),
359            None,
360            Some(RenameHint::From),
361        ));
362
363        let result = normalizer.drain(2);
364
365        assert_eq!(
366            result.events,
367            vec![normalized(
368                2,
369                NormalizedWatchKind::PartialRename {
370                    from: Some("/ghost.txt".into()),
371                    to: None,
372                },
373            )]
374        );
375    }
376
377    #[test]
378    fn rename_to_without_pending_becomes_partial_rename() {
379        let mut normalizer = WatchBatchNormalizer::new();
380        normalizer.push(raw_rename(
381            3,
382            vec!["/arrived.txt"],
383            None,
384            Some("/arrived.txt"),
385            Some(RenameHint::To),
386        ));
387
388        let result = normalizer.drain(3);
389
390        assert_eq!(
391            result.events,
392            vec![normalized(
393                3,
394                NormalizedWatchKind::PartialRename {
395                    from: None,
396                    to: Some("/arrived.txt".into()),
397                },
398            )]
399        );
400    }
401
402    #[test]
403    fn stale_generations_are_discarded() {
404        let mut normalizer = WatchBatchNormalizer::new();
405        normalizer.push(RawWatchEvent {
406            kind: RawWatchKind::Modify,
407            paths: vec!["/stale.txt".into()],
408            rename_from: None,
409            rename_to: None,
410            rename_hint: None,
411            generation: 4,
412        });
413        normalizer.push(RawWatchEvent {
414            kind: RawWatchKind::Modify,
415            paths: vec!["/fresh.txt".into()],
416            rename_from: None,
417            rename_to: None,
418            rename_hint: None,
419            generation: 5,
420        });
421
422        let result = normalizer.drain(5);
423
424        assert_eq!(
425            result,
426            FlushResult {
427                events: vec![normalized(
428                    5,
429                    NormalizedWatchKind::Modify {
430                        path: "/fresh.txt".into(),
431                    },
432                )],
433                discarded_stale: 1,
434            }
435        );
436    }
437
438    #[test]
439    fn consecutive_modify_events_coalesce_per_path() {
440        let mut normalizer = WatchBatchNormalizer::new();
441        normalizer.push(raw_modify(8, "/same.txt"));
442        normalizer.push(raw_modify(8, "/same.txt"));
443        normalizer.push(raw_modify(8, "/other.txt"));
444
445        let result = normalizer.drain(8);
446
447        assert_eq!(
448            result.events,
449            vec![
450                normalized(
451                    8,
452                    NormalizedWatchKind::Modify {
453                        path: "/same.txt".into(),
454                    },
455                ),
456                normalized(
457                    8,
458                    NormalizedWatchKind::Modify {
459                        path: "/other.txt".into(),
460                    },
461                ),
462            ]
463        );
464    }
465
466    #[test]
467    fn create_absorbs_immediate_modify_for_same_path() {
468        let mut normalizer = WatchBatchNormalizer::new();
469        normalizer.push(RawWatchEvent {
470            kind: RawWatchKind::Create,
471            paths: vec!["/fresh.txt".into()],
472            rename_from: None,
473            rename_to: None,
474            rename_hint: None,
475            generation: 9,
476        });
477        normalizer.push(raw_modify(9, "/fresh.txt"));
478
479        let result = normalizer.drain(9);
480
481        assert_eq!(
482            result.events,
483            vec![normalized(
484                9,
485                NormalizedWatchKind::Create {
486                    path: "/fresh.txt".into(),
487                },
488            )]
489        );
490    }
491
492    #[test]
493    fn remove_drops_pending_modify_for_same_path() {
494        let mut normalizer = WatchBatchNormalizer::new();
495        normalizer.push(raw_modify(10, "/deleted.txt"));
496        normalizer.push(RawWatchEvent {
497            kind: RawWatchKind::Remove,
498            paths: vec!["/deleted.txt".into()],
499            rename_from: None,
500            rename_to: None,
501            rename_hint: None,
502            generation: 10,
503        });
504
505        let result = normalizer.drain(10);
506
507        assert_eq!(
508            result.events,
509            vec![normalized(
510                10,
511                NormalizedWatchKind::Remove {
512                    path: "/deleted.txt".into(),
513                },
514            )]
515        );
516    }
517
518    #[test]
519    fn pending_rename_from_does_not_cross_drain_boundaries() {
520        let mut normalizer = WatchBatchNormalizer::new();
521        normalizer.push(raw_rename(
522            11,
523            vec!["/from.txt"],
524            Some("/from.txt"),
525            None,
526            Some(RenameHint::From),
527        ));
528
529        let first = normalizer.drain(11);
530        assert_eq!(
531            first.events,
532            vec![normalized(
533                11,
534                NormalizedWatchKind::PartialRename {
535                    from: Some("/from.txt".into()),
536                    to: None,
537                },
538            )]
539        );
540
541        normalizer.push(raw_rename(
542            11,
543            vec!["/to.txt"],
544            None,
545            Some("/to.txt"),
546            Some(RenameHint::To),
547        ));
548
549        let second = normalizer.drain(11);
550        assert_eq!(
551            second.events,
552            vec![normalized(
553                11,
554                NormalizedWatchKind::PartialRename {
555                    from: None,
556                    to: Some("/to.txt".into()),
557                },
558            )]
559        );
560    }
561
562    #[test]
563    fn pending_rename_from_does_not_join_to_different_generation_within_one_drain() {
564        let mut normalizer = WatchBatchNormalizer::new();
565        normalizer.push(raw_rename(
566            20,
567            vec!["/from.txt"],
568            Some("/from.txt"),
569            None,
570            Some(RenameHint::From),
571        ));
572        normalizer.push(raw_rename(
573            21,
574            vec!["/to.txt"],
575            None,
576            Some("/to.txt"),
577            Some(RenameHint::To),
578        ));
579
580        let result = normalizer.drain(20);
581
582        assert_eq!(
583            result.events,
584            vec![
585                normalized(
586                    20,
587                    NormalizedWatchKind::PartialRename {
588                        from: Some("/from.txt".into()),
589                        to: None,
590                    },
591                ),
592                normalized(
593                    21,
594                    NormalizedWatchKind::PartialRename {
595                        from: None,
596                        to: Some("/to.txt".into()),
597                    },
598                ),
599            ]
600        );
601    }
602
603    #[test]
604    fn completed_rename_does_not_drop_older_pending_rename_from() {
605        let mut normalizer = WatchBatchNormalizer::new();
606        normalizer.push(raw_rename(
607            12,
608            vec!["/stale-from.txt"],
609            Some("/stale-from.txt"),
610            None,
611            Some(RenameHint::From),
612        ));
613        normalizer.push(raw_rename(
614            12,
615            vec!["/from.txt", "/to.txt"],
616            Some("/from.txt"),
617            Some("/to.txt"),
618            Some(RenameHint::Both),
619        ));
620
621        let result = normalizer.drain(12);
622
623        assert_eq!(
624            result.events,
625            vec![
626                normalized(
627                    12,
628                    NormalizedWatchKind::PartialRename {
629                        from: Some("/stale-from.txt".into()),
630                        to: None,
631                    },
632                ),
633                normalized(
634                    12,
635                    NormalizedWatchKind::Rename {
636                        from: "/from.txt".into(),
637                        to: "/to.txt".into(),
638                    },
639                ),
640            ]
641        );
642    }
643
644    fn raw_rename(
645        generation: u64,
646        paths: Vec<&str>,
647        rename_from: Option<&str>,
648        rename_to: Option<&str>,
649        rename_hint: Option<RenameHint>,
650    ) -> RawWatchEvent {
651        RawWatchEvent {
652            kind: RawWatchKind::Rename,
653            paths: paths.into_iter().map(str::to_owned).collect(),
654            rename_from: rename_from.map(str::to_owned),
655            rename_to: rename_to.map(str::to_owned),
656            rename_hint,
657            generation,
658        }
659    }
660
661    fn raw_modify(generation: u64, path: &str) -> RawWatchEvent {
662        RawWatchEvent {
663            kind: RawWatchKind::Modify,
664            paths: vec![path.into()],
665            rename_from: None,
666            rename_to: None,
667            rename_hint: None,
668            generation,
669        }
670    }
671
672    fn normalized(generation: u64, kind: NormalizedWatchKind) -> NormalizedWatchEvent {
673        NormalizedWatchEvent { generation, kind }
674    }
675}