1use crate::{Client, NamespacedClient, grpc::WorkflowService};
2use futures_util::stream;
3use std::{
4 collections::VecDeque,
5 pin::Pin,
6 task::{Context, Poll},
7 time::{Duration, SystemTime},
8};
9use temporalio_common::protos::{
10 proto_ts_to_system_time,
11 temporal::api::{
12 common::v1 as common_proto, schedule::v1 as schedule_proto,
13 taskqueue::v1 as taskqueue_proto, workflow::v1 as workflow_proto, workflowservice::v1::*,
14 },
15};
16use tonic::IntoRequest;
17use uuid::Uuid;
18
19#[derive(Debug, thiserror::Error)]
21#[non_exhaustive]
22pub enum ScheduleError {
23 #[error("Server error: {0}")]
25 Rpc(#[from] tonic::Status),
26}
27
28#[derive(Debug, Clone, bon::Builder)]
30#[builder(on(String, into))]
31#[non_exhaustive]
32pub struct CreateScheduleOptions {
33 pub action: ScheduleAction,
35 pub spec: ScheduleSpec,
37 #[builder(default)]
39 pub trigger_immediately: bool,
40 #[builder(default)]
43 pub overlap_policy: ScheduleOverlapPolicy,
44 #[builder(default)]
46 pub paused: bool,
47 #[builder(default)]
49 pub note: String,
50}
51
52#[derive(Debug, Clone, PartialEq)]
55#[non_exhaustive]
56pub enum ScheduleAction {
57 StartWorkflow {
59 workflow_type: String,
61 task_queue: String,
63 workflow_id: String,
65 },
66}
67
68impl ScheduleAction {
69 pub fn start_workflow(
71 workflow_type: impl Into<String>,
72 task_queue: impl Into<String>,
73 workflow_id: impl Into<String>,
74 ) -> Self {
75 Self::StartWorkflow {
76 workflow_type: workflow_type.into(),
77 task_queue: task_queue.into(),
78 workflow_id: workflow_id.into(),
79 }
80 }
81
82 pub(crate) fn into_proto(self) -> schedule_proto::ScheduleAction {
83 match self {
84 Self::StartWorkflow {
85 workflow_type,
86 task_queue,
87 workflow_id,
88 } => schedule_proto::ScheduleAction {
89 action: Some(schedule_proto::schedule_action::Action::StartWorkflow(
90 workflow_proto::NewWorkflowExecutionInfo {
91 workflow_id,
92 workflow_type: Some(common_proto::WorkflowType {
93 name: workflow_type,
94 }),
95 task_queue: Some(taskqueue_proto::TaskQueue {
96 name: task_queue,
97 ..Default::default()
98 }),
99 ..Default::default()
100 },
101 )),
102 },
103 }
104 }
105}
106
107#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
112#[builder(on(String, into))]
113pub struct ScheduleSpec {
114 #[builder(default)]
116 pub intervals: Vec<ScheduleIntervalSpec>,
117 #[builder(default)]
119 pub calendars: Vec<ScheduleCalendarSpec>,
120 #[builder(default)]
122 pub exclude_calendars: Vec<ScheduleCalendarSpec>,
123 #[builder(default)]
125 pub cron_strings: Vec<String>,
126 #[builder(default)]
128 pub timezone_name: String,
129 pub start_time: Option<SystemTime>,
131 pub end_time: Option<SystemTime>,
133 pub jitter: Option<Duration>,
135}
136
137impl ScheduleSpec {
138 pub fn from_interval(every: Duration) -> Self {
140 Self {
141 intervals: vec![every.into()],
142 ..Default::default()
143 }
144 }
145
146 pub fn from_calendar(calendar: ScheduleCalendarSpec) -> Self {
148 Self {
149 calendars: vec![calendar],
150 ..Default::default()
151 }
152 }
153
154 pub(crate) fn into_proto(self) -> schedule_proto::ScheduleSpec {
155 #[allow(deprecated)]
156 schedule_proto::ScheduleSpec {
157 interval: self.intervals.into_iter().map(Into::into).collect(),
158 calendar: self.calendars.into_iter().map(Into::into).collect(),
159 exclude_calendar: self.exclude_calendars.into_iter().map(Into::into).collect(),
160 cron_string: self.cron_strings,
161 timezone_name: self.timezone_name,
162 start_time: self.start_time.map(Into::into),
163 end_time: self.end_time.map(Into::into),
164 jitter: self.jitter.and_then(|d| d.try_into().ok()),
165 ..Default::default()
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq)]
172#[non_exhaustive]
173pub struct ScheduleIntervalSpec {
174 pub every: Duration,
176 pub offset: Option<Duration>,
178}
179
180impl ScheduleIntervalSpec {
181 pub fn new(every: Duration, offset: Option<Duration>) -> Self {
183 Self { every, offset }
184 }
185}
186
187impl From<Duration> for ScheduleIntervalSpec {
188 fn from(every: Duration) -> Self {
189 Self {
190 every,
191 offset: None,
192 }
193 }
194}
195
196impl From<ScheduleIntervalSpec> for schedule_proto::IntervalSpec {
197 fn from(s: ScheduleIntervalSpec) -> Self {
198 Self {
199 interval: Some(s.every.try_into().unwrap_or_default()),
200 phase: s.offset.and_then(|d| d.try_into().ok()),
201 }
202 }
203}
204
205#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
209#[builder(on(String, into))]
210#[non_exhaustive]
211pub struct ScheduleCalendarSpec {
212 #[builder(default)]
214 pub second: String,
215 #[builder(default)]
217 pub minute: String,
218 #[builder(default)]
220 pub hour: String,
221 #[builder(default)]
223 pub day_of_month: String,
224 #[builder(default)]
226 pub month: String,
227 #[builder(default)]
229 pub day_of_week: String,
230 #[builder(default)]
232 pub year: String,
233 #[builder(default)]
235 pub comment: String,
236}
237
238impl From<ScheduleCalendarSpec> for schedule_proto::CalendarSpec {
239 fn from(s: ScheduleCalendarSpec) -> Self {
240 Self {
241 second: s.second,
242 minute: s.minute,
243 hour: s.hour,
244 day_of_month: s.day_of_month,
245 month: s.month,
246 day_of_week: s.day_of_week,
247 year: s.year,
248 comment: s.comment,
249 }
250 }
251}
252
253#[derive(Debug, Clone, Default, bon::Builder)]
255#[non_exhaustive]
256pub struct ListSchedulesOptions {
257 #[builder(default)]
259 pub maximum_page_size: i32,
260 #[builder(default)]
262 pub query: String,
263}
264
265pub struct ListSchedulesStream {
268 inner: Pin<Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>>,
269}
270
271impl ListSchedulesStream {
272 pub(crate) fn new(
273 inner: Pin<
274 Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>,
275 >,
276 ) -> Self {
277 Self { inner }
278 }
279}
280
281impl futures_util::Stream for ListSchedulesStream {
282 type Item = Result<ScheduleSummary, ScheduleError>;
283
284 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285 self.inner.as_mut().poll_next(cx)
286 }
287}
288
289#[derive(Debug, Clone, PartialEq)]
291#[non_exhaustive]
292pub struct ScheduleRecentAction {
293 pub schedule_time: Option<SystemTime>,
295 pub actual_time: Option<SystemTime>,
297 pub workflow_id: String,
299 pub run_id: String,
301}
302
303#[derive(Debug, Clone, PartialEq)]
305#[non_exhaustive]
306pub struct ScheduleRunningAction {
307 pub workflow_id: String,
309 pub run_id: String,
311}
312
313impl From<&schedule_proto::ScheduleActionResult> for ScheduleRecentAction {
314 fn from(a: &schedule_proto::ScheduleActionResult) -> Self {
315 let workflow_result = a
316 .start_workflow_result
317 .as_ref()
318 .expect("unsupported schedule action: start_workflow_result should be present");
319 ScheduleRecentAction {
320 schedule_time: a.schedule_time.as_ref().and_then(proto_ts_to_system_time),
321 actual_time: a.actual_time.as_ref().and_then(proto_ts_to_system_time),
322 workflow_id: workflow_result.workflow_id.clone(),
323 run_id: workflow_result.run_id.clone(),
324 }
325 }
326}
327
328#[derive(Debug, Clone)]
334pub struct ScheduleDescription {
335 raw: DescribeScheduleResponse,
336}
337
338impl ScheduleDescription {
339 pub fn conflict_token(&self) -> &[u8] {
341 &self.raw.conflict_token
342 }
343
344 pub fn paused(&self) -> bool {
346 self.raw
347 .schedule
348 .as_ref()
349 .and_then(|s| s.state.as_ref())
350 .is_some_and(|st| st.paused)
351 }
352
353 pub fn note(&self) -> Option<&str> {
356 self.raw
357 .schedule
358 .as_ref()
359 .and_then(|s| s.state.as_ref())
360 .map(|st| st.notes.as_str())
361 .filter(|s| !s.is_empty())
362 }
363
364 pub fn action_count(&self) -> i64 {
366 self.info().map_or(0, |i| i.action_count)
367 }
368
369 pub fn missed_catchup_window(&self) -> i64 {
371 self.info().map_or(0, |i| i.missed_catchup_window)
372 }
373
374 pub fn overlap_skipped(&self) -> i64 {
376 self.info().map_or(0, |i| i.overlap_skipped)
377 }
378
379 pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
381 self.info()
382 .map(|i| {
383 i.recent_actions
384 .iter()
385 .map(ScheduleRecentAction::from)
386 .collect()
387 })
388 .unwrap_or_default()
389 }
390
391 pub fn running_actions(&self) -> Vec<ScheduleRunningAction> {
393 self.info()
394 .map(|i| {
395 i.running_workflows
396 .iter()
397 .map(|w| ScheduleRunningAction {
398 workflow_id: w.workflow_id.clone(),
399 run_id: w.run_id.clone(),
400 })
401 .collect()
402 })
403 .unwrap_or_default()
404 }
405
406 pub fn future_action_times(&self) -> Vec<SystemTime> {
408 self.info()
409 .map(|i| {
410 i.future_action_times
411 .iter()
412 .filter_map(proto_ts_to_system_time)
413 .collect()
414 })
415 .unwrap_or_default()
416 }
417
418 pub fn create_time(&self) -> Option<SystemTime> {
420 self.info()
421 .and_then(|i| i.create_time.as_ref())
422 .and_then(proto_ts_to_system_time)
423 }
424
425 pub fn update_time(&self) -> Option<SystemTime> {
427 self.info()
428 .and_then(|i| i.update_time.as_ref())
429 .and_then(proto_ts_to_system_time)
430 }
431
432 pub fn memo(&self) -> Option<&common_proto::Memo> {
434 self.raw.memo.as_ref()
435 }
436
437 pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
439 self.raw.search_attributes.as_ref()
440 }
441
442 pub fn raw(&self) -> &DescribeScheduleResponse {
444 &self.raw
445 }
446
447 pub fn into_raw(self) -> DescribeScheduleResponse {
449 self.raw
450 }
451
452 fn info(&self) -> Option<&schedule_proto::ScheduleInfo> {
453 self.raw.info.as_ref()
454 }
455
456 pub fn into_update(self) -> ScheduleUpdate {
461 ScheduleUpdate {
462 schedule: self.raw.schedule.unwrap_or_default(),
463 }
464 }
465}
466
467impl From<DescribeScheduleResponse> for ScheduleDescription {
468 fn from(raw: DescribeScheduleResponse) -> Self {
469 Self { raw }
470 }
471}
472
473#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
475pub enum ScheduleOverlapPolicy {
476 #[default]
478 Unspecified,
479 Skip,
481 BufferOne,
483 BufferAll,
485 CancelOther,
487 TerminateOther,
489 AllowAll,
491}
492
493impl ScheduleOverlapPolicy {
494 pub(crate) fn to_proto(self) -> i32 {
495 match self {
496 Self::Unspecified => 0,
497 Self::Skip => 1,
498 Self::BufferOne => 2,
499 Self::BufferAll => 3,
500 Self::CancelOther => 4,
501 Self::TerminateOther => 5,
502 Self::AllowAll => 6,
503 }
504 }
505}
506
507#[derive(Debug, Clone, PartialEq, bon::Builder)]
509#[non_exhaustive]
510#[builder(start_fn = new)]
511pub struct ScheduleBackfill {
512 #[builder(start_fn)]
514 pub start_time: SystemTime,
515 #[builder(start_fn)]
517 pub end_time: SystemTime,
518 #[builder(default)]
520 pub overlap_policy: ScheduleOverlapPolicy,
521}
522
523#[derive(Debug, Clone)]
528pub struct ScheduleUpdate {
529 schedule: schedule_proto::Schedule,
530}
531
532impl ScheduleUpdate {
533 pub fn set_spec(&mut self, spec: ScheduleSpec) -> &mut Self {
535 self.schedule.spec = Some(spec.into_proto());
536 self
537 }
538
539 pub fn set_action(&mut self, action: ScheduleAction) -> &mut Self {
541 self.schedule.action = Some(action.into_proto());
542 self
543 }
544
545 pub fn set_paused(&mut self, paused: bool) -> &mut Self {
547 self.state_mut().paused = paused;
548 self
549 }
550
551 pub fn set_note(&mut self, note: impl Into<String>) -> &mut Self {
553 self.state_mut().notes = note.into();
554 self
555 }
556
557 pub fn set_overlap_policy(&mut self, policy: ScheduleOverlapPolicy) -> &mut Self {
559 self.policies_mut().overlap_policy = policy.to_proto();
560 self
561 }
562
563 pub fn set_catchup_window(&mut self, window: Duration) -> &mut Self {
566 self.policies_mut().catchup_window = window.try_into().ok();
567 self
568 }
569
570 pub fn set_pause_on_failure(&mut self, pause_on_failure: bool) -> &mut Self {
572 self.policies_mut().pause_on_failure = pause_on_failure;
573 self
574 }
575
576 pub fn set_keep_original_workflow_id(&mut self, keep: bool) -> &mut Self {
579 self.policies_mut().keep_original_workflow_id = keep;
580 self
581 }
582
583 pub fn set_remaining_actions(&mut self, count: Option<i64>) -> &mut Self {
586 let state = self.state_mut();
587 match count {
588 Some(n) => {
589 state.limited_actions = true;
590 state.remaining_actions = n;
591 }
592 None => {
593 state.limited_actions = false;
594 state.remaining_actions = 0;
595 }
596 }
597 self
598 }
599
600 pub fn raw(&self) -> &schedule_proto::Schedule {
602 &self.schedule
603 }
604
605 pub fn into_raw(self) -> schedule_proto::Schedule {
607 self.schedule
608 }
609
610 fn state_mut(&mut self) -> &mut schedule_proto::ScheduleState {
611 self.schedule.state.get_or_insert_with(Default::default)
612 }
613
614 fn policies_mut(&mut self) -> &mut schedule_proto::SchedulePolicies {
615 self.schedule.policies.get_or_insert_with(Default::default)
616 }
617}
618
619#[derive(Debug, Clone)]
625pub struct ScheduleSummary {
626 raw: schedule_proto::ScheduleListEntry,
627}
628
629impl ScheduleSummary {
630 pub fn schedule_id(&self) -> &str {
632 &self.raw.schedule_id
633 }
634
635 pub fn workflow_type(&self) -> Option<&str> {
637 self.info()
638 .and_then(|i| i.workflow_type.as_ref())
639 .map(|wt| wt.name.as_str())
640 }
641
642 pub fn note(&self) -> Option<&str> {
645 self.info()
646 .map(|i| i.notes.as_str())
647 .filter(|s| !s.is_empty())
648 }
649
650 pub fn paused(&self) -> bool {
652 self.info().is_some_and(|i| i.paused)
653 }
654
655 pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
657 self.info()
658 .map(|i| {
659 i.recent_actions
660 .iter()
661 .map(ScheduleRecentAction::from)
662 .collect()
663 })
664 .unwrap_or_default()
665 }
666
667 pub fn future_action_times(&self) -> Vec<SystemTime> {
669 self.info()
670 .map(|i| {
671 i.future_action_times
672 .iter()
673 .filter_map(proto_ts_to_system_time)
674 .collect()
675 })
676 .unwrap_or_default()
677 }
678
679 pub fn memo(&self) -> Option<&common_proto::Memo> {
681 self.raw.memo.as_ref()
682 }
683
684 pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
686 self.raw.search_attributes.as_ref()
687 }
688
689 pub fn raw(&self) -> &schedule_proto::ScheduleListEntry {
691 &self.raw
692 }
693
694 pub fn into_raw(self) -> schedule_proto::ScheduleListEntry {
696 self.raw
697 }
698
699 fn info(&self) -> Option<&schedule_proto::ScheduleListInfo> {
700 self.raw.info.as_ref()
701 }
702}
703
704impl From<schedule_proto::ScheduleListEntry> for ScheduleSummary {
705 fn from(raw: schedule_proto::ScheduleListEntry) -> Self {
706 Self { raw }
707 }
708}
709
710#[derive(Clone, derive_more::Debug)]
714pub struct ScheduleHandle<CT> {
715 #[debug(skip)]
716 client: CT,
717 namespace: String,
718 schedule_id: String,
719}
720
721impl<CT> ScheduleHandle<CT>
722where
723 CT: WorkflowService + NamespacedClient + Clone + Send + Sync,
724{
725 pub(crate) fn new(client: CT, namespace: String, schedule_id: String) -> Self {
726 Self {
727 client,
728 namespace,
729 schedule_id,
730 }
731 }
732
733 pub fn namespace(&self) -> &str {
735 &self.namespace
736 }
737
738 pub fn schedule_id(&self) -> &str {
740 &self.schedule_id
741 }
742
743 pub async fn describe(&self) -> Result<ScheduleDescription, ScheduleError> {
745 let resp = WorkflowService::describe_schedule(
746 &mut self.client.clone(),
747 DescribeScheduleRequest {
748 namespace: self.namespace.clone(),
749 schedule_id: self.schedule_id.clone(),
750 }
751 .into_request(),
752 )
753 .await?
754 .into_inner();
755
756 Ok(ScheduleDescription::from(resp))
757 }
758
759 pub async fn update(
772 &self,
773 updater: impl FnOnce(&mut ScheduleUpdate),
774 ) -> Result<(), ScheduleError> {
775 let desc = self.describe().await?;
776 let mut update = desc.into_update();
777 updater(&mut update);
778 self.send_update(update).await
779 }
780
781 pub async fn send_update(&self, update: ScheduleUpdate) -> Result<(), ScheduleError> {
787 WorkflowService::update_schedule(
788 &mut self.client.clone(),
789 UpdateScheduleRequest {
790 namespace: self.namespace.clone(),
791 schedule_id: self.schedule_id.clone(),
792 schedule: Some(update.schedule),
793 identity: self.client.identity(),
794 request_id: Uuid::new_v4().to_string(),
795 ..Default::default()
796 }
797 .into_request(),
798 )
799 .await?;
800 Ok(())
801 }
802
803 pub async fn delete(&self) -> Result<(), ScheduleError> {
805 WorkflowService::delete_schedule(
806 &mut self.client.clone(),
807 DeleteScheduleRequest {
808 namespace: self.namespace.clone(),
809 schedule_id: self.schedule_id.clone(),
810 identity: self.client.identity(),
811 }
812 .into_request(),
813 )
814 .await?;
815 Ok(())
816 }
817
818 pub async fn pause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
822 let note = note.map_or_else(|| "Paused via Rust SDK".to_string(), |s| s.into());
823 WorkflowService::patch_schedule(
824 &mut self.client.clone(),
825 PatchScheduleRequest {
826 namespace: self.namespace.clone(),
827 schedule_id: self.schedule_id.clone(),
828 patch: Some(schedule_proto::SchedulePatch {
829 pause: note,
830 ..Default::default()
831 }),
832 identity: self.client.identity(),
833 request_id: Uuid::new_v4().to_string(),
834 }
835 .into_request(),
836 )
837 .await?;
838 Ok(())
839 }
840
841 pub async fn unpause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
845 let note = note.map_or_else(|| "Unpaused via Rust SDK".to_string(), |s| s.into());
846 WorkflowService::patch_schedule(
847 &mut self.client.clone(),
848 PatchScheduleRequest {
849 namespace: self.namespace.clone(),
850 schedule_id: self.schedule_id.clone(),
851 patch: Some(schedule_proto::SchedulePatch {
852 unpause: note,
853 ..Default::default()
854 }),
855 identity: self.client.identity(),
856 request_id: Uuid::new_v4().to_string(),
857 }
858 .into_request(),
859 )
860 .await?;
861 Ok(())
862 }
863
864 pub async fn trigger(
866 &self,
867 overlap_policy: ScheduleOverlapPolicy,
868 ) -> Result<(), ScheduleError> {
869 WorkflowService::patch_schedule(
870 &mut self.client.clone(),
871 PatchScheduleRequest {
872 namespace: self.namespace.clone(),
873 schedule_id: self.schedule_id.clone(),
874 patch: Some(schedule_proto::SchedulePatch {
875 trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
876 overlap_policy: overlap_policy.to_proto(),
877 scheduled_time: None,
878 }),
879 ..Default::default()
880 }),
881 identity: self.client.identity(),
882 request_id: Uuid::new_v4().to_string(),
883 }
884 .into_request(),
885 )
886 .await?;
887 Ok(())
888 }
889
890 pub async fn backfill(
892 &self,
893 backfills: impl IntoIterator<Item = ScheduleBackfill>,
894 ) -> Result<(), ScheduleError> {
895 let backfill_requests: Vec<schedule_proto::BackfillRequest> = backfills
896 .into_iter()
897 .map(|b| schedule_proto::BackfillRequest {
898 start_time: Some(b.start_time.into()),
899 end_time: Some(b.end_time.into()),
900 overlap_policy: b.overlap_policy.to_proto(),
901 })
902 .collect();
903 WorkflowService::patch_schedule(
904 &mut self.client.clone(),
905 PatchScheduleRequest {
906 namespace: self.namespace.clone(),
907 schedule_id: self.schedule_id.clone(),
908 patch: Some(schedule_proto::SchedulePatch {
909 backfill_request: backfill_requests,
910 ..Default::default()
911 }),
912 identity: self.client.identity(),
913 request_id: Uuid::new_v4().to_string(),
914 }
915 .into_request(),
916 )
917 .await?;
918 Ok(())
919 }
920}
921
922impl Client {
924 pub async fn create_schedule(
926 &self,
927 schedule_id: impl Into<String>,
928 opts: CreateScheduleOptions,
929 ) -> Result<ScheduleHandle<Self>, ScheduleError> {
930 let schedule_id = schedule_id.into();
931 let namespace = self.namespace();
932
933 let initial_patch = if opts.trigger_immediately {
934 Some(schedule_proto::SchedulePatch {
935 trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
936 overlap_policy: ScheduleOverlapPolicy::AllowAll.to_proto(),
939 scheduled_time: None,
940 }),
941 ..Default::default()
942 })
943 } else {
944 None
945 };
946 let policies = (opts.overlap_policy != ScheduleOverlapPolicy::Unspecified).then(|| {
949 schedule_proto::SchedulePolicies {
950 overlap_policy: opts.overlap_policy.to_proto(),
951 ..Default::default()
952 }
953 });
954 let schedule = schedule_proto::Schedule {
955 spec: Some(opts.spec.into_proto()),
956 action: Some(opts.action.into_proto()),
957 policies,
958 state: Some(schedule_proto::ScheduleState {
959 paused: opts.paused,
960 notes: opts.note,
961 ..Default::default()
962 }),
963 };
964 WorkflowService::create_schedule(
965 &mut self.clone(),
966 CreateScheduleRequest {
967 namespace: namespace.clone(),
968 schedule_id: schedule_id.clone(),
969 schedule: Some(schedule),
970 initial_patch,
971 identity: self.identity(),
972 request_id: Uuid::new_v4().to_string(),
973 ..Default::default()
974 }
975 .into_request(),
976 )
977 .await?;
978 Ok(ScheduleHandle::new(self.clone(), namespace, schedule_id))
979 }
980
981 pub fn get_schedule_handle(&self, schedule_id: impl Into<String>) -> ScheduleHandle<Self> {
983 ScheduleHandle::new(self.clone(), self.namespace(), schedule_id.into())
984 }
985
986 pub fn list_schedules(&self, opts: ListSchedulesOptions) -> ListSchedulesStream {
989 let client = self.clone();
990 let namespace = self.namespace();
991 let query = opts.query;
992 let page_size = opts.maximum_page_size;
993
994 let stream = stream::unfold(
995 (Vec::new(), VecDeque::new(), false),
996 move |(next_page_token, mut buffer, exhausted)| {
997 let mut client = client.clone();
998 let namespace = namespace.clone();
999 let query = query.clone();
1000
1001 async move {
1002 if let Some(item) = buffer.pop_front() {
1003 return Some((Ok(item), (next_page_token, buffer, exhausted)));
1004 } else if exhausted {
1005 return None;
1006 }
1007
1008 let response = WorkflowService::list_schedules(
1009 &mut client,
1010 ListSchedulesRequest {
1011 namespace,
1012 maximum_page_size: page_size,
1013 next_page_token: next_page_token.clone(),
1014 query,
1015 }
1016 .into_request(),
1017 )
1018 .await;
1019
1020 match response {
1021 Ok(resp) => {
1022 let resp = resp.into_inner();
1023 let new_exhausted = resp.next_page_token.is_empty();
1024 let new_token = resp.next_page_token;
1025
1026 buffer = resp
1027 .schedules
1028 .into_iter()
1029 .map(ScheduleSummary::from)
1030 .collect();
1031
1032 buffer
1033 .pop_front()
1034 .map(|item| (Ok(item), (new_token, buffer, new_exhausted)))
1035 }
1036 Err(e) => Some((Err(e.into()), (next_page_token, buffer, true))),
1037 }
1038 }
1039 },
1040 );
1041
1042 ListSchedulesStream::new(Box::pin(stream))
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::*;
1049 use crate::{NamespacedClient, grpc::WorkflowService};
1050 use futures_util::FutureExt;
1051 use std::{
1052 sync::{
1053 Arc,
1054 atomic::{AtomicUsize, Ordering},
1055 },
1056 time::SystemTime,
1057 };
1058 use temporalio_common::protos::temporal::api::{
1059 common::v1::{
1060 Memo, SearchAttributes, WorkflowExecution as ProtoWorkflowExecution, WorkflowType,
1061 },
1062 schedule::v1::{
1063 Schedule, ScheduleActionResult, ScheduleInfo, ScheduleListEntry, ScheduleListInfo,
1064 ScheduleSpec, ScheduleState,
1065 },
1066 workflowservice::v1::{
1067 DeleteScheduleResponse, DescribeScheduleResponse, PatchScheduleResponse,
1068 UpdateScheduleResponse,
1069 },
1070 };
1071 use tonic::{Request, Response};
1072
1073 #[derive(Default)]
1074 struct CapturedRequests {
1075 describe: AtomicUsize,
1076 update: AtomicUsize,
1077 delete: AtomicUsize,
1078 patch: AtomicUsize,
1079 }
1080
1081 #[derive(Clone, Default)]
1082 struct MockScheduleClient {
1083 captured: Arc<CapturedRequests>,
1084 describe_response: DescribeScheduleResponse,
1085 should_error: bool,
1086 }
1087
1088 impl NamespacedClient for MockScheduleClient {
1089 fn namespace(&self) -> String {
1090 "test-namespace".to_string()
1091 }
1092 fn identity(&self) -> String {
1093 "test-identity".to_string()
1094 }
1095 }
1096
1097 impl WorkflowService for MockScheduleClient {
1098 fn describe_schedule(
1099 &mut self,
1100 _request: Request<DescribeScheduleRequest>,
1101 ) -> futures_util::future::BoxFuture<
1102 '_,
1103 Result<Response<DescribeScheduleResponse>, tonic::Status>,
1104 > {
1105 self.captured.describe.fetch_add(1, Ordering::SeqCst);
1106 let resp = self.describe_response.clone();
1107 let should_error = self.should_error;
1108 async move {
1109 if should_error {
1110 Err(tonic::Status::not_found("schedule not found"))
1111 } else {
1112 Ok(Response::new(resp))
1113 }
1114 }
1115 .boxed()
1116 }
1117
1118 fn update_schedule(
1119 &mut self,
1120 _request: Request<UpdateScheduleRequest>,
1121 ) -> futures_util::future::BoxFuture<
1122 '_,
1123 Result<Response<UpdateScheduleResponse>, tonic::Status>,
1124 > {
1125 self.captured.update.fetch_add(1, Ordering::SeqCst);
1126 let should_error = self.should_error;
1127 async move {
1128 if should_error {
1129 Err(tonic::Status::internal("update failed"))
1130 } else {
1131 Ok(Response::new(UpdateScheduleResponse::default()))
1132 }
1133 }
1134 .boxed()
1135 }
1136
1137 fn delete_schedule(
1138 &mut self,
1139 _request: Request<DeleteScheduleRequest>,
1140 ) -> futures_util::future::BoxFuture<
1141 '_,
1142 Result<Response<DeleteScheduleResponse>, tonic::Status>,
1143 > {
1144 self.captured.delete.fetch_add(1, Ordering::SeqCst);
1145 let should_error = self.should_error;
1146 async move {
1147 if should_error {
1148 Err(tonic::Status::internal("delete failed"))
1149 } else {
1150 Ok(Response::new(DeleteScheduleResponse::default()))
1151 }
1152 }
1153 .boxed()
1154 }
1155
1156 fn patch_schedule(
1157 &mut self,
1158 _request: Request<PatchScheduleRequest>,
1159 ) -> futures_util::future::BoxFuture<
1160 '_,
1161 Result<Response<PatchScheduleResponse>, tonic::Status>,
1162 > {
1163 self.captured.patch.fetch_add(1, Ordering::SeqCst);
1164 let should_error = self.should_error;
1165 async move {
1166 if should_error {
1167 Err(tonic::Status::internal("patch failed"))
1168 } else {
1169 Ok(Response::new(PatchScheduleResponse::default()))
1170 }
1171 }
1172 .boxed()
1173 }
1174 }
1175
1176 fn make_schedule_handle(client: MockScheduleClient) -> ScheduleHandle<MockScheduleClient> {
1177 ScheduleHandle::new(
1178 client,
1179 "test-namespace".to_string(),
1180 "test-schedule-id".to_string(),
1181 )
1182 }
1183
1184 #[test]
1185 fn schedule_handle_exposes_namespace_and_id() {
1186 let handle = make_schedule_handle(MockScheduleClient::default());
1187 assert_eq!(handle.namespace(), "test-namespace");
1188 assert_eq!(handle.schedule_id(), "test-schedule-id");
1189 }
1190
1191 #[tokio::test]
1192 async fn schedule_describe_returns_response_fields() {
1193 let conflict_token = b"token-123".to_vec();
1194
1195 let client = MockScheduleClient {
1196 describe_response: DescribeScheduleResponse {
1197 schedule: Some(Schedule::default()),
1198 info: Some(ScheduleInfo::default()),
1199 memo: Some(Memo {
1200 fields: Default::default(),
1201 }),
1202 search_attributes: Some(SearchAttributes {
1203 indexed_fields: Default::default(),
1204 }),
1205 conflict_token: conflict_token.clone(),
1206 },
1207 ..Default::default()
1208 };
1209
1210 let handle = make_schedule_handle(client.clone());
1211 let desc = handle.describe().await.unwrap();
1212
1213 assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1214 assert!(desc.raw().schedule.is_some());
1215 assert!(desc.raw().info.is_some());
1216 assert!(desc.raw().memo.is_some());
1217 assert!(desc.raw().search_attributes.is_some());
1218 assert_eq!(desc.conflict_token(), conflict_token);
1219 }
1220
1221 #[tokio::test]
1222 async fn schedule_update_describes_then_sends() {
1223 let client = MockScheduleClient::default();
1224 let handle = make_schedule_handle(client.clone());
1225
1226 handle
1227 .update(|u| {
1228 u.set_note("hi");
1229 })
1230 .await
1231 .unwrap();
1232
1233 assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1234 assert_eq!(client.captured.update.load(Ordering::SeqCst), 1);
1235 }
1236
1237 #[tokio::test]
1238 async fn schedule_multiple_updates_each_call_service() {
1239 let client = MockScheduleClient::default();
1240 let handle = make_schedule_handle(client.clone());
1241
1242 handle.update(|_| {}).await.unwrap();
1243 handle.update(|_| {}).await.unwrap();
1244
1245 assert_eq!(client.captured.update.load(Ordering::SeqCst), 2);
1246 }
1247
1248 #[tokio::test]
1249 async fn schedule_delete_calls_service() {
1250 let client = MockScheduleClient::default();
1251 let handle = make_schedule_handle(client.clone());
1252
1253 handle.delete().await.unwrap();
1254
1255 assert_eq!(client.captured.delete.load(Ordering::SeqCst), 1);
1256 }
1257
1258 #[tokio::test]
1259 async fn schedule_pause_calls_patch() {
1260 let client = MockScheduleClient::default();
1261 let handle = make_schedule_handle(client.clone());
1262
1263 handle.pause(Some("taking a break")).await.unwrap();
1264
1265 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1266 }
1267
1268 #[tokio::test]
1269 async fn schedule_pause_with_none_uses_default() {
1270 let client = MockScheduleClient::default();
1271 let handle = make_schedule_handle(client.clone());
1272
1273 handle.pause(None::<&str>).await.unwrap();
1274
1275 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1276 }
1277
1278 #[tokio::test]
1279 async fn schedule_unpause_calls_patch() {
1280 let client = MockScheduleClient::default();
1281 let handle = make_schedule_handle(client.clone());
1282
1283 handle.unpause(Some("resuming work")).await.unwrap();
1284
1285 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1286 }
1287
1288 #[tokio::test]
1289 async fn schedule_trigger_calls_patch() {
1290 let client = MockScheduleClient::default();
1291 let handle = make_schedule_handle(client.clone());
1292
1293 handle
1294 .trigger(ScheduleOverlapPolicy::Unspecified)
1295 .await
1296 .unwrap();
1297
1298 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1299 }
1300
1301 #[tokio::test]
1302 async fn schedule_backfill_calls_patch() {
1303 let client = MockScheduleClient::default();
1304 let handle = make_schedule_handle(client.clone());
1305
1306 let now = SystemTime::now();
1307 handle
1308 .backfill(vec![
1309 ScheduleBackfill::new(now, now)
1310 .overlap_policy(ScheduleOverlapPolicy::Skip)
1311 .build(),
1312 ScheduleBackfill::new(now, now)
1313 .overlap_policy(ScheduleOverlapPolicy::BufferOne)
1314 .build(),
1315 ])
1316 .await
1317 .unwrap();
1318
1319 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1320 }
1321
1322 #[tokio::test]
1323 async fn schedule_describe_propagates_rpc_errors() {
1324 let client = MockScheduleClient {
1325 should_error: true,
1326 ..Default::default()
1327 };
1328 let handle = make_schedule_handle(client);
1329
1330 let err = handle.describe().await.unwrap_err();
1331 assert!(
1332 matches!(err, ScheduleError::Rpc(_)),
1333 "expected Rpc variant, got: {err:?}"
1334 );
1335 assert!(err.to_string().contains("schedule not found"));
1336 }
1337
1338 #[tokio::test]
1339 async fn schedule_update_propagates_rpc_errors() {
1340 let client = MockScheduleClient {
1341 should_error: true,
1342 ..Default::default()
1343 };
1344 let handle = make_schedule_handle(client);
1345
1346 let err = handle.update(|_| {}).await.unwrap_err();
1347 assert!(matches!(err, ScheduleError::Rpc(_)));
1348 }
1349
1350 #[tokio::test]
1351 async fn schedule_delete_propagates_rpc_errors() {
1352 let client = MockScheduleClient {
1353 should_error: true,
1354 ..Default::default()
1355 };
1356 let handle = make_schedule_handle(client);
1357
1358 let err = handle.delete().await.unwrap_err();
1359 assert!(matches!(err, ScheduleError::Rpc(_)));
1360 }
1361
1362 #[tokio::test]
1363 async fn schedule_patch_operations_propagate_rpc_errors() {
1364 let client = MockScheduleClient {
1365 should_error: true,
1366 ..Default::default()
1367 };
1368 let handle = make_schedule_handle(client);
1369
1370 assert!(handle.pause(Some("")).await.is_err());
1371 assert!(handle.unpause(Some("")).await.is_err());
1372 assert!(handle.trigger(Default::default()).await.is_err());
1373 assert!(handle.backfill(vec![]).await.is_err());
1374 }
1375
1376 #[tokio::test]
1377 async fn schedule_all_patch_operations_call_service() {
1378 let client = MockScheduleClient::default();
1379 let handle = make_schedule_handle(client.clone());
1380
1381 handle.pause(Some("p")).await.unwrap();
1382 handle.unpause(Some("u")).await.unwrap();
1383 handle.trigger(Default::default()).await.unwrap();
1384 handle.backfill(vec![]).await.unwrap();
1385
1386 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 4);
1387 }
1388
1389 #[tokio::test]
1390 async fn schedule_describe_accessors_with_populated_fields() {
1391 let client = MockScheduleClient {
1392 describe_response: DescribeScheduleResponse {
1393 schedule: Some(Schedule {
1394 spec: Some(ScheduleSpec {
1395 timezone_name: "US/Eastern".to_string(),
1396 ..Default::default()
1397 }),
1398 state: Some(ScheduleState {
1399 paused: true,
1400 notes: "maintenance window".to_string(),
1401 ..Default::default()
1402 }),
1403 ..Default::default()
1404 }),
1405 info: Some(ScheduleInfo {
1406 action_count: 42,
1407 missed_catchup_window: 3,
1408 overlap_skipped: 5,
1409 recent_actions: vec![ScheduleActionResult {
1410 start_workflow_result: Some(ProtoWorkflowExecution {
1411 workflow_id: "ra-wf".to_string(),
1412 run_id: "ra-run".to_string(),
1413 }),
1414 ..Default::default()
1415 }],
1416 running_workflows: vec![ProtoWorkflowExecution {
1417 workflow_id: "wf-1".to_string(),
1418 run_id: "run-1".to_string(),
1419 }],
1420 create_time: Some(prost_types::Timestamp {
1421 seconds: 1_700_000_000,
1422 nanos: 0,
1423 }),
1424 update_time: Some(prost_types::Timestamp {
1425 seconds: 1_700_001_000,
1426 nanos: 0,
1427 }),
1428 future_action_times: vec![prost_types::Timestamp {
1429 seconds: 1_700_002_000,
1430 nanos: 0,
1431 }],
1432 ..Default::default()
1433 }),
1434 conflict_token: b"tok".to_vec(),
1435 ..Default::default()
1436 },
1437 ..Default::default()
1438 };
1439
1440 let handle = make_schedule_handle(client);
1441 let desc = handle.describe().await.unwrap();
1442
1443 assert!(desc.paused());
1444 assert_eq!(desc.note(), Some("maintenance window"));
1445 assert_eq!(desc.action_count(), 42);
1446 assert_eq!(desc.missed_catchup_window(), 3);
1447 assert_eq!(desc.overlap_skipped(), 5);
1448
1449 assert_eq!(desc.recent_actions().len(), 1);
1450 assert_eq!(
1451 desc.running_actions(),
1452 vec![ScheduleRunningAction {
1453 workflow_id: "wf-1".to_string(),
1454 run_id: "run-1".to_string(),
1455 }]
1456 );
1457
1458 assert_eq!(desc.future_action_times().len(), 1);
1459 assert!(desc.create_time().is_some());
1460 assert!(desc.update_time().is_some());
1461 }
1462
1463 #[tokio::test]
1464 async fn schedule_describe_defaults_when_nested_fields_are_none() {
1465 let client = MockScheduleClient::default();
1466
1467 let handle = make_schedule_handle(client);
1468 let desc = handle.describe().await.unwrap();
1469
1470 assert!(!desc.paused());
1471 assert_eq!(desc.note(), None);
1472 assert_eq!(desc.action_count(), 0);
1473 assert_eq!(desc.missed_catchup_window(), 0);
1474 assert_eq!(desc.overlap_skipped(), 0);
1475 assert!(desc.recent_actions().is_empty());
1476 assert!(desc.running_actions().is_empty());
1477 assert!(desc.future_action_times().is_empty());
1478 assert!(desc.create_time().is_none());
1479 assert!(desc.update_time().is_none());
1480 assert!(desc.conflict_token().is_empty());
1481 }
1482
1483 #[tokio::test]
1484 async fn schedule_note_returns_none_for_empty_string() {
1485 let client = MockScheduleClient {
1486 describe_response: DescribeScheduleResponse {
1487 schedule: Some(Schedule {
1488 state: Some(ScheduleState {
1489 notes: String::new(),
1490 ..Default::default()
1491 }),
1492 ..Default::default()
1493 }),
1494 ..Default::default()
1495 },
1496 ..Default::default()
1497 };
1498
1499 let handle = make_schedule_handle(client);
1500 let desc = handle.describe().await.unwrap();
1501 assert_eq!(desc.note(), None);
1502 }
1503
1504 #[test]
1505 fn schedule_summary_note_returns_none_for_empty_string() {
1506 let entry = ScheduleListEntry {
1507 schedule_id: "s".to_string(),
1508 info: Some(ScheduleListInfo {
1509 notes: String::new(),
1510 ..Default::default()
1511 }),
1512 ..Default::default()
1513 };
1514 let summary = ScheduleSummary::from(entry);
1515 assert_eq!(summary.note(), None);
1516 }
1517
1518 #[test]
1519 fn schedule_summary_accessors() {
1520 let entry = ScheduleListEntry {
1521 schedule_id: "sched-1".to_string(),
1522 memo: Some(Memo {
1523 fields: Default::default(),
1524 }),
1525 search_attributes: Some(SearchAttributes {
1526 indexed_fields: Default::default(),
1527 }),
1528 info: Some(ScheduleListInfo {
1529 spec: Some(ScheduleSpec::default()),
1530 workflow_type: Some(WorkflowType {
1531 name: "MyWorkflow".to_string(),
1532 }),
1533 notes: "some note".to_string(),
1534 paused: true,
1535 recent_actions: vec![ScheduleActionResult {
1536 start_workflow_result: Some(ProtoWorkflowExecution {
1537 workflow_id: "ra-wf".to_string(),
1538 run_id: "ra-run".to_string(),
1539 }),
1540 ..Default::default()
1541 }],
1542 future_action_times: vec![prost_types::Timestamp {
1543 seconds: 1_700_000_000,
1544 nanos: 0,
1545 }],
1546 }),
1547 };
1548
1549 let summary = ScheduleSummary::from(entry);
1550 assert_eq!(summary.schedule_id(), "sched-1");
1551 assert!(summary.raw().memo.is_some());
1552 assert!(summary.raw().search_attributes.is_some());
1553 assert_eq!(summary.workflow_type(), Some("MyWorkflow"));
1554 assert_eq!(summary.note(), Some("some note"));
1555 assert!(summary.paused());
1556 assert_eq!(summary.recent_actions().len(), 1);
1557 assert_eq!(summary.future_action_times().len(), 1);
1558 }
1559
1560 #[test]
1561 fn schedule_summary_defaults_when_info_is_none() {
1562 let entry = ScheduleListEntry {
1563 schedule_id: "sched-2".to_string(),
1564 ..Default::default()
1565 };
1566
1567 let summary = ScheduleSummary::from(entry);
1568 assert_eq!(summary.schedule_id(), "sched-2");
1569 assert!(summary.raw().memo.is_none());
1570 assert!(summary.raw().search_attributes.is_none());
1571 assert_eq!(summary.workflow_type(), None);
1572 assert_eq!(summary.note(), None);
1573 assert!(!summary.paused());
1574 assert!(summary.recent_actions().is_empty());
1575 assert!(summary.future_action_times().is_empty());
1576 }
1577
1578 #[test]
1579 fn schedule_description_raw_round_trip() {
1580 let resp = DescribeScheduleResponse {
1581 conflict_token: b"ct".to_vec(),
1582 schedule: Some(Schedule::default()),
1583 ..Default::default()
1584 };
1585 let desc = ScheduleDescription::from(resp.clone());
1586 assert_eq!(desc.raw().conflict_token, b"ct");
1587 let recovered = desc.into_raw();
1588 assert_eq!(recovered.conflict_token, resp.conflict_token);
1589 assert!(recovered.schedule.is_some());
1590 }
1591
1592 #[test]
1593 fn schedule_summary_raw_round_trip() {
1594 let entry = ScheduleListEntry {
1595 schedule_id: "rt-1".to_string(),
1596 ..Default::default()
1597 };
1598 let summary = ScheduleSummary::from(entry.clone());
1599 assert_eq!(summary.raw().schedule_id, "rt-1");
1600 let recovered = summary.into_raw();
1601 assert_eq!(recovered.schedule_id, entry.schedule_id);
1602 }
1603
1604 #[test]
1605 fn schedule_into_update_preserves_schedule() {
1606 let resp = DescribeScheduleResponse {
1607 schedule: Some(Schedule {
1608 state: Some(ScheduleState {
1609 notes: "my notes".to_string(),
1610 ..Default::default()
1611 }),
1612 ..Default::default()
1613 }),
1614 ..Default::default()
1615 };
1616 let desc = ScheduleDescription::from(resp);
1617 let update = desc.into_update();
1618
1619 assert_eq!(update.raw().state.as_ref().unwrap().notes, "my notes");
1620 }
1621
1622 #[test]
1623 fn schedule_update_setters_are_chainable() {
1624 let resp = DescribeScheduleResponse {
1625 schedule: Some(Schedule::default()),
1626 ..Default::default()
1627 };
1628 let desc = ScheduleDescription::from(resp);
1629 let mut update = desc.into_update();
1630 update.set_note("chained").set_paused(true);
1631 assert_eq!(update.raw().state.as_ref().unwrap().notes, "chained");
1632 assert!(update.raw().state.as_ref().unwrap().paused);
1633 }
1634
1635 #[test]
1636 fn schedule_recent_action_from_proto_with_timestamps() {
1637 let ts = prost_types::Timestamp {
1638 seconds: 1_700_000_000,
1639 nanos: 0,
1640 };
1641 let proto = ScheduleActionResult {
1642 schedule_time: Some(ts),
1643 actual_time: Some(ts),
1644 start_workflow_result: Some(ProtoWorkflowExecution {
1645 workflow_id: "wf-abc".to_string(),
1646 run_id: "run-xyz".to_string(),
1647 }),
1648 ..Default::default()
1649 };
1650
1651 let action = ScheduleRecentAction::from(&proto);
1652
1653 assert!(action.schedule_time.is_some());
1654 assert!(action.actual_time.is_some());
1655 assert_eq!(action.workflow_id, "wf-abc");
1656 assert_eq!(action.run_id, "run-xyz");
1657 }
1658
1659 #[test]
1660 #[should_panic(expected = "unsupported schedule action")]
1661 fn schedule_recent_action_panics_without_workflow_result() {
1662 let _ = ScheduleRecentAction::from(&ScheduleActionResult::default());
1663 }
1664
1665 #[test]
1666 fn schedule_overlap_policy_default_is_unspecified() {
1667 assert_eq!(
1668 ScheduleOverlapPolicy::default(),
1669 ScheduleOverlapPolicy::Unspecified
1670 );
1671 }
1672}