Skip to main content

floxide_reactive/
lib.rs

1//! Support for reactive patterns in the Floxide framework.
2//!
3//! This crate provides the `ReactiveNode` trait and related implementations for
4//! handling reactive patterns that respond to changes in external data sources.
5
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use floxide_core::{ActionType, DefaultAction, FloxideError, Node, NodeId, NodeOutcome};
14use futures::future::{BoxFuture, Future};
15use futures::{Stream, StreamExt};
16use thiserror::Error;
17use tokio::fs::metadata;
18use tokio::sync::mpsc;
19use tokio::time::sleep;
20use tokio_stream::wrappers::ReceiverStream;
21use tracing::{debug, warn};
22
23/// Errors specific to reactive operations
24#[derive(Debug, Error)]
25pub enum ReactiveError {
26    #[error("Failed to watch resource: {0}")]
27    WatchError(String),
28
29    #[error("Stream closed unexpectedly")]
30    StreamClosed,
31
32    #[error("Failed to connect to data source: {0}")]
33    ConnectionError(String),
34
35    #[error("Resource not found: {0}")]
36    ResourceNotFound(String),
37}
38
39/// A handler function for file changes
40pub type ChangeHandlerFn<Context, Action> = Box<
41    dyn Fn(FileChange, &mut Context) -> BoxFuture<'static, Result<Action, FloxideError>>
42        + Send
43        + Sync,
44>;
45
46/// Trait for nodes that react to changes in external data sources
47#[async_trait]
48pub trait ReactiveNode<Change, Context, Action>: Send + Sync
49where
50    Change: Send + Sync + 'static,
51    Context: Send + Sync + 'static,
52    Action: ActionType + Send + Sync + 'static + Debug,
53{
54    /// Set up a stream of changes to watch
55    async fn watch(&self) -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>;
56
57    /// React to a detected change
58    async fn react_to_change(
59        &self,
60        change: Change,
61        ctx: &mut Context,
62    ) -> Result<Action, FloxideError>;
63
64    /// Get the node's unique identifier
65    fn id(&self) -> NodeId;
66}
67
68/// An adapter that allows a ReactiveNode to be used as a standard Node.
69/// This adapter handles the streaming and change detection.
70pub struct ReactiveNodeAdapter<R, Change, Context, Action>
71where
72    Change: Send + Sync + 'static,
73    Context: Send + Sync + 'static,
74    Action: ActionType + Send + Sync + 'static + Debug,
75    R: ReactiveNode<Change, Context, Action> + Send + 'static,
76{
77    node: Arc<R>,
78    buffer_size: usize,
79    _phantom: PhantomData<(Change, Context, Action)>,
80}
81
82impl<R, Change, Context, Action> ReactiveNodeAdapter<R, Change, Context, Action>
83where
84    Change: Send + Sync + 'static,
85    Context: Send + Sync + 'static,
86    Action: ActionType + Send + Sync + 'static + Debug,
87    R: ReactiveNode<Change, Context, Action> + Send + 'static,
88{
89    /// Create a new adapter for a reactive node
90    pub fn new(node: R) -> Self {
91        Self {
92            node: Arc::new(node),
93            buffer_size: 100, // Default buffer size
94            _phantom: PhantomData,
95        }
96    }
97
98    /// Set the buffer size for the change stream
99    pub fn with_buffer_size(mut self, size: usize) -> Self {
100        self.buffer_size = size;
101        self
102    }
103
104    /// Start watching for changes and process them in the background
105    pub async fn start_watching(
106        &self,
107        mut ctx: Context,
108    ) -> Result<impl Stream<Item = Action> + Unpin, FloxideError> {
109        let (tx, rx) = mpsc::channel(self.buffer_size);
110        let node_clone = self.node.clone();
111
112        // Start a background task to watch for changes and process them
113        tokio::spawn(async move {
114            match node_clone.watch().await {
115                Ok(mut change_stream) => {
116                    while let Some(change) = change_stream.next().await {
117                        match node_clone.react_to_change(change, &mut ctx).await {
118                            Ok(action) => {
119                                if tx.send(action).await.is_err() {
120                                    warn!("Receiver dropped, stopping reactive node");
121                                    break;
122                                }
123                            }
124                            Err(e) => {
125                                warn!("Error processing change: {}", e);
126                                // Continue watching despite errors
127                            }
128                        }
129                    }
130                }
131                Err(e) => {
132                    warn!("Failed to set up watch stream: {}", e);
133                }
134            }
135        });
136
137        Ok(ReceiverStream::new(rx))
138    }
139}
140
141#[async_trait]
142impl<R, Change, Context, Action> Node<Context, Action>
143    for ReactiveNodeAdapter<R, Change, Context, Action>
144where
145    Change: Send + Sync + 'static,
146    Context: Clone + Send + Sync + 'static,
147    Action: ActionType + Send + Sync + 'static + Debug,
148    R: ReactiveNode<Change, Context, Action> + Send + 'static,
149{
150    type Output = Action;
151
152    async fn process(
153        &self,
154        ctx: &mut Context,
155    ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
156        // Create a clone of the context for the background task
157        let ctx_clone = ctx.clone();
158
159        // Start watching for changes
160        let mut action_stream = self.start_watching(ctx_clone).await?;
161
162        // Return the first action if available
163        if let Some(action) = action_stream.next().await {
164            Ok(NodeOutcome::RouteToAction(action))
165        } else {
166            Err(FloxideError::Other(
167                "Reactive stream closed without producing any actions".to_string(),
168            ))
169        }
170    }
171
172    fn id(&self) -> NodeId {
173        self.node.id()
174    }
175}
176
177/// A node that watches a file for changes
178pub struct FileWatcherNode<Context, Action>
179where
180    Context: Send + Sync + 'static,
181    Action: ActionType + Send + Sync + 'static + Debug,
182{
183    file_path: String,
184    poll_interval: Duration,
185    change_handler: Option<ChangeHandlerFn<Context, Action>>,
186}
187
188impl<Context, Action> FileWatcherNode<Context, Action>
189where
190    Context: Send + Sync + 'static,
191    Action: ActionType + Send + Sync + 'static + Debug,
192{
193    /// Create a new file watcher node
194    pub fn new<P: Into<String>>(file_path: P) -> Self {
195        Self {
196            file_path: file_path.into(),
197            poll_interval: Duration::from_secs(5), // Default 5 second interval
198            change_handler: None,
199        }
200    }
201
202    /// Create a new file watcher node with a specified ID
203    pub fn with_id<P: Into<String>>(file_path: P) -> Self {
204        Self {
205            file_path: file_path.into(),
206            poll_interval: Duration::from_secs(5),
207            change_handler: None,
208        }
209    }
210
211    /// Set the poll interval for the file watcher
212    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
213        self.poll_interval = interval;
214        self
215    }
216
217    /// Set the change handler for the file watcher node
218    pub fn with_change_handler<F, Fut>(mut self, handler: F) -> Self
219    where
220        F: Fn(FileChange, &mut Context) -> Fut + Send + Sync + 'static,
221        Fut: Future<Output = Result<Action, FloxideError>> + Send + 'static,
222    {
223        self.change_handler = Some(Box::new(move |change, ctx| Box::pin(handler(change, ctx))));
224        self
225    }
226}
227
228/// Represents a file change detected by the FileWatcherNode
229#[derive(Debug, Clone)]
230pub struct FileChange {
231    /// Path of the changed file
232    pub path: String,
233    /// Last modified timestamp in seconds since the epoch
234    pub modified_time: u64,
235    /// Size of the file in bytes
236    pub size: u64,
237}
238
239/// Default implementations for FileWatcherNode with DefaultAction
240pub trait DefaultReactiveNode<Context>: ReactiveNode<FileChange, Context, DefaultAction>
241where
242    Context: Send + Sync + 'static,
243{
244    /// Default implementation for react_to_change that returns "change_detected" action
245    fn default_react_to_change(
246        &self,
247        change: FileChange,
248        ctx: &mut Context,
249    ) -> Result<DefaultAction, FloxideError>;
250}
251
252#[async_trait]
253impl<Context, Action> ReactiveNode<FileChange, Context, Action> for FileWatcherNode<Context, Action>
254where
255    Context: Send + Sync + 'static,
256    Action: ActionType + Send + Sync + 'static + Debug,
257{
258    async fn watch(
259        &self,
260    ) -> Result<Box<dyn Stream<Item = FileChange> + Send + Unpin>, FloxideError> {
261        let file_path = self.file_path.clone();
262        let poll_interval = self.poll_interval;
263
264        // Check if the file exists
265        if !Path::new(&file_path).exists() {
266            return Err(FloxideError::Other(format!(
267                "File not found: {}",
268                file_path
269            )));
270        }
271
272        let (tx, rx) = mpsc::channel(10);
273
274        // Start a background task to poll for file changes
275        tokio::spawn(async move {
276            let mut last_modified = 0;
277            let mut last_size = 0;
278
279            loop {
280                match metadata(&file_path).await {
281                    Ok(meta) => {
282                        let modified = meta
283                            .modified()
284                            .unwrap_or_else(|_| std::time::SystemTime::now())
285                            .duration_since(std::time::UNIX_EPOCH)
286                            .unwrap_or_default()
287                            .as_secs();
288                        let size = meta.len();
289
290                        // Detect change in modified time or size
291                        if modified > last_modified || size != last_size {
292                            let change = FileChange {
293                                path: file_path.clone(),
294                                modified_time: modified,
295                                size,
296                            };
297
298                            last_modified = modified;
299                            last_size = size;
300
301                            if tx.send(change).await.is_err() {
302                                // Receiver dropped, stop watching
303                                break;
304                            }
305                        }
306                    }
307                    Err(e) => {
308                        warn!("Error accessing file {}: {}", file_path, e);
309                        // If file becomes inaccessible, stop watching
310                        break;
311                    }
312                }
313
314                sleep(poll_interval).await;
315            }
316        });
317
318        Ok(Box::new(ReceiverStream::new(rx)))
319    }
320
321    /// Reacts to a file change and produces an action
322    async fn react_to_change(
323        &self,
324        change: FileChange,
325        context: &mut Context,
326    ) -> Result<Action, FloxideError> {
327        debug!("Reacting to file change: {:?}", change);
328
329        if let Some(callback) = &self.change_handler {
330            callback(change.clone(), context).await
331        } else {
332            // We can't create a generic Action directly
333            Err(FloxideError::Other(
334                "FileWatcherNode requires a change handler to create specific action types"
335                    .to_string(),
336            ))
337        }
338    }
339
340    fn id(&self) -> NodeId {
341        NodeId::new()
342    }
343}
344
345/// A custom reactive node that uses a provided closure to create the watch stream
346/// and react to changes.
347pub struct CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
348where
349    Change: Send + Sync + 'static,
350    Context: Send + Sync + 'static,
351    Action: ActionType + Send + Sync + 'static + Debug,
352    WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
353        + Send
354        + Sync
355        + 'static,
356    ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
357{
358    id: NodeId,
359    watch_fn: Arc<WatchFn>,
360    react_fn: Arc<ReactFn>,
361    _phantom: PhantomData<(Change, Context, Action)>,
362}
363
364impl<Change, Context, Action, WatchFn, ReactFn>
365    CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
366where
367    Change: Send + Sync + 'static,
368    Context: Send + Sync + 'static,
369    Action: ActionType + Send + Sync + 'static + Debug,
370    WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
371        + Send
372        + Sync
373        + 'static,
374    ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
375{
376    /// Create a new custom reactive node
377    pub fn new(watch_fn: WatchFn, react_fn: ReactFn) -> Self {
378        Self {
379            id: NodeId::new(),
380            watch_fn: Arc::new(watch_fn),
381            react_fn: Arc::new(react_fn),
382            _phantom: PhantomData,
383        }
384    }
385
386    /// Create a new custom reactive node with a specified ID
387    pub fn with_id(id: impl Into<NodeId>, watch_fn: WatchFn, react_fn: ReactFn) -> Self {
388        Self {
389            id: id.into(),
390            watch_fn: Arc::new(watch_fn),
391            react_fn: Arc::new(react_fn),
392            _phantom: PhantomData,
393        }
394    }
395}
396
397#[async_trait]
398impl<Change, Context, Action, WatchFn, ReactFn> ReactiveNode<Change, Context, Action>
399    for CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
400where
401    Change: Send + Sync + 'static,
402    Context: Send + Sync + 'static,
403    Action: ActionType + Send + Sync + 'static + Debug,
404    WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
405        + Send
406        + Sync
407        + 'static,
408    ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
409{
410    async fn watch(&self) -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError> {
411        (self.watch_fn)()
412    }
413
414    async fn react_to_change(
415        &self,
416        change: Change,
417        ctx: &mut Context,
418    ) -> Result<Action, FloxideError> {
419        (self.react_fn)(change, ctx)
420    }
421
422    fn id(&self) -> NodeId {
423        self.id.clone()
424    }
425}
426
427// Extension trait to add helper methods to DefaultAction
428pub trait ReactiveActionExt: ActionType {
429    /// Create an action indicating a change was detected
430    fn change_detected() -> Self;
431
432    /// Create an action indicating no change was detected
433    fn no_change() -> Self;
434
435    /// Check if this is a change_detected action
436    fn is_change_detected(&self) -> bool;
437
438    /// Check if this is a no_change action
439    fn is_no_change(&self) -> bool;
440}
441
442// Implement the extension trait for DefaultAction
443impl ReactiveActionExt for DefaultAction {
444    fn change_detected() -> Self {
445        DefaultAction::Custom("change_detected".to_string())
446    }
447
448    fn no_change() -> Self {
449        DefaultAction::Custom("no_change".to_string())
450    }
451
452    fn is_change_detected(&self) -> bool {
453        matches!(self, DefaultAction::Custom(s) if s == "change_detected")
454    }
455
456    fn is_no_change(&self) -> bool {
457        matches!(self, DefaultAction::Custom(s) if s == "no_change")
458    }
459}
460
461/// A helper function to create an action from a change
462pub fn action_from_change<Change, Action>(_change: &Change) -> Action
463where
464    Action: ActionType + ReactiveActionExt,
465    Change: Send + Sync + 'static,
466{
467    // Default implementation just returns a change_detected action
468    Action::change_detected()
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use futures::stream;
475
476    struct TestContext {
477        values: Vec<String>,
478    }
479
480    #[tokio::test]
481    async fn test_custom_reactive_node() {
482        // Create a stream of test changes
483        let changes = vec!["change1", "change2", "change3"];
484
485        // Create a custom reactive node
486        let node = CustomReactiveNode::<_, _, _, _, _>::new(
487            || {
488                let stream = stream::iter(vec!["change1", "change2", "change3"]);
489                let boxed: Box<dyn Stream<Item = &'static str> + Send + Unpin> = Box::new(stream);
490                Ok(boxed)
491            },
492            |change: &str, ctx: &mut TestContext| {
493                ctx.values.push(change.to_string());
494                Ok(DefaultAction::change_detected())
495            },
496        );
497
498        // Create a test context
499        let mut ctx = TestContext { values: Vec::new() };
500
501        // Set up the node's watch stream
502        let mut stream = node.watch().await.unwrap();
503
504        // Process each change
505        while let Some(change) = stream.next().await {
506            node.react_to_change(change, &mut ctx).await.unwrap();
507        }
508
509        // Verify the context was updated correctly
510        assert_eq!(ctx.values, changes);
511    }
512}