kanban-persistence 0.7.0

Persistence traits and shared types for the kanban project management tool
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
use crate::traits::{ChangeDetector, ChangeEvent};
use crate::PersistenceResult;
use chrono::Utc;
use notify::{RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::Mutex as TokioMutex;

/// File system watcher for detecting changes to the persistence file
/// Uses the `notify` crate for cross-platform file watching
/// Spawns the watcher in a tokio task to handle the Send requirement
///
/// # Future Directory Format Support
///
/// This implementation is currently designed for single-file JSON persistence
/// but can be extended to support directory-based formats by:
///
/// 1. Adding a `WatchTarget` enum to distinguish between `File(path)` and `Directory(path, pattern)`
/// 2. For directory watching, use `RecursiveMode::Recursive` instead of `NonRecursive`
/// 3. Add glob pattern filtering to the event handler to match specific file extensions
/// 4. Implement event debouncing (e.g., 100ms window) to batch rapid file changes
/// 5. The OS-native backends (inotify, FSEvents, ReadDirectoryChangesW) efficiently
///    handle watching directories with hundreds of files, incurring negligible overhead
///
/// Example future usage:
/// ```ignore
/// let watcher = FileWatcher::new();
/// watcher.start_watching(WatchTarget::Directory("./data".into(), "*.json")).await?;
/// // Efficiently watches all JSON files in directory and subdirectories
/// ```
#[derive(Clone)]
pub struct FileWatcher {
    tx: broadcast::Sender<ChangeEvent>,
    task_handle: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
    suppress_remaining: Arc<AtomicUsize>,
}

impl FileWatcher {
    /// Create a new file watcher
    /// The broadcast channel has a buffer size of 10
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(10);
        Self {
            tx,
            task_handle: Arc::new(TokioMutex::new(None)),
            suppress_remaining: Arc::new(AtomicUsize::new(0)),
        }
    }

    /// Returns the number of events that will still be suppressed.
    ///
    /// Intended for tests only; not part of the stable API.
    #[doc(hidden)]
    pub fn suppress_remaining(&self) -> usize {
        self.suppress_remaining.load(Ordering::SeqCst)
    }

    /// Suppress the next 2 own-write events.
    ///
    /// Call immediately before each atomic rename. Each OS event from that
    /// rename decrements the counter; when the counter reaches 0, subsequent
    /// events are delivered normally. Using 2 is conservative — Linux fires
    /// only 1 event per rename on the target path (after `has_our_file`
    /// filtering), so the counter is typically fully consumed by 1 event.
    pub fn suppress_next_event(&self) {
        self.suppress_remaining.store(2, Ordering::SeqCst);
        tracing::debug!("File watcher suppress counter set to 2");
    }
}

impl Default for FileWatcher {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait::async_trait]
impl ChangeDetector for FileWatcher {
    async fn start_watching(&self, path: PathBuf) -> PersistenceResult<()> {
        let tx = self.tx.clone();
        let task_handle = self.task_handle.clone();
        let suppress_remaining = self.suppress_remaining.clone();

        // Canonicalize to absolute path so it matches OS event paths
        let canonical_path = tokio::fs::canonicalize(&path).await?;

        // Spawn file watching in a background task
        let handle = tokio::spawn(async move {
            let parent = canonical_path
                .parent()
                .expect("Canonicalized path should always have parent")
                .to_path_buf();
            let watch_path = canonical_path.clone();
            let suppress_remaining_clone = suppress_remaining.clone();

            match notify::recommended_watcher(move |res: notify::Result<notify::Event>| match res {
                Ok(event) => {
                    let is_relevant_event = matches!(
                        event.kind,
                        notify::EventKind::Modify(notify::event::ModifyKind::Data(
                            notify::event::DataChange::Content,
                        )) | notify::EventKind::Modify(notify::event::ModifyKind::Name(_),)
                            | notify::EventKind::Create(_)
                            | notify::EventKind::Remove(_)
                    );

                    let has_our_file = event.paths.iter().any(|p| p == &watch_path);

                    if is_relevant_event {
                        tracing::debug!(
                            "File system event detected: kind={:?}, paths={:?}, has_our_file={}",
                            event.kind,
                            event.paths,
                            has_our_file
                        );
                    }

                    if is_relevant_event && has_our_file {
                        let suppressed = suppress_remaining_clone
                            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
                            .is_ok();
                        if suppressed {
                            tracing::debug!(
                                "Own-write event suppressed (counter): kind={:?}, path={}",
                                event.kind,
                                watch_path.display()
                            );
                            return;
                        }

                        tracing::debug!(
                            "File event detected: kind={:?}, path={}, our_file_exists={}",
                            event.kind,
                            watch_path.display(),
                            watch_path.exists()
                        );
                        let change = ChangeEvent {
                            path: watch_path.clone(),
                            detected_at: Utc::now(),
                        };
                        match tx.send(change) {
                            Ok(receiver_count) => {
                                tracing::debug!(
                                    "File change event sent to {} receivers",
                                    receiver_count
                                );
                            }
                            Err(e) => {
                                tracing::warn!("Failed to send file change event: {}", e);
                            }
                        }
                    }
                }
                Err(e) => {
                    tracing::warn!("File watcher error: {}", e);
                }
            }) {
                Ok(mut watcher) => {
                    // Watch parent directory first (better for detecting atomic writes on macOS FSEvents)
                    let watch_result = watcher.watch(&parent, RecursiveMode::NonRecursive);

                    if watch_result.is_err() {
                        // Fallback to watching the file directly if parent watch fails
                        if let Err(e) = watcher.watch(&canonical_path, RecursiveMode::NonRecursive)
                        {
                            tracing::error!("Failed to watch file or parent directory: {}", e);
                            return;
                        }
                        tracing::info!("Watching file: {}", canonical_path.display());
                    } else {
                        tracing::info!("Watching parent directory: {}", parent.display());
                    }

                    // Keep watcher alive
                    std::future::pending::<()>().await;
                }
                Err(e) => {
                    tracing::error!("Failed to create watcher: {}", e);
                }
            }
        });

        let mut guard = task_handle.lock().await;
        *guard = Some(handle);

        Ok(())
    }

    async fn stop_watching(&self) -> PersistenceResult<()> {
        let mut guard = self.task_handle.lock().await;
        if let Some(handle) = guard.take() {
            handle.abort();
            tracing::info!("Stopped file watching");
        }
        Ok(())
    }

    fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
        self.tx.subscribe()
    }

    fn is_watching(&self) -> bool {
        true
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use tokio::time::{sleep, Duration};

    #[tokio::test]
    async fn test_file_watcher_detects_direct_writes() {
        let dir = tempdir().unwrap();
        let file_path = dir.path().join("test.json");

        // Create initial file
        tokio::fs::write(&file_path, b"initial content")
            .await
            .unwrap();

        let watcher = FileWatcher::new();
        let mut rx = watcher.subscribe();

        watcher.start_watching(file_path.clone()).await.unwrap();

        // Give watcher time to start
        sleep(Duration::from_millis(100)).await;

        // Modify the file with direct write
        tokio::fs::write(&file_path, b"modified content")
            .await
            .unwrap();

        // Wait for change event (with timeout)
        let result = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;

        watcher.stop_watching().await.unwrap();

        // We got an event (timing is platform-dependent, so this might be flaky)
        if let Ok(Ok(event)) = result {
            // Canonicalize both paths to handle platform differences (e.g., macOS /var -> /private/var)
            let expected_path = tokio::fs::canonicalize(&file_path)
                .await
                .unwrap_or(file_path.clone());
            let event_path = tokio::fs::canonicalize(&event.path)
                .await
                .unwrap_or(event.path.clone());
            assert_eq!(event_path, expected_path);
        }
    }

    #[tokio::test]
    async fn test_file_watcher_detects_atomic_writes() {
        use std::fs;
        use tempfile::NamedTempFile;

        let dir = tempdir().unwrap();
        let file_path = dir.path().join("test.json");

        // Create initial file
        tokio::fs::write(&file_path, b"initial content")
            .await
            .unwrap();

        let watcher = FileWatcher::new();
        let mut rx = watcher.subscribe();

        watcher.start_watching(file_path.clone()).await.unwrap();

        // Give watcher time to start
        sleep(Duration::from_millis(100)).await;

        // Modify file with atomic write pattern (temp → rename)
        let temp_file = NamedTempFile::new_in(dir.path()).unwrap();
        let temp_path = temp_file.path().to_path_buf();
        std::fs::write(&temp_path, b"atomic write content").unwrap();
        fs::rename(&temp_path, &file_path).unwrap();

        // Wait for change event (with timeout)
        let result = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;

        watcher.stop_watching().await.unwrap();

        // We got an event from the atomic write
        if let Ok(Ok(event)) = result {
            let expected_path = tokio::fs::canonicalize(&file_path)
                .await
                .unwrap_or(file_path.clone());
            let event_path = tokio::fs::canonicalize(&event.path)
                .await
                .unwrap_or(event.path.clone());
            assert_eq!(event_path, expected_path);
        }
    }

    /// Pure unit test: `suppress_next_event` loads the counter to 2.
    /// No I/O, no async, no timing.
    #[test]
    fn test_suppress_next_event_sets_counter() {
        let watcher = FileWatcher::new();
        assert_eq!(watcher.suppress_remaining(), 0, "counter must start at 0");
        watcher.suppress_next_event();
        assert_eq!(
            watcher.suppress_remaining(),
            2,
            "suppress_next_event must set counter to 2"
        );
    }

    /// After our own atomic rename, the counter is decremented and no event
    /// reaches the channel.  Replaces the 500 ms timeout test.
    #[tokio::test]
    async fn test_own_write_decrements_suppress_counter() {
        use std::fs;
        use tempfile::NamedTempFile;

        let dir = tempdir().unwrap();
        let file_path = dir.path().join("own.json");
        tokio::fs::write(&file_path, b"initial").await.unwrap();

        let watcher = FileWatcher::new();
        let mut rx = watcher.subscribe();
        watcher.start_watching(file_path.clone()).await.unwrap();
        sleep(Duration::from_millis(100)).await;

        watcher.suppress_next_event();
        assert_eq!(watcher.suppress_remaining(), 2);

        let temp = NamedTempFile::new_in(dir.path()).unwrap();
        std::fs::write(temp.path(), b"own write").unwrap();
        fs::rename(temp.path(), &file_path).unwrap();

        // Give the OS time to deliver the event to the handler.
        sleep(Duration::from_millis(150)).await;

        assert!(
            watcher.suppress_remaining() < 2,
            "counter must have been decremented by the OS event"
        );
        // No event must have been forwarded to subscribers.
        let result = rx.try_recv();
        assert!(
            result.is_err(),
            "no event should reach the channel for an own write; got: {:?}",
            result
        );

        watcher.stop_watching().await.unwrap();
    }

    /// After the counter is exhausted, a subsequent external write IS delivered.
    /// Guards against a "counter stuck at MAX" regression.
    #[tokio::test]
    async fn test_external_write_delivered_after_counter_exhausted() {
        use std::fs;
        use tempfile::NamedTempFile;

        let dir = tempdir().unwrap();
        let file_path = dir.path().join("external.json");
        tokio::fs::write(&file_path, b"initial").await.unwrap();

        let watcher = FileWatcher::new();
        let mut rx = watcher.subscribe();
        watcher.start_watching(file_path.clone()).await.unwrap();
        sleep(Duration::from_millis(100)).await;

        // Own write — suppress counter counts it down.
        watcher.suppress_next_event();
        let temp = NamedTempFile::new_in(dir.path()).unwrap();
        std::fs::write(temp.path(), b"own write").unwrap();
        fs::rename(temp.path(), &file_path).unwrap();
        sleep(Duration::from_millis(150)).await;

        // Second rename simulates an external write after counter is exhausted.
        let temp2 = NamedTempFile::new_in(dir.path()).unwrap();
        std::fs::write(temp2.path(), b"external write").unwrap();
        fs::rename(temp2.path(), &file_path).unwrap();

        let result = tokio::time::timeout(Duration::from_millis(300), rx.recv()).await;
        watcher.stop_watching().await.unwrap();

        assert!(
            result.is_ok(),
            "external write after counter is exhausted must fire an event, got: {:?}",
            result
        );
    }

    #[tokio::test]
    async fn test_file_watcher_does_not_fire_for_unrelated_temp_file() {
        use tempfile::NamedTempFile;

        let dir = tempdir().unwrap();
        let file_path = dir.path().join("test.json");

        // Create the watched file
        tokio::fs::write(&file_path, b"initial content")
            .await
            .unwrap();

        let watcher = FileWatcher::new();
        let mut rx = watcher.subscribe();

        watcher.start_watching(file_path.clone()).await.unwrap();

        // Give watcher time to start
        sleep(Duration::from_millis(100)).await;

        // Create a temp file in the SAME directory but do NOT rename it to test.json
        let temp_file = NamedTempFile::new_in(dir.path()).unwrap();
        std::fs::write(temp_file.path(), b"unrelated content").unwrap();

        // No event should be emitted — the temp file is not our watched path
        let result = tokio::time::timeout(Duration::from_millis(500), rx.recv()).await;

        watcher.stop_watching().await.unwrap();

        assert!(
            result.is_err(),
            "Expected timeout (no event), but got: {:?}",
            result
        );
    }
}