dampen_dev/subscription.rs
1//! Iced subscription integration for file watching
2//!
3//! This module provides an Iced subscription that bridges file system events
4//! from the notify crate into Iced's async message system.
5
6use crate::watcher::{FileWatcher, FileWatcherConfig};
7use dampen_core::ir::DampenDocument;
8use dampen_core::parser;
9use dampen_core::parser::error::ParseError;
10
11use iced::Subscription;
12use iced::advanced::subscription::{EventStream, Hasher, Recipe};
13use std::hash::Hash;
14use std::path::PathBuf;
15use tokio::sync::mpsc;
16use tokio_stream::wrappers::ReceiverStream;
17
18/// Domain event for file watcher subscription output
19#[derive(Debug, Clone)]
20pub enum FileEvent {
21 /// File changed and parsed successfully
22 Success {
23 /// Path to the changed file
24 path: PathBuf,
25 /// Parsed document (boxed to reduce enum size)
26 document: Box<DampenDocument>,
27 },
28
29 /// Parse error (XML syntax or validation)
30 ParseError {
31 /// Path to the file with error
32 path: PathBuf,
33 /// Parse error details
34 error: ParseError,
35 /// File content for error overlay display
36 content: String,
37 },
38
39 /// File watcher error (permissions, deleted file, etc.)
40 WatcherError {
41 /// Path to the file
42 path: PathBuf,
43 /// Error description
44 error: String,
45 },
46}
47
48/// Recipe for creating file watching subscriptions
49///
50/// This struct implements `iced::subscription::Recipe` to bridge synchronous
51/// file system events from `notify` into Iced's async subscription system.
52///
53/// The recipe creates a unique subscription based on the watched paths and
54/// debounce configuration, ensuring that multiple subscriptions with the same
55/// configuration share the same underlying file watcher.
56#[derive(Debug, Clone)]
57pub struct FileWatcherRecipe {
58 /// Paths to watch for changes
59 pub paths: Vec<PathBuf>,
60
61 /// Debounce interval in milliseconds
62 pub debounce_ms: u64,
63
64 /// File extension filter (e.g., ".dampen")
65 pub extension_filter: String,
66
67 /// Whether to watch directories recursively
68 pub recursive: bool,
69}
70
71impl FileWatcherRecipe {
72 /// Create a new file watcher recipe
73 ///
74 /// # Arguments
75 /// * `paths` - Paths to watch (directories or specific files)
76 /// * `debounce_ms` - Debounce interval in milliseconds
77 ///
78 /// # Returns
79 /// A new FileWatcherRecipe with default settings
80 ///
81 /// # Example
82 /// ```no_run
83 /// use dampen_dev::subscription::FileWatcherRecipe;
84 /// use std::path::PathBuf;
85 ///
86 /// let recipe = FileWatcherRecipe::new(
87 /// vec![PathBuf::from("src/ui")],
88 /// 100
89 /// );
90 /// ```
91 pub fn new(paths: Vec<PathBuf>, debounce_ms: u64) -> Self {
92 Self {
93 paths,
94 debounce_ms,
95 extension_filter: ".dampen".to_string(),
96 recursive: true,
97 }
98 }
99
100 /// Set the file extension filter
101 ///
102 /// # Arguments
103 /// * `extension` - File extension to watch (e.g., ".dampen", ".xml")
104 ///
105 /// # Returns
106 /// Self for method chaining
107 pub fn with_extension(mut self, extension: impl Into<String>) -> Self {
108 self.extension_filter = extension.into();
109 self
110 }
111
112 /// Set whether to watch directories recursively
113 ///
114 /// # Arguments
115 /// * `recursive` - If true, watches subdirectories
116 ///
117 /// # Returns
118 /// Self for method chaining
119 pub fn with_recursive(mut self, recursive: bool) -> Self {
120 self.recursive = recursive;
121 self
122 }
123}
124
125impl Recipe for FileWatcherRecipe {
126 type Output = FileEvent;
127
128 fn hash(&self, state: &mut Hasher) {
129 // Hash all configuration parameters to create a unique subscription identity
130 // This ensures that subscriptions with the same configuration are deduplicated
131
132 // Hash the type discriminant
133 std::any::TypeId::of::<Self>().hash(state);
134
135 // Hash paths (sorted to ensure order-independence)
136 let mut sorted_paths = self.paths.clone();
137 sorted_paths.sort();
138 for path in &sorted_paths {
139 path.hash(state);
140 }
141
142 // Hash configuration
143 self.debounce_ms.hash(state);
144 self.extension_filter.hash(state);
145 self.recursive.hash(state);
146 }
147
148 fn stream(
149 self: Box<Self>,
150 _input: EventStream,
151 ) -> futures::stream::BoxStream<'static, Self::Output> {
152 // Extract configuration from self
153 let paths = self.paths;
154 let debounce_ms = self.debounce_ms;
155 let extension_filter = self.extension_filter;
156 let recursive = self.recursive;
157
158 // Create async channel for bridging sync→async
159 // Buffer size of 100 should handle burst file changes
160 let (tx, rx) = mpsc::channel(100);
161
162 // Spawn blocking task to run the synchronous file watcher
163 // This bridges the sync crossbeam_channel to async tokio channel
164 tokio::task::spawn_blocking(move || {
165 // Create watcher configuration
166 let config = FileWatcherConfig {
167 watch_paths: paths.clone(),
168 debounce_ms,
169 extension_filter,
170 recursive,
171 };
172
173 eprintln!(
174 "[dampen-dev] Creating file watcher with config: paths={:?}, debounce={}ms",
175 paths, debounce_ms
176 );
177
178 // Create the file watcher
179 let mut watcher = match FileWatcher::new(config) {
180 Ok(w) => {
181 eprintln!("[dampen-dev] File watcher created successfully");
182 w
183 }
184 Err(e) => {
185 eprintln!("[dampen-dev] Failed to create file watcher: {}", e);
186 // Send initialization error and return
187 let _ = tx.blocking_send(FileEvent::WatcherError {
188 path: PathBuf::new(),
189 error: format!("Failed to create file watcher: {}", e),
190 });
191 return;
192 }
193 };
194
195 // Start watching all configured paths
196 for path in &paths {
197 eprintln!("[dampen-dev] Attempting to watch: {}", path.display());
198 if let Err(e) = watcher.watch(path.clone()) {
199 eprintln!("[dampen-dev] Failed to watch {}: {}", path.display(), e);
200 let _ = tx.blocking_send(FileEvent::WatcherError {
201 path: path.clone(),
202 error: format!("Failed to watch path: {}", e),
203 });
204 } else {
205 eprintln!("[dampen-dev] Successfully watching: {}", path.display());
206 }
207 }
208
209 // Read events from the file watcher's channel
210 eprintln!("[dampen-dev] File watcher ready, waiting for events...");
211 let receiver = watcher.receiver();
212
213 // Use recv_timeout to allow graceful shutdown detection
214 // When the async channel (tx) is dropped, blocking_send will fail
215 loop {
216 match receiver.recv_timeout(std::time::Duration::from_millis(100)) {
217 Ok(path) => {
218 eprintln!("[dampen-dev] File changed: {}", path.display());
219 // Read the file content
220 let content = match std::fs::read_to_string(&path) {
221 Ok(c) => c,
222 Err(e) => {
223 // File read error (permissions, deleted, etc.)
224 if tx
225 .blocking_send(FileEvent::WatcherError {
226 path: path.clone(),
227 error: format!("Failed to read file: {}", e),
228 })
229 .is_err()
230 {
231 // Channel closed, subscription dropped, exit gracefully
232 eprintln!("[dampen-dev] Channel closed, stopping file watcher");
233 break;
234 }
235 continue;
236 }
237 };
238
239 // Parse the XML content
240 let event = match parser::parse(&content) {
241 Ok(document) => {
242 // Success: send parsed document (boxed to reduce enum size)
243 FileEvent::Success {
244 path: path.clone(),
245 document: Box::new(document),
246 }
247 }
248 Err(error) => {
249 // Parse error: send error with content for overlay
250 FileEvent::ParseError {
251 path: path.clone(),
252 error,
253 content,
254 }
255 }
256 };
257
258 // Send the event; if channel is closed, stop watching
259 if tx.blocking_send(event).is_err() {
260 eprintln!("[dampen-dev] Channel closed, stopping file watcher");
261 break;
262 }
263 }
264 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
265 // No file events, check if async channel is still alive
266 if tx.is_closed() {
267 eprintln!("[dampen-dev] Channel closed, stopping file watcher");
268 break;
269 }
270 // Continue waiting for file events
271 }
272 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
273 // File watcher channel closed (shouldn't happen normally)
274 eprintln!("[dampen-dev] File watcher disconnected");
275 break;
276 }
277 }
278 }
279 eprintln!("[dampen-dev] File watcher task exiting gracefully");
280 });
281
282 // Convert the tokio receiver into a stream and return it
283 Box::pin(ReceiverStream::new(rx))
284 }
285}
286
287/// Create a subscription that watches files and emits FileEvents
288///
289/// This is the main public API for creating file watching subscriptions in Iced applications.
290/// It creates a `FileWatcherRecipe` that bridges synchronous file system events into Iced's
291/// async subscription system.
292///
293/// # Arguments
294/// * `paths` - Paths to watch (directories or files)
295/// * `debounce_ms` - Debounce interval in milliseconds (recommended: 100ms)
296///
297/// # Returns
298/// An Iced subscription that produces FileEvent messages
299///
300/// # Example
301/// ```no_run
302/// use dampen_dev::subscription::{watch_files, FileEvent};
303/// use std::path::PathBuf;
304///
305/// // Create a subscription that watches UI files
306/// let subscription = watch_files(vec![PathBuf::from("src/ui")], 100);
307/// // The subscription yields FileEvent values
308/// ```
309pub fn watch_files<P: AsRef<std::path::Path>>(
310 paths: Vec<P>,
311 debounce_ms: u64,
312) -> Subscription<FileEvent> {
313 // Convert paths to PathBuf
314 let path_bufs: Vec<PathBuf> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
315
316 // Create the recipe
317 let recipe = FileWatcherRecipe::new(path_bufs, debounce_ms);
318
319 // Use the advanced API to create a subscription from our Recipe
320 use iced::advanced::subscription::from_recipe;
321 from_recipe(recipe)
322}