1use crate::{Client, NamespacedClient, grpc::WorkflowService};
2use futures_util::{FutureExt, future::BoxFuture, stream};
3use std::{
4 collections::VecDeque,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8 time::{Duration, SystemTime},
9};
10use temporalio_common::{
11 HasWorkflowDefinition,
12 data_converters::{
13 DataConverter, PayloadConversionError, SerializationContextData, TemporalSerializable,
14 },
15 protos::{
16 coresdk::IntoPayloadsExt,
17 proto_ts_to_system_time,
18 temporal::api::{
19 common::v1 as common_proto, schedule::v1 as schedule_proto,
20 taskqueue::v1 as taskqueue_proto, workflow::v1 as workflow_proto,
21 workflowservice::v1::*,
22 },
23 },
24};
25use tonic::IntoRequest;
26use uuid::Uuid;
27
28#[derive(Debug, thiserror::Error)]
30#[non_exhaustive]
31pub enum ScheduleError {
32 #[error("Server error: {0}")]
34 Rpc(#[from] tonic::Status),
35 #[error("Payload conversion error: {0}")]
37 PayloadConversion(#[from] PayloadConversionError),
38}
39
40trait SerializableScheduleInput: Send + Sync {
41 fn to_payloads<'a>(
42 &'a self,
43 dc: &'a DataConverter,
44 context: &'a SerializationContextData,
45 ) -> BoxFuture<'a, Result<Vec<common_proto::Payload>, PayloadConversionError>>;
46}
47
48impl<T> SerializableScheduleInput for T
49where
50 T: TemporalSerializable + Send + Sync + 'static,
51{
52 fn to_payloads<'a>(
53 &'a self,
54 dc: &'a DataConverter,
55 context: &'a SerializationContextData,
56 ) -> BoxFuture<'a, Result<Vec<common_proto::Payload>, PayloadConversionError>> {
57 dc.to_payloads(context, self).boxed()
58 }
59}
60
61#[derive(derive_more::Debug, Clone)]
63pub struct ScheduleWorkflowInput {
64 repr: ScheduleWorkflowInputRepr,
65}
66
67#[derive(derive_more::Debug, Clone)]
68enum ScheduleWorkflowInputRepr {
69 #[debug("Deferred(...)")]
70 Deferred(#[debug(skip)] Arc<dyn SerializableScheduleInput>),
71}
72
73impl ScheduleWorkflowInput {
74 fn new_deferred<T>(val: T) -> Self
75 where
76 T: SerializableScheduleInput + 'static,
77 {
78 Self {
79 repr: ScheduleWorkflowInputRepr::Deferred(Arc::new(val)),
80 }
81 }
82
83 pub(crate) async fn into_payloads(
84 self,
85 dc: &DataConverter,
86 ) -> Result<Vec<common_proto::Payload>, PayloadConversionError> {
87 let ScheduleWorkflowInputRepr::Deferred(v) = self.repr;
88 v.to_payloads(dc, &SerializationContextData::Workflow).await
89 }
90}
91
92#[derive(Debug, Clone, bon::Builder)]
94#[builder(on(String, into))]
95#[non_exhaustive]
96pub struct CreateScheduleOptions {
97 pub action: ScheduleAction,
99 pub spec: ScheduleSpec,
101 #[builder(default)]
103 pub trigger_immediately: bool,
104 #[builder(default)]
107 pub overlap_policy: ScheduleOverlapPolicy,
108 #[builder(default)]
110 pub paused: bool,
111 #[builder(default)]
113 pub note: String,
114}
115
116#[derive(derive_more::Debug, Clone)]
119#[non_exhaustive]
120pub enum ScheduleAction {
121 StartWorkflow {
123 workflow_type: String,
125 task_queue: String,
127 workflow_id: String,
129 input: Option<ScheduleWorkflowInput>,
131 },
132}
133
134impl ScheduleAction {
135 pub fn start_workflow<W>(
137 workflow: W,
138 input: W::Input,
139 task_queue: impl Into<String>,
140 workflow_id: impl Into<String>,
141 ) -> Self
142 where
143 W: HasWorkflowDefinition,
144 W::Input: TemporalSerializable + Send + Sync + 'static,
145 {
146 Self::StartWorkflow {
147 workflow_type: workflow.name().to_string(),
148 task_queue: task_queue.into(),
149 workflow_id: workflow_id.into(),
150 input: Some(ScheduleWorkflowInput::new_deferred(input)),
151 }
152 }
153
154 pub(crate) async fn into_proto(
155 self,
156 dc: &DataConverter,
157 ) -> Result<schedule_proto::ScheduleAction, PayloadConversionError> {
158 match self {
159 Self::StartWorkflow {
160 workflow_type,
161 task_queue,
162 workflow_id,
163 input,
164 } => {
165 let input = if let Some(wi) = input {
166 wi.into_payloads(dc).await?.into_payloads()
167 } else {
168 None
169 };
170 Ok(schedule_proto::ScheduleAction {
171 action: Some(schedule_proto::schedule_action::Action::StartWorkflow(
172 workflow_proto::NewWorkflowExecutionInfo {
173 workflow_id,
174 workflow_type: Some(common_proto::WorkflowType {
175 name: workflow_type,
176 }),
177 task_queue: Some(taskqueue_proto::TaskQueue {
178 name: task_queue,
179 ..Default::default()
180 }),
181 input,
182 ..Default::default()
183 },
184 )),
185 })
186 }
187 }
188 }
189}
190
191#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
196#[builder(on(String, into))]
197pub struct ScheduleSpec {
198 #[builder(default)]
200 pub intervals: Vec<ScheduleIntervalSpec>,
201 #[builder(default)]
203 pub calendars: Vec<ScheduleCalendarSpec>,
204 #[builder(default)]
206 pub exclude_calendars: Vec<ScheduleCalendarSpec>,
207 #[builder(default)]
209 pub cron_strings: Vec<String>,
210 #[builder(default)]
212 pub timezone_name: String,
213 pub start_time: Option<SystemTime>,
215 pub end_time: Option<SystemTime>,
217 pub jitter: Option<Duration>,
219}
220
221impl ScheduleSpec {
222 pub fn from_interval(every: Duration) -> Self {
224 Self {
225 intervals: vec![every.into()],
226 ..Default::default()
227 }
228 }
229
230 pub fn from_calendar(calendar: ScheduleCalendarSpec) -> Self {
232 Self {
233 calendars: vec![calendar],
234 ..Default::default()
235 }
236 }
237
238 pub(crate) fn into_proto(self) -> schedule_proto::ScheduleSpec {
239 #[allow(deprecated)]
240 schedule_proto::ScheduleSpec {
241 interval: self.intervals.into_iter().map(Into::into).collect(),
242 calendar: self.calendars.into_iter().map(Into::into).collect(),
243 exclude_calendar: self.exclude_calendars.into_iter().map(Into::into).collect(),
244 cron_string: self.cron_strings,
245 timezone_name: self.timezone_name,
246 start_time: self.start_time.map(Into::into),
247 end_time: self.end_time.map(Into::into),
248 jitter: self.jitter.and_then(|d| d.try_into().ok()),
249 ..Default::default()
250 }
251 }
252}
253
254#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub struct ScheduleIntervalSpec {
258 pub every: Duration,
260 pub offset: Option<Duration>,
262}
263
264impl ScheduleIntervalSpec {
265 pub fn new(every: Duration, offset: Option<Duration>) -> Self {
267 Self { every, offset }
268 }
269}
270
271impl From<Duration> for ScheduleIntervalSpec {
272 fn from(every: Duration) -> Self {
273 Self {
274 every,
275 offset: None,
276 }
277 }
278}
279
280impl From<ScheduleIntervalSpec> for schedule_proto::IntervalSpec {
281 fn from(s: ScheduleIntervalSpec) -> Self {
282 Self {
283 interval: Some(s.every.try_into().unwrap_or_default()),
284 phase: s.offset.and_then(|d| d.try_into().ok()),
285 }
286 }
287}
288
289#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
293#[builder(on(String, into))]
294#[non_exhaustive]
295pub struct ScheduleCalendarSpec {
296 #[builder(default)]
298 pub second: String,
299 #[builder(default)]
301 pub minute: String,
302 #[builder(default)]
304 pub hour: String,
305 #[builder(default)]
307 pub day_of_month: String,
308 #[builder(default)]
310 pub month: String,
311 #[builder(default)]
313 pub day_of_week: String,
314 #[builder(default)]
316 pub year: String,
317 #[builder(default)]
319 pub comment: String,
320}
321
322impl From<ScheduleCalendarSpec> for schedule_proto::CalendarSpec {
323 fn from(s: ScheduleCalendarSpec) -> Self {
324 Self {
325 second: s.second,
326 minute: s.minute,
327 hour: s.hour,
328 day_of_month: s.day_of_month,
329 month: s.month,
330 day_of_week: s.day_of_week,
331 year: s.year,
332 comment: s.comment,
333 }
334 }
335}
336
337#[derive(Debug, Clone, Default, bon::Builder)]
339#[non_exhaustive]
340pub struct ListSchedulesOptions {
341 #[builder(default)]
343 pub maximum_page_size: i32,
344 #[builder(default)]
346 pub query: String,
347}
348
349pub struct ListSchedulesStream {
352 inner: Pin<Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>>,
353}
354
355impl ListSchedulesStream {
356 pub(crate) fn new(
357 inner: Pin<
358 Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>,
359 >,
360 ) -> Self {
361 Self { inner }
362 }
363}
364
365impl futures_util::Stream for ListSchedulesStream {
366 type Item = Result<ScheduleSummary, ScheduleError>;
367
368 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
369 self.inner.as_mut().poll_next(cx)
370 }
371}
372
373#[derive(Debug, Clone, PartialEq)]
375#[non_exhaustive]
376pub struct ScheduleRecentAction {
377 pub schedule_time: Option<SystemTime>,
379 pub actual_time: Option<SystemTime>,
381 pub workflow_id: String,
383 pub run_id: String,
385}
386
387#[derive(Debug, Clone, PartialEq)]
389#[non_exhaustive]
390pub struct ScheduleRunningAction {
391 pub workflow_id: String,
393 pub run_id: String,
395}
396
397impl From<&schedule_proto::ScheduleActionResult> for ScheduleRecentAction {
398 fn from(a: &schedule_proto::ScheduleActionResult) -> Self {
399 let workflow_result = a
400 .start_workflow_result
401 .as_ref()
402 .expect("unsupported schedule action: start_workflow_result should be present");
403 ScheduleRecentAction {
404 schedule_time: a.schedule_time.as_ref().and_then(proto_ts_to_system_time),
405 actual_time: a.actual_time.as_ref().and_then(proto_ts_to_system_time),
406 workflow_id: workflow_result.workflow_id.clone(),
407 run_id: workflow_result.run_id.clone(),
408 }
409 }
410}
411
412#[derive(Debug, Clone)]
418pub struct ScheduleDescription {
419 raw: DescribeScheduleResponse,
420}
421
422impl ScheduleDescription {
423 pub fn conflict_token(&self) -> &[u8] {
425 &self.raw.conflict_token
426 }
427
428 pub fn paused(&self) -> bool {
430 self.raw
431 .schedule
432 .as_ref()
433 .and_then(|s| s.state.as_ref())
434 .is_some_and(|st| st.paused)
435 }
436
437 pub fn note(&self) -> Option<&str> {
440 self.raw
441 .schedule
442 .as_ref()
443 .and_then(|s| s.state.as_ref())
444 .map(|st| st.notes.as_str())
445 .filter(|s| !s.is_empty())
446 }
447
448 pub fn action_count(&self) -> i64 {
450 self.info().map_or(0, |i| i.action_count)
451 }
452
453 pub fn missed_catchup_window(&self) -> i64 {
455 self.info().map_or(0, |i| i.missed_catchup_window)
456 }
457
458 pub fn overlap_skipped(&self) -> i64 {
460 self.info().map_or(0, |i| i.overlap_skipped)
461 }
462
463 pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
465 self.info()
466 .map(|i| {
467 i.recent_actions
468 .iter()
469 .map(ScheduleRecentAction::from)
470 .collect()
471 })
472 .unwrap_or_default()
473 }
474
475 pub fn running_actions(&self) -> Vec<ScheduleRunningAction> {
477 self.info()
478 .map(|i| {
479 i.running_workflows
480 .iter()
481 .map(|w| ScheduleRunningAction {
482 workflow_id: w.workflow_id.clone(),
483 run_id: w.run_id.clone(),
484 })
485 .collect()
486 })
487 .unwrap_or_default()
488 }
489
490 pub fn future_action_times(&self) -> Vec<SystemTime> {
492 self.info()
493 .map(|i| {
494 i.future_action_times
495 .iter()
496 .filter_map(proto_ts_to_system_time)
497 .collect()
498 })
499 .unwrap_or_default()
500 }
501
502 pub fn create_time(&self) -> Option<SystemTime> {
504 self.info()
505 .and_then(|i| i.create_time.as_ref())
506 .and_then(proto_ts_to_system_time)
507 }
508
509 pub fn update_time(&self) -> Option<SystemTime> {
511 self.info()
512 .and_then(|i| i.update_time.as_ref())
513 .and_then(proto_ts_to_system_time)
514 }
515
516 pub fn memo(&self) -> Option<&common_proto::Memo> {
518 self.raw.memo.as_ref()
519 }
520
521 pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
523 self.raw.search_attributes.as_ref()
524 }
525
526 pub fn raw(&self) -> &DescribeScheduleResponse {
528 &self.raw
529 }
530
531 pub fn into_raw(self) -> DescribeScheduleResponse {
533 self.raw
534 }
535
536 fn info(&self) -> Option<&schedule_proto::ScheduleInfo> {
537 self.raw.info.as_ref()
538 }
539
540 pub fn into_update(self) -> ScheduleUpdate {
545 ScheduleUpdate {
546 schedule: self.raw.schedule.unwrap_or_default(),
547 pending_action: None,
548 }
549 }
550}
551
552impl From<DescribeScheduleResponse> for ScheduleDescription {
553 fn from(raw: DescribeScheduleResponse) -> Self {
554 Self { raw }
555 }
556}
557
558#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
560pub enum ScheduleOverlapPolicy {
561 #[default]
563 Unspecified,
564 Skip,
566 BufferOne,
568 BufferAll,
570 CancelOther,
572 TerminateOther,
574 AllowAll,
576}
577
578impl ScheduleOverlapPolicy {
579 pub(crate) fn to_proto(self) -> i32 {
580 match self {
581 Self::Unspecified => 0,
582 Self::Skip => 1,
583 Self::BufferOne => 2,
584 Self::BufferAll => 3,
585 Self::CancelOther => 4,
586 Self::TerminateOther => 5,
587 Self::AllowAll => 6,
588 }
589 }
590}
591
592#[derive(Debug, Clone, PartialEq, bon::Builder)]
594#[non_exhaustive]
595#[builder(start_fn = new)]
596pub struct ScheduleBackfill {
597 #[builder(start_fn)]
599 pub start_time: SystemTime,
600 #[builder(start_fn)]
602 pub end_time: SystemTime,
603 #[builder(default)]
605 pub overlap_policy: ScheduleOverlapPolicy,
606}
607
608#[derive(Debug, Clone)]
613pub struct ScheduleUpdate {
614 schedule: schedule_proto::Schedule,
615 pending_action: Option<ScheduleAction>,
616}
617
618impl ScheduleUpdate {
619 pub fn set_spec(&mut self, spec: ScheduleSpec) -> &mut Self {
621 self.schedule.spec = Some(spec.into_proto());
622 self
623 }
624
625 pub fn set_action(&mut self, action: ScheduleAction) -> &mut Self {
627 self.pending_action = Some(action);
628 self
629 }
630
631 pub fn set_paused(&mut self, paused: bool) -> &mut Self {
633 self.state_mut().paused = paused;
634 self
635 }
636
637 pub fn set_note(&mut self, note: impl Into<String>) -> &mut Self {
639 self.state_mut().notes = note.into();
640 self
641 }
642
643 pub fn set_overlap_policy(&mut self, policy: ScheduleOverlapPolicy) -> &mut Self {
645 self.policies_mut().overlap_policy = policy.to_proto();
646 self
647 }
648
649 pub fn set_catchup_window(&mut self, window: Duration) -> &mut Self {
652 self.policies_mut().catchup_window = window.try_into().ok();
653 self
654 }
655
656 pub fn set_pause_on_failure(&mut self, pause_on_failure: bool) -> &mut Self {
658 self.policies_mut().pause_on_failure = pause_on_failure;
659 self
660 }
661
662 pub fn set_keep_original_workflow_id(&mut self, keep: bool) -> &mut Self {
665 self.policies_mut().keep_original_workflow_id = keep;
666 self
667 }
668
669 pub fn set_remaining_actions(&mut self, count: Option<i64>) -> &mut Self {
672 let state = self.state_mut();
673 match count {
674 Some(n) => {
675 state.limited_actions = true;
676 state.remaining_actions = n;
677 }
678 None => {
679 state.limited_actions = false;
680 state.remaining_actions = 0;
681 }
682 }
683 self
684 }
685
686 pub fn raw(&self) -> &schedule_proto::Schedule {
688 &self.schedule
689 }
690
691 pub fn into_raw(self) -> schedule_proto::Schedule {
693 self.schedule
694 }
695
696 fn state_mut(&mut self) -> &mut schedule_proto::ScheduleState {
697 self.schedule.state.get_or_insert_with(Default::default)
698 }
699
700 fn policies_mut(&mut self) -> &mut schedule_proto::SchedulePolicies {
701 self.schedule.policies.get_or_insert_with(Default::default)
702 }
703}
704
705#[derive(Debug, Clone)]
711pub struct ScheduleSummary {
712 raw: schedule_proto::ScheduleListEntry,
713}
714
715impl ScheduleSummary {
716 pub fn schedule_id(&self) -> &str {
718 &self.raw.schedule_id
719 }
720
721 pub fn workflow_type(&self) -> Option<&str> {
723 self.info()
724 .and_then(|i| i.workflow_type.as_ref())
725 .map(|wt| wt.name.as_str())
726 }
727
728 pub fn note(&self) -> Option<&str> {
731 self.info()
732 .map(|i| i.notes.as_str())
733 .filter(|s| !s.is_empty())
734 }
735
736 pub fn paused(&self) -> bool {
738 self.info().is_some_and(|i| i.paused)
739 }
740
741 pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
743 self.info()
744 .map(|i| {
745 i.recent_actions
746 .iter()
747 .map(ScheduleRecentAction::from)
748 .collect()
749 })
750 .unwrap_or_default()
751 }
752
753 pub fn future_action_times(&self) -> Vec<SystemTime> {
755 self.info()
756 .map(|i| {
757 i.future_action_times
758 .iter()
759 .filter_map(proto_ts_to_system_time)
760 .collect()
761 })
762 .unwrap_or_default()
763 }
764
765 pub fn memo(&self) -> Option<&common_proto::Memo> {
767 self.raw.memo.as_ref()
768 }
769
770 pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
772 self.raw.search_attributes.as_ref()
773 }
774
775 pub fn raw(&self) -> &schedule_proto::ScheduleListEntry {
777 &self.raw
778 }
779
780 pub fn into_raw(self) -> schedule_proto::ScheduleListEntry {
782 self.raw
783 }
784
785 fn info(&self) -> Option<&schedule_proto::ScheduleListInfo> {
786 self.raw.info.as_ref()
787 }
788}
789
790impl From<schedule_proto::ScheduleListEntry> for ScheduleSummary {
791 fn from(raw: schedule_proto::ScheduleListEntry) -> Self {
792 Self { raw }
793 }
794}
795
796#[derive(Clone, derive_more::Debug)]
800pub struct ScheduleHandle<CT> {
801 #[debug(skip)]
802 client: CT,
803 namespace: String,
804 schedule_id: String,
805}
806
807impl<CT> ScheduleHandle<CT>
808where
809 CT: WorkflowService + NamespacedClient + Clone + Send + Sync,
810{
811 pub(crate) fn new(client: CT, namespace: String, schedule_id: String) -> Self {
812 Self {
813 client,
814 namespace,
815 schedule_id,
816 }
817 }
818
819 pub fn namespace(&self) -> &str {
821 &self.namespace
822 }
823
824 pub fn schedule_id(&self) -> &str {
826 &self.schedule_id
827 }
828
829 pub async fn describe(&self) -> Result<ScheduleDescription, ScheduleError> {
831 let resp = WorkflowService::describe_schedule(
832 &mut self.client.clone(),
833 DescribeScheduleRequest {
834 namespace: self.namespace.clone(),
835 schedule_id: self.schedule_id.clone(),
836 }
837 .into_request(),
838 )
839 .await?
840 .into_inner();
841
842 Ok(ScheduleDescription::from(resp))
843 }
844
845 pub async fn update(
865 &self,
866 updater: impl FnOnce(&mut ScheduleUpdate),
867 ) -> Result<(), ScheduleError> {
868 let desc = self.describe().await?;
869 let mut update = desc.into_update();
870 updater(&mut update);
871 self.send_update(update).await
872 }
873
874 pub async fn send_update(&self, mut update: ScheduleUpdate) -> Result<(), ScheduleError> {
880 if let Some(action) = update.pending_action.take() {
881 update.schedule.action = Some(action.into_proto(self.client.data_converter()).await?);
882 }
883 WorkflowService::update_schedule(
884 &mut self.client.clone(),
885 UpdateScheduleRequest {
886 namespace: self.namespace.clone(),
887 schedule_id: self.schedule_id.clone(),
888 schedule: Some(update.schedule),
889 identity: self.client.identity(),
890 request_id: Uuid::new_v4().to_string(),
891 ..Default::default()
892 }
893 .into_request(),
894 )
895 .await?;
896 Ok(())
897 }
898
899 pub async fn delete(&self) -> Result<(), ScheduleError> {
901 WorkflowService::delete_schedule(
902 &mut self.client.clone(),
903 DeleteScheduleRequest {
904 namespace: self.namespace.clone(),
905 schedule_id: self.schedule_id.clone(),
906 identity: self.client.identity(),
907 }
908 .into_request(),
909 )
910 .await?;
911 Ok(())
912 }
913
914 pub async fn pause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
918 let note = note.map_or_else(|| "Paused via Rust SDK".to_string(), |s| s.into());
919 WorkflowService::patch_schedule(
920 &mut self.client.clone(),
921 PatchScheduleRequest {
922 namespace: self.namespace.clone(),
923 schedule_id: self.schedule_id.clone(),
924 patch: Some(schedule_proto::SchedulePatch {
925 pause: note,
926 ..Default::default()
927 }),
928 identity: self.client.identity(),
929 request_id: Uuid::new_v4().to_string(),
930 }
931 .into_request(),
932 )
933 .await?;
934 Ok(())
935 }
936
937 pub async fn unpause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
941 let note = note.map_or_else(|| "Unpaused via Rust SDK".to_string(), |s| s.into());
942 WorkflowService::patch_schedule(
943 &mut self.client.clone(),
944 PatchScheduleRequest {
945 namespace: self.namespace.clone(),
946 schedule_id: self.schedule_id.clone(),
947 patch: Some(schedule_proto::SchedulePatch {
948 unpause: note,
949 ..Default::default()
950 }),
951 identity: self.client.identity(),
952 request_id: Uuid::new_v4().to_string(),
953 }
954 .into_request(),
955 )
956 .await?;
957 Ok(())
958 }
959
960 pub async fn trigger(
962 &self,
963 overlap_policy: ScheduleOverlapPolicy,
964 ) -> Result<(), ScheduleError> {
965 WorkflowService::patch_schedule(
966 &mut self.client.clone(),
967 PatchScheduleRequest {
968 namespace: self.namespace.clone(),
969 schedule_id: self.schedule_id.clone(),
970 patch: Some(schedule_proto::SchedulePatch {
971 trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
972 overlap_policy: overlap_policy.to_proto(),
973 scheduled_time: None,
974 }),
975 ..Default::default()
976 }),
977 identity: self.client.identity(),
978 request_id: Uuid::new_v4().to_string(),
979 }
980 .into_request(),
981 )
982 .await?;
983 Ok(())
984 }
985
986 pub async fn backfill(
988 &self,
989 backfills: impl IntoIterator<Item = ScheduleBackfill>,
990 ) -> Result<(), ScheduleError> {
991 let backfill_requests: Vec<schedule_proto::BackfillRequest> = backfills
992 .into_iter()
993 .map(|b| schedule_proto::BackfillRequest {
994 start_time: Some(b.start_time.into()),
995 end_time: Some(b.end_time.into()),
996 overlap_policy: b.overlap_policy.to_proto(),
997 })
998 .collect();
999 WorkflowService::patch_schedule(
1000 &mut self.client.clone(),
1001 PatchScheduleRequest {
1002 namespace: self.namespace.clone(),
1003 schedule_id: self.schedule_id.clone(),
1004 patch: Some(schedule_proto::SchedulePatch {
1005 backfill_request: backfill_requests,
1006 ..Default::default()
1007 }),
1008 identity: self.client.identity(),
1009 request_id: Uuid::new_v4().to_string(),
1010 }
1011 .into_request(),
1012 )
1013 .await?;
1014 Ok(())
1015 }
1016}
1017
1018impl Client {
1020 pub async fn create_schedule(
1022 &self,
1023 schedule_id: impl Into<String>,
1024 opts: CreateScheduleOptions,
1025 ) -> Result<ScheduleHandle<Self>, ScheduleError> {
1026 let schedule_id = schedule_id.into();
1027 let namespace = self.namespace();
1028
1029 let initial_patch = if opts.trigger_immediately {
1030 Some(schedule_proto::SchedulePatch {
1031 trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
1032 overlap_policy: ScheduleOverlapPolicy::AllowAll.to_proto(),
1035 scheduled_time: None,
1036 }),
1037 ..Default::default()
1038 })
1039 } else {
1040 None
1041 };
1042 let policies = (opts.overlap_policy != ScheduleOverlapPolicy::Unspecified).then(|| {
1045 schedule_proto::SchedulePolicies {
1046 overlap_policy: opts.overlap_policy.to_proto(),
1047 ..Default::default()
1048 }
1049 });
1050 let schedule = schedule_proto::Schedule {
1051 spec: Some(opts.spec.into_proto()),
1052 action: Some(opts.action.into_proto(self.data_converter()).await?),
1053 policies,
1054 state: Some(schedule_proto::ScheduleState {
1055 paused: opts.paused,
1056 notes: opts.note,
1057 ..Default::default()
1058 }),
1059 };
1060 WorkflowService::create_schedule(
1061 &mut self.clone(),
1062 CreateScheduleRequest {
1063 namespace: namespace.clone(),
1064 schedule_id: schedule_id.clone(),
1065 schedule: Some(schedule),
1066 initial_patch,
1067 identity: self.identity(),
1068 request_id: Uuid::new_v4().to_string(),
1069 ..Default::default()
1070 }
1071 .into_request(),
1072 )
1073 .await?;
1074 Ok(ScheduleHandle::new(self.clone(), namespace, schedule_id))
1075 }
1076
1077 pub fn get_schedule_handle(&self, schedule_id: impl Into<String>) -> ScheduleHandle<Self> {
1079 ScheduleHandle::new(self.clone(), self.namespace(), schedule_id.into())
1080 }
1081
1082 pub fn list_schedules(&self, opts: ListSchedulesOptions) -> ListSchedulesStream {
1085 let client = self.clone();
1086 let namespace = self.namespace();
1087 let query = opts.query;
1088 let page_size = opts.maximum_page_size;
1089
1090 let stream = stream::unfold(
1091 (Vec::new(), VecDeque::new(), false),
1092 move |(next_page_token, mut buffer, exhausted)| {
1093 let mut client = client.clone();
1094 let namespace = namespace.clone();
1095 let query = query.clone();
1096
1097 async move {
1098 if let Some(item) = buffer.pop_front() {
1099 return Some((Ok(item), (next_page_token, buffer, exhausted)));
1100 } else if exhausted {
1101 return None;
1102 }
1103
1104 let response = WorkflowService::list_schedules(
1105 &mut client,
1106 ListSchedulesRequest {
1107 namespace,
1108 maximum_page_size: page_size,
1109 next_page_token: next_page_token.clone(),
1110 query,
1111 }
1112 .into_request(),
1113 )
1114 .await;
1115
1116 match response {
1117 Ok(resp) => {
1118 let resp = resp.into_inner();
1119 let new_exhausted = resp.next_page_token.is_empty();
1120 let new_token = resp.next_page_token;
1121
1122 buffer = resp
1123 .schedules
1124 .into_iter()
1125 .map(ScheduleSummary::from)
1126 .collect();
1127
1128 buffer
1129 .pop_front()
1130 .map(|item| (Ok(item), (new_token, buffer, new_exhausted)))
1131 }
1132 Err(e) => Some((Err(e.into()), (next_page_token, buffer, true))),
1133 }
1134 }
1135 },
1136 );
1137
1138 ListSchedulesStream::new(Box::pin(stream))
1139 }
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144 use super::*;
1145 use crate::{NamespacedClient, grpc::WorkflowService};
1146 use futures_util::FutureExt;
1147 use std::{
1148 sync::{
1149 Arc,
1150 atomic::{AtomicUsize, Ordering},
1151 },
1152 time::SystemTime,
1153 };
1154 use temporalio_common::protos::temporal::api::{
1155 common::v1::{
1156 Memo, SearchAttributes, WorkflowExecution as ProtoWorkflowExecution, WorkflowType,
1157 },
1158 schedule::v1::{
1159 Schedule, ScheduleActionResult, ScheduleInfo, ScheduleListEntry, ScheduleListInfo,
1160 ScheduleSpec, ScheduleState,
1161 },
1162 workflowservice::v1::{
1163 DeleteScheduleResponse, DescribeScheduleResponse, PatchScheduleResponse,
1164 UpdateScheduleResponse,
1165 },
1166 };
1167 use tonic::{Request, Response};
1168
1169 #[derive(Default)]
1170 struct CapturedRequests {
1171 describe: AtomicUsize,
1172 update: AtomicUsize,
1173 delete: AtomicUsize,
1174 patch: AtomicUsize,
1175 }
1176
1177 #[derive(Clone, Default)]
1178 struct MockScheduleClient {
1179 captured: Arc<CapturedRequests>,
1180 describe_response: DescribeScheduleResponse,
1181 should_error: bool,
1182 }
1183
1184 impl NamespacedClient for MockScheduleClient {
1185 fn namespace(&self) -> String {
1186 "test-namespace".to_string()
1187 }
1188 fn identity(&self) -> String {
1189 "test-identity".to_string()
1190 }
1191 }
1192
1193 impl WorkflowService for MockScheduleClient {
1194 fn describe_schedule(
1195 &mut self,
1196 _request: Request<DescribeScheduleRequest>,
1197 ) -> futures_util::future::BoxFuture<
1198 '_,
1199 Result<Response<DescribeScheduleResponse>, tonic::Status>,
1200 > {
1201 self.captured.describe.fetch_add(1, Ordering::SeqCst);
1202 let resp = self.describe_response.clone();
1203 let should_error = self.should_error;
1204 async move {
1205 if should_error {
1206 Err(tonic::Status::not_found("schedule not found"))
1207 } else {
1208 Ok(Response::new(resp))
1209 }
1210 }
1211 .boxed()
1212 }
1213
1214 fn update_schedule(
1215 &mut self,
1216 _request: Request<UpdateScheduleRequest>,
1217 ) -> futures_util::future::BoxFuture<
1218 '_,
1219 Result<Response<UpdateScheduleResponse>, tonic::Status>,
1220 > {
1221 self.captured.update.fetch_add(1, Ordering::SeqCst);
1222 let should_error = self.should_error;
1223 async move {
1224 if should_error {
1225 Err(tonic::Status::internal("update failed"))
1226 } else {
1227 Ok(Response::new(UpdateScheduleResponse::default()))
1228 }
1229 }
1230 .boxed()
1231 }
1232
1233 fn delete_schedule(
1234 &mut self,
1235 _request: Request<DeleteScheduleRequest>,
1236 ) -> futures_util::future::BoxFuture<
1237 '_,
1238 Result<Response<DeleteScheduleResponse>, tonic::Status>,
1239 > {
1240 self.captured.delete.fetch_add(1, Ordering::SeqCst);
1241 let should_error = self.should_error;
1242 async move {
1243 if should_error {
1244 Err(tonic::Status::internal("delete failed"))
1245 } else {
1246 Ok(Response::new(DeleteScheduleResponse::default()))
1247 }
1248 }
1249 .boxed()
1250 }
1251
1252 fn patch_schedule(
1253 &mut self,
1254 _request: Request<PatchScheduleRequest>,
1255 ) -> futures_util::future::BoxFuture<
1256 '_,
1257 Result<Response<PatchScheduleResponse>, tonic::Status>,
1258 > {
1259 self.captured.patch.fetch_add(1, Ordering::SeqCst);
1260 let should_error = self.should_error;
1261 async move {
1262 if should_error {
1263 Err(tonic::Status::internal("patch failed"))
1264 } else {
1265 Ok(Response::new(PatchScheduleResponse::default()))
1266 }
1267 }
1268 .boxed()
1269 }
1270 }
1271
1272 fn make_schedule_handle(client: MockScheduleClient) -> ScheduleHandle<MockScheduleClient> {
1273 ScheduleHandle::new(
1274 client,
1275 "test-namespace".to_string(),
1276 "test-schedule-id".to_string(),
1277 )
1278 }
1279
1280 #[test]
1281 fn schedule_handle_exposes_namespace_and_id() {
1282 let handle = make_schedule_handle(MockScheduleClient::default());
1283 assert_eq!(handle.namespace(), "test-namespace");
1284 assert_eq!(handle.schedule_id(), "test-schedule-id");
1285 }
1286
1287 #[tokio::test]
1288 async fn schedule_describe_returns_response_fields() {
1289 let conflict_token = b"token-123".to_vec();
1290
1291 let client = MockScheduleClient {
1292 describe_response: DescribeScheduleResponse {
1293 schedule: Some(Schedule::default()),
1294 info: Some(ScheduleInfo::default()),
1295 memo: Some(Memo {
1296 fields: Default::default(),
1297 }),
1298 search_attributes: Some(SearchAttributes {
1299 indexed_fields: Default::default(),
1300 }),
1301 conflict_token: conflict_token.clone(),
1302 },
1303 ..Default::default()
1304 };
1305
1306 let handle = make_schedule_handle(client.clone());
1307 let desc = handle.describe().await.unwrap();
1308
1309 assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1310 assert!(desc.raw().schedule.is_some());
1311 assert!(desc.raw().info.is_some());
1312 assert!(desc.raw().memo.is_some());
1313 assert!(desc.raw().search_attributes.is_some());
1314 assert_eq!(desc.conflict_token(), conflict_token);
1315 }
1316
1317 #[tokio::test]
1318 async fn schedule_update_describes_then_sends() {
1319 let client = MockScheduleClient::default();
1320 let handle = make_schedule_handle(client.clone());
1321
1322 handle
1323 .update(|u| {
1324 u.set_note("hi");
1325 })
1326 .await
1327 .unwrap();
1328
1329 assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1330 assert_eq!(client.captured.update.load(Ordering::SeqCst), 1);
1331 }
1332
1333 #[tokio::test]
1334 async fn schedule_multiple_updates_each_call_service() {
1335 let client = MockScheduleClient::default();
1336 let handle = make_schedule_handle(client.clone());
1337
1338 handle.update(|_| {}).await.unwrap();
1339 handle.update(|_| {}).await.unwrap();
1340
1341 assert_eq!(client.captured.update.load(Ordering::SeqCst), 2);
1342 }
1343
1344 #[tokio::test]
1345 async fn schedule_delete_calls_service() {
1346 let client = MockScheduleClient::default();
1347 let handle = make_schedule_handle(client.clone());
1348
1349 handle.delete().await.unwrap();
1350
1351 assert_eq!(client.captured.delete.load(Ordering::SeqCst), 1);
1352 }
1353
1354 #[tokio::test]
1355 async fn schedule_pause_calls_patch() {
1356 let client = MockScheduleClient::default();
1357 let handle = make_schedule_handle(client.clone());
1358
1359 handle.pause(Some("taking a break")).await.unwrap();
1360
1361 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1362 }
1363
1364 #[tokio::test]
1365 async fn schedule_pause_with_none_uses_default() {
1366 let client = MockScheduleClient::default();
1367 let handle = make_schedule_handle(client.clone());
1368
1369 handle.pause(None::<&str>).await.unwrap();
1370
1371 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1372 }
1373
1374 #[tokio::test]
1375 async fn schedule_unpause_calls_patch() {
1376 let client = MockScheduleClient::default();
1377 let handle = make_schedule_handle(client.clone());
1378
1379 handle.unpause(Some("resuming work")).await.unwrap();
1380
1381 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1382 }
1383
1384 #[tokio::test]
1385 async fn schedule_trigger_calls_patch() {
1386 let client = MockScheduleClient::default();
1387 let handle = make_schedule_handle(client.clone());
1388
1389 handle
1390 .trigger(ScheduleOverlapPolicy::Unspecified)
1391 .await
1392 .unwrap();
1393
1394 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1395 }
1396
1397 #[tokio::test]
1398 async fn schedule_backfill_calls_patch() {
1399 let client = MockScheduleClient::default();
1400 let handle = make_schedule_handle(client.clone());
1401
1402 let now = SystemTime::now();
1403 handle
1404 .backfill(vec![
1405 ScheduleBackfill::new(now, now)
1406 .overlap_policy(ScheduleOverlapPolicy::Skip)
1407 .build(),
1408 ScheduleBackfill::new(now, now)
1409 .overlap_policy(ScheduleOverlapPolicy::BufferOne)
1410 .build(),
1411 ])
1412 .await
1413 .unwrap();
1414
1415 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1416 }
1417
1418 #[tokio::test]
1419 async fn schedule_describe_propagates_rpc_errors() {
1420 let client = MockScheduleClient {
1421 should_error: true,
1422 ..Default::default()
1423 };
1424 let handle = make_schedule_handle(client);
1425
1426 let err = handle.describe().await.unwrap_err();
1427 assert!(
1428 matches!(err, ScheduleError::Rpc(_)),
1429 "expected Rpc variant, got: {err:?}"
1430 );
1431 assert!(err.to_string().contains("schedule not found"));
1432 }
1433
1434 #[tokio::test]
1435 async fn schedule_update_propagates_rpc_errors() {
1436 let client = MockScheduleClient {
1437 should_error: true,
1438 ..Default::default()
1439 };
1440 let handle = make_schedule_handle(client);
1441
1442 let err = handle.update(|_| {}).await.unwrap_err();
1443 assert!(matches!(err, ScheduleError::Rpc(_)));
1444 }
1445
1446 #[tokio::test]
1447 async fn schedule_delete_propagates_rpc_errors() {
1448 let client = MockScheduleClient {
1449 should_error: true,
1450 ..Default::default()
1451 };
1452 let handle = make_schedule_handle(client);
1453
1454 let err = handle.delete().await.unwrap_err();
1455 assert!(matches!(err, ScheduleError::Rpc(_)));
1456 }
1457
1458 #[tokio::test]
1459 async fn schedule_patch_operations_propagate_rpc_errors() {
1460 let client = MockScheduleClient {
1461 should_error: true,
1462 ..Default::default()
1463 };
1464 let handle = make_schedule_handle(client);
1465
1466 assert!(handle.pause(Some("")).await.is_err());
1467 assert!(handle.unpause(Some("")).await.is_err());
1468 assert!(handle.trigger(Default::default()).await.is_err());
1469 assert!(handle.backfill(vec![]).await.is_err());
1470 }
1471
1472 #[tokio::test]
1473 async fn schedule_all_patch_operations_call_service() {
1474 let client = MockScheduleClient::default();
1475 let handle = make_schedule_handle(client.clone());
1476
1477 handle.pause(Some("p")).await.unwrap();
1478 handle.unpause(Some("u")).await.unwrap();
1479 handle.trigger(Default::default()).await.unwrap();
1480 handle.backfill(vec![]).await.unwrap();
1481
1482 assert_eq!(client.captured.patch.load(Ordering::SeqCst), 4);
1483 }
1484
1485 #[tokio::test]
1486 async fn schedule_describe_accessors_with_populated_fields() {
1487 let client = MockScheduleClient {
1488 describe_response: DescribeScheduleResponse {
1489 schedule: Some(Schedule {
1490 spec: Some(ScheduleSpec {
1491 timezone_name: "US/Eastern".to_string(),
1492 ..Default::default()
1493 }),
1494 state: Some(ScheduleState {
1495 paused: true,
1496 notes: "maintenance window".to_string(),
1497 ..Default::default()
1498 }),
1499 ..Default::default()
1500 }),
1501 info: Some(ScheduleInfo {
1502 action_count: 42,
1503 missed_catchup_window: 3,
1504 overlap_skipped: 5,
1505 recent_actions: vec![ScheduleActionResult {
1506 start_workflow_result: Some(ProtoWorkflowExecution {
1507 workflow_id: "ra-wf".to_string(),
1508 run_id: "ra-run".to_string(),
1509 }),
1510 ..Default::default()
1511 }],
1512 running_workflows: vec![ProtoWorkflowExecution {
1513 workflow_id: "wf-1".to_string(),
1514 run_id: "run-1".to_string(),
1515 }],
1516 create_time: Some(prost_types::Timestamp {
1517 seconds: 1_700_000_000,
1518 nanos: 0,
1519 }),
1520 update_time: Some(prost_types::Timestamp {
1521 seconds: 1_700_001_000,
1522 nanos: 0,
1523 }),
1524 future_action_times: vec![prost_types::Timestamp {
1525 seconds: 1_700_002_000,
1526 nanos: 0,
1527 }],
1528 ..Default::default()
1529 }),
1530 conflict_token: b"tok".to_vec(),
1531 ..Default::default()
1532 },
1533 ..Default::default()
1534 };
1535
1536 let handle = make_schedule_handle(client);
1537 let desc = handle.describe().await.unwrap();
1538
1539 assert!(desc.paused());
1540 assert_eq!(desc.note(), Some("maintenance window"));
1541 assert_eq!(desc.action_count(), 42);
1542 assert_eq!(desc.missed_catchup_window(), 3);
1543 assert_eq!(desc.overlap_skipped(), 5);
1544
1545 assert_eq!(desc.recent_actions().len(), 1);
1546 assert_eq!(
1547 desc.running_actions(),
1548 vec![ScheduleRunningAction {
1549 workflow_id: "wf-1".to_string(),
1550 run_id: "run-1".to_string(),
1551 }]
1552 );
1553
1554 assert_eq!(desc.future_action_times().len(), 1);
1555 assert!(desc.create_time().is_some());
1556 assert!(desc.update_time().is_some());
1557 }
1558
1559 #[tokio::test]
1560 async fn schedule_describe_defaults_when_nested_fields_are_none() {
1561 let client = MockScheduleClient::default();
1562
1563 let handle = make_schedule_handle(client);
1564 let desc = handle.describe().await.unwrap();
1565
1566 assert!(!desc.paused());
1567 assert_eq!(desc.note(), None);
1568 assert_eq!(desc.action_count(), 0);
1569 assert_eq!(desc.missed_catchup_window(), 0);
1570 assert_eq!(desc.overlap_skipped(), 0);
1571 assert!(desc.recent_actions().is_empty());
1572 assert!(desc.running_actions().is_empty());
1573 assert!(desc.future_action_times().is_empty());
1574 assert!(desc.create_time().is_none());
1575 assert!(desc.update_time().is_none());
1576 assert!(desc.conflict_token().is_empty());
1577 }
1578
1579 #[tokio::test]
1580 async fn schedule_note_returns_none_for_empty_string() {
1581 let client = MockScheduleClient {
1582 describe_response: DescribeScheduleResponse {
1583 schedule: Some(Schedule {
1584 state: Some(ScheduleState {
1585 notes: String::new(),
1586 ..Default::default()
1587 }),
1588 ..Default::default()
1589 }),
1590 ..Default::default()
1591 },
1592 ..Default::default()
1593 };
1594
1595 let handle = make_schedule_handle(client);
1596 let desc = handle.describe().await.unwrap();
1597 assert_eq!(desc.note(), None);
1598 }
1599
1600 #[test]
1601 fn schedule_summary_note_returns_none_for_empty_string() {
1602 let entry = ScheduleListEntry {
1603 schedule_id: "s".to_string(),
1604 info: Some(ScheduleListInfo {
1605 notes: String::new(),
1606 ..Default::default()
1607 }),
1608 ..Default::default()
1609 };
1610 let summary = ScheduleSummary::from(entry);
1611 assert_eq!(summary.note(), None);
1612 }
1613
1614 #[test]
1615 fn schedule_summary_accessors() {
1616 let entry = ScheduleListEntry {
1617 schedule_id: "sched-1".to_string(),
1618 memo: Some(Memo {
1619 fields: Default::default(),
1620 }),
1621 search_attributes: Some(SearchAttributes {
1622 indexed_fields: Default::default(),
1623 }),
1624 info: Some(ScheduleListInfo {
1625 spec: Some(ScheduleSpec::default()),
1626 workflow_type: Some(WorkflowType {
1627 name: "MyWorkflow".to_string(),
1628 }),
1629 notes: "some note".to_string(),
1630 paused: true,
1631 recent_actions: vec![ScheduleActionResult {
1632 start_workflow_result: Some(ProtoWorkflowExecution {
1633 workflow_id: "ra-wf".to_string(),
1634 run_id: "ra-run".to_string(),
1635 }),
1636 ..Default::default()
1637 }],
1638 future_action_times: vec![prost_types::Timestamp {
1639 seconds: 1_700_000_000,
1640 nanos: 0,
1641 }],
1642 state_size_bytes: 0,
1643 }),
1644 };
1645
1646 let summary = ScheduleSummary::from(entry);
1647 assert_eq!(summary.schedule_id(), "sched-1");
1648 assert!(summary.raw().memo.is_some());
1649 assert!(summary.raw().search_attributes.is_some());
1650 assert_eq!(summary.workflow_type(), Some("MyWorkflow"));
1651 assert_eq!(summary.note(), Some("some note"));
1652 assert!(summary.paused());
1653 assert_eq!(summary.recent_actions().len(), 1);
1654 assert_eq!(summary.future_action_times().len(), 1);
1655 }
1656
1657 #[test]
1658 fn schedule_summary_defaults_when_info_is_none() {
1659 let entry = ScheduleListEntry {
1660 schedule_id: "sched-2".to_string(),
1661 ..Default::default()
1662 };
1663
1664 let summary = ScheduleSummary::from(entry);
1665 assert_eq!(summary.schedule_id(), "sched-2");
1666 assert!(summary.raw().memo.is_none());
1667 assert!(summary.raw().search_attributes.is_none());
1668 assert_eq!(summary.workflow_type(), None);
1669 assert_eq!(summary.note(), None);
1670 assert!(!summary.paused());
1671 assert!(summary.recent_actions().is_empty());
1672 assert!(summary.future_action_times().is_empty());
1673 }
1674
1675 #[test]
1676 fn schedule_description_raw_round_trip() {
1677 let resp = DescribeScheduleResponse {
1678 conflict_token: b"ct".to_vec(),
1679 schedule: Some(Schedule::default()),
1680 ..Default::default()
1681 };
1682 let desc = ScheduleDescription::from(resp.clone());
1683 assert_eq!(desc.raw().conflict_token, b"ct");
1684 let recovered = desc.into_raw();
1685 assert_eq!(recovered.conflict_token, resp.conflict_token);
1686 assert!(recovered.schedule.is_some());
1687 }
1688
1689 #[test]
1690 fn schedule_summary_raw_round_trip() {
1691 let entry = ScheduleListEntry {
1692 schedule_id: "rt-1".to_string(),
1693 ..Default::default()
1694 };
1695 let summary = ScheduleSummary::from(entry.clone());
1696 assert_eq!(summary.raw().schedule_id, "rt-1");
1697 let recovered = summary.into_raw();
1698 assert_eq!(recovered.schedule_id, entry.schedule_id);
1699 }
1700
1701 #[test]
1702 fn schedule_into_update_preserves_schedule() {
1703 let resp = DescribeScheduleResponse {
1704 schedule: Some(Schedule {
1705 state: Some(ScheduleState {
1706 notes: "my notes".to_string(),
1707 ..Default::default()
1708 }),
1709 ..Default::default()
1710 }),
1711 ..Default::default()
1712 };
1713 let desc = ScheduleDescription::from(resp);
1714 let update = desc.into_update();
1715
1716 assert_eq!(update.raw().state.as_ref().unwrap().notes, "my notes");
1717 }
1718
1719 #[test]
1720 fn schedule_update_setters_are_chainable() {
1721 let resp = DescribeScheduleResponse {
1722 schedule: Some(Schedule::default()),
1723 ..Default::default()
1724 };
1725 let desc = ScheduleDescription::from(resp);
1726 let mut update = desc.into_update();
1727 update.set_note("chained").set_paused(true);
1728 assert_eq!(update.raw().state.as_ref().unwrap().notes, "chained");
1729 assert!(update.raw().state.as_ref().unwrap().paused);
1730 }
1731
1732 #[test]
1733 fn schedule_recent_action_from_proto_with_timestamps() {
1734 let ts = prost_types::Timestamp {
1735 seconds: 1_700_000_000,
1736 nanos: 0,
1737 };
1738 let proto = ScheduleActionResult {
1739 schedule_time: Some(ts),
1740 actual_time: Some(ts),
1741 start_workflow_result: Some(ProtoWorkflowExecution {
1742 workflow_id: "wf-abc".to_string(),
1743 run_id: "run-xyz".to_string(),
1744 }),
1745 ..Default::default()
1746 };
1747
1748 let action = ScheduleRecentAction::from(&proto);
1749
1750 assert!(action.schedule_time.is_some());
1751 assert!(action.actual_time.is_some());
1752 assert_eq!(action.workflow_id, "wf-abc");
1753 assert_eq!(action.run_id, "run-xyz");
1754 }
1755
1756 #[test]
1757 #[should_panic(expected = "unsupported schedule action")]
1758 fn schedule_recent_action_panics_without_workflow_result() {
1759 let _ = ScheduleRecentAction::from(&ScheduleActionResult::default());
1760 }
1761
1762 #[test]
1763 fn schedule_overlap_policy_default_is_unspecified() {
1764 assert_eq!(
1765 ScheduleOverlapPolicy::default(),
1766 ScheduleOverlapPolicy::Unspecified
1767 );
1768 }
1769
1770 #[tokio::test]
1771 async fn schedule_action_start_workflow_with_input_into_proto() {
1772 use temporalio_common::{
1773 UntypedWorkflow,
1774 data_converters::{DataConverter, RawValue},
1775 protos::temporal::api::common::v1::Payload,
1776 };
1777
1778 let payload = Payload {
1779 metadata: [("encoding".to_string(), b"json/plain".to_vec())]
1780 .into_iter()
1781 .collect(),
1782 data: b"42".to_vec(),
1783 ..Default::default()
1784 };
1785 let action = ScheduleAction::start_workflow(
1786 UntypedWorkflow::new("MyWorkflow"),
1787 RawValue::new(vec![payload.clone()]),
1788 "my-queue",
1789 "my-wf-id",
1790 );
1791 let proto = action.into_proto(&DataConverter::default()).await.unwrap();
1792 #[allow(irrefutable_let_patterns)]
1793 let schedule_proto::schedule_action::Action::StartWorkflow(wf_info) = proto.action.unwrap()
1794 else {
1795 panic!("expected StartWorkflow action")
1796 };
1797 assert_eq!(wf_info.input.unwrap().payloads, vec![payload]);
1798 }
1799}