1use async_trait::async_trait;
9use chrono::{DateTime, Datelike, Duration as ChronoDuration, Timelike, Utc, Weekday};
10use floxide_core::{error::FloxideError, ActionType, DefaultAction, Node, NodeId, NodeOutcome};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::time::sleep;
17use tracing::{debug, warn};
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
22pub enum Schedule {
23 Once(DateTime<Utc>),
25
26 Interval(Duration),
28
29 Daily(u32, u32),
31
32 Weekly(Weekday, u32, u32),
34
35 Monthly(u32, u32, u32),
37
38 Cron(String),
41}
42
43impl Schedule {
44 pub fn next_execution(&self) -> Result<DateTime<Utc>, FloxideError> {
46 let now = Utc::now();
47
48 match self {
49 Schedule::Once(time) => {
50 if time <= &now {
51 Err(FloxideError::Other(
52 "Scheduled time has already passed".to_string(),
53 ))
54 } else {
55 Ok(*time)
56 }
57 }
58
59 Schedule::Interval(duration) => Ok(now
60 + ChronoDuration::from_std(*duration).map_err(|e| {
61 FloxideError::Other(format!("Failed to convert duration: {}", e))
62 })?),
63
64 Schedule::Daily(hour, minute) => {
65 let mut next = now;
66
67 next = next
69 .with_hour(*hour)
70 .and_then(|dt| dt.with_minute(*minute))
71 .and_then(|dt| dt.with_second(0))
72 .and_then(|dt| dt.with_nanosecond(0))
73 .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
74
75 if next <= now {
77 next += ChronoDuration::days(1);
78 }
79
80 Ok(next)
81 }
82
83 Schedule::Weekly(weekday, hour, minute) => {
84 let mut next = now;
85
86 next = next
88 .with_hour(*hour)
89 .and_then(|dt| dt.with_minute(*minute))
90 .and_then(|dt| dt.with_second(0))
91 .and_then(|dt| dt.with_nanosecond(0))
92 .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
93
94 let days_until_weekday =
96 (*weekday as i32 - now.weekday().num_days_from_monday() as i32 + 7) % 7;
97
98 if days_until_weekday == 0 && next <= now {
100 next += ChronoDuration::days(7);
101 } else {
102 next += ChronoDuration::days(days_until_weekday as i64);
103 }
104
105 Ok(next)
106 }
107
108 Schedule::Monthly(day, hour, minute) => {
109 let mut next = now;
110
111 next = next
113 .with_hour(*hour)
114 .and_then(|dt| dt.with_minute(*minute))
115 .and_then(|dt| dt.with_second(0))
116 .and_then(|dt| dt.with_nanosecond(0))
117 .ok_or_else(|| FloxideError::Other("Invalid hour or minute".to_string()))?;
118
119 let current_day = now.day();
121
122 if *day <= 31 {
124 match next.with_day(*day) {
126 Some(date) => next = date,
127 None => {
128 return Err(FloxideError::Other(format!(
129 "Invalid day {} for the current month",
130 day
131 )))
132 }
133 }
134
135 if next <= now || *day < current_day {
138 next += ChronoDuration::days(32); next = next.with_day(1).ok_or_else(|| {
141 FloxideError::Other("Failed to set day to 1".to_string())
142 })?;
143
144 next = next.with_day(*day).ok_or_else(|| {
146 FloxideError::Other(format!("Invalid day {} for next month", day))
147 })?;
148 }
149 } else {
150 return Err(FloxideError::Other(format!(
151 "Invalid day of month: {}",
152 day
153 )));
154 }
155
156 Ok(next)
157 }
158
159 Schedule::Cron(_expression) => {
160 Err(FloxideError::Other(
163 "Cron expressions are not yet implemented".to_string(),
164 ))
165 }
166 }
167 }
168
169 pub fn duration_until_next(&self) -> Result<Duration, FloxideError> {
171 let next = self.next_execution()?;
172 let now = Utc::now();
173
174 let duration = next.signed_duration_since(now);
175 if duration.num_milliseconds() <= 0 {
176 return Err(FloxideError::Other(
177 "Scheduled time is in the past".to_string(),
178 ));
179 }
180
181 Ok(Duration::from_millis(duration.num_milliseconds() as u64))
182 }
183}
184
185#[async_trait]
187pub trait TimerNode<Context, Action>: Send + Sync
188where
189 Context: Send + Sync + 'static,
190 Action: ActionType + Send + Sync + 'static + Default + Debug,
191{
192 fn schedule(&self) -> Schedule;
194
195 async fn execute_on_schedule(&self, ctx: &mut Context) -> Result<Action, FloxideError>;
197
198 fn id(&self) -> NodeId;
200}
201
202pub struct SimpleTimer<F>
204where
205 F: Send + Sync + 'static,
206{
207 id: NodeId,
208 schedule: Schedule,
209 action: F,
210}
211
212impl<F> SimpleTimer<F>
213where
214 F: Send + Sync + 'static,
215{
216 pub fn new(schedule: Schedule, action: F) -> Self {
218 Self {
219 id: Uuid::new_v4().to_string(),
220 schedule,
221 action,
222 }
223 }
224
225 pub fn with_id(id: impl Into<String>, schedule: Schedule, action: F) -> Self {
227 Self {
228 id: id.into(),
229 schedule,
230 action,
231 }
232 }
233}
234
235#[async_trait]
236impl<Context, Action, F> TimerNode<Context, Action> for SimpleTimer<F>
237where
238 Context: Send + Sync + 'static,
239 Action: ActionType + Send + Sync + 'static + Default + Debug,
240 F: Fn(&mut Context) -> Result<Action, FloxideError> + Send + Sync + 'static,
241{
242 fn schedule(&self) -> Schedule {
243 self.schedule.clone()
244 }
245
246 async fn execute_on_schedule(&self, ctx: &mut Context) -> Result<Action, FloxideError> {
247 (self.action)(ctx)
248 }
249
250 fn id(&self) -> NodeId {
251 self.id.clone()
252 }
253}
254
255pub struct TimerWorkflow<Context, Action>
257where
258 Context: Send + Sync + 'static,
259 Action: ActionType + Send + Sync + 'static + Default + Debug,
260{
261 nodes: HashMap<NodeId, Arc<dyn TimerNode<Context, Action>>>,
262 routes: HashMap<(NodeId, Action), NodeId>,
263 initial_node: NodeId,
264 termination_action: Action,
265}
266
267impl<Context, Action> TimerWorkflow<Context, Action>
268where
269 Context: Send + Sync + 'static,
270 Action: ActionType + Send + Sync + 'static + Default + Debug,
271{
272 pub fn new(
274 initial_node: Arc<dyn TimerNode<Context, Action>>,
275 termination_action: Action,
276 ) -> Self {
277 let id = initial_node.id();
278
279 let mut nodes = HashMap::new();
280 nodes.insert(id.clone(), initial_node);
281
282 Self {
283 nodes,
284 routes: HashMap::new(),
285 initial_node: id,
286 termination_action,
287 }
288 }
289
290 pub fn add_node(&mut self, node: Arc<dyn TimerNode<Context, Action>>) {
292 let id = node.id();
293 self.nodes.insert(id, node);
294 }
295
296 pub fn set_route(&mut self, from_id: &NodeId, action: Action, to_id: &NodeId) {
298 self.routes.insert((from_id.clone(), action), to_id.clone());
299 }
300
301 pub async fn execute(&self, ctx: &mut Context) -> Result<(), FloxideError> {
303 let mut current_node_id = self.initial_node.clone();
304
305 loop {
306 let node = self.nodes.get(¤t_node_id).ok_or_else(|| {
307 FloxideError::Other(format!("Node not found: {}", current_node_id))
308 })?;
309
310 let wait_duration = match node.schedule().duration_until_next() {
312 Ok(duration) => duration,
313 Err(e) => {
314 warn!(
315 "Failed to calculate next execution time for node {}: {}",
316 current_node_id, e
317 );
318 Duration::from_secs(5)
320 }
321 };
322
323 debug!(
325 "Waiting {:?} until next execution of node {}",
326 wait_duration, current_node_id
327 );
328 sleep(wait_duration).await;
329
330 let action = match node.execute_on_schedule(ctx).await {
332 Ok(action) => action,
333 Err(e) => {
334 warn!("Error executing node {}: {}", current_node_id, e);
335 Action::default()
337 }
338 };
339
340 if action == self.termination_action {
342 debug!("Workflow terminated by node {}", current_node_id);
343 break;
344 }
345
346 if let Some(next_node_id) = self.routes.get(&(current_node_id.clone(), action.clone()))
348 {
349 debug!(
350 "Moving from node {} to node {}",
351 current_node_id, next_node_id
352 );
353 current_node_id = next_node_id.clone();
354 } else {
355 if let Some(next_node_id) = self
357 .routes
358 .get(&(current_node_id.clone(), Action::default()))
359 {
360 debug!(
361 "No route found for action {:?}, using default route to node {}",
362 action, next_node_id
363 );
364 current_node_id = next_node_id.clone();
365 } else {
366 warn!(
368 "No route found for node {} with action {:?} and no default route",
369 current_node_id, action
370 );
371 break;
372 }
373 }
374 }
375
376 Ok(())
377 }
378}
379
380pub struct TimerNodeAdapter<Context, Action>
382where
383 Context: Send + Sync + 'static,
384 Action: ActionType + Send + Sync + 'static + Default + Debug,
385{
386 node: Arc<dyn TimerNode<Context, Action>>,
387 id: NodeId,
388 execute_immediately: bool,
389}
390
391impl<Context, Action> TimerNodeAdapter<Context, Action>
392where
393 Context: Send + Sync + 'static,
394 Action: ActionType + Send + Sync + 'static + Default + Debug,
395{
396 pub fn new(node: Arc<dyn TimerNode<Context, Action>>, execute_immediately: bool) -> Self {
398 let id = node.id();
399 Self {
400 node,
401 id,
402 execute_immediately,
403 }
404 }
405
406 pub fn with_id(
408 node: Arc<dyn TimerNode<Context, Action>>,
409 id: impl Into<String>,
410 execute_immediately: bool,
411 ) -> Self {
412 Self {
413 node,
414 id: id.into(),
415 execute_immediately,
416 }
417 }
418}
419
420#[async_trait]
421impl<Context, Action> Node<Context, Action> for TimerNodeAdapter<Context, Action>
422where
423 Context: Send + Sync + 'static,
424 Action: ActionType + Send + Sync + 'static + Default + Debug,
425{
426 type Output = ();
427
428 fn id(&self) -> NodeId {
429 self.id.clone()
430 }
431
432 async fn process(
433 &self,
434 ctx: &mut Context,
435 ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
436 if self.execute_immediately {
437 let action = self.node.execute_on_schedule(ctx).await?;
439 Ok(NodeOutcome::RouteToAction(action))
440 } else {
441 let wait_duration = self.node.schedule().duration_until_next()?;
443 debug!(
444 "Waiting {:?} before executing node {}",
445 wait_duration, self.id
446 );
447 sleep(wait_duration).await;
448
449 let action = self.node.execute_on_schedule(ctx).await?;
451 Ok(NodeOutcome::RouteToAction(action))
452 }
453 }
454}
455
456pub struct NestedTimerWorkflow<Context, Action>
458where
459 Context: Send + Sync + 'static,
460 Action: ActionType + Send + Sync + 'static + Default + Debug,
461{
462 workflow: Arc<TimerWorkflow<Context, Action>>,
463 id: NodeId,
464 complete_action: Action,
465 _phantom: PhantomData<(Context, Action)>,
466}
467
468impl<Context, Action> NestedTimerWorkflow<Context, Action>
469where
470 Context: Send + Sync + 'static,
471 Action: ActionType + Send + Sync + 'static + Default + Debug,
472{
473 pub fn new(workflow: Arc<TimerWorkflow<Context, Action>>, complete_action: Action) -> Self {
475 Self {
476 workflow,
477 id: Uuid::new_v4().to_string(),
478 complete_action,
479 _phantom: PhantomData,
480 }
481 }
482
483 pub fn with_id(
485 workflow: Arc<TimerWorkflow<Context, Action>>,
486 id: impl Into<String>,
487 complete_action: Action,
488 ) -> Self {
489 Self {
490 workflow,
491 id: id.into(),
492 complete_action,
493 _phantom: PhantomData,
494 }
495 }
496}
497
498#[async_trait]
499impl<Context, Action> Node<Context, Action> for NestedTimerWorkflow<Context, Action>
500where
501 Context: Send + Sync + 'static,
502 Action: ActionType + Send + Sync + 'static + Default + Debug,
503{
504 type Output = ();
505
506 fn id(&self) -> NodeId {
507 self.id.clone()
508 }
509
510 async fn process(
511 &self,
512 ctx: &mut Context,
513 ) -> Result<NodeOutcome<Self::Output, Action>, FloxideError> {
514 let result = self.workflow.execute(ctx).await;
516
517 match result {
518 Ok(_) => Ok(NodeOutcome::RouteToAction(self.complete_action.clone())),
519 Err(e) => Err(e),
520 }
521 }
522}
523
524pub trait TimerActionExt: ActionType {
526 fn complete() -> Self;
528
529 fn retry() -> Self;
531}
532
533impl TimerActionExt for DefaultAction {
534 fn complete() -> Self {
535 DefaultAction::Custom("timer_complete".to_string())
536 }
537
538 fn retry() -> Self {
539 DefaultAction::Custom("timer_retry".to_string())
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546
547 #[tokio::test]
549 async fn test_schedule_next_execution() {
550 let future_time = Utc::now() + ChronoDuration::hours(1);
552 let once_schedule = Schedule::Once(future_time);
553 let next = once_schedule.next_execution().unwrap();
554 assert_eq!(next, future_time);
555
556 let interval_schedule = Schedule::Interval(Duration::from_secs(60));
558 let next = interval_schedule.next_execution().unwrap();
559 let diff = (next - Utc::now()).num_seconds();
560 assert!(diff > 0 && diff <= 61); let now = Utc::now();
564 let future_hour = (now.hour() + 1) % 24;
565 let daily_schedule = Schedule::Daily(future_hour, 0);
566 let next = daily_schedule.next_execution().unwrap();
567 assert!(next > now);
568 assert_eq!(next.hour(), future_hour);
569 assert_eq!(next.minute(), 0);
570 }
571
572 #[tokio::test]
574 async fn test_simple_timer() {
575 let mut ctx = "test_context".to_string();
577
578 let timer = SimpleTimer::new(
580 Schedule::Once(Utc::now() + ChronoDuration::milliseconds(100)),
581 |ctx: &mut String| {
582 *ctx = format!("{}_executed", ctx);
583 Ok(DefaultAction::Next)
584 },
585 );
586
587 let action = timer.execute_on_schedule(&mut ctx).await.unwrap();
589
590 assert_eq!(action, DefaultAction::Next);
592 assert_eq!(ctx, "test_context_executed");
593 }
594
595 #[tokio::test]
597 async fn test_timer_workflow() {
598 let mut ctx = 0;
600
601 let timer1 = Arc::new(SimpleTimer::with_id(
603 "timer1",
604 Schedule::Once(Utc::now() + ChronoDuration::milliseconds(100)),
605 |ctx: &mut i32| {
606 *ctx += 1;
607 Ok(DefaultAction::Next)
608 },
609 ));
610
611 let timer2 = Arc::new(SimpleTimer::with_id(
612 "timer2",
613 Schedule::Once(Utc::now() + ChronoDuration::milliseconds(200)),
614 |ctx: &mut i32| {
615 *ctx += 2;
616 Ok(DefaultAction::Custom("terminate".to_string()))
617 },
618 ));
619
620 let mut workflow = TimerWorkflow::new(
622 timer1.clone(),
623 DefaultAction::Custom("terminate".to_string()),
624 );
625
626 workflow.add_node(timer2.clone());
627 workflow.set_route(&timer1.id(), DefaultAction::Next, &timer2.id());
628
629 let handle = tokio::spawn(async move {
631 workflow.execute(&mut ctx).await.unwrap();
632 ctx
633 });
634
635 let result = tokio::time::timeout(Duration::from_secs(1), handle)
637 .await
638 .unwrap()
639 .unwrap();
640
641 assert_eq!(result, 3); }
644}