Skip to main content

dampen_dev/
subscription.rs

1//! Iced subscription integration for file watching
2//!
3//! This module provides an Iced subscription that bridges file system events
4//! from the notify crate into Iced's async message system.
5
6use crate::watcher::{FileWatcher, FileWatcherConfig};
7use dampen_core::ir::DampenDocument;
8use dampen_core::parser;
9use dampen_core::parser::error::ParseError;
10
11use iced::Subscription;
12use iced::advanced::subscription::{EventStream, Hasher, Recipe};
13use std::hash::Hash;
14use std::path::PathBuf;
15use tokio::sync::mpsc;
16use tokio_stream::wrappers::ReceiverStream;
17
18/// Domain event for file watcher subscription output
19#[derive(Debug, Clone)]
20pub enum FileEvent {
21    /// File changed and parsed successfully
22    Success {
23        /// Path to the changed file
24        path: PathBuf,
25        /// Parsed document (boxed to reduce enum size)
26        document: Box<DampenDocument>,
27    },
28
29    /// Parse error (XML syntax or validation)
30    ParseError {
31        /// Path to the file with error
32        path: PathBuf,
33        /// Parse error details
34        error: ParseError,
35        /// File content for error overlay display
36        content: String,
37    },
38
39    /// File watcher error (permissions, deleted file, etc.)
40    WatcherError {
41        /// Path to the file
42        path: PathBuf,
43        /// Error description
44        error: String,
45    },
46}
47
48/// Recipe for creating file watching subscriptions
49///
50/// This struct implements `iced::subscription::Recipe` to bridge synchronous
51/// file system events from `notify` into Iced's async subscription system.
52///
53/// The recipe creates a unique subscription based on the watched paths and
54/// debounce configuration, ensuring that multiple subscriptions with the same
55/// configuration share the same underlying file watcher.
56#[derive(Debug, Clone)]
57pub struct FileWatcherRecipe {
58    /// Paths to watch for changes
59    pub paths: Vec<PathBuf>,
60
61    /// Debounce interval in milliseconds
62    pub debounce_ms: u64,
63
64    /// File extension filter (e.g., ".dampen")
65    pub extension_filter: String,
66
67    /// Whether to watch directories recursively
68    pub recursive: bool,
69}
70
71impl FileWatcherRecipe {
72    /// Create a new file watcher recipe
73    ///
74    /// # Arguments
75    /// * `paths` - Paths to watch (directories or specific files)
76    /// * `debounce_ms` - Debounce interval in milliseconds
77    ///
78    /// # Returns
79    /// A new FileWatcherRecipe with default settings
80    ///
81    /// # Example
82    /// ```no_run
83    /// use dampen_dev::subscription::FileWatcherRecipe;
84    /// use std::path::PathBuf;
85    ///
86    /// let recipe = FileWatcherRecipe::new(
87    ///     vec![PathBuf::from("src/ui")],
88    ///     100
89    /// );
90    /// ```
91    pub fn new(paths: Vec<PathBuf>, debounce_ms: u64) -> Self {
92        Self {
93            paths,
94            debounce_ms,
95            extension_filter: ".dampen".to_string(),
96            recursive: true,
97        }
98    }
99
100    /// Set the file extension filter
101    ///
102    /// # Arguments
103    /// * `extension` - File extension to watch (e.g., ".dampen", ".xml")
104    ///
105    /// # Returns
106    /// Self for method chaining
107    pub fn with_extension(mut self, extension: impl Into<String>) -> Self {
108        self.extension_filter = extension.into();
109        self
110    }
111
112    /// Set whether to watch directories recursively
113    ///
114    /// # Arguments
115    /// * `recursive` - If true, watches subdirectories
116    ///
117    /// # Returns
118    /// Self for method chaining
119    pub fn with_recursive(mut self, recursive: bool) -> Self {
120        self.recursive = recursive;
121        self
122    }
123}
124
125impl Recipe for FileWatcherRecipe {
126    type Output = FileEvent;
127
128    fn hash(&self, state: &mut Hasher) {
129        // Hash all configuration parameters to create a unique subscription identity
130        // This ensures that subscriptions with the same configuration are deduplicated
131
132        // Hash the type discriminant
133        std::any::TypeId::of::<Self>().hash(state);
134
135        // Hash paths (sorted to ensure order-independence)
136        let mut sorted_paths = self.paths.clone();
137        sorted_paths.sort();
138        for path in &sorted_paths {
139            path.hash(state);
140        }
141
142        // Hash configuration
143        self.debounce_ms.hash(state);
144        self.extension_filter.hash(state);
145        self.recursive.hash(state);
146    }
147
148    fn stream(
149        self: Box<Self>,
150        _input: EventStream,
151    ) -> futures::stream::BoxStream<'static, Self::Output> {
152        // Extract configuration from self
153        let paths = self.paths;
154        let debounce_ms = self.debounce_ms;
155        let extension_filter = self.extension_filter;
156        let recursive = self.recursive;
157
158        // Create async channel for bridging sync→async
159        // Buffer size of 1000 handles burst file changes better than 100
160        let (tx, rx) = mpsc::channel(1000);
161
162        // Spawn channel health monitoring task
163        let tx_monitor = tx.clone();
164        tokio::spawn(async move {
165            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
166            loop {
167                interval.tick().await;
168                let capacity = tx_monitor.max_capacity();
169                let available = tx_monitor.capacity();
170                let fill_percent = ((capacity - available) as f64 / capacity as f64) * 100.0;
171
172                if fill_percent > 80.0 {
173                    eprintln!(
174                        "[dampen-dev] Warning: File event channel {:.0}% full ({} of {} slots used)",
175                        fill_percent,
176                        capacity - available,
177                        capacity
178                    );
179                }
180            }
181        });
182
183        // Spawn blocking task to run the synchronous file watcher
184        // This bridges the sync crossbeam_channel to async tokio channel
185        tokio::task::spawn_blocking(move || {
186            // Create watcher configuration
187            let config = FileWatcherConfig {
188                watch_paths: paths.clone(),
189                debounce_ms,
190                extension_filter,
191                recursive,
192            };
193
194            eprintln!(
195                "[dampen-dev] Creating file watcher with config: paths={:?}, debounce={}ms",
196                paths, debounce_ms
197            );
198
199            // Create the file watcher
200            let mut watcher = match FileWatcher::new(config) {
201                Ok(w) => {
202                    eprintln!("[dampen-dev] File watcher created successfully");
203                    w
204                }
205                Err(e) => {
206                    eprintln!("[dampen-dev] Failed to create file watcher: {}", e);
207                    // Send initialization error and return
208                    let _ = tx.blocking_send(FileEvent::WatcherError {
209                        path: PathBuf::new(),
210                        error: format!("Failed to create file watcher: {}", e),
211                    });
212                    return;
213                }
214            };
215
216            // Start watching all configured paths
217            for path in &paths {
218                eprintln!("[dampen-dev] Attempting to watch: {}", path.display());
219                if let Err(e) = watcher.watch(path.clone()) {
220                    eprintln!("[dampen-dev] Failed to watch {}: {}", path.display(), e);
221                    let _ = tx.blocking_send(FileEvent::WatcherError {
222                        path: path.clone(),
223                        error: format!("Failed to watch path: {}", e),
224                    });
225                } else {
226                    eprintln!("[dampen-dev] Successfully watching: {}", path.display());
227                }
228            }
229
230            // Read events from the file watcher's channel
231            eprintln!("[dampen-dev] File watcher ready, waiting for events...");
232            let receiver = watcher.receiver();
233
234            // Use recv_timeout to allow graceful shutdown detection
235            // When the async channel (tx) is dropped, blocking_send will fail
236            loop {
237                match receiver.recv_timeout(std::time::Duration::from_millis(100)) {
238                    Ok(path) => {
239                        eprintln!("[dampen-dev] File changed: {}", path.display());
240                        // Read the file content
241                        let content = match std::fs::read_to_string(&path) {
242                            Ok(c) => c,
243                            Err(e) => {
244                                // File read error (permissions, deleted, etc.)
245                                if tx
246                                    .blocking_send(FileEvent::WatcherError {
247                                        path: path.clone(),
248                                        error: format!("Failed to read file: {}", e),
249                                    })
250                                    .is_err()
251                                {
252                                    // Channel closed, subscription dropped, exit gracefully
253                                    eprintln!("[dampen-dev] Channel closed, stopping file watcher");
254                                    break;
255                                }
256                                continue;
257                            }
258                        };
259
260                        // Parse the XML content
261                        let event = match parser::parse(&content) {
262                            Ok(document) => {
263                                // Success: send parsed document (boxed to reduce enum size)
264                                FileEvent::Success {
265                                    path: path.clone(),
266                                    document: Box::new(document),
267                                }
268                            }
269                            Err(error) => {
270                                // Parse error: send error with content for overlay
271                                FileEvent::ParseError {
272                                    path: path.clone(),
273                                    error,
274                                    content,
275                                }
276                            }
277                        };
278
279                        // Send the event; if channel is closed, stop watching
280                        if tx.blocking_send(event).is_err() {
281                            eprintln!("[dampen-dev] Channel closed, stopping file watcher");
282                            break;
283                        }
284                    }
285                    Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
286                        // No file events, check if async channel is still alive
287                        if tx.is_closed() {
288                            eprintln!("[dampen-dev] Channel closed, stopping file watcher");
289                            break;
290                        }
291                        // Continue waiting for file events
292                    }
293                    Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
294                        // File watcher channel closed (shouldn't happen normally)
295                        eprintln!("[dampen-dev] File watcher disconnected");
296                        break;
297                    }
298                }
299            }
300            eprintln!("[dampen-dev] File watcher task exiting gracefully");
301        });
302
303        // Convert the tokio receiver into a stream and return it
304        Box::pin(ReceiverStream::new(rx))
305    }
306}
307
308/// Create a subscription that watches files and emits FileEvents
309///
310/// This is the main public API for creating file watching subscriptions in Iced applications.
311/// It creates a `FileWatcherRecipe` that bridges synchronous file system events into Iced's
312/// async subscription system.
313///
314/// # Arguments
315/// * `paths` - Paths to watch (directories or files)
316/// * `debounce_ms` - Debounce interval in milliseconds (recommended: 100ms)
317///
318/// # Returns
319/// An Iced subscription that produces FileEvent messages
320///
321/// # Example
322/// ```no_run
323/// use dampen_dev::subscription::{watch_files, FileEvent};
324/// use std::path::PathBuf;
325///
326/// // Create a subscription that watches UI files
327/// let subscription = watch_files(vec![PathBuf::from("src/ui")], 100);
328/// // The subscription yields FileEvent values
329/// ```
330pub fn watch_files<P: AsRef<std::path::Path>>(
331    paths: Vec<P>,
332    debounce_ms: u64,
333) -> Subscription<FileEvent> {
334    // Convert paths to PathBuf
335    let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
336
337    // Create the recipe
338    let recipe = FileWatcherRecipe::new(path_bufs, debounce_ms);
339
340    // Use the advanced API to create a subscription from our Recipe
341    use iced::advanced::subscription::from_recipe;
342    from_recipe(recipe)
343}
344
345/// Create a subscription that monitors system theme changes.
346///
347/// This is a re-export from `dampen_iced` for backward compatibility.
348/// New code should use `dampen_iced::watch_system_theme()` directly, as it
349/// is available in both debug and release builds without the dampen-dev dependency.
350///
351/// Uses Iced's built-in system theme detection via `iced::system::theme_changes()`,
352/// which leverages winit's native theme detection.
353///
354/// # Returns
355///
356/// A subscription that yields `"light"` or `"dark"` strings when the system theme changes.
357///
358/// # Example
359///
360/// ```no_run
361/// use dampen_dev::subscription::watch_system_theme;
362///
363/// let subscription = watch_system_theme();
364/// // Maps to your message type, e.g.: subscription.map(Message::SystemThemeChanged)
365/// ```
366pub fn watch_system_theme() -> Subscription<String> {
367    // Re-export from dampen-iced for backward compatibility
368    dampen_iced::watch_system_theme()
369}