floxide_timer/
lib.rs

1//! # Floxide Timer
2//!
3//! Timer-based node extensions for the Floxide framework.
4//!
5//! This crate provides time-based workflow capabilities through
6//! the TimerNode trait and various time-based schedule implementations.
7
8use async_trait::async_trait;
9use chrono::{DateTime, Datelike, Duration as ChronoDuration, Timelike, Utc, Weekday};
10use floxide_core::{error::FloxideError, ActionType, DefaultAction, Node, NodeId, NodeOutcome};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::time::sleep;
17use tracing::{info, warn};
18use uuid::Uuid;
19
20/// Represents a time schedule for execution
21#[derive(Debug, Clone)]
22pub enum Schedule {
23    /// Execute once at a specific time
24    Once(DateTime<Utc>),
25
26    /// Execute repeatedly at fixed intervals
27    Interval(Duration),
28
29    /// Execute daily at a specified hour and minute (24-hour format)
30    Daily(u32, u32),
31
32    /// Execute weekly on a specified day at a specified hour and minute
33    Weekly(Weekday, u32, u32),
34
35    /// Execute monthly on a specified day at a specified hour and minute
36    Monthly(u32, u32, u32),
37
38    /// Execute according to a cron expression (not fully implemented)
39    /// This is a placeholder for future implementation
40    Cron(String),
41}
42
43impl Schedule {
44    /// Calculate the next execution time based on the current time
45    pub fn next_execution(&self) -> Result<DateTime<Utc>, FloxideError> {
46        let now = Utc::now();
47
48        match self {
49            Schedule::Once(time) => {
50                if time <= &now {
51                    Err(FloxideError::Other(
52                        "Scheduled time has already passed".to_string(),
53                    ))
54                } else {
55                    Ok(*time)
56                }
57            }
58
59            Schedule::Interval(duration) => Ok(now
60                + ChronoDuration::from_std(*duration).map_err(|e| {
61                    FloxideError::Other(format!("Failed to convert duration: {}", e))
62                })?),
63
64            Schedule::Daily(hour, minute) => {
65                let mut next = now;
66
67                // Set the next execution time to today at the specified hour and minute
68                next = next
69                    .with_hour(*hour)
70                    .and_then(|dt| dt.with_minute(*minute))
71                    .and_then(|dt| dt.with_second(0))
72                    .and_then(|dt| dt.with_nanosecond(0))
73                    .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
74
75                // If this time has already passed today, schedule it for tomorrow
76                if next <= now {
77                    next += ChronoDuration::days(1);
78                }
79
80                Ok(next)
81            }
82
83            Schedule::Weekly(weekday, hour, minute) => {
84                let mut next = now;
85
86                // Set the next execution time to the specified hour and minute
87                next = next
88                    .with_hour(*hour)
89                    .and_then(|dt| dt.with_minute(*minute))
90                    .and_then(|dt| dt.with_second(0))
91                    .and_then(|dt| dt.with_nanosecond(0))
92                    .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
93
94                // Calculate days until the next occurrence of the specified weekday
95                let days_until_weekday =
96                    (*weekday as i32 - now.weekday().num_days_from_monday() as i32 + 7) % 7;
97
98                // If the time has already passed today and it's the specified weekday, schedule it for next week
99                if days_until_weekday == 0 && next <= now {
100                    next += ChronoDuration::days(7);
101                } else {
102                    next += ChronoDuration::days(days_until_weekday as i64);
103                }
104
105                Ok(next)
106            }
107
108            Schedule::Monthly(day, hour, minute) => {
109                let mut next = now;
110
111                // Set the next execution time to the specified hour and minute
112                next = next
113                    .with_hour(*hour)
114                    .and_then(|dt| dt.with_minute(*minute))
115                    .and_then(|dt| dt.with_second(0))
116                    .and_then(|dt| dt.with_nanosecond(0))
117                    .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
118
119                // Set the day of the month
120                let current_day = now.day();
121
122                // If the specified day is valid for the current month
123                if *day <= 31 {
124                    // Try to set the day
125                    match next.with_day(*day) {
126                        Some(date) => next = date,
127                        None => {
128                            return Err(FloxideError::Other(format!(
129                                "Invalid day {} for the current month",
130                                day
131                            )))
132                        }
133                    }
134
135                    // If this time has already passed this month, or the day is earlier than today,
136                    // schedule it for next month
137                    if next <= now || *day < current_day {
138                        // Move to the 1st of next month and then try to set the day
139                        next += ChronoDuration::days(32); // Move well into next month
140                        next = next.with_day(1).ok_or_else(|| {
141                            FloxideError::Other("Failed to set day to 1".to_string())
142                        })?;
143
144                        // Try to set the specified day in the next month
145                        next = next.with_day(*day).ok_or_else(|| {
146                            FloxideError::Other(format!("Invalid day {} for next month", day))
147                        })?;
148                    }
149                } else {
150                    return Err(FloxideError::Other(format!(
151                        "Invalid day of month: {}",
152                        day
153                    )));
154                }
155
156                Ok(next)
157            }
158
159            Schedule::Cron(_expression) => {
160                // Placeholder for cron implementation
161                // For now, just return an error
162                Err(FloxideError::Other(
163                    "Cron expressions are not yet implemented".to_string(),
164                ))
165            }
166        }
167    }
168
169    /// Calculate the duration until the next execution
170    pub fn duration_until_next(&self) -> Result<Duration, FloxideError> {
171        let next = self.next_execution()?;
172        let now = Utc::now();
173
174        let duration = next.signed_duration_since(now);
175        if duration.num_milliseconds() <= 0 {
176            return Err(FloxideError::Other(
177                "Scheduled time is in the past".to_string(),
178            ));
179        }
180
181        Ok(Duration::from_millis(duration.num_milliseconds() as u64))
182    }
183}
184
185/// A node that executes based on time schedules
186#[async_trait]
187pub trait TimerNode<Context, Action>: Send + Sync
188where
189    Context: Send + Sync + 'static,
190    Action: ActionType + Send + Sync + 'static + Default + Debug,
191{
192    /// Define the execution schedule
193    fn schedule(&self) -> Schedule;
194
195    /// Execute the node on schedule
196    async fn execute_on_schedule(&self, ctx: &mut Context) -> Result<Action, FloxideError>;
197
198    /// Get the node's unique identifier
199    fn id(&self) -> NodeId;
200}
201
202/// A simple timer that executes a function based on a schedule
203pub struct SimpleTimer<F>
204where
205    F: Send + Sync + 'static,
206{
207    id: NodeId,
208    schedule: Schedule,
209    action: F,
210}
211
212impl<F> SimpleTimer<F>
213where
214    F: Send + Sync + 'static,
215{
216    /// Create a new simple timer with a default ID
217    pub fn new(schedule: Schedule, action: F) -> Self {
218        Self {
219            id: Uuid::new_v4().to_string(),
220            schedule,
221            action,
222        }
223    }
224
225    /// Create a new simple timer with a specific ID
226    pub fn with_id(id: impl Into<String>, schedule: Schedule, action: F) -> Self {
227        Self {
228            id: id.into(),
229            schedule,
230            action,
231        }
232    }
233}
234
235#[async_trait]
236impl<Context, Action, F> TimerNode<Context, Action> for SimpleTimer<F>
237where
238    Context: Send + Sync + 'static,
239    Action: ActionType + Send + Sync + 'static + Default + Debug,
240    F: Fn(&mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
241{
242    fn schedule(&self) -> Schedule {
243        self.schedule.clone()
244    }
245
246    async fn execute_on_schedule(&self, ctx: &mut Context) -> Result<Action, FloxideError> {
247        (self.action)(ctx)
248    }
249
250    fn id(&self) -> NodeId {
251        self.id.clone()
252    }
253}
254
255/// A timer workflow that orchestrates execution of timer nodes
256pub struct TimerWorkflow<Context, Action>
257where
258    Context: Send + Sync + 'static,
259    Action: ActionType + Send + Sync + 'static + Default + Debug,
260{
261    nodes: HashMap<NodeId, Arc<dyn TimerNode<Context, Action>>>,
262    routes: HashMap<(NodeId, Action), NodeId>,
263    initial_node: NodeId,
264    termination_action: Action,
265}
266
267impl<Context, Action> TimerWorkflow<Context, Action>
268where
269    Context: Send + Sync + 'static,
270    Action: ActionType + Send + Sync + 'static + Default + Debug,
271{
272    /// Create a new timer workflow with an initial node
273    pub fn new(
274        initial_node: Arc<dyn TimerNode<Context, Action>>,
275        termination_action: Action,
276    ) -> Self {
277        let id = initial_node.id();
278
279        let mut nodes = HashMap::new();
280        nodes.insert(id.clone(), initial_node);
281
282        Self {
283            nodes,
284            routes: HashMap::new(),
285            initial_node: id,
286            termination_action,
287        }
288    }
289
290    /// Add a node to the workflow
291    pub fn add_node(&mut self, node: Arc<dyn TimerNode<Context, Action>>) {
292        let id = node.id();
293        self.nodes.insert(id, node);
294    }
295
296    /// Set a route from one node to another based on an action
297    pub fn set_route(&mut self, from_id: &NodeId, action: Action, to_id: &NodeId) {
298        self.routes.insert((from_id.clone(), action), to_id.clone());
299    }
300
301    /// Execute the workflow until completion or error
302    pub async fn execute(&self, ctx: &mut Context) -> Result<(), FloxideError> {
303        let mut current_node_id = self.initial_node.clone();
304
305        loop {
306            let node = self.nodes.get(&current_node_id).ok_or_else(|| {
307                FloxideError::Other(format!("Node not found: {}", current_node_id))
308            })?;
309
310            // Calculate the time until the next execution
311            let wait_duration = match node.schedule().duration_until_next() {
312                Ok(duration) => duration,
313                Err(e) => {
314                    warn!(
315                        "Failed to calculate next execution time for node {}: {}",
316                        current_node_id, e
317                    );
318                    // Default to a short interval if we can't calculate the next time
319                    Duration::from_secs(5)
320                }
321            };
322
323            // Wait until it's time to execute
324            info!(
325                "Waiting {:?} until next execution of node {}",
326                wait_duration, current_node_id
327            );
328            sleep(wait_duration).await;
329
330            // Execute the node
331            let action = match node.execute_on_schedule(ctx).await {
332                Ok(action) => action,
333                Err(e) => {
334                    warn!("Error executing node {}: {}", current_node_id, e);
335                    // Continue with the next node if there's an error
336                    Action::default()
337                }
338            };
339
340            // If the action is the termination action, stop the workflow
341            if action == self.termination_action {
342                info!("Workflow terminated by node {}", current_node_id);
343                break;
344            }
345
346            // Find the next node based on the action
347            if let Some(next_node_id) = self.routes.get(&(current_node_id.clone(), action.clone()))
348            {
349                info!(
350                    "Moving from node {} to node {}",
351                    current_node_id, next_node_id
352                );
353                current_node_id = next_node_id.clone();
354            } else {
355                // If there's no route for this action, use the default action
356                if let Some(next_node_id) = self
357                    .routes
358                    .get(&(current_node_id.clone(), Action::default()))
359                {
360                    info!(
361                        "No route found for action {:?}, using default route to node {}",
362                        action, next_node_id
363                    );
364                    current_node_id = next_node_id.clone();
365                } else {
366                    // If there's no default route either, stop the workflow
367                    warn!(
368                        "No route found for node {} with action {:?} and no default route",
369                        current_node_id, action
370                    );
371                    break;
372                }
373            }
374        }
375
376        Ok(())
377    }
378}
379
380/// An adapter to use a timer node as a standard node
381pub struct TimerNodeAdapter<Context, Action>
382where
383    Context: Send + Sync + 'static,
384    Action: ActionType + Send + Sync + 'static + Default + Debug,
385{
386    node: Arc<dyn TimerNode<Context, Action>>,
387    id: NodeId,
388    execute_immediately: bool,
389}
390
391impl<Context, Action> TimerNodeAdapter<Context, Action>
392where
393    Context: Send + Sync + 'static,
394    Action: ActionType + Send + Sync + 'static + Default + Debug,
395{
396    /// Create a new timer node adapter
397    pub fn new(node: Arc<dyn TimerNode<Context, Action>>, execute_immediately: bool) -> Self {
398        let id = node.id();
399        Self {
400            node,
401            id,
402            execute_immediately,
403        }
404    }
405
406    /// Create a new timer node adapter with a specific ID
407    pub fn with_id(
408        node: Arc<dyn TimerNode<Context, Action>>,
409        id: impl Into<String>,
410        execute_immediately: bool,
411    ) -> Self {
412        Self {
413            node,
414            id: id.into(),
415            execute_immediately,
416        }
417    }
418}
419
420#[async_trait]
421impl<Context, Action> Node<Context, Action> for TimerNodeAdapter<Context, Action>
422where
423    Context: Send + Sync + 'static,
424    Action: ActionType + Send + Sync + 'static + Default + Debug,
425{
426    type Output = ();
427
428    fn id(&self) -> NodeId {
429        self.id.clone()
430    }
431
432    async fn process(
433        &self,
434        ctx: &mut Context,
435    ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
436        if self.execute_immediately {
437            // Execute the node immediately
438            let action = self.node.execute_on_schedule(ctx).await?;
439            Ok(NodeOutcome::RouteToAction(action))
440        } else {
441            // Wait until the scheduled time
442            let wait_duration = self.node.schedule().duration_until_next()?;
443            info!(
444                "Waiting {:?} before executing node {}",
445                wait_duration, self.id
446            );
447            sleep(wait_duration).await;
448
449            // Execute the node
450            let action = self.node.execute_on_schedule(ctx).await?;
451            Ok(NodeOutcome::RouteToAction(action))
452        }
453    }
454}
455
456/// A nested timer workflow that can be used as a standard node
457pub struct NestedTimerWorkflow<Context, Action>
458where
459    Context: Send + Sync + 'static,
460    Action: ActionType + Send + Sync + 'static + Default + Debug,
461{
462    workflow: Arc<TimerWorkflow<Context, Action>>,
463    id: NodeId,
464    complete_action: Action,
465    _phantom: PhantomData<(Context, Action)>,
466}
467
468impl<Context, Action> NestedTimerWorkflow<Context, Action>
469where
470    Context: Send + Sync + 'static,
471    Action: ActionType + Send + Sync + 'static + Default + Debug,
472{
473    /// Create a new nested timer workflow
474    pub fn new(workflow: Arc<TimerWorkflow<Context, Action>>, complete_action: Action) -> Self {
475        Self {
476            workflow,
477            id: Uuid::new_v4().to_string(),
478            complete_action,
479            _phantom: PhantomData,
480        }
481    }
482
483    /// Create a new nested timer workflow with a specific ID
484    pub fn with_id(
485        workflow: Arc<TimerWorkflow<Context, Action>>,
486        id: impl Into<String>,
487        complete_action: Action,
488    ) -> Self {
489        Self {
490            workflow,
491            id: id.into(),
492            complete_action,
493            _phantom: PhantomData,
494        }
495    }
496}
497
498#[async_trait]
499impl<Context, Action> Node<Context, Action> for NestedTimerWorkflow<Context, Action>
500where
501    Context: Send + Sync + 'static,
502    Action: ActionType + Send + Sync + 'static + Default + Debug,
503{
504    type Output = ();
505
506    fn id(&self) -> NodeId {
507        self.id.clone()
508    }
509
510    async fn process(
511        &self,
512        ctx: &mut Context,
513    ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
514        // Execute the timer workflow
515        let result = self.workflow.execute(ctx).await;
516
517        match result {
518            Ok(_) => Ok(NodeOutcome::RouteToAction(self.complete_action.clone())),
519            Err(e) => Err(e),
520        }
521    }
522}
523
524/// Extension trait for ActionType to provide timer-specific actions
525pub trait TimerActionExt: ActionType {
526    /// Create a complete action for timer nodes
527    fn complete() -> Self;
528
529    /// Create a retry action for timer nodes
530    fn retry() -> Self;
531}
532
533impl TimerActionExt for DefaultAction {
534    fn complete() -> Self {
535        DefaultAction::Custom("timer_complete".to_string())
536    }
537
538    fn retry() -> Self {
539        DefaultAction::Custom("timer_retry".to_string())
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546
547    // Test the Schedule::next_execution method
548    #[tokio::test]
549    async fn test_schedule_next_execution() {
550        // Test Once schedule
551        let future_time = Utc::now() + ChronoDuration::hours(1);
552        let once_schedule = Schedule::Once(future_time);
553        let next = once_schedule.next_execution().unwrap();
554        assert_eq!(next, future_time);
555
556        // Test Interval schedule
557        let interval_schedule = Schedule::Interval(Duration::from_secs(60));
558        let next = interval_schedule.next_execution().unwrap();
559        let diff = (next - Utc::now()).num_seconds();
560        assert!(diff > 0 && diff <= 61); // Allow a small margin for execution time
561
562        // Test Daily schedule (this is a simple test; more complex tests would verify exact times)
563        let now = Utc::now();
564        let future_hour = (now.hour() + 1) % 24;
565        let daily_schedule = Schedule::Daily(future_hour, 0);
566        let next = daily_schedule.next_execution().unwrap();
567        assert!(next > now);
568        assert_eq!(next.hour(), future_hour);
569        assert_eq!(next.minute(), 0);
570    }
571
572    // Test a simple timer node
573    #[tokio::test]
574    async fn test_simple_timer() {
575        // Create a context
576        let mut ctx = "test_context".to_string();
577
578        // Create a simple timer that executes immediately
579        let timer = SimpleTimer::new(
580            Schedule::Once(Utc::now() + ChronoDuration::milliseconds(100)),
581            |ctx: &mut String| {
582                *ctx = format!("{}_executed", ctx);
583                Ok(DefaultAction::Next)
584            },
585        );
586
587        // Execute the timer
588        let action = timer.execute_on_schedule(&mut ctx).await.unwrap();
589
590        // Verify the results
591        assert_eq!(action, DefaultAction::Next);
592        assert_eq!(ctx, "test_context_executed");
593    }
594
595    // Test a timer workflow
596    #[tokio::test]
597    async fn test_timer_workflow() {
598        // Create a context
599        let mut ctx = 0;
600
601        // Create timer nodes
602        let timer1 = Arc::new(SimpleTimer::with_id(
603            "timer1",
604            Schedule::Once(Utc::now() + ChronoDuration::milliseconds(100)),
605            |ctx: &mut i32| {
606                *ctx += 1;
607                Ok(DefaultAction::Next)
608            },
609        ));
610
611        let timer2 = Arc::new(SimpleTimer::with_id(
612            "timer2",
613            Schedule::Once(Utc::now() + ChronoDuration::milliseconds(200)),
614            |ctx: &mut i32| {
615                *ctx += 2;
616                Ok(DefaultAction::Custom("terminate".to_string()))
617            },
618        ));
619
620        // Create a workflow
621        let mut workflow = TimerWorkflow::new(
622            timer1.clone(),
623            DefaultAction::Custom("terminate".to_string()),
624        );
625
626        workflow.add_node(timer2.clone());
627        workflow.set_route(&timer1.id(), DefaultAction::Next, &timer2.id());
628
629        // Execute the workflow with a timeout to prevent the test from hanging
630        let handle = tokio::spawn(async move {
631            workflow.execute(&mut ctx).await.unwrap();
632            ctx
633        });
634
635        // Wait for the workflow to complete or timeout
636        let result = tokio::time::timeout(Duration::from_secs(1), handle)
637            .await
638            .unwrap()
639            .unwrap();
640
641        // Verify the results
642        assert_eq!(result, 3); // 1 from timer1 + 2 from timer2
643    }
644}