1use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use floxide_core::{ActionType, DefaultAction, FloxideError, Node, NodeId, NodeOutcome};
14use futures::future::{BoxFuture, Future};
15use futures::{Stream, StreamExt};
16use thiserror::Error;
17use tokio::fs::metadata;
18use tokio::sync::mpsc;
19use tokio::time::sleep;
20use tokio_stream::wrappers::ReceiverStream;
21use tracing::{debug, warn};
22
23#[derive(Debug, Error)]
25pub enum ReactiveError {
26 #[error("Failed to watch resource: {0}")]
27 WatchError(String),
28
29 #[error("Stream closed unexpectedly")]
30 StreamClosed,
31
32 #[error("Failed to connect to data source: {0}")]
33 ConnectionError(String),
34
35 #[error("Resource not found: {0}")]
36 ResourceNotFound(String),
37}
38
39pub type ChangeHandlerFn<Context, Action> = Box<
41 dyn Fn(FileChange, &mut Context) -> BoxFuture<'static, Result<Action, FloxideError>>
42 + Send
43 + Sync,
44>;
45
46#[async_trait]
48pub trait ReactiveNode<Change, Context, Action>: Send + Sync
49where
50 Change: Send + Sync + 'static,
51 Context: Send + Sync + 'static,
52 Action: ActionType + Send + Sync + 'static + Debug,
53{
54 async fn watch(&self) -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>;
56
57 async fn react_to_change(
59 &self,
60 change: Change,
61 ctx: &mut Context,
62 ) -> Result<Action, FloxideError>;
63
64 fn id(&self) -> NodeId;
66}
67
68pub struct ReactiveNodeAdapter<R, Change, Context, Action>
71where
72 Change: Send + Sync + 'static,
73 Context: Send + Sync + 'static,
74 Action: ActionType + Send + Sync + 'static + Debug,
75 R: ReactiveNode<Change, Context, Action> + Send + 'static,
76{
77 node: Arc<R>,
78 buffer_size: usize,
79 _phantom: PhantomData<(Change, Context, Action)>,
80}
81
82impl<R, Change, Context, Action> ReactiveNodeAdapter<R, Change, Context, Action>
83where
84 Change: Send + Sync + 'static,
85 Context: Send + Sync + 'static,
86 Action: ActionType + Send + Sync + 'static + Debug,
87 R: ReactiveNode<Change, Context, Action> + Send + 'static,
88{
89 pub fn new(node: R) -> Self {
91 Self {
92 node: Arc::new(node),
93 buffer_size: 100, _phantom: PhantomData,
95 }
96 }
97
98 pub fn with_buffer_size(mut self, size: usize) -> Self {
100 self.buffer_size = size;
101 self
102 }
103
104 pub async fn start_watching(
106 &self,
107 mut ctx: Context,
108 ) -> Result<impl Stream<Item = Action> + Unpin, FloxideError> {
109 let (tx, rx) = mpsc::channel(self.buffer_size);
110 let node_clone = self.node.clone();
111
112 tokio::spawn(async move {
114 match node_clone.watch().await {
115 Ok(mut change_stream) => {
116 while let Some(change) = change_stream.next().await {
117 match node_clone.react_to_change(change, &mut ctx).await {
118 Ok(action) => {
119 if tx.send(action).await.is_err() {
120 warn!("Receiver dropped, stopping reactive node");
121 break;
122 }
123 }
124 Err(e) => {
125 warn!("Error processing change: {}", e);
126 }
128 }
129 }
130 }
131 Err(e) => {
132 warn!("Failed to set up watch stream: {}", e);
133 }
134 }
135 });
136
137 Ok(ReceiverStream::new(rx))
138 }
139}
140
141#[async_trait]
142impl<R, Change, Context, Action> Node<Context, Action>
143 for ReactiveNodeAdapter<R, Change, Context, Action>
144where
145 Change: Send + Sync + 'static,
146 Context: Clone + Send + Sync + 'static,
147 Action: ActionType + Send + Sync + 'static + Debug,
148 R: ReactiveNode<Change, Context, Action> + Send + 'static,
149{
150 type Output = Action;
151
152 async fn process(
153 &self,
154 ctx: &mut Context,
155 ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
156 let ctx_clone = ctx.clone();
158
159 let mut action_stream = self.start_watching(ctx_clone).await?;
161
162 if let Some(action) = action_stream.next().await {
164 Ok(NodeOutcome::RouteToAction(action))
165 } else {
166 Err(FloxideError::Other(
167 "Reactive stream closed without producing any actions".to_string(),
168 ))
169 }
170 }
171
172 fn id(&self) -> NodeId {
173 self.node.id()
174 }
175}
176
177pub struct FileWatcherNode<Context, Action>
179where
180 Context: Send + Sync + 'static,
181 Action: ActionType + Send + Sync + 'static + Debug,
182{
183 file_path: String,
184 poll_interval: Duration,
185 change_handler: Option<ChangeHandlerFn<Context, Action>>,
186}
187
188impl<Context, Action> FileWatcherNode<Context, Action>
189where
190 Context: Send + Sync + 'static,
191 Action: ActionType + Send + Sync + 'static + Debug,
192{
193 pub fn new<P: Into<String>>(file_path: P) -> Self {
195 Self {
196 file_path: file_path.into(),
197 poll_interval: Duration::from_secs(5), change_handler: None,
199 }
200 }
201
202 pub fn with_id<P: Into<String>>(file_path: P) -> Self {
204 Self {
205 file_path: file_path.into(),
206 poll_interval: Duration::from_secs(5),
207 change_handler: None,
208 }
209 }
210
211 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
213 self.poll_interval = interval;
214 self
215 }
216
217 pub fn with_change_handler<F, Fut>(mut self, handler: F) -> Self
219 where
220 F: Fn(FileChange, &mut Context) -> Fut + Send + Sync + 'static,
221 Fut: Future<Output = Result<Action, FloxideError>> + Send + 'static,
222 {
223 self.change_handler = Some(Box::new(move |change, ctx| Box::pin(handler(change, ctx))));
224 self
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct FileChange {
231 pub path: String,
233 pub modified_time: u64,
235 pub size: u64,
237}
238
239pub trait DefaultReactiveNode<Context>: ReactiveNode<FileChange, Context, DefaultAction>
241where
242 Context: Send + Sync + 'static,
243{
244 fn default_react_to_change(
246 &self,
247 change: FileChange,
248 ctx: &mut Context,
249 ) -> Result<DefaultAction, FloxideError>;
250}
251
252#[async_trait]
253impl<Context, Action> ReactiveNode<FileChange, Context, Action> for FileWatcherNode<Context, Action>
254where
255 Context: Send + Sync + 'static,
256 Action: ActionType + Send + Sync + 'static + Debug,
257{
258 async fn watch(
259 &self,
260 ) -> Result<Box<dyn Stream<Item = FileChange> + Send + Unpin>, FloxideError> {
261 let file_path = self.file_path.clone();
262 let poll_interval = self.poll_interval;
263
264 if !Path::new(&file_path).exists() {
266 return Err(FloxideError::Other(format!(
267 "File not found: {}",
268 file_path
269 )));
270 }
271
272 let (tx, rx) = mpsc::channel(10);
273
274 tokio::spawn(async move {
276 let mut last_modified = 0;
277 let mut last_size = 0;
278
279 loop {
280 match metadata(&file_path).await {
281 Ok(meta) => {
282 let modified = meta
283 .modified()
284 .unwrap_or_else(|_| std::time::SystemTime::now())
285 .duration_since(std::time::UNIX_EPOCH)
286 .unwrap_or_default()
287 .as_secs();
288 let size = meta.len();
289
290 if modified > last_modified || size != last_size {
292 let change = FileChange {
293 path: file_path.clone(),
294 modified_time: modified,
295 size,
296 };
297
298 last_modified = modified;
299 last_size = size;
300
301 if tx.send(change).await.is_err() {
302 break;
304 }
305 }
306 }
307 Err(e) => {
308 warn!("Error accessing file {}: {}", file_path, e);
309 break;
311 }
312 }
313
314 sleep(poll_interval).await;
315 }
316 });
317
318 Ok(Box::new(ReceiverStream::new(rx)))
319 }
320
321 async fn react_to_change(
323 &self,
324 change: FileChange,
325 context: &mut Context,
326 ) -> Result<Action, FloxideError> {
327 debug!("Reacting to file change: {:?}", change);
328
329 if let Some(callback) = &self.change_handler {
330 callback(change.clone(), context).await
331 } else {
332 Err(FloxideError::Other(
334 "FileWatcherNode requires a change handler to create specific action types"
335 .to_string(),
336 ))
337 }
338 }
339
340 fn id(&self) -> NodeId {
341 NodeId::new()
342 }
343}
344
345pub struct CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
348where
349 Change: Send + Sync + 'static,
350 Context: Send + Sync + 'static,
351 Action: ActionType + Send + Sync + 'static + Debug,
352 WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
353 + Send
354 + Sync
355 + 'static,
356 ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
357{
358 id: NodeId,
359 watch_fn: Arc<WatchFn>,
360 react_fn: Arc<ReactFn>,
361 _phantom: PhantomData<(Change, Context, Action)>,
362}
363
364impl<Change, Context, Action, WatchFn, ReactFn>
365 CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
366where
367 Change: Send + Sync + 'static,
368 Context: Send + Sync + 'static,
369 Action: ActionType + Send + Sync + 'static + Debug,
370 WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
371 + Send
372 + Sync
373 + 'static,
374 ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
375{
376 pub fn new(watch_fn: WatchFn, react_fn: ReactFn) -> Self {
378 Self {
379 id: NodeId::new(),
380 watch_fn: Arc::new(watch_fn),
381 react_fn: Arc::new(react_fn),
382 _phantom: PhantomData,
383 }
384 }
385
386 pub fn with_id(id: impl Into<NodeId>, watch_fn: WatchFn, react_fn: ReactFn) -> Self {
388 Self {
389 id: id.into(),
390 watch_fn: Arc::new(watch_fn),
391 react_fn: Arc::new(react_fn),
392 _phantom: PhantomData,
393 }
394 }
395}
396
397#[async_trait]
398impl<Change, Context, Action, WatchFn, ReactFn> ReactiveNode<Change, Context, Action>
399 for CustomReactiveNode<Change, Context, Action, WatchFn, ReactFn>
400where
401 Change: Send + Sync + 'static,
402 Context: Send + Sync + 'static,
403 Action: ActionType + Send + Sync + 'static + Debug,
404 WatchFn: Fn() -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError>
405 + Send
406 + Sync
407 + 'static,
408 ReactFn: Fn(Change, &mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
409{
410 async fn watch(&self) -> Result<Box<dyn Stream<Item = Change> + Send + Unpin>, FloxideError> {
411 (self.watch_fn)()
412 }
413
414 async fn react_to_change(
415 &self,
416 change: Change,
417 ctx: &mut Context,
418 ) -> Result<Action, FloxideError> {
419 (self.react_fn)(change, ctx)
420 }
421
422 fn id(&self) -> NodeId {
423 self.id.clone()
424 }
425}
426
427pub trait ReactiveActionExt: ActionType {
429 fn change_detected() -> Self;
431
432 fn no_change() -> Self;
434
435 fn is_change_detected(&self) -> bool;
437
438 fn is_no_change(&self) -> bool;
440}
441
442impl ReactiveActionExt for DefaultAction {
444 fn change_detected() -> Self {
445 DefaultAction::Custom("change_detected".to_string())
446 }
447
448 fn no_change() -> Self {
449 DefaultAction::Custom("no_change".to_string())
450 }
451
452 fn is_change_detected(&self) -> bool {
453 matches!(self, DefaultAction::Custom(s) if s == "change_detected")
454 }
455
456 fn is_no_change(&self) -> bool {
457 matches!(self, DefaultAction::Custom(s) if s == "no_change")
458 }
459}
460
461pub fn action_from_change<Change, Action>(_change: &Change) -> Action
463where
464 Action: ActionType + ReactiveActionExt,
465 Change: Send + Sync + 'static,
466{
467 Action::change_detected()
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use futures::stream;
475
476 struct TestContext {
477 values: Vec<String>,
478 }
479
480 #[tokio::test]
481 async fn test_custom_reactive_node() {
482 let changes = vec!["change1", "change2", "change3"];
484
485 let node = CustomReactiveNode::<_, _, _, _, _>::new(
487 || {
488 let stream = stream::iter(vec!["change1", "change2", "change3"]);
489 let boxed: Box<dyn Stream<Item = &'static str> + Send + Unpin> = Box::new(stream);
490 Ok(boxed)
491 },
492 |change: &str, ctx: &mut TestContext| {
493 ctx.values.push(change.to_string());
494 Ok(DefaultAction::change_detected())
495 },
496 );
497
498 let mut ctx = TestContext { values: Vec::new() };
500
501 let mut stream = node.watch().await.unwrap();
503
504 while let Some(change) = stream.next().await {
506 node.react_to_change(change, &mut ctx).await.unwrap();
507 }
508
509 assert_eq!(ctx.values, changes);
511 }
512}