1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum RawWatchKind {
6 Create,
7 Remove,
8 Modify,
9 Rename,
10}
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RenameHint {
15 From,
16 To,
17 Both,
18 Any,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct RawWatchEvent {
24 pub kind: RawWatchKind,
25 pub paths: Vec<String>,
26 pub rename_from: Option<String>,
27 pub rename_to: Option<String>,
28 pub rename_hint: Option<RenameHint>,
29 pub generation: u64,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct NormalizedWatchEvent {
35 pub generation: u64,
36 pub kind: NormalizedWatchKind,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum NormalizedWatchKind {
42 Create {
43 path: String,
44 },
45 Remove {
46 path: String,
47 },
48 Modify {
49 path: String,
50 },
51 Rename {
52 from: String,
53 to: String,
54 },
55 PartialRename {
56 from: Option<String>,
57 to: Option<String>,
58 },
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct FlushResult {
64 pub events: Vec<NormalizedWatchEvent>,
65 pub discarded_stale: usize,
66}
67
68#[derive(Debug, Default, Clone)]
70pub struct WatchBatchNormalizer {
71 pending: Vec<RawWatchEvent>,
72}
73
74impl WatchBatchNormalizer {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn push(&mut self, event: RawWatchEvent) {
82 self.pending.push(event);
83 }
84
85 pub fn drain(&mut self, current_generation: u64) -> FlushResult {
87 let pending = core::mem::take(&mut self.pending);
88 let mut discarded_stale = 0usize;
89 let mut normalized = Vec::new();
90 let mut pending_rename_from: Option<(u64, String)> = None;
91
92 for event in pending {
93 if event.generation < current_generation {
94 discarded_stale += 1;
95 continue;
96 }
97
98 match event.kind {
99 RawWatchKind::Create => {
100 for path in event.paths {
101 normalized.push(NormalizedWatchEvent {
102 generation: event.generation,
103 kind: NormalizedWatchKind::Create { path },
104 });
105 }
106 }
107 RawWatchKind::Remove => {
108 for path in event.paths {
109 normalized.push(NormalizedWatchEvent {
110 generation: event.generation,
111 kind: NormalizedWatchKind::Remove { path },
112 });
113 }
114 }
115 RawWatchKind::Modify => {
116 for path in event.paths {
117 normalized.push(NormalizedWatchEvent {
118 generation: event.generation,
119 kind: NormalizedWatchKind::Modify { path },
120 });
121 }
122 }
123 RawWatchKind::Rename => {
124 normalize_rename_event(event, &mut pending_rename_from, &mut normalized);
125 }
126 }
127 }
128
129 if let Some((generation, from)) = pending_rename_from.take() {
130 normalized.push(NormalizedWatchEvent {
131 generation,
132 kind: NormalizedWatchKind::PartialRename {
133 from: Some(from),
134 to: None,
135 },
136 });
137 }
138
139 FlushResult {
140 events: coalesce_events(normalized),
141 discarded_stale,
142 }
143 }
144}
145
146fn normalize_rename_event(
147 event: RawWatchEvent,
148 pending_rename_from: &mut Option<(u64, String)>,
149 normalized: &mut Vec<NormalizedWatchEvent>,
150) {
151 let generation = event.generation;
152 let (rename_from, rename_to) = resolve_rename_paths(&event);
153
154 match (rename_from, rename_to) {
155 (Some(from), Some(to)) => {
156 if let Some((pending_generation, pending_from)) = pending_rename_from.take() {
157 normalized.push(NormalizedWatchEvent {
158 generation: pending_generation,
159 kind: NormalizedWatchKind::PartialRename {
160 from: Some(pending_from),
161 to: None,
162 },
163 });
164 }
165 normalized.push(NormalizedWatchEvent {
166 generation,
167 kind: NormalizedWatchKind::Rename { from, to },
168 });
169 }
170 (Some(from), None) => {
171 if let Some((pending_generation, pending_from)) =
172 pending_rename_from.replace((generation, from))
173 {
174 normalized.push(NormalizedWatchEvent {
175 generation: pending_generation,
176 kind: NormalizedWatchKind::PartialRename {
177 from: Some(pending_from),
178 to: None,
179 },
180 });
181 }
182 }
183 (None, Some(to)) => {
184 if let Some((pending_generation, from)) = pending_rename_from.take() {
185 if pending_generation == generation {
186 normalized.push(NormalizedWatchEvent {
187 generation,
188 kind: NormalizedWatchKind::Rename { from, to },
189 });
190 } else {
191 normalized.push(NormalizedWatchEvent {
192 generation: pending_generation,
193 kind: NormalizedWatchKind::PartialRename {
194 from: Some(from),
195 to: None,
196 },
197 });
198 normalized.push(NormalizedWatchEvent {
199 generation,
200 kind: NormalizedWatchKind::PartialRename {
201 from: None,
202 to: Some(to),
203 },
204 });
205 }
206 } else {
207 normalized.push(NormalizedWatchEvent {
208 generation,
209 kind: NormalizedWatchKind::PartialRename {
210 from: None,
211 to: Some(to),
212 },
213 });
214 }
215 }
216 (None, None) => {}
217 }
218}
219
220fn resolve_rename_paths(event: &RawWatchEvent) -> (Option<String>, Option<String>) {
221 let explicit_from = event.rename_from.clone();
222 let explicit_to = event.rename_to.clone();
223 if explicit_from.is_some() || explicit_to.is_some() {
224 return (explicit_from, explicit_to);
225 }
226
227 match event.rename_hint {
228 Some(RenameHint::From) => (event.paths.first().cloned(), None),
229 Some(RenameHint::To) => (None, event.paths.first().cloned()),
230 Some(RenameHint::Both) | Some(RenameHint::Any) | None => {
231 if event.paths.len() > 1 {
232 (event.paths.first().cloned(), event.paths.get(1).cloned())
233 } else {
234 (None, None)
235 }
236 }
237 }
238}
239
240fn coalesce_events(events: Vec<NormalizedWatchEvent>) -> Vec<NormalizedWatchEvent> {
241 let mut coalesced = Vec::with_capacity(events.len());
242
243 for event in events {
244 match &event.kind {
245 NormalizedWatchKind::Modify { path } => {
246 if last_is_modify_for_path(&coalesced, path) {
247 continue;
248 }
249 if last_is_create_for_path(&coalesced, path) {
250 continue;
251 }
252 coalesced.push(event);
253 }
254 NormalizedWatchKind::Remove { path } => {
255 coalesced.retain(|item| {
256 !matches!(&item.kind, NormalizedWatchKind::Modify { path: last_path } if last_path == path)
257 });
258 coalesced.push(event);
259 }
260 _ => coalesced.push(event),
261 }
262 }
263
264 coalesced
265}
266
267fn last_is_modify_for_path(events: &[NormalizedWatchEvent], path: &str) -> bool {
268 matches!(
269 events.last(),
270 Some(NormalizedWatchEvent {
271 kind: NormalizedWatchKind::Modify { path: last_path },
272 ..
273 }) if last_path == path
274 )
275}
276
277fn last_is_create_for_path(events: &[NormalizedWatchEvent], path: &str) -> bool {
278 matches!(
279 events.last(),
280 Some(NormalizedWatchEvent {
281 kind: NormalizedWatchKind::Create { path: last_path },
282 ..
283 }) if last_path == path
284 )
285}
286
287#[cfg(test)]
288mod tests {
289 use super::{
290 FlushResult, NormalizedWatchEvent, NormalizedWatchKind, RawWatchEvent, RawWatchKind,
291 RenameHint, WatchBatchNormalizer,
292 };
293
294 #[test]
295 fn rename_both_normalizes_into_single_rename() {
296 let mut normalizer = WatchBatchNormalizer::new();
297 normalizer.push(raw_rename(
298 4,
299 vec!["/old.txt", "/new.txt"],
300 None,
301 None,
302 Some(RenameHint::Both),
303 ));
304
305 let result = normalizer.drain(4);
306
307 assert_eq!(
308 result.events,
309 vec![normalized(
310 4,
311 NormalizedWatchKind::Rename {
312 from: "/old.txt".into(),
313 to: "/new.txt".into(),
314 },
315 )]
316 );
317 assert_eq!(result.discarded_stale, 0);
318 }
319
320 #[test]
321 fn rename_from_and_to_are_joined_within_one_drain() {
322 let mut normalizer = WatchBatchNormalizer::new();
323 normalizer.push(raw_rename(
324 7,
325 vec!["/from.txt"],
326 Some("/from.txt"),
327 None,
328 Some(RenameHint::From),
329 ));
330 normalizer.push(raw_rename(
331 7,
332 vec!["/to.txt"],
333 None,
334 Some("/to.txt"),
335 Some(RenameHint::To),
336 ));
337
338 let result = normalizer.drain(7);
339
340 assert_eq!(
341 result.events,
342 vec![normalized(
343 7,
344 NormalizedWatchKind::Rename {
345 from: "/from.txt".into(),
346 to: "/to.txt".into(),
347 },
348 )]
349 );
350 }
351
352 #[test]
353 fn unresolved_rename_from_becomes_partial_rename() {
354 let mut normalizer = WatchBatchNormalizer::new();
355 normalizer.push(raw_rename(
356 2,
357 vec!["/ghost.txt"],
358 Some("/ghost.txt"),
359 None,
360 Some(RenameHint::From),
361 ));
362
363 let result = normalizer.drain(2);
364
365 assert_eq!(
366 result.events,
367 vec![normalized(
368 2,
369 NormalizedWatchKind::PartialRename {
370 from: Some("/ghost.txt".into()),
371 to: None,
372 },
373 )]
374 );
375 }
376
377 #[test]
378 fn rename_to_without_pending_becomes_partial_rename() {
379 let mut normalizer = WatchBatchNormalizer::new();
380 normalizer.push(raw_rename(
381 3,
382 vec!["/arrived.txt"],
383 None,
384 Some("/arrived.txt"),
385 Some(RenameHint::To),
386 ));
387
388 let result = normalizer.drain(3);
389
390 assert_eq!(
391 result.events,
392 vec![normalized(
393 3,
394 NormalizedWatchKind::PartialRename {
395 from: None,
396 to: Some("/arrived.txt".into()),
397 },
398 )]
399 );
400 }
401
402 #[test]
403 fn stale_generations_are_discarded() {
404 let mut normalizer = WatchBatchNormalizer::new();
405 normalizer.push(RawWatchEvent {
406 kind: RawWatchKind::Modify,
407 paths: vec!["/stale.txt".into()],
408 rename_from: None,
409 rename_to: None,
410 rename_hint: None,
411 generation: 4,
412 });
413 normalizer.push(RawWatchEvent {
414 kind: RawWatchKind::Modify,
415 paths: vec!["/fresh.txt".into()],
416 rename_from: None,
417 rename_to: None,
418 rename_hint: None,
419 generation: 5,
420 });
421
422 let result = normalizer.drain(5);
423
424 assert_eq!(
425 result,
426 FlushResult {
427 events: vec![normalized(
428 5,
429 NormalizedWatchKind::Modify {
430 path: "/fresh.txt".into(),
431 },
432 )],
433 discarded_stale: 1,
434 }
435 );
436 }
437
438 #[test]
439 fn consecutive_modify_events_coalesce_per_path() {
440 let mut normalizer = WatchBatchNormalizer::new();
441 normalizer.push(raw_modify(8, "/same.txt"));
442 normalizer.push(raw_modify(8, "/same.txt"));
443 normalizer.push(raw_modify(8, "/other.txt"));
444
445 let result = normalizer.drain(8);
446
447 assert_eq!(
448 result.events,
449 vec![
450 normalized(
451 8,
452 NormalizedWatchKind::Modify {
453 path: "/same.txt".into(),
454 },
455 ),
456 normalized(
457 8,
458 NormalizedWatchKind::Modify {
459 path: "/other.txt".into(),
460 },
461 ),
462 ]
463 );
464 }
465
466 #[test]
467 fn create_absorbs_immediate_modify_for_same_path() {
468 let mut normalizer = WatchBatchNormalizer::new();
469 normalizer.push(RawWatchEvent {
470 kind: RawWatchKind::Create,
471 paths: vec!["/fresh.txt".into()],
472 rename_from: None,
473 rename_to: None,
474 rename_hint: None,
475 generation: 9,
476 });
477 normalizer.push(raw_modify(9, "/fresh.txt"));
478
479 let result = normalizer.drain(9);
480
481 assert_eq!(
482 result.events,
483 vec![normalized(
484 9,
485 NormalizedWatchKind::Create {
486 path: "/fresh.txt".into(),
487 },
488 )]
489 );
490 }
491
492 #[test]
493 fn remove_drops_pending_modify_for_same_path() {
494 let mut normalizer = WatchBatchNormalizer::new();
495 normalizer.push(raw_modify(10, "/deleted.txt"));
496 normalizer.push(RawWatchEvent {
497 kind: RawWatchKind::Remove,
498 paths: vec!["/deleted.txt".into()],
499 rename_from: None,
500 rename_to: None,
501 rename_hint: None,
502 generation: 10,
503 });
504
505 let result = normalizer.drain(10);
506
507 assert_eq!(
508 result.events,
509 vec![normalized(
510 10,
511 NormalizedWatchKind::Remove {
512 path: "/deleted.txt".into(),
513 },
514 )]
515 );
516 }
517
518 #[test]
519 fn pending_rename_from_does_not_cross_drain_boundaries() {
520 let mut normalizer = WatchBatchNormalizer::new();
521 normalizer.push(raw_rename(
522 11,
523 vec!["/from.txt"],
524 Some("/from.txt"),
525 None,
526 Some(RenameHint::From),
527 ));
528
529 let first = normalizer.drain(11);
530 assert_eq!(
531 first.events,
532 vec![normalized(
533 11,
534 NormalizedWatchKind::PartialRename {
535 from: Some("/from.txt".into()),
536 to: None,
537 },
538 )]
539 );
540
541 normalizer.push(raw_rename(
542 11,
543 vec!["/to.txt"],
544 None,
545 Some("/to.txt"),
546 Some(RenameHint::To),
547 ));
548
549 let second = normalizer.drain(11);
550 assert_eq!(
551 second.events,
552 vec![normalized(
553 11,
554 NormalizedWatchKind::PartialRename {
555 from: None,
556 to: Some("/to.txt".into()),
557 },
558 )]
559 );
560 }
561
562 #[test]
563 fn pending_rename_from_does_not_join_to_different_generation_within_one_drain() {
564 let mut normalizer = WatchBatchNormalizer::new();
565 normalizer.push(raw_rename(
566 20,
567 vec!["/from.txt"],
568 Some("/from.txt"),
569 None,
570 Some(RenameHint::From),
571 ));
572 normalizer.push(raw_rename(
573 21,
574 vec!["/to.txt"],
575 None,
576 Some("/to.txt"),
577 Some(RenameHint::To),
578 ));
579
580 let result = normalizer.drain(20);
581
582 assert_eq!(
583 result.events,
584 vec![
585 normalized(
586 20,
587 NormalizedWatchKind::PartialRename {
588 from: Some("/from.txt".into()),
589 to: None,
590 },
591 ),
592 normalized(
593 21,
594 NormalizedWatchKind::PartialRename {
595 from: None,
596 to: Some("/to.txt".into()),
597 },
598 ),
599 ]
600 );
601 }
602
603 #[test]
604 fn completed_rename_does_not_drop_older_pending_rename_from() {
605 let mut normalizer = WatchBatchNormalizer::new();
606 normalizer.push(raw_rename(
607 12,
608 vec!["/stale-from.txt"],
609 Some("/stale-from.txt"),
610 None,
611 Some(RenameHint::From),
612 ));
613 normalizer.push(raw_rename(
614 12,
615 vec!["/from.txt", "/to.txt"],
616 Some("/from.txt"),
617 Some("/to.txt"),
618 Some(RenameHint::Both),
619 ));
620
621 let result = normalizer.drain(12);
622
623 assert_eq!(
624 result.events,
625 vec![
626 normalized(
627 12,
628 NormalizedWatchKind::PartialRename {
629 from: Some("/stale-from.txt".into()),
630 to: None,
631 },
632 ),
633 normalized(
634 12,
635 NormalizedWatchKind::Rename {
636 from: "/from.txt".into(),
637 to: "/to.txt".into(),
638 },
639 ),
640 ]
641 );
642 }
643
644 fn raw_rename(
645 generation: u64,
646 paths: Vec<&str>,
647 rename_from: Option<&str>,
648 rename_to: Option<&str>,
649 rename_hint: Option<RenameHint>,
650 ) -> RawWatchEvent {
651 RawWatchEvent {
652 kind: RawWatchKind::Rename,
653 paths: paths.into_iter().map(str::to_owned).collect(),
654 rename_from: rename_from.map(str::to_owned),
655 rename_to: rename_to.map(str::to_owned),
656 rename_hint,
657 generation,
658 }
659 }
660
661 fn raw_modify(generation: u64, path: &str) -> RawWatchEvent {
662 RawWatchEvent {
663 kind: RawWatchKind::Modify,
664 paths: vec![path.into()],
665 rename_from: None,
666 rename_to: None,
667 rename_hint: None,
668 generation,
669 }
670 }
671
672 fn normalized(generation: u64, kind: NormalizedWatchKind) -> NormalizedWatchEvent {
673 NormalizedWatchEvent { generation, kind }
674 }
675}