1use std::fmt;
8use std::time::Duration;
9
10#[derive(Clone, Debug, PartialEq, Eq)]
12pub enum TimeoutError {
13 TaskTimeout { task_id: String, timeout: Duration },
15 WorkflowTimeout { timeout: Duration },
17}
18
19impl fmt::Display for TimeoutError {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 TimeoutError::TaskTimeout { task_id, timeout } => {
23 write!(
24 f,
25 "Task '{}' exceeded timeout limit of {:?}",
26 task_id, timeout
27 )
28 }
29 TimeoutError::WorkflowTimeout { timeout } => {
30 write!(f, "Workflow exceeded timeout limit of {:?}", timeout)
31 }
32 }
33 }
34}
35
36impl std::error::Error for TimeoutError {}
37
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
43pub struct TaskTimeout(Duration);
44
45impl TaskTimeout {
46 pub fn new(duration: Duration) -> Self {
61 Self(duration)
62 }
63
64 pub fn from_secs(secs: u64) -> Self {
78 Self(Duration::from_secs(secs))
79 }
80
81 pub fn from_millis(millis: u64) -> Self {
95 Self(Duration::from_millis(millis))
96 }
97
98 pub fn duration(&self) -> Duration {
110 self.0
111 }
112}
113
114impl Default for TaskTimeout {
115 fn default() -> Self {
117 Self(Duration::from_secs(30))
118 }
119}
120
121#[derive(Clone, Copy, Debug, PartialEq, Eq)]
126pub struct WorkflowTimeout(Duration);
127
128impl WorkflowTimeout {
129 pub fn new(duration: Duration) -> Self {
144 Self(duration)
145 }
146
147 pub fn from_secs(secs: u64) -> Self {
161 Self(Duration::from_secs(secs))
162 }
163
164 pub fn from_millis(millis: u64) -> Self {
178 Self(Duration::from_millis(millis))
179 }
180
181 pub fn duration(&self) -> Duration {
193 self.0
194 }
195}
196
197impl Default for WorkflowTimeout {
198 fn default() -> Self {
200 Self(Duration::from_secs(300))
201 }
202}
203
204#[derive(Clone, Debug)]
224pub struct TimeoutConfig {
225 pub task_timeout: Option<TaskTimeout>,
227 pub workflow_timeout: Option<WorkflowTimeout>,
229}
230
231impl TimeoutConfig {
232 pub fn new() -> Self {
247 Self {
248 task_timeout: Some(TaskTimeout::default()),
249 workflow_timeout: Some(WorkflowTimeout::default()),
250 }
251 }
252
253 pub fn no_task_timeout() -> Self {
265 Self {
266 task_timeout: None,
267 workflow_timeout: Some(WorkflowTimeout::default()),
268 }
269 }
270
271 pub fn no_workflow_timeout() -> Self {
283 Self {
284 task_timeout: Some(TaskTimeout::default()),
285 workflow_timeout: None,
286 }
287 }
288
289 pub fn no_timeouts() -> Self {
301 Self {
302 task_timeout: None,
303 workflow_timeout: None,
304 }
305 }
306}
307
308impl Default for TimeoutConfig {
309 fn default() -> Self {
311 Self::new()
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn test_timeout_error_display() {
321 let task_err = TimeoutError::TaskTimeout {
322 task_id: "task-1".to_string(),
323 timeout: Duration::from_secs(30),
324 };
325 assert!(task_err.to_string().contains("task-1"));
326 assert!(task_err.to_string().contains("30s"));
327
328 let workflow_err = TimeoutError::WorkflowTimeout {
329 timeout: Duration::from_secs(300),
330 };
331 assert!(workflow_err.to_string().contains("Workflow"));
332 assert!(workflow_err.to_string().contains("300s"));
333 }
334
335 #[test]
336 fn test_task_timeout_creation() {
337 let timeout = TaskTimeout::new(Duration::from_secs(45));
338 assert_eq!(timeout.duration(), Duration::from_secs(45));
339 }
340
341 #[test]
342 fn test_task_timeout_convenience_constructors() {
343 let secs = TaskTimeout::from_secs(60);
344 assert_eq!(secs.duration(), Duration::from_secs(60));
345
346 let millis = TaskTimeout::from_millis(5000);
347 assert_eq!(millis.duration(), Duration::from_millis(5000));
348 }
349
350 #[test]
351 fn test_task_timeout_default() {
352 let timeout = TaskTimeout::default();
353 assert_eq!(timeout.duration(), Duration::from_secs(30));
354 }
355
356 #[test]
357 fn test_workflow_timeout_creation() {
358 let timeout = WorkflowTimeout::new(Duration::from_secs(600));
359 assert_eq!(timeout.duration(), Duration::from_secs(600));
360 }
361
362 #[test]
363 fn test_workflow_timeout_convenience_constructors() {
364 let secs = WorkflowTimeout::from_secs(300);
365 assert_eq!(secs.duration(), Duration::from_secs(300));
366
367 let millis = WorkflowTimeout::from_millis(10000);
368 assert_eq!(millis.duration(), Duration::from_millis(10000));
369 }
370
371 #[test]
372 fn test_workflow_timeout_default() {
373 let timeout = WorkflowTimeout::default();
374 assert_eq!(timeout.duration(), Duration::from_secs(300));
375 }
376
377 #[test]
378 fn test_timeout_config_defaults() {
379 let config = TimeoutConfig::new();
380 assert!(config.task_timeout.is_some());
381 assert!(config.workflow_timeout.is_some());
382 assert_eq!(
383 config.task_timeout.unwrap().duration(),
384 Duration::from_secs(30)
385 );
386 assert_eq!(
387 config.workflow_timeout.unwrap().duration(),
388 Duration::from_secs(300)
389 );
390 }
391
392 #[test]
393 fn test_timeout_config_disable_task_timeout() {
394 let config = TimeoutConfig::no_task_timeout();
395 assert!(config.task_timeout.is_none());
396 assert!(config.workflow_timeout.is_some());
397 }
398
399 #[test]
400 fn test_timeout_config_disable_workflow_timeout() {
401 let config = TimeoutConfig::no_workflow_timeout();
402 assert!(config.task_timeout.is_some());
403 assert!(config.workflow_timeout.is_none());
404 }
405
406 #[test]
407 fn test_timeout_config_no_timeouts() {
408 let config = TimeoutConfig::no_timeouts();
409 assert!(config.task_timeout.is_none());
410 assert!(config.workflow_timeout.is_none());
411 }
412
413 #[test]
414 fn test_timeout_config_default_impl() {
415 let config = TimeoutConfig::default();
416 assert!(config.task_timeout.is_some());
417 assert!(config.workflow_timeout.is_some());
418 }
419
420 #[test]
421 fn test_task_timeout_copy() {
422 let timeout1 = TaskTimeout::from_secs(30);
423 let timeout2 = timeout1;
424 assert_eq!(timeout1, timeout2);
425 }
426
427 #[test]
428 fn test_workflow_timeout_copy() {
429 let timeout1 = WorkflowTimeout::from_secs(300);
430 let timeout2 = timeout1;
431 assert_eq!(timeout1, timeout2);
432 }
433
434 #[test]
435 fn test_timeout_error_equality() {
436 let err1 = TimeoutError::TaskTimeout {
437 task_id: "task-1".to_string(),
438 timeout: Duration::from_secs(30),
439 };
440 let err2 = TimeoutError::TaskTimeout {
441 task_id: "task-1".to_string(),
442 timeout: Duration::from_secs(30),
443 };
444 let err3 = TimeoutError::TaskTimeout {
445 task_id: "task-2".to_string(),
446 timeout: Duration::from_secs(30),
447 };
448
449 assert_eq!(err1, err2);
450 assert_ne!(err1, err3);
451 }
452
453 #[tokio::test]
456 async fn test_workflow_with_task_timeout() {
457 use crate::workflow::{dag::Workflow, executor::WorkflowExecutor, task::TaskId};
458 use crate::workflow::task::{TaskContext, TaskResult, WorkflowTask};
459 use async_trait::async_trait;
460
461 struct SlowTask {
463 id: TaskId,
464 name: String,
465 sleep_duration: Duration,
466 }
467
468 #[async_trait]
469 impl WorkflowTask for SlowTask {
470 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, crate::workflow::TaskError> {
471 tokio::time::sleep(self.sleep_duration).await;
472 Ok(TaskResult::Success)
473 }
474
475 fn id(&self) -> TaskId {
476 self.id.clone()
477 }
478
479 fn name(&self) -> &str {
480 &self.name
481 }
482 }
483
484 let mut workflow = Workflow::new();
486 workflow.add_task(Box::new(SlowTask {
487 id: TaskId::new("slow-task"),
488 name: "Slow Task".to_string(),
489 sleep_duration: Duration::from_millis(200),
490 }));
491
492 let config = TimeoutConfig {
494 task_timeout: Some(TaskTimeout::from_millis(100)),
495 workflow_timeout: None,
496 };
497
498 let mut executor = WorkflowExecutor::new(workflow)
499 .with_timeout_config(config);
500
501 let result = executor.execute().await;
503
504 assert!(result.is_ok());
508 }
509
510 #[tokio::test]
511 async fn test_workflow_with_workflow_timeout() {
512 use crate::workflow::{dag::Workflow, executor::WorkflowExecutor, task::TaskId};
513 use crate::workflow::task::{TaskContext, TaskResult, WorkflowTask};
514 use async_trait::async_trait;
515
516 struct SlowTask {
518 id: TaskId,
519 name: String,
520 }
521
522 #[async_trait]
523 impl WorkflowTask for SlowTask {
524 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, crate::workflow::TaskError> {
525 tokio::time::sleep(Duration::from_millis(100)).await;
527 Ok(TaskResult::Success)
528 }
529
530 fn id(&self) -> TaskId {
531 self.id.clone()
532 }
533
534 fn name(&self) -> &str {
535 &self.name
536 }
537 }
538
539 let mut workflow = Workflow::new();
541 for i in 1..=5 {
542 workflow.add_task(Box::new(SlowTask {
543 id: TaskId::new(format!("task-{}", i)),
544 name: format!("Task {}", i),
545 }));
546 }
547
548 let config = TimeoutConfig {
550 task_timeout: None,
551 workflow_timeout: Some(WorkflowTimeout::from_millis(200)),
552 };
553
554 let mut executor = WorkflowExecutor::new(workflow)
555 .with_timeout_config(config);
556
557 let result = executor.execute_with_timeout().await;
559
560 assert!(result.is_ok());
564 }
565
566 #[tokio::test]
567 async fn test_workflow_timeout_configured_but_not_exceeded() {
568 use crate::workflow::{dag::Workflow, executor::WorkflowExecutor, task::TaskId};
569 use crate::workflow::task::{TaskContext, TaskResult, WorkflowTask};
570 use async_trait::async_trait;
571
572 struct FastTask {
574 id: TaskId,
575 name: String,
576 }
577
578 #[async_trait]
579 impl WorkflowTask for FastTask {
580 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, crate::workflow::TaskError> {
581 Ok(TaskResult::Success)
583 }
584
585 fn id(&self) -> TaskId {
586 self.id.clone()
587 }
588
589 fn name(&self) -> &str {
590 &self.name
591 }
592 }
593
594 let mut workflow = Workflow::new();
596 workflow.add_task(Box::new(FastTask {
597 id: TaskId::new("fast-task"),
598 name: "Fast Task".to_string(),
599 }));
600
601 let config = TimeoutConfig {
603 task_timeout: Some(TaskTimeout::from_secs(5)),
604 workflow_timeout: Some(WorkflowTimeout::from_secs(10)),
605 };
606
607 let mut executor = WorkflowExecutor::new(workflow)
608 .with_timeout_config(config);
609
610 let result = executor.execute().await;
612 assert!(result.is_ok());
613 assert!(result.unwrap().success);
614 }
615}