Skip to main content

temporalio_client/
schedules.rs

1use crate::{Client, NamespacedClient, grpc::WorkflowService};
2use futures_util::{FutureExt, future::BoxFuture, stream};
3use std::{
4    collections::VecDeque,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8    time::{Duration, SystemTime},
9};
10use temporalio_common::{
11    HasWorkflowDefinition,
12    data_converters::{
13        DataConverter, PayloadConversionError, SerializationContextData, TemporalSerializable,
14    },
15    protos::{
16        coresdk::IntoPayloadsExt,
17        proto_ts_to_system_time,
18        temporal::api::{
19            common::v1 as common_proto, schedule::v1 as schedule_proto,
20            taskqueue::v1 as taskqueue_proto, workflow::v1 as workflow_proto,
21            workflowservice::v1::*,
22        },
23    },
24};
25use tonic::IntoRequest;
26use uuid::Uuid;
27
28/// Errors returned by schedule operations.
29#[derive(Debug, thiserror::Error)]
30#[non_exhaustive]
31pub enum ScheduleError {
32    /// An rpc error from the server.
33    #[error("Server error: {0}")]
34    Rpc(#[from] tonic::Status),
35    /// Failed to encode workflow input payloads.
36    #[error("Payload conversion error: {0}")]
37    PayloadConversion(#[from] PayloadConversionError),
38}
39
40trait SerializableScheduleInput: Send + Sync {
41    fn to_payloads<'a>(
42        &'a self,
43        dc: &'a DataConverter,
44        context: &'a SerializationContextData,
45    ) -> BoxFuture<'a, Result<Vec<common_proto::Payload>, PayloadConversionError>>;
46}
47
48impl<T> SerializableScheduleInput for T
49where
50    T: TemporalSerializable + Send + Sync + 'static,
51{
52    fn to_payloads<'a>(
53        &'a self,
54        dc: &'a DataConverter,
55        context: &'a SerializationContextData,
56    ) -> BoxFuture<'a, Result<Vec<common_proto::Payload>, PayloadConversionError>> {
57        dc.to_payloads(context, self).boxed()
58    }
59}
60
61/// Workflow input for a schedule action, stored unencoded until the schedule is created.
62#[derive(derive_more::Debug, Clone)]
63pub struct ScheduleWorkflowInput {
64    repr: ScheduleWorkflowInputRepr,
65}
66
67#[derive(derive_more::Debug, Clone)]
68enum ScheduleWorkflowInputRepr {
69    #[debug("Deferred(...)")]
70    Deferred(#[debug(skip)] Arc<dyn SerializableScheduleInput>),
71}
72
73impl ScheduleWorkflowInput {
74    fn new_deferred<T>(val: T) -> Self
75    where
76        T: SerializableScheduleInput + 'static,
77    {
78        Self {
79            repr: ScheduleWorkflowInputRepr::Deferred(Arc::new(val)),
80        }
81    }
82
83    pub(crate) async fn into_payloads(
84        self,
85        dc: &DataConverter,
86    ) -> Result<Vec<common_proto::Payload>, PayloadConversionError> {
87        let ScheduleWorkflowInputRepr::Deferred(v) = self.repr;
88        v.to_payloads(dc, &SerializationContextData::Workflow).await
89    }
90}
91
92/// Options for creating a schedule.
93#[derive(Debug, Clone, bon::Builder)]
94#[builder(on(String, into))]
95#[non_exhaustive]
96pub struct CreateScheduleOptions {
97    /// The action the schedule should perform on each trigger.
98    pub action: ScheduleAction,
99    /// Defines when the schedule should trigger.
100    pub spec: ScheduleSpec,
101    /// Whether to trigger the schedule immediately upon creation.
102    #[builder(default)]
103    pub trigger_immediately: bool,
104    /// Overlap policy for the schedule. Also used for the initial trigger when
105    /// `trigger_immediately` is true.
106    #[builder(default)]
107    pub overlap_policy: ScheduleOverlapPolicy,
108    /// Whether the schedule starts in a paused state.
109    #[builder(default)]
110    pub paused: bool,
111    /// A note to attach to the schedule state (e.g., reason for pausing).
112    #[builder(default)]
113    pub note: String,
114}
115
116/// The action a schedule should perform on each trigger.
117// TODO: The proto supports other action types beyond StartWorkflow.
118#[derive(derive_more::Debug, Clone)]
119#[non_exhaustive]
120pub enum ScheduleAction {
121    /// Start a workflow execution.
122    StartWorkflow {
123        /// The workflow type name.
124        workflow_type: String,
125        /// The task queue to run the workflow on.
126        task_queue: String,
127        /// The workflow ID prefix. The server may append a timestamp.
128        workflow_id: String,
129        /// Workflow input to pass on each execution. `None` means no input.
130        input: Option<ScheduleWorkflowInput>,
131    },
132}
133
134impl ScheduleAction {
135    /// Create a start-workflow action. Input is encoded when the schedule is created.
136    pub fn start_workflow<W>(
137        workflow: W,
138        input: W::Input,
139        task_queue: impl Into<String>,
140        workflow_id: impl Into<String>,
141    ) -> Self
142    where
143        W: HasWorkflowDefinition,
144        W::Input: TemporalSerializable + Send + Sync + 'static,
145    {
146        Self::StartWorkflow {
147            workflow_type: workflow.name().to_string(),
148            task_queue: task_queue.into(),
149            workflow_id: workflow_id.into(),
150            input: Some(ScheduleWorkflowInput::new_deferred(input)),
151        }
152    }
153
154    pub(crate) async fn into_proto(
155        self,
156        dc: &DataConverter,
157    ) -> Result<schedule_proto::ScheduleAction, PayloadConversionError> {
158        match self {
159            Self::StartWorkflow {
160                workflow_type,
161                task_queue,
162                workflow_id,
163                input,
164            } => {
165                let input = if let Some(wi) = input {
166                    wi.into_payloads(dc).await?.into_payloads()
167                } else {
168                    None
169                };
170                Ok(schedule_proto::ScheduleAction {
171                    action: Some(schedule_proto::schedule_action::Action::StartWorkflow(
172                        workflow_proto::NewWorkflowExecutionInfo {
173                            workflow_id,
174                            workflow_type: Some(common_proto::WorkflowType {
175                                name: workflow_type,
176                            }),
177                            task_queue: Some(taskqueue_proto::TaskQueue {
178                                name: task_queue,
179                                ..Default::default()
180                            }),
181                            input,
182                            ..Default::default()
183                        },
184                    )),
185                })
186            }
187        }
188    }
189}
190
191/// Defines when a schedule should trigger.
192///
193/// Note: `set_spec` on [`ScheduleUpdate`] replaces the entire spec. Fields not
194/// set here will use their proto defaults on the server.
195#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
196#[builder(on(String, into))]
197pub struct ScheduleSpec {
198    /// Interval-based triggers (e.g., every 1 hour).
199    #[builder(default)]
200    pub intervals: Vec<ScheduleIntervalSpec>,
201    /// Calendar-based triggers using range strings.
202    #[builder(default)]
203    pub calendars: Vec<ScheduleCalendarSpec>,
204    /// Calendar-based exclusions. Matching times are skipped.
205    #[builder(default)]
206    pub exclude_calendars: Vec<ScheduleCalendarSpec>,
207    /// Cron expression triggers (e.g., `"0 12 * * MON-FRI"`).
208    #[builder(default)]
209    pub cron_strings: Vec<String>,
210    /// IANA timezone name (e.g., `"US/Eastern"`). Empty uses UTC.
211    #[builder(default)]
212    pub timezone_name: String,
213    /// Earliest time the schedule is active.
214    pub start_time: Option<SystemTime>,
215    /// Latest time the schedule is active.
216    pub end_time: Option<SystemTime>,
217    /// Random jitter applied to each action time.
218    pub jitter: Option<Duration>,
219}
220
221impl ScheduleSpec {
222    /// Create a spec that triggers on a single interval.
223    pub fn from_interval(every: Duration) -> Self {
224        Self {
225            intervals: vec![every.into()],
226            ..Default::default()
227        }
228    }
229
230    /// Create a spec that triggers on a single calendar schedule.
231    pub fn from_calendar(calendar: ScheduleCalendarSpec) -> Self {
232        Self {
233            calendars: vec![calendar],
234            ..Default::default()
235        }
236    }
237
238    pub(crate) fn into_proto(self) -> schedule_proto::ScheduleSpec {
239        #[allow(deprecated)]
240        schedule_proto::ScheduleSpec {
241            interval: self.intervals.into_iter().map(Into::into).collect(),
242            calendar: self.calendars.into_iter().map(Into::into).collect(),
243            exclude_calendar: self.exclude_calendars.into_iter().map(Into::into).collect(),
244            cron_string: self.cron_strings,
245            timezone_name: self.timezone_name,
246            start_time: self.start_time.map(Into::into),
247            end_time: self.end_time.map(Into::into),
248            jitter: self.jitter.and_then(|d| d.try_into().ok()),
249            ..Default::default()
250        }
251    }
252}
253
254/// An interval-based schedule trigger.
255#[derive(Debug, Clone, PartialEq)]
256#[non_exhaustive]
257pub struct ScheduleIntervalSpec {
258    /// How often the action should repeat.
259    pub every: Duration,
260    /// Fixed offset added to each interval.
261    pub offset: Option<Duration>,
262}
263
264impl ScheduleIntervalSpec {
265    /// Create an interval with an optional offset.
266    pub fn new(every: Duration, offset: Option<Duration>) -> Self {
267        Self { every, offset }
268    }
269}
270
271impl From<Duration> for ScheduleIntervalSpec {
272    fn from(every: Duration) -> Self {
273        Self {
274            every,
275            offset: None,
276        }
277    }
278}
279
280impl From<ScheduleIntervalSpec> for schedule_proto::IntervalSpec {
281    fn from(s: ScheduleIntervalSpec) -> Self {
282        Self {
283            interval: Some(s.every.try_into().unwrap_or_default()),
284            phase: s.offset.and_then(|d| d.try_into().ok()),
285        }
286    }
287}
288
289/// A calendar-based schedule trigger using range strings (e.g., `"2-7"` for hours 2 through 7).
290///
291/// Empty strings use server defaults (typically `"*"` for most fields, `"0"` for seconds/minutes).
292#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
293#[builder(on(String, into))]
294#[non_exhaustive]
295pub struct ScheduleCalendarSpec {
296    /// Second within the minute. Default: `"0"`.
297    #[builder(default)]
298    pub second: String,
299    /// Minute within the hour. Default: `"0"`.
300    #[builder(default)]
301    pub minute: String,
302    /// Hour of the day. Default: `"0"`.
303    #[builder(default)]
304    pub hour: String,
305    /// Day of the month. Default: `"*"`.
306    #[builder(default)]
307    pub day_of_month: String,
308    /// Month of the year. Default: `"*"`.
309    #[builder(default)]
310    pub month: String,
311    /// Day of the week. Default: `"*"`.
312    #[builder(default)]
313    pub day_of_week: String,
314    /// Year. Default: `"*"`.
315    #[builder(default)]
316    pub year: String,
317    /// Free-form comment.
318    #[builder(default)]
319    pub comment: String,
320}
321
322impl From<ScheduleCalendarSpec> for schedule_proto::CalendarSpec {
323    fn from(s: ScheduleCalendarSpec) -> Self {
324        Self {
325            second: s.second,
326            minute: s.minute,
327            hour: s.hour,
328            day_of_month: s.day_of_month,
329            month: s.month,
330            day_of_week: s.day_of_week,
331            year: s.year,
332            comment: s.comment,
333        }
334    }
335}
336
337/// Options for listing schedules.
338#[derive(Debug, Clone, Default, bon::Builder)]
339#[non_exhaustive]
340pub struct ListSchedulesOptions {
341    /// Maximum number of results per page (server-side hint).
342    #[builder(default)]
343    pub maximum_page_size: i32,
344    /// Query filter string.
345    #[builder(default)]
346    pub query: String,
347}
348
349/// A stream of schedule summaries from a list operation.
350/// Internally paginates through results from the server.
351pub struct ListSchedulesStream {
352    inner: Pin<Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>>,
353}
354
355impl ListSchedulesStream {
356    pub(crate) fn new(
357        inner: Pin<
358            Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>,
359        >,
360    ) -> Self {
361        Self { inner }
362    }
363}
364
365impl futures_util::Stream for ListSchedulesStream {
366    type Item = Result<ScheduleSummary, ScheduleError>;
367
368    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
369        self.inner.as_mut().poll_next(cx)
370    }
371}
372
373/// A recent action taken by a schedule.
374#[derive(Debug, Clone, PartialEq)]
375#[non_exhaustive]
376pub struct ScheduleRecentAction {
377    /// When this action was scheduled to occur (including jitter).
378    pub schedule_time: Option<SystemTime>,
379    /// When this action actually occurred.
380    pub actual_time: Option<SystemTime>,
381    /// Workflow ID of the started workflow.
382    pub workflow_id: String,
383    /// Run ID of the started workflow.
384    pub run_id: String,
385}
386
387/// A currently-running workflow started by a schedule.
388#[derive(Debug, Clone, PartialEq)]
389#[non_exhaustive]
390pub struct ScheduleRunningAction {
391    /// Workflow ID of the running workflow.
392    pub workflow_id: String,
393    /// Run ID of the running workflow.
394    pub run_id: String,
395}
396
397impl From<&schedule_proto::ScheduleActionResult> for ScheduleRecentAction {
398    fn from(a: &schedule_proto::ScheduleActionResult) -> Self {
399        let workflow_result = a
400            .start_workflow_result
401            .as_ref()
402            .expect("unsupported schedule action: start_workflow_result should be present");
403        ScheduleRecentAction {
404            schedule_time: a.schedule_time.as_ref().and_then(proto_ts_to_system_time),
405            actual_time: a.actual_time.as_ref().and_then(proto_ts_to_system_time),
406            workflow_id: workflow_result.workflow_id.clone(),
407            run_id: workflow_result.run_id.clone(),
408        }
409    }
410}
411
412/// Description of a schedule returned by describe().
413///
414/// Provides ergonomic accessors over the raw `DescribeScheduleResponse` proto.
415/// Use [`raw()`](Self::raw) or [`into_raw()`](Self::into_raw) to access the
416/// full proto when needed.
417#[derive(Debug, Clone)]
418pub struct ScheduleDescription {
419    raw: DescribeScheduleResponse,
420}
421
422impl ScheduleDescription {
423    /// Token used for optimistic concurrency on updates.
424    pub fn conflict_token(&self) -> &[u8] {
425        &self.raw.conflict_token
426    }
427
428    /// Whether the schedule is paused.
429    pub fn paused(&self) -> bool {
430        self.raw
431            .schedule
432            .as_ref()
433            .and_then(|s| s.state.as_ref())
434            .is_some_and(|st| st.paused)
435    }
436
437    /// Note on the schedule state (e.g., reason for pause).
438    /// Returns `None` if no note is set or the note is empty.
439    pub fn note(&self) -> Option<&str> {
440        self.raw
441            .schedule
442            .as_ref()
443            .and_then(|s| s.state.as_ref())
444            .map(|st| st.notes.as_str())
445            .filter(|s| !s.is_empty())
446    }
447
448    /// Total number of actions taken by this schedule.
449    pub fn action_count(&self) -> i64 {
450        self.info().map_or(0, |i| i.action_count)
451    }
452
453    /// Number of times a scheduled action was skipped due to missing the catchup window.
454    pub fn missed_catchup_window(&self) -> i64 {
455        self.info().map_or(0, |i| i.missed_catchup_window)
456    }
457
458    /// Number of skipped actions due to overlap.
459    pub fn overlap_skipped(&self) -> i64 {
460        self.info().map_or(0, |i| i.overlap_skipped)
461    }
462
463    /// Most recent action results (up to 10).
464    pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
465        self.info()
466            .map(|i| {
467                i.recent_actions
468                    .iter()
469                    .map(ScheduleRecentAction::from)
470                    .collect()
471            })
472            .unwrap_or_default()
473    }
474
475    /// Currently-running workflows started by this schedule.
476    pub fn running_actions(&self) -> Vec<ScheduleRunningAction> {
477        self.info()
478            .map(|i| {
479                i.running_workflows
480                    .iter()
481                    .map(|w| ScheduleRunningAction {
482                        workflow_id: w.workflow_id.clone(),
483                        run_id: w.run_id.clone(),
484                    })
485                    .collect()
486            })
487            .unwrap_or_default()
488    }
489
490    /// Next scheduled action times.
491    pub fn future_action_times(&self) -> Vec<SystemTime> {
492        self.info()
493            .map(|i| {
494                i.future_action_times
495                    .iter()
496                    .filter_map(proto_ts_to_system_time)
497                    .collect()
498            })
499            .unwrap_or_default()
500    }
501
502    /// When the schedule was created.
503    pub fn create_time(&self) -> Option<SystemTime> {
504        self.info()
505            .and_then(|i| i.create_time.as_ref())
506            .and_then(proto_ts_to_system_time)
507    }
508
509    /// When the schedule was last updated.
510    pub fn update_time(&self) -> Option<SystemTime> {
511        self.info()
512            .and_then(|i| i.update_time.as_ref())
513            .and_then(proto_ts_to_system_time)
514    }
515
516    /// Memo attached to the schedule.
517    pub fn memo(&self) -> Option<&common_proto::Memo> {
518        self.raw.memo.as_ref()
519    }
520
521    /// Search attributes on the schedule.
522    pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
523        self.raw.search_attributes.as_ref()
524    }
525
526    /// Access the raw proto for additional fields not exposed via accessors.
527    pub fn raw(&self) -> &DescribeScheduleResponse {
528        &self.raw
529    }
530
531    /// Consume the wrapper and return the raw proto.
532    pub fn into_raw(self) -> DescribeScheduleResponse {
533        self.raw
534    }
535
536    fn info(&self) -> Option<&schedule_proto::ScheduleInfo> {
537        self.raw.info.as_ref()
538    }
539
540    /// Convert this description into a [`ScheduleUpdate`] for use with
541    /// [`ScheduleHandle::send_update()`].
542    ///
543    /// Extracts the schedule definition from the description.
544    pub fn into_update(self) -> ScheduleUpdate {
545        ScheduleUpdate {
546            schedule: self.raw.schedule.unwrap_or_default(),
547            pending_action: None,
548        }
549    }
550}
551
552impl From<DescribeScheduleResponse> for ScheduleDescription {
553    fn from(raw: DescribeScheduleResponse) -> Self {
554        Self { raw }
555    }
556}
557
558/// Controls what happens when a scheduled workflow would overlap with a running one.
559#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
560pub enum ScheduleOverlapPolicy {
561    /// Use the server default (currently Skip).
562    #[default]
563    Unspecified,
564    /// Don't start a new workflow if one is already running.
565    Skip,
566    /// Buffer one workflow start, to run after the current one completes.
567    BufferOne,
568    /// Buffer all workflow starts, to run sequentially.
569    BufferAll,
570    /// Cancel the running workflow and start a new one.
571    CancelOther,
572    /// Terminate the running workflow and start a new one.
573    TerminateOther,
574    /// Start any number of concurrent workflows.
575    AllowAll,
576}
577
578impl ScheduleOverlapPolicy {
579    pub(crate) fn to_proto(self) -> i32 {
580        match self {
581            Self::Unspecified => 0,
582            Self::Skip => 1,
583            Self::BufferOne => 2,
584            Self::BufferAll => 3,
585            Self::CancelOther => 4,
586            Self::TerminateOther => 5,
587            Self::AllowAll => 6,
588        }
589    }
590}
591
592/// A backfill request for a schedule, specifying a time range of missed runs.
593#[derive(Debug, Clone, PartialEq, bon::Builder)]
594#[non_exhaustive]
595#[builder(start_fn = new)]
596pub struct ScheduleBackfill {
597    /// Start of the time range to backfill.
598    #[builder(start_fn)]
599    pub start_time: SystemTime,
600    /// End of the time range to backfill.
601    #[builder(start_fn)]
602    pub end_time: SystemTime,
603    /// How overlapping runs are handled during backfill.
604    #[builder(default)]
605    pub overlap_policy: ScheduleOverlapPolicy,
606}
607
608/// An update to apply to a schedule definition.
609///
610/// Obtain from [`ScheduleDescription::into_update()`], modify the schedule
611/// using the setter methods, then pass to [`ScheduleHandle::update()`].
612#[derive(Debug, Clone)]
613pub struct ScheduleUpdate {
614    schedule: schedule_proto::Schedule,
615    pending_action: Option<ScheduleAction>,
616}
617
618impl ScheduleUpdate {
619    /// Replace the schedule spec (when to trigger).
620    pub fn set_spec(&mut self, spec: ScheduleSpec) -> &mut Self {
621        self.schedule.spec = Some(spec.into_proto());
622        self
623    }
624
625    /// Replace the schedule action (what to do on trigger).
626    pub fn set_action(&mut self, action: ScheduleAction) -> &mut Self {
627        self.pending_action = Some(action);
628        self
629    }
630
631    /// Set whether the schedule is paused.
632    pub fn set_paused(&mut self, paused: bool) -> &mut Self {
633        self.state_mut().paused = paused;
634        self
635    }
636
637    /// Set the note on the schedule state.
638    pub fn set_note(&mut self, note: impl Into<String>) -> &mut Self {
639        self.state_mut().notes = note.into();
640        self
641    }
642
643    /// Set the overlap policy.
644    pub fn set_overlap_policy(&mut self, policy: ScheduleOverlapPolicy) -> &mut Self {
645        self.policies_mut().overlap_policy = policy.to_proto();
646        self
647    }
648
649    /// Set the catchup window. Actions missed by more than this duration are
650    /// skipped.
651    pub fn set_catchup_window(&mut self, window: Duration) -> &mut Self {
652        self.policies_mut().catchup_window = window.try_into().ok();
653        self
654    }
655
656    /// Set whether to pause the schedule when a workflow run fails or times out.
657    pub fn set_pause_on_failure(&mut self, pause_on_failure: bool) -> &mut Self {
658        self.policies_mut().pause_on_failure = pause_on_failure;
659        self
660    }
661
662    /// Set whether to keep the original workflow ID without appending a
663    /// timestamp.
664    pub fn set_keep_original_workflow_id(&mut self, keep: bool) -> &mut Self {
665        self.policies_mut().keep_original_workflow_id = keep;
666        self
667    }
668
669    /// Limit the schedule to a fixed number of remaining actions, after which
670    /// it stops triggering. Passing `None` removes the limit.
671    pub fn set_remaining_actions(&mut self, count: Option<i64>) -> &mut Self {
672        let state = self.state_mut();
673        match count {
674            Some(n) => {
675                state.limited_actions = true;
676                state.remaining_actions = n;
677            }
678            None => {
679                state.limited_actions = false;
680                state.remaining_actions = 0;
681            }
682        }
683        self
684    }
685
686    /// Access the raw schedule proto.
687    pub fn raw(&self) -> &schedule_proto::Schedule {
688        &self.schedule
689    }
690
691    /// Consume and return the raw schedule proto.
692    pub fn into_raw(self) -> schedule_proto::Schedule {
693        self.schedule
694    }
695
696    fn state_mut(&mut self) -> &mut schedule_proto::ScheduleState {
697        self.schedule.state.get_or_insert_with(Default::default)
698    }
699
700    fn policies_mut(&mut self) -> &mut schedule_proto::SchedulePolicies {
701        self.schedule.policies.get_or_insert_with(Default::default)
702    }
703}
704
705/// Summary of a schedule returned in list operations.
706///
707/// Provides ergonomic accessors over the raw `ScheduleListEntry` proto.
708/// Use [`raw()`](Self::raw) or [`into_raw()`](Self::into_raw) to access the
709/// full proto when needed.
710#[derive(Debug, Clone)]
711pub struct ScheduleSummary {
712    raw: schedule_proto::ScheduleListEntry,
713}
714
715impl ScheduleSummary {
716    /// The schedule ID.
717    pub fn schedule_id(&self) -> &str {
718        &self.raw.schedule_id
719    }
720
721    /// The workflow type name for start-workflow actions.
722    pub fn workflow_type(&self) -> Option<&str> {
723        self.info()
724            .and_then(|i| i.workflow_type.as_ref())
725            .map(|wt| wt.name.as_str())
726    }
727
728    /// Note on the schedule state.
729    /// Returns `None` if no note is set or the note is empty.
730    pub fn note(&self) -> Option<&str> {
731        self.info()
732            .map(|i| i.notes.as_str())
733            .filter(|s| !s.is_empty())
734    }
735
736    /// Whether the schedule is paused.
737    pub fn paused(&self) -> bool {
738        self.info().is_some_and(|i| i.paused)
739    }
740
741    /// Most recent action results (up to 10).
742    pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
743        self.info()
744            .map(|i| {
745                i.recent_actions
746                    .iter()
747                    .map(ScheduleRecentAction::from)
748                    .collect()
749            })
750            .unwrap_or_default()
751    }
752
753    /// Next scheduled action times.
754    pub fn future_action_times(&self) -> Vec<SystemTime> {
755        self.info()
756            .map(|i| {
757                i.future_action_times
758                    .iter()
759                    .filter_map(proto_ts_to_system_time)
760                    .collect()
761            })
762            .unwrap_or_default()
763    }
764
765    /// Memo attached to the schedule.
766    pub fn memo(&self) -> Option<&common_proto::Memo> {
767        self.raw.memo.as_ref()
768    }
769
770    /// Search attributes on the schedule.
771    pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
772        self.raw.search_attributes.as_ref()
773    }
774
775    /// Access the raw proto for additional fields not exposed via accessors.
776    pub fn raw(&self) -> &schedule_proto::ScheduleListEntry {
777        &self.raw
778    }
779
780    /// Consume the wrapper and return the raw proto.
781    pub fn into_raw(self) -> schedule_proto::ScheduleListEntry {
782        self.raw
783    }
784
785    fn info(&self) -> Option<&schedule_proto::ScheduleListInfo> {
786        self.raw.info.as_ref()
787    }
788}
789
790impl From<schedule_proto::ScheduleListEntry> for ScheduleSummary {
791    fn from(raw: schedule_proto::ScheduleListEntry) -> Self {
792        Self { raw }
793    }
794}
795
796/// Handle to an existing schedule. Obtained from
797/// [`Client::create_schedule`](crate::Client::create_schedule) or
798/// [`Client::get_schedule_handle`](crate::Client::get_schedule_handle).
799#[derive(Clone, derive_more::Debug)]
800pub struct ScheduleHandle<CT> {
801    #[debug(skip)]
802    client: CT,
803    namespace: String,
804    schedule_id: String,
805}
806
807impl<CT> ScheduleHandle<CT>
808where
809    CT: WorkflowService + NamespacedClient + Clone + Send + Sync,
810{
811    pub(crate) fn new(client: CT, namespace: String, schedule_id: String) -> Self {
812        Self {
813            client,
814            namespace,
815            schedule_id,
816        }
817    }
818
819    /// The namespace the schedule belongs to.
820    pub fn namespace(&self) -> &str {
821        &self.namespace
822    }
823
824    /// The schedule ID.
825    pub fn schedule_id(&self) -> &str {
826        &self.schedule_id
827    }
828
829    /// Describe this schedule, returning its full definition, info, and conflict token.
830    pub async fn describe(&self) -> Result<ScheduleDescription, ScheduleError> {
831        let resp = WorkflowService::describe_schedule(
832            &mut self.client.clone(),
833            DescribeScheduleRequest {
834                namespace: self.namespace.clone(),
835                schedule_id: self.schedule_id.clone(),
836            }
837            .into_request(),
838        )
839        .await?
840        .into_inner();
841
842        Ok(ScheduleDescription::from(resp))
843    }
844
845    /// Update the schedule definition.
846    ///
847    /// Describes the current schedule, applies the closure to modify it, and
848    /// sends the update. The conflict token is managed automatically.
849    ///
850    /// ```no_run
851    /// # async fn hidden(
852    /// #     handle: &temporalio_client::schedules::ScheduleHandle<temporalio_client::Client>,
853    /// # ) -> Result<(), temporalio_client::schedules::ScheduleError> {
854    /// handle
855    ///     .update(|u| {
856    ///         u.set_note("updated").set_paused(true);
857    ///     })
858    ///     .await?;
859    /// # Ok(())
860    /// # }
861    /// ```
862    // TODO: Add a retry loop for conflict token mismatch. The server
863    // returns FailedPrecondition with "mismatched conflict token".
864    pub async fn update(
865        &self,
866        updater: impl FnOnce(&mut ScheduleUpdate),
867    ) -> Result<(), ScheduleError> {
868        let desc = self.describe().await?;
869        let mut update = desc.into_update();
870        updater(&mut update);
871        self.send_update(update).await
872    }
873
874    /// Send a pre-built [`ScheduleUpdate`] to the server.
875    ///
876    /// Prefer [`update()`](Self::update) for most use cases. Use this when you
877    /// need to inspect the [`ScheduleDescription`] before deciding what to
878    /// change.
879    pub async fn send_update(&self, mut update: ScheduleUpdate) -> Result<(), ScheduleError> {
880        if let Some(action) = update.pending_action.take() {
881            update.schedule.action = Some(action.into_proto(self.client.data_converter()).await?);
882        }
883        WorkflowService::update_schedule(
884            &mut self.client.clone(),
885            UpdateScheduleRequest {
886                namespace: self.namespace.clone(),
887                schedule_id: self.schedule_id.clone(),
888                schedule: Some(update.schedule),
889                identity: self.client.identity(),
890                request_id: Uuid::new_v4().to_string(),
891                ..Default::default()
892            }
893            .into_request(),
894        )
895        .await?;
896        Ok(())
897    }
898
899    /// Delete this schedule.
900    pub async fn delete(&self) -> Result<(), ScheduleError> {
901        WorkflowService::delete_schedule(
902            &mut self.client.clone(),
903            DeleteScheduleRequest {
904                namespace: self.namespace.clone(),
905                schedule_id: self.schedule_id.clone(),
906                identity: self.client.identity(),
907            }
908            .into_request(),
909        )
910        .await?;
911        Ok(())
912    }
913
914    /// Pause the schedule with an optional note.
915    ///
916    /// If `note` is `None`, a default note is used.
917    pub async fn pause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
918        let note = note.map_or_else(|| "Paused via Rust SDK".to_string(), |s| s.into());
919        WorkflowService::patch_schedule(
920            &mut self.client.clone(),
921            PatchScheduleRequest {
922                namespace: self.namespace.clone(),
923                schedule_id: self.schedule_id.clone(),
924                patch: Some(schedule_proto::SchedulePatch {
925                    pause: note,
926                    ..Default::default()
927                }),
928                identity: self.client.identity(),
929                request_id: Uuid::new_v4().to_string(),
930            }
931            .into_request(),
932        )
933        .await?;
934        Ok(())
935    }
936
937    /// Unpause the schedule with an optional note.
938    ///
939    /// If `note` is `None`, a default note is used.
940    pub async fn unpause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
941        let note = note.map_or_else(|| "Unpaused via Rust SDK".to_string(), |s| s.into());
942        WorkflowService::patch_schedule(
943            &mut self.client.clone(),
944            PatchScheduleRequest {
945                namespace: self.namespace.clone(),
946                schedule_id: self.schedule_id.clone(),
947                patch: Some(schedule_proto::SchedulePatch {
948                    unpause: note,
949                    ..Default::default()
950                }),
951                identity: self.client.identity(),
952                request_id: Uuid::new_v4().to_string(),
953            }
954            .into_request(),
955        )
956        .await?;
957        Ok(())
958    }
959
960    /// Trigger the schedule to run immediately with the given overlap policy.
961    pub async fn trigger(
962        &self,
963        overlap_policy: ScheduleOverlapPolicy,
964    ) -> Result<(), ScheduleError> {
965        WorkflowService::patch_schedule(
966            &mut self.client.clone(),
967            PatchScheduleRequest {
968                namespace: self.namespace.clone(),
969                schedule_id: self.schedule_id.clone(),
970                patch: Some(schedule_proto::SchedulePatch {
971                    trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
972                        overlap_policy: overlap_policy.to_proto(),
973                        scheduled_time: None,
974                    }),
975                    ..Default::default()
976                }),
977                identity: self.client.identity(),
978                request_id: Uuid::new_v4().to_string(),
979            }
980            .into_request(),
981        )
982        .await?;
983        Ok(())
984    }
985
986    /// Request backfill of missed runs.
987    pub async fn backfill(
988        &self,
989        backfills: impl IntoIterator<Item = ScheduleBackfill>,
990    ) -> Result<(), ScheduleError> {
991        let backfill_requests: Vec<schedule_proto::BackfillRequest> = backfills
992            .into_iter()
993            .map(|b| schedule_proto::BackfillRequest {
994                start_time: Some(b.start_time.into()),
995                end_time: Some(b.end_time.into()),
996                overlap_policy: b.overlap_policy.to_proto(),
997            })
998            .collect();
999        WorkflowService::patch_schedule(
1000            &mut self.client.clone(),
1001            PatchScheduleRequest {
1002                namespace: self.namespace.clone(),
1003                schedule_id: self.schedule_id.clone(),
1004                patch: Some(schedule_proto::SchedulePatch {
1005                    backfill_request: backfill_requests,
1006                    ..Default::default()
1007                }),
1008                identity: self.client.identity(),
1009                request_id: Uuid::new_v4().to_string(),
1010            }
1011            .into_request(),
1012        )
1013        .await?;
1014        Ok(())
1015    }
1016}
1017
1018// Schedule operations on Client.
1019impl Client {
1020    /// Create a schedule and return a handle to it.
1021    pub async fn create_schedule(
1022        &self,
1023        schedule_id: impl Into<String>,
1024        opts: CreateScheduleOptions,
1025    ) -> Result<ScheduleHandle<Self>, ScheduleError> {
1026        let schedule_id = schedule_id.into();
1027        let namespace = self.namespace();
1028
1029        let initial_patch = if opts.trigger_immediately {
1030            Some(schedule_proto::SchedulePatch {
1031                trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
1032                    // Always use AllowAll for the initial trigger so the
1033                    // schedule fires immediately regardless of overlap state.
1034                    overlap_policy: ScheduleOverlapPolicy::AllowAll.to_proto(),
1035                    scheduled_time: None,
1036                }),
1037                ..Default::default()
1038            })
1039        } else {
1040            None
1041        };
1042        // Only send explicit policies when the user set a non-default overlap
1043        // policy, so the server uses its own defaults otherwise.
1044        let policies = (opts.overlap_policy != ScheduleOverlapPolicy::Unspecified).then(|| {
1045            schedule_proto::SchedulePolicies {
1046                overlap_policy: opts.overlap_policy.to_proto(),
1047                ..Default::default()
1048            }
1049        });
1050        let schedule = schedule_proto::Schedule {
1051            spec: Some(opts.spec.into_proto()),
1052            action: Some(opts.action.into_proto(self.data_converter()).await?),
1053            policies,
1054            state: Some(schedule_proto::ScheduleState {
1055                paused: opts.paused,
1056                notes: opts.note,
1057                ..Default::default()
1058            }),
1059        };
1060        WorkflowService::create_schedule(
1061            &mut self.clone(),
1062            CreateScheduleRequest {
1063                namespace: namespace.clone(),
1064                schedule_id: schedule_id.clone(),
1065                schedule: Some(schedule),
1066                initial_patch,
1067                identity: self.identity(),
1068                request_id: Uuid::new_v4().to_string(),
1069                ..Default::default()
1070            }
1071            .into_request(),
1072        )
1073        .await?;
1074        Ok(ScheduleHandle::new(self.clone(), namespace, schedule_id))
1075    }
1076
1077    /// Get a handle to an existing schedule by ID.
1078    pub fn get_schedule_handle(&self, schedule_id: impl Into<String>) -> ScheduleHandle<Self> {
1079        ScheduleHandle::new(self.clone(), self.namespace(), schedule_id.into())
1080    }
1081
1082    /// List schedules matching the query, returning a stream that lazily
1083    /// paginates through results.
1084    pub fn list_schedules(&self, opts: ListSchedulesOptions) -> ListSchedulesStream {
1085        let client = self.clone();
1086        let namespace = self.namespace();
1087        let query = opts.query;
1088        let page_size = opts.maximum_page_size;
1089
1090        let stream = stream::unfold(
1091            (Vec::new(), VecDeque::new(), false),
1092            move |(next_page_token, mut buffer, exhausted)| {
1093                let mut client = client.clone();
1094                let namespace = namespace.clone();
1095                let query = query.clone();
1096
1097                async move {
1098                    if let Some(item) = buffer.pop_front() {
1099                        return Some((Ok(item), (next_page_token, buffer, exhausted)));
1100                    } else if exhausted {
1101                        return None;
1102                    }
1103
1104                    let response = WorkflowService::list_schedules(
1105                        &mut client,
1106                        ListSchedulesRequest {
1107                            namespace,
1108                            maximum_page_size: page_size,
1109                            next_page_token: next_page_token.clone(),
1110                            query,
1111                        }
1112                        .into_request(),
1113                    )
1114                    .await;
1115
1116                    match response {
1117                        Ok(resp) => {
1118                            let resp = resp.into_inner();
1119                            let new_exhausted = resp.next_page_token.is_empty();
1120                            let new_token = resp.next_page_token;
1121
1122                            buffer = resp
1123                                .schedules
1124                                .into_iter()
1125                                .map(ScheduleSummary::from)
1126                                .collect();
1127
1128                            buffer
1129                                .pop_front()
1130                                .map(|item| (Ok(item), (new_token, buffer, new_exhausted)))
1131                        }
1132                        Err(e) => Some((Err(e.into()), (next_page_token, buffer, true))),
1133                    }
1134                }
1135            },
1136        );
1137
1138        ListSchedulesStream::new(Box::pin(stream))
1139    }
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144    use super::*;
1145    use crate::{NamespacedClient, grpc::WorkflowService};
1146    use futures_util::FutureExt;
1147    use std::{
1148        sync::{
1149            Arc,
1150            atomic::{AtomicUsize, Ordering},
1151        },
1152        time::SystemTime,
1153    };
1154    use temporalio_common::protos::temporal::api::{
1155        common::v1::{
1156            Memo, SearchAttributes, WorkflowExecution as ProtoWorkflowExecution, WorkflowType,
1157        },
1158        schedule::v1::{
1159            Schedule, ScheduleActionResult, ScheduleInfo, ScheduleListEntry, ScheduleListInfo,
1160            ScheduleSpec, ScheduleState,
1161        },
1162        workflowservice::v1::{
1163            DeleteScheduleResponse, DescribeScheduleResponse, PatchScheduleResponse,
1164            UpdateScheduleResponse,
1165        },
1166    };
1167    use tonic::{Request, Response};
1168
1169    #[derive(Default)]
1170    struct CapturedRequests {
1171        describe: AtomicUsize,
1172        update: AtomicUsize,
1173        delete: AtomicUsize,
1174        patch: AtomicUsize,
1175    }
1176
1177    #[derive(Clone, Default)]
1178    struct MockScheduleClient {
1179        captured: Arc<CapturedRequests>,
1180        describe_response: DescribeScheduleResponse,
1181        should_error: bool,
1182    }
1183
1184    impl NamespacedClient for MockScheduleClient {
1185        fn namespace(&self) -> String {
1186            "test-namespace".to_string()
1187        }
1188        fn identity(&self) -> String {
1189            "test-identity".to_string()
1190        }
1191    }
1192
1193    impl WorkflowService for MockScheduleClient {
1194        fn describe_schedule(
1195            &mut self,
1196            _request: Request<DescribeScheduleRequest>,
1197        ) -> futures_util::future::BoxFuture<
1198            '_,
1199            Result<Response<DescribeScheduleResponse>, tonic::Status>,
1200        > {
1201            self.captured.describe.fetch_add(1, Ordering::SeqCst);
1202            let resp = self.describe_response.clone();
1203            let should_error = self.should_error;
1204            async move {
1205                if should_error {
1206                    Err(tonic::Status::not_found("schedule not found"))
1207                } else {
1208                    Ok(Response::new(resp))
1209                }
1210            }
1211            .boxed()
1212        }
1213
1214        fn update_schedule(
1215            &mut self,
1216            _request: Request<UpdateScheduleRequest>,
1217        ) -> futures_util::future::BoxFuture<
1218            '_,
1219            Result<Response<UpdateScheduleResponse>, tonic::Status>,
1220        > {
1221            self.captured.update.fetch_add(1, Ordering::SeqCst);
1222            let should_error = self.should_error;
1223            async move {
1224                if should_error {
1225                    Err(tonic::Status::internal("update failed"))
1226                } else {
1227                    Ok(Response::new(UpdateScheduleResponse::default()))
1228                }
1229            }
1230            .boxed()
1231        }
1232
1233        fn delete_schedule(
1234            &mut self,
1235            _request: Request<DeleteScheduleRequest>,
1236        ) -> futures_util::future::BoxFuture<
1237            '_,
1238            Result<Response<DeleteScheduleResponse>, tonic::Status>,
1239        > {
1240            self.captured.delete.fetch_add(1, Ordering::SeqCst);
1241            let should_error = self.should_error;
1242            async move {
1243                if should_error {
1244                    Err(tonic::Status::internal("delete failed"))
1245                } else {
1246                    Ok(Response::new(DeleteScheduleResponse::default()))
1247                }
1248            }
1249            .boxed()
1250        }
1251
1252        fn patch_schedule(
1253            &mut self,
1254            _request: Request<PatchScheduleRequest>,
1255        ) -> futures_util::future::BoxFuture<
1256            '_,
1257            Result<Response<PatchScheduleResponse>, tonic::Status>,
1258        > {
1259            self.captured.patch.fetch_add(1, Ordering::SeqCst);
1260            let should_error = self.should_error;
1261            async move {
1262                if should_error {
1263                    Err(tonic::Status::internal("patch failed"))
1264                } else {
1265                    Ok(Response::new(PatchScheduleResponse::default()))
1266                }
1267            }
1268            .boxed()
1269        }
1270    }
1271
1272    fn make_schedule_handle(client: MockScheduleClient) -> ScheduleHandle<MockScheduleClient> {
1273        ScheduleHandle::new(
1274            client,
1275            "test-namespace".to_string(),
1276            "test-schedule-id".to_string(),
1277        )
1278    }
1279
1280    #[test]
1281    fn schedule_handle_exposes_namespace_and_id() {
1282        let handle = make_schedule_handle(MockScheduleClient::default());
1283        assert_eq!(handle.namespace(), "test-namespace");
1284        assert_eq!(handle.schedule_id(), "test-schedule-id");
1285    }
1286
1287    #[tokio::test]
1288    async fn schedule_describe_returns_response_fields() {
1289        let conflict_token = b"token-123".to_vec();
1290
1291        let client = MockScheduleClient {
1292            describe_response: DescribeScheduleResponse {
1293                schedule: Some(Schedule::default()),
1294                info: Some(ScheduleInfo::default()),
1295                memo: Some(Memo {
1296                    fields: Default::default(),
1297                }),
1298                search_attributes: Some(SearchAttributes {
1299                    indexed_fields: Default::default(),
1300                }),
1301                conflict_token: conflict_token.clone(),
1302            },
1303            ..Default::default()
1304        };
1305
1306        let handle = make_schedule_handle(client.clone());
1307        let desc = handle.describe().await.unwrap();
1308
1309        assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1310        assert!(desc.raw().schedule.is_some());
1311        assert!(desc.raw().info.is_some());
1312        assert!(desc.raw().memo.is_some());
1313        assert!(desc.raw().search_attributes.is_some());
1314        assert_eq!(desc.conflict_token(), conflict_token);
1315    }
1316
1317    #[tokio::test]
1318    async fn schedule_update_describes_then_sends() {
1319        let client = MockScheduleClient::default();
1320        let handle = make_schedule_handle(client.clone());
1321
1322        handle
1323            .update(|u| {
1324                u.set_note("hi");
1325            })
1326            .await
1327            .unwrap();
1328
1329        assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1330        assert_eq!(client.captured.update.load(Ordering::SeqCst), 1);
1331    }
1332
1333    #[tokio::test]
1334    async fn schedule_multiple_updates_each_call_service() {
1335        let client = MockScheduleClient::default();
1336        let handle = make_schedule_handle(client.clone());
1337
1338        handle.update(|_| {}).await.unwrap();
1339        handle.update(|_| {}).await.unwrap();
1340
1341        assert_eq!(client.captured.update.load(Ordering::SeqCst), 2);
1342    }
1343
1344    #[tokio::test]
1345    async fn schedule_delete_calls_service() {
1346        let client = MockScheduleClient::default();
1347        let handle = make_schedule_handle(client.clone());
1348
1349        handle.delete().await.unwrap();
1350
1351        assert_eq!(client.captured.delete.load(Ordering::SeqCst), 1);
1352    }
1353
1354    #[tokio::test]
1355    async fn schedule_pause_calls_patch() {
1356        let client = MockScheduleClient::default();
1357        let handle = make_schedule_handle(client.clone());
1358
1359        handle.pause(Some("taking a break")).await.unwrap();
1360
1361        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1362    }
1363
1364    #[tokio::test]
1365    async fn schedule_pause_with_none_uses_default() {
1366        let client = MockScheduleClient::default();
1367        let handle = make_schedule_handle(client.clone());
1368
1369        handle.pause(None::<&str>).await.unwrap();
1370
1371        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1372    }
1373
1374    #[tokio::test]
1375    async fn schedule_unpause_calls_patch() {
1376        let client = MockScheduleClient::default();
1377        let handle = make_schedule_handle(client.clone());
1378
1379        handle.unpause(Some("resuming work")).await.unwrap();
1380
1381        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1382    }
1383
1384    #[tokio::test]
1385    async fn schedule_trigger_calls_patch() {
1386        let client = MockScheduleClient::default();
1387        let handle = make_schedule_handle(client.clone());
1388
1389        handle
1390            .trigger(ScheduleOverlapPolicy::Unspecified)
1391            .await
1392            .unwrap();
1393
1394        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1395    }
1396
1397    #[tokio::test]
1398    async fn schedule_backfill_calls_patch() {
1399        let client = MockScheduleClient::default();
1400        let handle = make_schedule_handle(client.clone());
1401
1402        let now = SystemTime::now();
1403        handle
1404            .backfill(vec![
1405                ScheduleBackfill::new(now, now)
1406                    .overlap_policy(ScheduleOverlapPolicy::Skip)
1407                    .build(),
1408                ScheduleBackfill::new(now, now)
1409                    .overlap_policy(ScheduleOverlapPolicy::BufferOne)
1410                    .build(),
1411            ])
1412            .await
1413            .unwrap();
1414
1415        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1416    }
1417
1418    #[tokio::test]
1419    async fn schedule_describe_propagates_rpc_errors() {
1420        let client = MockScheduleClient {
1421            should_error: true,
1422            ..Default::default()
1423        };
1424        let handle = make_schedule_handle(client);
1425
1426        let err = handle.describe().await.unwrap_err();
1427        assert!(
1428            matches!(err, ScheduleError::Rpc(_)),
1429            "expected Rpc variant, got: {err:?}"
1430        );
1431        assert!(err.to_string().contains("schedule not found"));
1432    }
1433
1434    #[tokio::test]
1435    async fn schedule_update_propagates_rpc_errors() {
1436        let client = MockScheduleClient {
1437            should_error: true,
1438            ..Default::default()
1439        };
1440        let handle = make_schedule_handle(client);
1441
1442        let err = handle.update(|_| {}).await.unwrap_err();
1443        assert!(matches!(err, ScheduleError::Rpc(_)));
1444    }
1445
1446    #[tokio::test]
1447    async fn schedule_delete_propagates_rpc_errors() {
1448        let client = MockScheduleClient {
1449            should_error: true,
1450            ..Default::default()
1451        };
1452        let handle = make_schedule_handle(client);
1453
1454        let err = handle.delete().await.unwrap_err();
1455        assert!(matches!(err, ScheduleError::Rpc(_)));
1456    }
1457
1458    #[tokio::test]
1459    async fn schedule_patch_operations_propagate_rpc_errors() {
1460        let client = MockScheduleClient {
1461            should_error: true,
1462            ..Default::default()
1463        };
1464        let handle = make_schedule_handle(client);
1465
1466        assert!(handle.pause(Some("")).await.is_err());
1467        assert!(handle.unpause(Some("")).await.is_err());
1468        assert!(handle.trigger(Default::default()).await.is_err());
1469        assert!(handle.backfill(vec![]).await.is_err());
1470    }
1471
1472    #[tokio::test]
1473    async fn schedule_all_patch_operations_call_service() {
1474        let client = MockScheduleClient::default();
1475        let handle = make_schedule_handle(client.clone());
1476
1477        handle.pause(Some("p")).await.unwrap();
1478        handle.unpause(Some("u")).await.unwrap();
1479        handle.trigger(Default::default()).await.unwrap();
1480        handle.backfill(vec![]).await.unwrap();
1481
1482        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 4);
1483    }
1484
1485    #[tokio::test]
1486    async fn schedule_describe_accessors_with_populated_fields() {
1487        let client = MockScheduleClient {
1488            describe_response: DescribeScheduleResponse {
1489                schedule: Some(Schedule {
1490                    spec: Some(ScheduleSpec {
1491                        timezone_name: "US/Eastern".to_string(),
1492                        ..Default::default()
1493                    }),
1494                    state: Some(ScheduleState {
1495                        paused: true,
1496                        notes: "maintenance window".to_string(),
1497                        ..Default::default()
1498                    }),
1499                    ..Default::default()
1500                }),
1501                info: Some(ScheduleInfo {
1502                    action_count: 42,
1503                    missed_catchup_window: 3,
1504                    overlap_skipped: 5,
1505                    recent_actions: vec![ScheduleActionResult {
1506                        start_workflow_result: Some(ProtoWorkflowExecution {
1507                            workflow_id: "ra-wf".to_string(),
1508                            run_id: "ra-run".to_string(),
1509                        }),
1510                        ..Default::default()
1511                    }],
1512                    running_workflows: vec![ProtoWorkflowExecution {
1513                        workflow_id: "wf-1".to_string(),
1514                        run_id: "run-1".to_string(),
1515                    }],
1516                    create_time: Some(prost_types::Timestamp {
1517                        seconds: 1_700_000_000,
1518                        nanos: 0,
1519                    }),
1520                    update_time: Some(prost_types::Timestamp {
1521                        seconds: 1_700_001_000,
1522                        nanos: 0,
1523                    }),
1524                    future_action_times: vec![prost_types::Timestamp {
1525                        seconds: 1_700_002_000,
1526                        nanos: 0,
1527                    }],
1528                    ..Default::default()
1529                }),
1530                conflict_token: b"tok".to_vec(),
1531                ..Default::default()
1532            },
1533            ..Default::default()
1534        };
1535
1536        let handle = make_schedule_handle(client);
1537        let desc = handle.describe().await.unwrap();
1538
1539        assert!(desc.paused());
1540        assert_eq!(desc.note(), Some("maintenance window"));
1541        assert_eq!(desc.action_count(), 42);
1542        assert_eq!(desc.missed_catchup_window(), 3);
1543        assert_eq!(desc.overlap_skipped(), 5);
1544
1545        assert_eq!(desc.recent_actions().len(), 1);
1546        assert_eq!(
1547            desc.running_actions(),
1548            vec![ScheduleRunningAction {
1549                workflow_id: "wf-1".to_string(),
1550                run_id: "run-1".to_string(),
1551            }]
1552        );
1553
1554        assert_eq!(desc.future_action_times().len(), 1);
1555        assert!(desc.create_time().is_some());
1556        assert!(desc.update_time().is_some());
1557    }
1558
1559    #[tokio::test]
1560    async fn schedule_describe_defaults_when_nested_fields_are_none() {
1561        let client = MockScheduleClient::default();
1562
1563        let handle = make_schedule_handle(client);
1564        let desc = handle.describe().await.unwrap();
1565
1566        assert!(!desc.paused());
1567        assert_eq!(desc.note(), None);
1568        assert_eq!(desc.action_count(), 0);
1569        assert_eq!(desc.missed_catchup_window(), 0);
1570        assert_eq!(desc.overlap_skipped(), 0);
1571        assert!(desc.recent_actions().is_empty());
1572        assert!(desc.running_actions().is_empty());
1573        assert!(desc.future_action_times().is_empty());
1574        assert!(desc.create_time().is_none());
1575        assert!(desc.update_time().is_none());
1576        assert!(desc.conflict_token().is_empty());
1577    }
1578
1579    #[tokio::test]
1580    async fn schedule_note_returns_none_for_empty_string() {
1581        let client = MockScheduleClient {
1582            describe_response: DescribeScheduleResponse {
1583                schedule: Some(Schedule {
1584                    state: Some(ScheduleState {
1585                        notes: String::new(),
1586                        ..Default::default()
1587                    }),
1588                    ..Default::default()
1589                }),
1590                ..Default::default()
1591            },
1592            ..Default::default()
1593        };
1594
1595        let handle = make_schedule_handle(client);
1596        let desc = handle.describe().await.unwrap();
1597        assert_eq!(desc.note(), None);
1598    }
1599
1600    #[test]
1601    fn schedule_summary_note_returns_none_for_empty_string() {
1602        let entry = ScheduleListEntry {
1603            schedule_id: "s".to_string(),
1604            info: Some(ScheduleListInfo {
1605                notes: String::new(),
1606                ..Default::default()
1607            }),
1608            ..Default::default()
1609        };
1610        let summary = ScheduleSummary::from(entry);
1611        assert_eq!(summary.note(), None);
1612    }
1613
1614    #[test]
1615    fn schedule_summary_accessors() {
1616        let entry = ScheduleListEntry {
1617            schedule_id: "sched-1".to_string(),
1618            memo: Some(Memo {
1619                fields: Default::default(),
1620            }),
1621            search_attributes: Some(SearchAttributes {
1622                indexed_fields: Default::default(),
1623            }),
1624            info: Some(ScheduleListInfo {
1625                spec: Some(ScheduleSpec::default()),
1626                workflow_type: Some(WorkflowType {
1627                    name: "MyWorkflow".to_string(),
1628                }),
1629                notes: "some note".to_string(),
1630                paused: true,
1631                recent_actions: vec![ScheduleActionResult {
1632                    start_workflow_result: Some(ProtoWorkflowExecution {
1633                        workflow_id: "ra-wf".to_string(),
1634                        run_id: "ra-run".to_string(),
1635                    }),
1636                    ..Default::default()
1637                }],
1638                future_action_times: vec![prost_types::Timestamp {
1639                    seconds: 1_700_000_000,
1640                    nanos: 0,
1641                }],
1642                state_size_bytes: 0,
1643            }),
1644        };
1645
1646        let summary = ScheduleSummary::from(entry);
1647        assert_eq!(summary.schedule_id(), "sched-1");
1648        assert!(summary.raw().memo.is_some());
1649        assert!(summary.raw().search_attributes.is_some());
1650        assert_eq!(summary.workflow_type(), Some("MyWorkflow"));
1651        assert_eq!(summary.note(), Some("some note"));
1652        assert!(summary.paused());
1653        assert_eq!(summary.recent_actions().len(), 1);
1654        assert_eq!(summary.future_action_times().len(), 1);
1655    }
1656
1657    #[test]
1658    fn schedule_summary_defaults_when_info_is_none() {
1659        let entry = ScheduleListEntry {
1660            schedule_id: "sched-2".to_string(),
1661            ..Default::default()
1662        };
1663
1664        let summary = ScheduleSummary::from(entry);
1665        assert_eq!(summary.schedule_id(), "sched-2");
1666        assert!(summary.raw().memo.is_none());
1667        assert!(summary.raw().search_attributes.is_none());
1668        assert_eq!(summary.workflow_type(), None);
1669        assert_eq!(summary.note(), None);
1670        assert!(!summary.paused());
1671        assert!(summary.recent_actions().is_empty());
1672        assert!(summary.future_action_times().is_empty());
1673    }
1674
1675    #[test]
1676    fn schedule_description_raw_round_trip() {
1677        let resp = DescribeScheduleResponse {
1678            conflict_token: b"ct".to_vec(),
1679            schedule: Some(Schedule::default()),
1680            ..Default::default()
1681        };
1682        let desc = ScheduleDescription::from(resp.clone());
1683        assert_eq!(desc.raw().conflict_token, b"ct");
1684        let recovered = desc.into_raw();
1685        assert_eq!(recovered.conflict_token, resp.conflict_token);
1686        assert!(recovered.schedule.is_some());
1687    }
1688
1689    #[test]
1690    fn schedule_summary_raw_round_trip() {
1691        let entry = ScheduleListEntry {
1692            schedule_id: "rt-1".to_string(),
1693            ..Default::default()
1694        };
1695        let summary = ScheduleSummary::from(entry.clone());
1696        assert_eq!(summary.raw().schedule_id, "rt-1");
1697        let recovered = summary.into_raw();
1698        assert_eq!(recovered.schedule_id, entry.schedule_id);
1699    }
1700
1701    #[test]
1702    fn schedule_into_update_preserves_schedule() {
1703        let resp = DescribeScheduleResponse {
1704            schedule: Some(Schedule {
1705                state: Some(ScheduleState {
1706                    notes: "my notes".to_string(),
1707                    ..Default::default()
1708                }),
1709                ..Default::default()
1710            }),
1711            ..Default::default()
1712        };
1713        let desc = ScheduleDescription::from(resp);
1714        let update = desc.into_update();
1715
1716        assert_eq!(update.raw().state.as_ref().unwrap().notes, "my notes");
1717    }
1718
1719    #[test]
1720    fn schedule_update_setters_are_chainable() {
1721        let resp = DescribeScheduleResponse {
1722            schedule: Some(Schedule::default()),
1723            ..Default::default()
1724        };
1725        let desc = ScheduleDescription::from(resp);
1726        let mut update = desc.into_update();
1727        update.set_note("chained").set_paused(true);
1728        assert_eq!(update.raw().state.as_ref().unwrap().notes, "chained");
1729        assert!(update.raw().state.as_ref().unwrap().paused);
1730    }
1731
1732    #[test]
1733    fn schedule_recent_action_from_proto_with_timestamps() {
1734        let ts = prost_types::Timestamp {
1735            seconds: 1_700_000_000,
1736            nanos: 0,
1737        };
1738        let proto = ScheduleActionResult {
1739            schedule_time: Some(ts),
1740            actual_time: Some(ts),
1741            start_workflow_result: Some(ProtoWorkflowExecution {
1742                workflow_id: "wf-abc".to_string(),
1743                run_id: "run-xyz".to_string(),
1744            }),
1745            ..Default::default()
1746        };
1747
1748        let action = ScheduleRecentAction::from(&proto);
1749
1750        assert!(action.schedule_time.is_some());
1751        assert!(action.actual_time.is_some());
1752        assert_eq!(action.workflow_id, "wf-abc");
1753        assert_eq!(action.run_id, "run-xyz");
1754    }
1755
1756    #[test]
1757    #[should_panic(expected = "unsupported schedule action")]
1758    fn schedule_recent_action_panics_without_workflow_result() {
1759        let _ = ScheduleRecentAction::from(&ScheduleActionResult::default());
1760    }
1761
1762    #[test]
1763    fn schedule_overlap_policy_default_is_unspecified() {
1764        assert_eq!(
1765            ScheduleOverlapPolicy::default(),
1766            ScheduleOverlapPolicy::Unspecified
1767        );
1768    }
1769
1770    #[tokio::test]
1771    async fn schedule_action_start_workflow_with_input_into_proto() {
1772        use temporalio_common::{
1773            UntypedWorkflow,
1774            data_converters::{DataConverter, RawValue},
1775            protos::temporal::api::common::v1::Payload,
1776        };
1777
1778        let payload = Payload {
1779            metadata: [("encoding".to_string(), b"json/plain".to_vec())]
1780                .into_iter()
1781                .collect(),
1782            data: b"42".to_vec(),
1783            ..Default::default()
1784        };
1785        let action = ScheduleAction::start_workflow(
1786            UntypedWorkflow::new("MyWorkflow"),
1787            RawValue::new(vec![payload.clone()]),
1788            "my-queue",
1789            "my-wf-id",
1790        );
1791        let proto = action.into_proto(&DataConverter::default()).await.unwrap();
1792        #[allow(irrefutable_let_patterns)]
1793        let schedule_proto::schedule_action::Action::StartWorkflow(wf_info) = proto.action.unwrap()
1794        else {
1795            panic!("expected StartWorkflow action")
1796        };
1797        assert_eq!(wf_info.input.unwrap().payloads, vec![payload]);
1798    }
1799}