#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::Event;
use duroxide::EventKind;
use duroxide::providers::sqlite::SqliteProvider;
use duroxide::providers::{ExecutionMetadata, Provider, TagFilter, WorkItem};
use std::sync::Arc;
use std::time::Duration;
mod common;
use common::test_create_execution;
#[tokio::test]
async fn test_ignore_work_after_terminal_event() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
let instance = "inst-terminal";
let _ = test_create_execution(store.as_ref(), instance, "TermOrch", "1.0.0", "seed", None, None)
.await
.unwrap();
store
.append_with_execution(
instance,
1,
vec![Event::with_event_id(
2,
instance.to_string(),
1,
None,
EventKind::OrchestrationCompleted {
output: "done".to_string(),
},
)],
)
.await
.unwrap();
store
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: instance.to_string(),
name: "Ignored".to_string(),
data: "x".to_string(),
},
None,
)
.await
.unwrap();
let (item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, instance);
assert_eq!(item.messages.len(), 1);
Provider::ack_orchestration_item(
store.as_ref(),
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
let hist = store.read(instance).await.unwrap_or_default();
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
);
}
#[tokio::test]
async fn test_fetch_orchestration_item_new_instance() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
store
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: "test-instance".to_string(),
orchestration: "TestOrch".to_string(),
input: "test-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: duroxide::INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
let (item, _lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, "test-instance");
assert_eq!(item.orchestration_name, "TestOrch");
assert_eq!(item.version, "1.0.0");
assert_eq!(item.execution_id, 1);
assert!(item.history.is_empty());
assert_eq!(item.messages.len(), 1);
assert!(matches!(
&item.messages[0],
WorkItem::StartOrchestration { orchestration, .. } if orchestration == "TestOrch"
));
}
#[tokio::test]
async fn test_fetch_orchestration_item_existing_instance() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
test_create_execution(
store.as_ref(),
"test-instance",
"TestOrch",
"1.0.0",
"test-input",
None,
None,
)
.await
.unwrap();
store
.append_with_execution(
"test-instance",
1,
vec![Event::with_event_id(
2,
"test-instance".to_string(),
1,
None,
EventKind::ActivityScheduled {
name: "TestActivity".to_string(),
input: "activity-input".to_string(),
session_id: None,
tag: None,
},
)],
)
.await
.unwrap();
store
.enqueue_for_orchestrator(
WorkItem::ActivityCompleted {
instance: "test-instance".to_string(),
execution_id: 1,
id: 1,
result: "activity-result".to_string(),
},
None,
)
.await
.unwrap();
let (item, _lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, "test-instance");
assert_eq!(item.orchestration_name, "TestOrch");
assert_eq!(item.version, "1.0.0");
assert_eq!(item.execution_id, 1);
assert_eq!(item.history.len(), 2);
assert_eq!(item.messages.len(), 1);
assert!(matches!(
&item.messages[0],
WorkItem::ActivityCompleted { result, .. } if result == "activity-result"
));
}
#[tokio::test]
async fn test_fetch_orchestration_item_no_work() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
let item = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap();
assert!(item.is_none());
}
#[tokio::test]
async fn test_ack_orchestration_item_atomic() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
store
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: "test-instance".to_string(),
orchestration: "TestOrch".to_string(),
input: "test-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: duroxide::INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
let (_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let history_delta = vec![
Event::with_event_id(
1,
"test-instance".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "test-input".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"test-instance".to_string(),
1,
None,
EventKind::ActivityScheduled {
name: "TestActivity".to_string(),
input: "activity-input".to_string(),
session_id: None,
tag: None,
},
),
];
let worker_items = vec![WorkItem::ActivityExecute {
instance: "test-instance".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "activity-input".to_string(),
session_id: None,
tag: None,
}];
store
.ack_orchestration_item(
&lock_token,
1,
history_delta,
worker_items,
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let history = store.read("test-instance").await.unwrap_or_default();
assert_eq!(history.len(), 2);
assert!(matches!(&history[0].kind, EventKind::OrchestrationStarted { .. }));
assert!(matches!(&history[1].kind, EventKind::ActivityScheduled { .. }));
let (worker_item, _, _) = store
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert!(matches!(worker_item, WorkItem::ActivityExecute { .. }));
assert!(
store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_ack_orchestration_item_error_handling() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
let result = store
.ack_orchestration_item(
"invalid-token",
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("Invalid lock token"));
}
#[tokio::test]
async fn test_abandon_orchestration_item() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
store
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: "test-instance".to_string(),
orchestration: "TestOrch".to_string(),
input: "test-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: duroxide::INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
let (_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.abandon_orchestration_item(&lock_token, None, false)
.await
.unwrap();
let (item2, _lock_token2, _attempt_count2) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item2.instance, "test-instance");
assert!(matches!(
&item2.messages[0],
WorkItem::StartOrchestration { orchestration, .. } if orchestration == "TestOrch"
));
}
#[tokio::test]
async fn test_abandon_orchestration_item_with_delay() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
store
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: "test-instance".to_string(),
orchestration: "TestOrch".to_string(),
input: "test-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: duroxide::INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
let lock_timeout = Duration::from_secs(30);
let (_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.abandon_orchestration_item(&lock_token, Some(Duration::from_millis(500)), false)
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
tokio::time::sleep(std::time::Duration::from_millis(600)).await;
let (item2, _lock_token2, _attempt_count2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item2.instance, "test-instance");
}
#[tokio::test]
async fn test_abandon_orchestration_item_error_handling() {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store: Arc<SqliteProvider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
let result = store.abandon_orchestration_item("invalid-token", None, false).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_in_memory_provider_atomic_operations() {
let store: Arc<dyn Provider> = Arc::new(SqliteProvider::new_in_memory().await.unwrap());
store
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: "test-instance".to_string(),
orchestration: "TestOrch".to_string(),
input: "test-input".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: duroxide::INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
let (item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, "test-instance");
assert_eq!(item.orchestration_name, "TestOrch");
let history_delta = vec![Event::with_event_id(
1,
"test-instance".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "test-input".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)];
store
.ack_orchestration_item(
&lock_token,
1,
history_delta,
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let history = store.read("test-instance").await.unwrap_or_default();
assert_eq!(history.len(), 1);
assert!(matches!(&history[0].kind, EventKind::OrchestrationStarted { .. }));
store
.enqueue_for_orchestrator(
WorkItem::ActivityCompleted {
instance: "test-instance".to_string(),
execution_id: 1,
id: 1,
result: "result".to_string(),
},
None,
)
.await
.unwrap();
let (_item2, lock_token2, _attempt_count2) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.abandon_orchestration_item(&lock_token2, None, false)
.await
.unwrap();
let (item3, _lock_token3, _attempt_count3) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item3.instance, "test-instance");
}