use crate::common::{NAMESPACE, eventually, get_integ_client, rand_6_chars};
use futures::TryStreamExt;
use std::time::{Duration, SystemTime};
use temporalio_client::schedules::{
CreateScheduleOptions, ListSchedulesOptions, ScheduleAction, ScheduleBackfill,
ScheduleCalendarSpec, ScheduleOverlapPolicy, ScheduleSpec,
};
async fn test_client() -> temporalio_client::Client {
get_integ_client(NAMESPACE.to_string(), None).await
}
#[tokio::test]
async fn create_and_describe_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-create-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.note("created paused for testing")
.build(),
)
.await
.unwrap();
assert_eq!(handle.schedule_id(), schedule_id);
let desc = handle.describe().await.unwrap();
assert!(desc.paused());
assert_eq!(desc.note(), Some("created paused for testing"));
assert!(!desc.conflict_token().is_empty());
handle.delete().await.unwrap();
}
#[tokio::test]
async fn create_schedule_with_calendar_spec() {
let client = test_client().await;
let schedule_id = format!("sched-calendar-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_calendar(
ScheduleCalendarSpec::builder().hour("2-7").build(),
))
.paused(true)
.build(),
)
.await
.unwrap();
let desc = handle.describe().await.unwrap();
let raw_spec = desc.raw().schedule.as_ref().unwrap().spec.as_ref().unwrap();
assert_eq!(raw_spec.structured_calendar.len(), 1);
handle.delete().await.unwrap();
}
#[tokio::test]
async fn create_schedule_with_trigger_immediately() {
let client = test_client().await;
let schedule_id = format!("sched-trigimm-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(86400)))
.trigger_immediately(true)
.build(),
)
.await
.unwrap();
let desc = eventually(
|| {
let handle = client.get_schedule_handle(&schedule_id);
async move {
let desc = handle.describe().await.map_err(|e| e.to_string())?;
if desc.action_count() > 0 {
Ok(desc)
} else {
Err("action_count still 0".to_string())
}
}
},
Duration::from_secs(10),
)
.await
.unwrap();
assert!(desc.action_count() >= 1);
handle.delete().await.unwrap();
}
#[tokio::test]
async fn pause_and_unpause_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-pause-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.build(),
)
.await
.unwrap();
let desc = handle.describe().await.unwrap();
assert!(!desc.paused());
handle.pause(Some("pausing for maintenance")).await.unwrap();
let desc = handle.describe().await.unwrap();
assert!(desc.paused());
assert_eq!(desc.note(), Some("pausing for maintenance"));
handle.unpause(Some("maintenance complete")).await.unwrap();
let desc = handle.describe().await.unwrap();
assert!(!desc.paused());
assert_eq!(desc.note(), Some("maintenance complete"));
handle.pause(None::<&str>).await.unwrap();
let desc = handle.describe().await.unwrap();
assert!(desc.paused());
assert!(desc.note().is_some());
handle.delete().await.unwrap();
}
#[tokio::test]
async fn update_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-update-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.note("before update")
.build(),
)
.await
.unwrap();
handle
.update(|u| {
u.set_note("updated notes");
})
.await
.unwrap();
let desc = handle.describe().await.unwrap();
assert_eq!(desc.note(), Some("updated notes"));
handle.delete().await.unwrap();
}
#[tokio::test]
async fn trigger_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-trigger-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.build(),
)
.await
.unwrap();
handle
.trigger(ScheduleOverlapPolicy::AllowAll)
.await
.unwrap();
let desc = eventually(
|| {
let handle = client.get_schedule_handle(&schedule_id);
async move {
let desc = handle.describe().await.map_err(|e| e.to_string())?;
if desc.action_count() > 0 {
Ok(desc)
} else {
Err("action_count still 0".to_string())
}
}
},
Duration::from_secs(10),
)
.await
.unwrap();
assert!(desc.action_count() >= 1);
handle.delete().await.unwrap();
}
#[tokio::test]
async fn backfill_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-backfill-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.build(),
)
.await
.unwrap();
let now = SystemTime::now();
let two_hours_ago = now - Duration::from_secs(7200);
handle
.backfill([ScheduleBackfill::new(two_hours_ago, now)
.overlap_policy(ScheduleOverlapPolicy::AllowAll)
.build()])
.await
.unwrap();
handle.delete().await.unwrap();
}
#[tokio::test]
async fn delete_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-delete-{}", rand_6_chars());
client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.build(),
)
.await
.unwrap();
let handle = client.get_schedule_handle(&schedule_id);
handle.delete().await.unwrap();
let err = handle.describe().await.unwrap_err();
assert!(err.to_string().contains("not found") || err.to_string().contains("NotFound"));
}
#[tokio::test]
async fn get_schedule_handle_for_existing_schedule() {
let client = test_client().await;
let schedule_id = format!("sched-handle-{}", rand_6_chars());
client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.build(),
)
.await
.unwrap();
let handle = client.get_schedule_handle(&schedule_id);
let desc = handle.describe().await.unwrap();
assert!(desc.paused());
handle.delete().await.unwrap();
}
#[tokio::test]
async fn list_schedules() {
let client = test_client().await;
let suffix = rand_6_chars();
let num_schedules = 3;
let mut schedule_ids = Vec::new();
for i in 0..num_schedules {
let schedule_id = format!("sched-list-{suffix}-{i}");
schedule_ids.push(schedule_id.clone());
client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.build(),
)
.await
.unwrap();
}
let found_ids = eventually(
|| {
let client = client.clone();
let expected = schedule_ids.clone();
async move {
let schedules: Vec<_> = client
.list_schedules(ListSchedulesOptions::default())
.try_collect()
.await
.map_err(|e| e.to_string())?;
let found: Vec<_> = schedules
.iter()
.filter(|s| expected.contains(&s.schedule_id().to_string()))
.map(|s| s.schedule_id().to_string())
.collect();
if found.len() == expected.len() {
Ok(found)
} else {
Err(format!(
"Expected {} schedules, found {}",
expected.len(),
found.len()
))
}
}
},
Duration::from_secs(10),
)
.await
.unwrap();
assert_eq!(found_ids.len(), num_schedules);
for id in &schedule_ids {
client.get_schedule_handle(id).delete().await.unwrap();
}
}
#[tokio::test]
async fn describe_accessors_match_created_values() {
let client = test_client().await;
let schedule_id = format!("sched-accessors-{}", rand_6_chars());
let handle = client
.create_schedule(
&schedule_id,
CreateScheduleOptions::builder()
.action(ScheduleAction::start_workflow(
"MyWorkflow",
"my-task-queue",
format!("wf-{}", rand_6_chars()),
))
.spec(ScheduleSpec::from_interval(Duration::from_secs(3600)))
.paused(true)
.note("accessors test")
.build(),
)
.await
.unwrap();
let desc = handle.describe().await.unwrap();
assert!(desc.paused());
assert_eq!(desc.note(), Some("accessors test"));
assert_eq!(desc.action_count(), 0);
assert_eq!(desc.missed_catchup_window(), 0);
assert_eq!(desc.overlap_skipped(), 0);
assert!(desc.recent_actions().is_empty());
assert!(desc.running_actions().is_empty());
assert!(desc.create_time().is_some());
handle.delete().await.unwrap();
}