Skip to main content

temporalio_client/
schedules.rs

1use crate::{Client, NamespacedClient, grpc::WorkflowService};
2use futures_util::stream;
3use std::{
4    collections::VecDeque,
5    pin::Pin,
6    task::{Context, Poll},
7    time::{Duration, SystemTime},
8};
9use temporalio_common::protos::{
10    proto_ts_to_system_time,
11    temporal::api::{
12        common::v1 as common_proto, schedule::v1 as schedule_proto,
13        taskqueue::v1 as taskqueue_proto, workflow::v1 as workflow_proto, workflowservice::v1::*,
14    },
15};
16use tonic::IntoRequest;
17use uuid::Uuid;
18
19/// Errors returned by schedule operations.
20#[derive(Debug, thiserror::Error)]
21#[non_exhaustive]
22pub enum ScheduleError {
23    /// An rpc error from the server.
24    #[error("Server error: {0}")]
25    Rpc(#[from] tonic::Status),
26}
27
28/// Options for creating a schedule.
29#[derive(Debug, Clone, bon::Builder)]
30#[builder(on(String, into))]
31#[non_exhaustive]
32pub struct CreateScheduleOptions {
33    /// The action the schedule should perform on each trigger.
34    pub action: ScheduleAction,
35    /// Defines when the schedule should trigger.
36    pub spec: ScheduleSpec,
37    /// Whether to trigger the schedule immediately upon creation.
38    #[builder(default)]
39    pub trigger_immediately: bool,
40    /// Overlap policy for the schedule. Also used for the initial trigger when
41    /// `trigger_immediately` is true.
42    #[builder(default)]
43    pub overlap_policy: ScheduleOverlapPolicy,
44    /// Whether the schedule starts in a paused state.
45    #[builder(default)]
46    pub paused: bool,
47    /// A note to attach to the schedule state (e.g., reason for pausing).
48    #[builder(default)]
49    pub note: String,
50}
51
52/// The action a schedule should perform on each trigger.
53// TODO: The proto supports other action types beyond StartWorkflow.
54#[derive(Debug, Clone, PartialEq)]
55#[non_exhaustive]
56pub enum ScheduleAction {
57    /// Start a workflow execution.
58    StartWorkflow {
59        /// The workflow type name.
60        workflow_type: String,
61        /// The task queue to run the workflow on.
62        task_queue: String,
63        /// The workflow ID prefix. The server may append a timestamp.
64        workflow_id: String,
65    },
66}
67
68impl ScheduleAction {
69    /// Create a start-workflow action.
70    pub fn start_workflow(
71        workflow_type: impl Into<String>,
72        task_queue: impl Into<String>,
73        workflow_id: impl Into<String>,
74    ) -> Self {
75        Self::StartWorkflow {
76            workflow_type: workflow_type.into(),
77            task_queue: task_queue.into(),
78            workflow_id: workflow_id.into(),
79        }
80    }
81
82    pub(crate) fn into_proto(self) -> schedule_proto::ScheduleAction {
83        match self {
84            Self::StartWorkflow {
85                workflow_type,
86                task_queue,
87                workflow_id,
88            } => schedule_proto::ScheduleAction {
89                action: Some(schedule_proto::schedule_action::Action::StartWorkflow(
90                    workflow_proto::NewWorkflowExecutionInfo {
91                        workflow_id,
92                        workflow_type: Some(common_proto::WorkflowType {
93                            name: workflow_type,
94                        }),
95                        task_queue: Some(taskqueue_proto::TaskQueue {
96                            name: task_queue,
97                            ..Default::default()
98                        }),
99                        ..Default::default()
100                    },
101                )),
102            },
103        }
104    }
105}
106
107/// Defines when a schedule should trigger.
108///
109/// Note: `set_spec` on [`ScheduleUpdate`] replaces the entire spec. Fields not
110/// set here will use their proto defaults on the server.
111#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
112#[builder(on(String, into))]
113pub struct ScheduleSpec {
114    /// Interval-based triggers (e.g., every 1 hour).
115    #[builder(default)]
116    pub intervals: Vec<ScheduleIntervalSpec>,
117    /// Calendar-based triggers using range strings.
118    #[builder(default)]
119    pub calendars: Vec<ScheduleCalendarSpec>,
120    /// Calendar-based exclusions. Matching times are skipped.
121    #[builder(default)]
122    pub exclude_calendars: Vec<ScheduleCalendarSpec>,
123    /// Cron expression triggers (e.g., `"0 12 * * MON-FRI"`).
124    #[builder(default)]
125    pub cron_strings: Vec<String>,
126    /// IANA timezone name (e.g., `"US/Eastern"`). Empty uses UTC.
127    #[builder(default)]
128    pub timezone_name: String,
129    /// Earliest time the schedule is active.
130    pub start_time: Option<SystemTime>,
131    /// Latest time the schedule is active.
132    pub end_time: Option<SystemTime>,
133    /// Random jitter applied to each action time.
134    pub jitter: Option<Duration>,
135}
136
137impl ScheduleSpec {
138    /// Create a spec that triggers on a single interval.
139    pub fn from_interval(every: Duration) -> Self {
140        Self {
141            intervals: vec![every.into()],
142            ..Default::default()
143        }
144    }
145
146    /// Create a spec that triggers on a single calendar schedule.
147    pub fn from_calendar(calendar: ScheduleCalendarSpec) -> Self {
148        Self {
149            calendars: vec![calendar],
150            ..Default::default()
151        }
152    }
153
154    pub(crate) fn into_proto(self) -> schedule_proto::ScheduleSpec {
155        #[allow(deprecated)]
156        schedule_proto::ScheduleSpec {
157            interval: self.intervals.into_iter().map(Into::into).collect(),
158            calendar: self.calendars.into_iter().map(Into::into).collect(),
159            exclude_calendar: self.exclude_calendars.into_iter().map(Into::into).collect(),
160            cron_string: self.cron_strings,
161            timezone_name: self.timezone_name,
162            start_time: self.start_time.map(Into::into),
163            end_time: self.end_time.map(Into::into),
164            jitter: self.jitter.and_then(|d| d.try_into().ok()),
165            ..Default::default()
166        }
167    }
168}
169
170/// An interval-based schedule trigger.
171#[derive(Debug, Clone, PartialEq)]
172#[non_exhaustive]
173pub struct ScheduleIntervalSpec {
174    /// How often the action should repeat.
175    pub every: Duration,
176    /// Fixed offset added to each interval.
177    pub offset: Option<Duration>,
178}
179
180impl ScheduleIntervalSpec {
181    /// Create an interval with an optional offset.
182    pub fn new(every: Duration, offset: Option<Duration>) -> Self {
183        Self { every, offset }
184    }
185}
186
187impl From<Duration> for ScheduleIntervalSpec {
188    fn from(every: Duration) -> Self {
189        Self {
190            every,
191            offset: None,
192        }
193    }
194}
195
196impl From<ScheduleIntervalSpec> for schedule_proto::IntervalSpec {
197    fn from(s: ScheduleIntervalSpec) -> Self {
198        Self {
199            interval: Some(s.every.try_into().unwrap_or_default()),
200            phase: s.offset.and_then(|d| d.try_into().ok()),
201        }
202    }
203}
204
205/// A calendar-based schedule trigger using range strings (e.g., `"2-7"` for hours 2 through 7).
206///
207/// Empty strings use server defaults (typically `"*"` for most fields, `"0"` for seconds/minutes).
208#[derive(Debug, Clone, Default, PartialEq, bon::Builder)]
209#[builder(on(String, into))]
210#[non_exhaustive]
211pub struct ScheduleCalendarSpec {
212    /// Second within the minute. Default: `"0"`.
213    #[builder(default)]
214    pub second: String,
215    /// Minute within the hour. Default: `"0"`.
216    #[builder(default)]
217    pub minute: String,
218    /// Hour of the day. Default: `"0"`.
219    #[builder(default)]
220    pub hour: String,
221    /// Day of the month. Default: `"*"`.
222    #[builder(default)]
223    pub day_of_month: String,
224    /// Month of the year. Default: `"*"`.
225    #[builder(default)]
226    pub month: String,
227    /// Day of the week. Default: `"*"`.
228    #[builder(default)]
229    pub day_of_week: String,
230    /// Year. Default: `"*"`.
231    #[builder(default)]
232    pub year: String,
233    /// Free-form comment.
234    #[builder(default)]
235    pub comment: String,
236}
237
238impl From<ScheduleCalendarSpec> for schedule_proto::CalendarSpec {
239    fn from(s: ScheduleCalendarSpec) -> Self {
240        Self {
241            second: s.second,
242            minute: s.minute,
243            hour: s.hour,
244            day_of_month: s.day_of_month,
245            month: s.month,
246            day_of_week: s.day_of_week,
247            year: s.year,
248            comment: s.comment,
249        }
250    }
251}
252
253/// Options for listing schedules.
254#[derive(Debug, Clone, Default, bon::Builder)]
255#[non_exhaustive]
256pub struct ListSchedulesOptions {
257    /// Maximum number of results per page (server-side hint).
258    #[builder(default)]
259    pub maximum_page_size: i32,
260    /// Query filter string.
261    #[builder(default)]
262    pub query: String,
263}
264
265/// A stream of schedule summaries from a list operation.
266/// Internally paginates through results from the server.
267pub struct ListSchedulesStream {
268    inner: Pin<Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>>,
269}
270
271impl ListSchedulesStream {
272    pub(crate) fn new(
273        inner: Pin<
274            Box<dyn futures_util::Stream<Item = Result<ScheduleSummary, ScheduleError>> + Send>,
275        >,
276    ) -> Self {
277        Self { inner }
278    }
279}
280
281impl futures_util::Stream for ListSchedulesStream {
282    type Item = Result<ScheduleSummary, ScheduleError>;
283
284    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285        self.inner.as_mut().poll_next(cx)
286    }
287}
288
289/// A recent action taken by a schedule.
290#[derive(Debug, Clone, PartialEq)]
291#[non_exhaustive]
292pub struct ScheduleRecentAction {
293    /// When this action was scheduled to occur (including jitter).
294    pub schedule_time: Option<SystemTime>,
295    /// When this action actually occurred.
296    pub actual_time: Option<SystemTime>,
297    /// Workflow ID of the started workflow.
298    pub workflow_id: String,
299    /// Run ID of the started workflow.
300    pub run_id: String,
301}
302
303/// A currently-running workflow started by a schedule.
304#[derive(Debug, Clone, PartialEq)]
305#[non_exhaustive]
306pub struct ScheduleRunningAction {
307    /// Workflow ID of the running workflow.
308    pub workflow_id: String,
309    /// Run ID of the running workflow.
310    pub run_id: String,
311}
312
313impl From<&schedule_proto::ScheduleActionResult> for ScheduleRecentAction {
314    fn from(a: &schedule_proto::ScheduleActionResult) -> Self {
315        let workflow_result = a
316            .start_workflow_result
317            .as_ref()
318            .expect("unsupported schedule action: start_workflow_result should be present");
319        ScheduleRecentAction {
320            schedule_time: a.schedule_time.as_ref().and_then(proto_ts_to_system_time),
321            actual_time: a.actual_time.as_ref().and_then(proto_ts_to_system_time),
322            workflow_id: workflow_result.workflow_id.clone(),
323            run_id: workflow_result.run_id.clone(),
324        }
325    }
326}
327
328/// Description of a schedule returned by describe().
329///
330/// Provides ergonomic accessors over the raw `DescribeScheduleResponse` proto.
331/// Use [`raw()`](Self::raw) or [`into_raw()`](Self::into_raw) to access the
332/// full proto when needed.
333#[derive(Debug, Clone)]
334pub struct ScheduleDescription {
335    raw: DescribeScheduleResponse,
336}
337
338impl ScheduleDescription {
339    /// Token used for optimistic concurrency on updates.
340    pub fn conflict_token(&self) -> &[u8] {
341        &self.raw.conflict_token
342    }
343
344    /// Whether the schedule is paused.
345    pub fn paused(&self) -> bool {
346        self.raw
347            .schedule
348            .as_ref()
349            .and_then(|s| s.state.as_ref())
350            .is_some_and(|st| st.paused)
351    }
352
353    /// Note on the schedule state (e.g., reason for pause).
354    /// Returns `None` if no note is set or the note is empty.
355    pub fn note(&self) -> Option<&str> {
356        self.raw
357            .schedule
358            .as_ref()
359            .and_then(|s| s.state.as_ref())
360            .map(|st| st.notes.as_str())
361            .filter(|s| !s.is_empty())
362    }
363
364    /// Total number of actions taken by this schedule.
365    pub fn action_count(&self) -> i64 {
366        self.info().map_or(0, |i| i.action_count)
367    }
368
369    /// Number of times a scheduled action was skipped due to missing the catchup window.
370    pub fn missed_catchup_window(&self) -> i64 {
371        self.info().map_or(0, |i| i.missed_catchup_window)
372    }
373
374    /// Number of skipped actions due to overlap.
375    pub fn overlap_skipped(&self) -> i64 {
376        self.info().map_or(0, |i| i.overlap_skipped)
377    }
378
379    /// Most recent action results (up to 10).
380    pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
381        self.info()
382            .map(|i| {
383                i.recent_actions
384                    .iter()
385                    .map(ScheduleRecentAction::from)
386                    .collect()
387            })
388            .unwrap_or_default()
389    }
390
391    /// Currently-running workflows started by this schedule.
392    pub fn running_actions(&self) -> Vec<ScheduleRunningAction> {
393        self.info()
394            .map(|i| {
395                i.running_workflows
396                    .iter()
397                    .map(|w| ScheduleRunningAction {
398                        workflow_id: w.workflow_id.clone(),
399                        run_id: w.run_id.clone(),
400                    })
401                    .collect()
402            })
403            .unwrap_or_default()
404    }
405
406    /// Next scheduled action times.
407    pub fn future_action_times(&self) -> Vec<SystemTime> {
408        self.info()
409            .map(|i| {
410                i.future_action_times
411                    .iter()
412                    .filter_map(proto_ts_to_system_time)
413                    .collect()
414            })
415            .unwrap_or_default()
416    }
417
418    /// When the schedule was created.
419    pub fn create_time(&self) -> Option<SystemTime> {
420        self.info()
421            .and_then(|i| i.create_time.as_ref())
422            .and_then(proto_ts_to_system_time)
423    }
424
425    /// When the schedule was last updated.
426    pub fn update_time(&self) -> Option<SystemTime> {
427        self.info()
428            .and_then(|i| i.update_time.as_ref())
429            .and_then(proto_ts_to_system_time)
430    }
431
432    /// Memo attached to the schedule.
433    pub fn memo(&self) -> Option<&common_proto::Memo> {
434        self.raw.memo.as_ref()
435    }
436
437    /// Search attributes on the schedule.
438    pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
439        self.raw.search_attributes.as_ref()
440    }
441
442    /// Access the raw proto for additional fields not exposed via accessors.
443    pub fn raw(&self) -> &DescribeScheduleResponse {
444        &self.raw
445    }
446
447    /// Consume the wrapper and return the raw proto.
448    pub fn into_raw(self) -> DescribeScheduleResponse {
449        self.raw
450    }
451
452    fn info(&self) -> Option<&schedule_proto::ScheduleInfo> {
453        self.raw.info.as_ref()
454    }
455
456    /// Convert this description into a [`ScheduleUpdate`] for use with
457    /// [`ScheduleHandle::send_update()`].
458    ///
459    /// Extracts the schedule definition from the description.
460    pub fn into_update(self) -> ScheduleUpdate {
461        ScheduleUpdate {
462            schedule: self.raw.schedule.unwrap_or_default(),
463        }
464    }
465}
466
467impl From<DescribeScheduleResponse> for ScheduleDescription {
468    fn from(raw: DescribeScheduleResponse) -> Self {
469        Self { raw }
470    }
471}
472
473/// Controls what happens when a scheduled workflow would overlap with a running one.
474#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
475pub enum ScheduleOverlapPolicy {
476    /// Use the server default (currently Skip).
477    #[default]
478    Unspecified,
479    /// Don't start a new workflow if one is already running.
480    Skip,
481    /// Buffer one workflow start, to run after the current one completes.
482    BufferOne,
483    /// Buffer all workflow starts, to run sequentially.
484    BufferAll,
485    /// Cancel the running workflow and start a new one.
486    CancelOther,
487    /// Terminate the running workflow and start a new one.
488    TerminateOther,
489    /// Start any number of concurrent workflows.
490    AllowAll,
491}
492
493impl ScheduleOverlapPolicy {
494    pub(crate) fn to_proto(self) -> i32 {
495        match self {
496            Self::Unspecified => 0,
497            Self::Skip => 1,
498            Self::BufferOne => 2,
499            Self::BufferAll => 3,
500            Self::CancelOther => 4,
501            Self::TerminateOther => 5,
502            Self::AllowAll => 6,
503        }
504    }
505}
506
507/// A backfill request for a schedule, specifying a time range of missed runs.
508#[derive(Debug, Clone, PartialEq, bon::Builder)]
509#[non_exhaustive]
510#[builder(start_fn = new)]
511pub struct ScheduleBackfill {
512    /// Start of the time range to backfill.
513    #[builder(start_fn)]
514    pub start_time: SystemTime,
515    /// End of the time range to backfill.
516    #[builder(start_fn)]
517    pub end_time: SystemTime,
518    /// How overlapping runs are handled during backfill.
519    #[builder(default)]
520    pub overlap_policy: ScheduleOverlapPolicy,
521}
522
523/// An update to apply to a schedule definition.
524///
525/// Obtain from [`ScheduleDescription::into_update()`], modify the schedule
526/// using the setter methods, then pass to [`ScheduleHandle::update()`].
527#[derive(Debug, Clone)]
528pub struct ScheduleUpdate {
529    schedule: schedule_proto::Schedule,
530}
531
532impl ScheduleUpdate {
533    /// Replace the schedule spec (when to trigger).
534    pub fn set_spec(&mut self, spec: ScheduleSpec) -> &mut Self {
535        self.schedule.spec = Some(spec.into_proto());
536        self
537    }
538
539    /// Replace the schedule action (what to do on trigger).
540    pub fn set_action(&mut self, action: ScheduleAction) -> &mut Self {
541        self.schedule.action = Some(action.into_proto());
542        self
543    }
544
545    /// Set whether the schedule is paused.
546    pub fn set_paused(&mut self, paused: bool) -> &mut Self {
547        self.state_mut().paused = paused;
548        self
549    }
550
551    /// Set the note on the schedule state.
552    pub fn set_note(&mut self, note: impl Into<String>) -> &mut Self {
553        self.state_mut().notes = note.into();
554        self
555    }
556
557    /// Set the overlap policy.
558    pub fn set_overlap_policy(&mut self, policy: ScheduleOverlapPolicy) -> &mut Self {
559        self.policies_mut().overlap_policy = policy.to_proto();
560        self
561    }
562
563    /// Set the catchup window. Actions missed by more than this duration are
564    /// skipped.
565    pub fn set_catchup_window(&mut self, window: Duration) -> &mut Self {
566        self.policies_mut().catchup_window = window.try_into().ok();
567        self
568    }
569
570    /// Set whether to pause the schedule when a workflow run fails or times out.
571    pub fn set_pause_on_failure(&mut self, pause_on_failure: bool) -> &mut Self {
572        self.policies_mut().pause_on_failure = pause_on_failure;
573        self
574    }
575
576    /// Set whether to keep the original workflow ID without appending a
577    /// timestamp.
578    pub fn set_keep_original_workflow_id(&mut self, keep: bool) -> &mut Self {
579        self.policies_mut().keep_original_workflow_id = keep;
580        self
581    }
582
583    /// Limit the schedule to a fixed number of remaining actions, after which
584    /// it stops triggering. Passing `None` removes the limit.
585    pub fn set_remaining_actions(&mut self, count: Option<i64>) -> &mut Self {
586        let state = self.state_mut();
587        match count {
588            Some(n) => {
589                state.limited_actions = true;
590                state.remaining_actions = n;
591            }
592            None => {
593                state.limited_actions = false;
594                state.remaining_actions = 0;
595            }
596        }
597        self
598    }
599
600    /// Access the raw schedule proto.
601    pub fn raw(&self) -> &schedule_proto::Schedule {
602        &self.schedule
603    }
604
605    /// Consume and return the raw schedule proto.
606    pub fn into_raw(self) -> schedule_proto::Schedule {
607        self.schedule
608    }
609
610    fn state_mut(&mut self) -> &mut schedule_proto::ScheduleState {
611        self.schedule.state.get_or_insert_with(Default::default)
612    }
613
614    fn policies_mut(&mut self) -> &mut schedule_proto::SchedulePolicies {
615        self.schedule.policies.get_or_insert_with(Default::default)
616    }
617}
618
619/// Summary of a schedule returned in list operations.
620///
621/// Provides ergonomic accessors over the raw `ScheduleListEntry` proto.
622/// Use [`raw()`](Self::raw) or [`into_raw()`](Self::into_raw) to access the
623/// full proto when needed.
624#[derive(Debug, Clone)]
625pub struct ScheduleSummary {
626    raw: schedule_proto::ScheduleListEntry,
627}
628
629impl ScheduleSummary {
630    /// The schedule ID.
631    pub fn schedule_id(&self) -> &str {
632        &self.raw.schedule_id
633    }
634
635    /// The workflow type name for start-workflow actions.
636    pub fn workflow_type(&self) -> Option<&str> {
637        self.info()
638            .and_then(|i| i.workflow_type.as_ref())
639            .map(|wt| wt.name.as_str())
640    }
641
642    /// Note on the schedule state.
643    /// Returns `None` if no note is set or the note is empty.
644    pub fn note(&self) -> Option<&str> {
645        self.info()
646            .map(|i| i.notes.as_str())
647            .filter(|s| !s.is_empty())
648    }
649
650    /// Whether the schedule is paused.
651    pub fn paused(&self) -> bool {
652        self.info().is_some_and(|i| i.paused)
653    }
654
655    /// Most recent action results (up to 10).
656    pub fn recent_actions(&self) -> Vec<ScheduleRecentAction> {
657        self.info()
658            .map(|i| {
659                i.recent_actions
660                    .iter()
661                    .map(ScheduleRecentAction::from)
662                    .collect()
663            })
664            .unwrap_or_default()
665    }
666
667    /// Next scheduled action times.
668    pub fn future_action_times(&self) -> Vec<SystemTime> {
669        self.info()
670            .map(|i| {
671                i.future_action_times
672                    .iter()
673                    .filter_map(proto_ts_to_system_time)
674                    .collect()
675            })
676            .unwrap_or_default()
677    }
678
679    /// Memo attached to the schedule.
680    pub fn memo(&self) -> Option<&common_proto::Memo> {
681        self.raw.memo.as_ref()
682    }
683
684    /// Search attributes on the schedule.
685    pub fn search_attributes(&self) -> Option<&common_proto::SearchAttributes> {
686        self.raw.search_attributes.as_ref()
687    }
688
689    /// Access the raw proto for additional fields not exposed via accessors.
690    pub fn raw(&self) -> &schedule_proto::ScheduleListEntry {
691        &self.raw
692    }
693
694    /// Consume the wrapper and return the raw proto.
695    pub fn into_raw(self) -> schedule_proto::ScheduleListEntry {
696        self.raw
697    }
698
699    fn info(&self) -> Option<&schedule_proto::ScheduleListInfo> {
700        self.raw.info.as_ref()
701    }
702}
703
704impl From<schedule_proto::ScheduleListEntry> for ScheduleSummary {
705    fn from(raw: schedule_proto::ScheduleListEntry) -> Self {
706        Self { raw }
707    }
708}
709
710/// Handle to an existing schedule. Obtained from
711/// [`Client::create_schedule`](crate::Client::create_schedule) or
712/// [`Client::get_schedule_handle`](crate::Client::get_schedule_handle).
713#[derive(Clone, derive_more::Debug)]
714pub struct ScheduleHandle<CT> {
715    #[debug(skip)]
716    client: CT,
717    namespace: String,
718    schedule_id: String,
719}
720
721impl<CT> ScheduleHandle<CT>
722where
723    CT: WorkflowService + NamespacedClient + Clone + Send + Sync,
724{
725    pub(crate) fn new(client: CT, namespace: String, schedule_id: String) -> Self {
726        Self {
727            client,
728            namespace,
729            schedule_id,
730        }
731    }
732
733    /// The namespace the schedule belongs to.
734    pub fn namespace(&self) -> &str {
735        &self.namespace
736    }
737
738    /// The schedule ID.
739    pub fn schedule_id(&self) -> &str {
740        &self.schedule_id
741    }
742
743    /// Describe this schedule, returning its full definition, info, and conflict token.
744    pub async fn describe(&self) -> Result<ScheduleDescription, ScheduleError> {
745        let resp = WorkflowService::describe_schedule(
746            &mut self.client.clone(),
747            DescribeScheduleRequest {
748                namespace: self.namespace.clone(),
749                schedule_id: self.schedule_id.clone(),
750            }
751            .into_request(),
752        )
753        .await?
754        .into_inner();
755
756        Ok(ScheduleDescription::from(resp))
757    }
758
759    /// Update the schedule definition.
760    ///
761    /// Describes the current schedule, applies the closure to modify it, and
762    /// sends the update. The conflict token is managed automatically.
763    ///
764    /// ```ignore
765    /// handle.update(|u| {
766    ///     u.set_note("updated").set_paused(true);
767    /// }).await?;
768    /// ```
769    // TODO: Add a retry loop for conflict token mismatch. The server
770    // returns FailedPrecondition with "mismatched conflict token".
771    pub async fn update(
772        &self,
773        updater: impl FnOnce(&mut ScheduleUpdate),
774    ) -> Result<(), ScheduleError> {
775        let desc = self.describe().await?;
776        let mut update = desc.into_update();
777        updater(&mut update);
778        self.send_update(update).await
779    }
780
781    /// Send a pre-built [`ScheduleUpdate`] to the server.
782    ///
783    /// Prefer [`update()`](Self::update) for most use cases. Use this when you
784    /// need to inspect the [`ScheduleDescription`] before deciding what to
785    /// change.
786    pub async fn send_update(&self, update: ScheduleUpdate) -> Result<(), ScheduleError> {
787        WorkflowService::update_schedule(
788            &mut self.client.clone(),
789            UpdateScheduleRequest {
790                namespace: self.namespace.clone(),
791                schedule_id: self.schedule_id.clone(),
792                schedule: Some(update.schedule),
793                identity: self.client.identity(),
794                request_id: Uuid::new_v4().to_string(),
795                ..Default::default()
796            }
797            .into_request(),
798        )
799        .await?;
800        Ok(())
801    }
802
803    /// Delete this schedule.
804    pub async fn delete(&self) -> Result<(), ScheduleError> {
805        WorkflowService::delete_schedule(
806            &mut self.client.clone(),
807            DeleteScheduleRequest {
808                namespace: self.namespace.clone(),
809                schedule_id: self.schedule_id.clone(),
810                identity: self.client.identity(),
811            }
812            .into_request(),
813        )
814        .await?;
815        Ok(())
816    }
817
818    /// Pause the schedule with an optional note.
819    ///
820    /// If `note` is `None`, a default note is used.
821    pub async fn pause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
822        let note = note.map_or_else(|| "Paused via Rust SDK".to_string(), |s| s.into());
823        WorkflowService::patch_schedule(
824            &mut self.client.clone(),
825            PatchScheduleRequest {
826                namespace: self.namespace.clone(),
827                schedule_id: self.schedule_id.clone(),
828                patch: Some(schedule_proto::SchedulePatch {
829                    pause: note,
830                    ..Default::default()
831                }),
832                identity: self.client.identity(),
833                request_id: Uuid::new_v4().to_string(),
834            }
835            .into_request(),
836        )
837        .await?;
838        Ok(())
839    }
840
841    /// Unpause the schedule with an optional note.
842    ///
843    /// If `note` is `None`, a default note is used.
844    pub async fn unpause(&self, note: Option<impl Into<String>>) -> Result<(), ScheduleError> {
845        let note = note.map_or_else(|| "Unpaused via Rust SDK".to_string(), |s| s.into());
846        WorkflowService::patch_schedule(
847            &mut self.client.clone(),
848            PatchScheduleRequest {
849                namespace: self.namespace.clone(),
850                schedule_id: self.schedule_id.clone(),
851                patch: Some(schedule_proto::SchedulePatch {
852                    unpause: note,
853                    ..Default::default()
854                }),
855                identity: self.client.identity(),
856                request_id: Uuid::new_v4().to_string(),
857            }
858            .into_request(),
859        )
860        .await?;
861        Ok(())
862    }
863
864    /// Trigger the schedule to run immediately with the given overlap policy.
865    pub async fn trigger(
866        &self,
867        overlap_policy: ScheduleOverlapPolicy,
868    ) -> Result<(), ScheduleError> {
869        WorkflowService::patch_schedule(
870            &mut self.client.clone(),
871            PatchScheduleRequest {
872                namespace: self.namespace.clone(),
873                schedule_id: self.schedule_id.clone(),
874                patch: Some(schedule_proto::SchedulePatch {
875                    trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
876                        overlap_policy: overlap_policy.to_proto(),
877                        scheduled_time: None,
878                    }),
879                    ..Default::default()
880                }),
881                identity: self.client.identity(),
882                request_id: Uuid::new_v4().to_string(),
883            }
884            .into_request(),
885        )
886        .await?;
887        Ok(())
888    }
889
890    /// Request backfill of missed runs.
891    pub async fn backfill(
892        &self,
893        backfills: impl IntoIterator<Item = ScheduleBackfill>,
894    ) -> Result<(), ScheduleError> {
895        let backfill_requests: Vec<schedule_proto::BackfillRequest> = backfills
896            .into_iter()
897            .map(|b| schedule_proto::BackfillRequest {
898                start_time: Some(b.start_time.into()),
899                end_time: Some(b.end_time.into()),
900                overlap_policy: b.overlap_policy.to_proto(),
901            })
902            .collect();
903        WorkflowService::patch_schedule(
904            &mut self.client.clone(),
905            PatchScheduleRequest {
906                namespace: self.namespace.clone(),
907                schedule_id: self.schedule_id.clone(),
908                patch: Some(schedule_proto::SchedulePatch {
909                    backfill_request: backfill_requests,
910                    ..Default::default()
911                }),
912                identity: self.client.identity(),
913                request_id: Uuid::new_v4().to_string(),
914            }
915            .into_request(),
916        )
917        .await?;
918        Ok(())
919    }
920}
921
922// Schedule operations on Client.
923impl Client {
924    /// Create a schedule and return a handle to it.
925    pub async fn create_schedule(
926        &self,
927        schedule_id: impl Into<String>,
928        opts: CreateScheduleOptions,
929    ) -> Result<ScheduleHandle<Self>, ScheduleError> {
930        let schedule_id = schedule_id.into();
931        let namespace = self.namespace();
932
933        let initial_patch = if opts.trigger_immediately {
934            Some(schedule_proto::SchedulePatch {
935                trigger_immediately: Some(schedule_proto::TriggerImmediatelyRequest {
936                    // Always use AllowAll for the initial trigger so the
937                    // schedule fires immediately regardless of overlap state.
938                    overlap_policy: ScheduleOverlapPolicy::AllowAll.to_proto(),
939                    scheduled_time: None,
940                }),
941                ..Default::default()
942            })
943        } else {
944            None
945        };
946        // Only send explicit policies when the user set a non-default overlap
947        // policy, so the server uses its own defaults otherwise.
948        let policies = (opts.overlap_policy != ScheduleOverlapPolicy::Unspecified).then(|| {
949            schedule_proto::SchedulePolicies {
950                overlap_policy: opts.overlap_policy.to_proto(),
951                ..Default::default()
952            }
953        });
954        let schedule = schedule_proto::Schedule {
955            spec: Some(opts.spec.into_proto()),
956            action: Some(opts.action.into_proto()),
957            policies,
958            state: Some(schedule_proto::ScheduleState {
959                paused: opts.paused,
960                notes: opts.note,
961                ..Default::default()
962            }),
963        };
964        WorkflowService::create_schedule(
965            &mut self.clone(),
966            CreateScheduleRequest {
967                namespace: namespace.clone(),
968                schedule_id: schedule_id.clone(),
969                schedule: Some(schedule),
970                initial_patch,
971                identity: self.identity(),
972                request_id: Uuid::new_v4().to_string(),
973                ..Default::default()
974            }
975            .into_request(),
976        )
977        .await?;
978        Ok(ScheduleHandle::new(self.clone(), namespace, schedule_id))
979    }
980
981    /// Get a handle to an existing schedule by ID.
982    pub fn get_schedule_handle(&self, schedule_id: impl Into<String>) -> ScheduleHandle<Self> {
983        ScheduleHandle::new(self.clone(), self.namespace(), schedule_id.into())
984    }
985
986    /// List schedules matching the query, returning a stream that lazily
987    /// paginates through results.
988    pub fn list_schedules(&self, opts: ListSchedulesOptions) -> ListSchedulesStream {
989        let client = self.clone();
990        let namespace = self.namespace();
991        let query = opts.query;
992        let page_size = opts.maximum_page_size;
993
994        let stream = stream::unfold(
995            (Vec::new(), VecDeque::new(), false),
996            move |(next_page_token, mut buffer, exhausted)| {
997                let mut client = client.clone();
998                let namespace = namespace.clone();
999                let query = query.clone();
1000
1001                async move {
1002                    if let Some(item) = buffer.pop_front() {
1003                        return Some((Ok(item), (next_page_token, buffer, exhausted)));
1004                    } else if exhausted {
1005                        return None;
1006                    }
1007
1008                    let response = WorkflowService::list_schedules(
1009                        &mut client,
1010                        ListSchedulesRequest {
1011                            namespace,
1012                            maximum_page_size: page_size,
1013                            next_page_token: next_page_token.clone(),
1014                            query,
1015                        }
1016                        .into_request(),
1017                    )
1018                    .await;
1019
1020                    match response {
1021                        Ok(resp) => {
1022                            let resp = resp.into_inner();
1023                            let new_exhausted = resp.next_page_token.is_empty();
1024                            let new_token = resp.next_page_token;
1025
1026                            buffer = resp
1027                                .schedules
1028                                .into_iter()
1029                                .map(ScheduleSummary::from)
1030                                .collect();
1031
1032                            buffer
1033                                .pop_front()
1034                                .map(|item| (Ok(item), (new_token, buffer, new_exhausted)))
1035                        }
1036                        Err(e) => Some((Err(e.into()), (next_page_token, buffer, true))),
1037                    }
1038                }
1039            },
1040        );
1041
1042        ListSchedulesStream::new(Box::pin(stream))
1043    }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048    use super::*;
1049    use crate::{NamespacedClient, grpc::WorkflowService};
1050    use futures_util::FutureExt;
1051    use std::{
1052        sync::{
1053            Arc,
1054            atomic::{AtomicUsize, Ordering},
1055        },
1056        time::SystemTime,
1057    };
1058    use temporalio_common::protos::temporal::api::{
1059        common::v1::{
1060            Memo, SearchAttributes, WorkflowExecution as ProtoWorkflowExecution, WorkflowType,
1061        },
1062        schedule::v1::{
1063            Schedule, ScheduleActionResult, ScheduleInfo, ScheduleListEntry, ScheduleListInfo,
1064            ScheduleSpec, ScheduleState,
1065        },
1066        workflowservice::v1::{
1067            DeleteScheduleResponse, DescribeScheduleResponse, PatchScheduleResponse,
1068            UpdateScheduleResponse,
1069        },
1070    };
1071    use tonic::{Request, Response};
1072
1073    #[derive(Default)]
1074    struct CapturedRequests {
1075        describe: AtomicUsize,
1076        update: AtomicUsize,
1077        delete: AtomicUsize,
1078        patch: AtomicUsize,
1079    }
1080
1081    #[derive(Clone, Default)]
1082    struct MockScheduleClient {
1083        captured: Arc<CapturedRequests>,
1084        describe_response: DescribeScheduleResponse,
1085        should_error: bool,
1086    }
1087
1088    impl NamespacedClient for MockScheduleClient {
1089        fn namespace(&self) -> String {
1090            "test-namespace".to_string()
1091        }
1092        fn identity(&self) -> String {
1093            "test-identity".to_string()
1094        }
1095    }
1096
1097    impl WorkflowService for MockScheduleClient {
1098        fn describe_schedule(
1099            &mut self,
1100            _request: Request<DescribeScheduleRequest>,
1101        ) -> futures_util::future::BoxFuture<
1102            '_,
1103            Result<Response<DescribeScheduleResponse>, tonic::Status>,
1104        > {
1105            self.captured.describe.fetch_add(1, Ordering::SeqCst);
1106            let resp = self.describe_response.clone();
1107            let should_error = self.should_error;
1108            async move {
1109                if should_error {
1110                    Err(tonic::Status::not_found("schedule not found"))
1111                } else {
1112                    Ok(Response::new(resp))
1113                }
1114            }
1115            .boxed()
1116        }
1117
1118        fn update_schedule(
1119            &mut self,
1120            _request: Request<UpdateScheduleRequest>,
1121        ) -> futures_util::future::BoxFuture<
1122            '_,
1123            Result<Response<UpdateScheduleResponse>, tonic::Status>,
1124        > {
1125            self.captured.update.fetch_add(1, Ordering::SeqCst);
1126            let should_error = self.should_error;
1127            async move {
1128                if should_error {
1129                    Err(tonic::Status::internal("update failed"))
1130                } else {
1131                    Ok(Response::new(UpdateScheduleResponse::default()))
1132                }
1133            }
1134            .boxed()
1135        }
1136
1137        fn delete_schedule(
1138            &mut self,
1139            _request: Request<DeleteScheduleRequest>,
1140        ) -> futures_util::future::BoxFuture<
1141            '_,
1142            Result<Response<DeleteScheduleResponse>, tonic::Status>,
1143        > {
1144            self.captured.delete.fetch_add(1, Ordering::SeqCst);
1145            let should_error = self.should_error;
1146            async move {
1147                if should_error {
1148                    Err(tonic::Status::internal("delete failed"))
1149                } else {
1150                    Ok(Response::new(DeleteScheduleResponse::default()))
1151                }
1152            }
1153            .boxed()
1154        }
1155
1156        fn patch_schedule(
1157            &mut self,
1158            _request: Request<PatchScheduleRequest>,
1159        ) -> futures_util::future::BoxFuture<
1160            '_,
1161            Result<Response<PatchScheduleResponse>, tonic::Status>,
1162        > {
1163            self.captured.patch.fetch_add(1, Ordering::SeqCst);
1164            let should_error = self.should_error;
1165            async move {
1166                if should_error {
1167                    Err(tonic::Status::internal("patch failed"))
1168                } else {
1169                    Ok(Response::new(PatchScheduleResponse::default()))
1170                }
1171            }
1172            .boxed()
1173        }
1174    }
1175
1176    fn make_schedule_handle(client: MockScheduleClient) -> ScheduleHandle<MockScheduleClient> {
1177        ScheduleHandle::new(
1178            client,
1179            "test-namespace".to_string(),
1180            "test-schedule-id".to_string(),
1181        )
1182    }
1183
1184    #[test]
1185    fn schedule_handle_exposes_namespace_and_id() {
1186        let handle = make_schedule_handle(MockScheduleClient::default());
1187        assert_eq!(handle.namespace(), "test-namespace");
1188        assert_eq!(handle.schedule_id(), "test-schedule-id");
1189    }
1190
1191    #[tokio::test]
1192    async fn schedule_describe_returns_response_fields() {
1193        let conflict_token = b"token-123".to_vec();
1194
1195        let client = MockScheduleClient {
1196            describe_response: DescribeScheduleResponse {
1197                schedule: Some(Schedule::default()),
1198                info: Some(ScheduleInfo::default()),
1199                memo: Some(Memo {
1200                    fields: Default::default(),
1201                }),
1202                search_attributes: Some(SearchAttributes {
1203                    indexed_fields: Default::default(),
1204                }),
1205                conflict_token: conflict_token.clone(),
1206            },
1207            ..Default::default()
1208        };
1209
1210        let handle = make_schedule_handle(client.clone());
1211        let desc = handle.describe().await.unwrap();
1212
1213        assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1214        assert!(desc.raw().schedule.is_some());
1215        assert!(desc.raw().info.is_some());
1216        assert!(desc.raw().memo.is_some());
1217        assert!(desc.raw().search_attributes.is_some());
1218        assert_eq!(desc.conflict_token(), conflict_token);
1219    }
1220
1221    #[tokio::test]
1222    async fn schedule_update_describes_then_sends() {
1223        let client = MockScheduleClient::default();
1224        let handle = make_schedule_handle(client.clone());
1225
1226        handle
1227            .update(|u| {
1228                u.set_note("hi");
1229            })
1230            .await
1231            .unwrap();
1232
1233        assert_eq!(client.captured.describe.load(Ordering::SeqCst), 1);
1234        assert_eq!(client.captured.update.load(Ordering::SeqCst), 1);
1235    }
1236
1237    #[tokio::test]
1238    async fn schedule_multiple_updates_each_call_service() {
1239        let client = MockScheduleClient::default();
1240        let handle = make_schedule_handle(client.clone());
1241
1242        handle.update(|_| {}).await.unwrap();
1243        handle.update(|_| {}).await.unwrap();
1244
1245        assert_eq!(client.captured.update.load(Ordering::SeqCst), 2);
1246    }
1247
1248    #[tokio::test]
1249    async fn schedule_delete_calls_service() {
1250        let client = MockScheduleClient::default();
1251        let handle = make_schedule_handle(client.clone());
1252
1253        handle.delete().await.unwrap();
1254
1255        assert_eq!(client.captured.delete.load(Ordering::SeqCst), 1);
1256    }
1257
1258    #[tokio::test]
1259    async fn schedule_pause_calls_patch() {
1260        let client = MockScheduleClient::default();
1261        let handle = make_schedule_handle(client.clone());
1262
1263        handle.pause(Some("taking a break")).await.unwrap();
1264
1265        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1266    }
1267
1268    #[tokio::test]
1269    async fn schedule_pause_with_none_uses_default() {
1270        let client = MockScheduleClient::default();
1271        let handle = make_schedule_handle(client.clone());
1272
1273        handle.pause(None::<&str>).await.unwrap();
1274
1275        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1276    }
1277
1278    #[tokio::test]
1279    async fn schedule_unpause_calls_patch() {
1280        let client = MockScheduleClient::default();
1281        let handle = make_schedule_handle(client.clone());
1282
1283        handle.unpause(Some("resuming work")).await.unwrap();
1284
1285        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1286    }
1287
1288    #[tokio::test]
1289    async fn schedule_trigger_calls_patch() {
1290        let client = MockScheduleClient::default();
1291        let handle = make_schedule_handle(client.clone());
1292
1293        handle
1294            .trigger(ScheduleOverlapPolicy::Unspecified)
1295            .await
1296            .unwrap();
1297
1298        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1299    }
1300
1301    #[tokio::test]
1302    async fn schedule_backfill_calls_patch() {
1303        let client = MockScheduleClient::default();
1304        let handle = make_schedule_handle(client.clone());
1305
1306        let now = SystemTime::now();
1307        handle
1308            .backfill(vec![
1309                ScheduleBackfill::new(now, now)
1310                    .overlap_policy(ScheduleOverlapPolicy::Skip)
1311                    .build(),
1312                ScheduleBackfill::new(now, now)
1313                    .overlap_policy(ScheduleOverlapPolicy::BufferOne)
1314                    .build(),
1315            ])
1316            .await
1317            .unwrap();
1318
1319        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 1);
1320    }
1321
1322    #[tokio::test]
1323    async fn schedule_describe_propagates_rpc_errors() {
1324        let client = MockScheduleClient {
1325            should_error: true,
1326            ..Default::default()
1327        };
1328        let handle = make_schedule_handle(client);
1329
1330        let err = handle.describe().await.unwrap_err();
1331        assert!(
1332            matches!(err, ScheduleError::Rpc(_)),
1333            "expected Rpc variant, got: {err:?}"
1334        );
1335        assert!(err.to_string().contains("schedule not found"));
1336    }
1337
1338    #[tokio::test]
1339    async fn schedule_update_propagates_rpc_errors() {
1340        let client = MockScheduleClient {
1341            should_error: true,
1342            ..Default::default()
1343        };
1344        let handle = make_schedule_handle(client);
1345
1346        let err = handle.update(|_| {}).await.unwrap_err();
1347        assert!(matches!(err, ScheduleError::Rpc(_)));
1348    }
1349
1350    #[tokio::test]
1351    async fn schedule_delete_propagates_rpc_errors() {
1352        let client = MockScheduleClient {
1353            should_error: true,
1354            ..Default::default()
1355        };
1356        let handle = make_schedule_handle(client);
1357
1358        let err = handle.delete().await.unwrap_err();
1359        assert!(matches!(err, ScheduleError::Rpc(_)));
1360    }
1361
1362    #[tokio::test]
1363    async fn schedule_patch_operations_propagate_rpc_errors() {
1364        let client = MockScheduleClient {
1365            should_error: true,
1366            ..Default::default()
1367        };
1368        let handle = make_schedule_handle(client);
1369
1370        assert!(handle.pause(Some("")).await.is_err());
1371        assert!(handle.unpause(Some("")).await.is_err());
1372        assert!(handle.trigger(Default::default()).await.is_err());
1373        assert!(handle.backfill(vec![]).await.is_err());
1374    }
1375
1376    #[tokio::test]
1377    async fn schedule_all_patch_operations_call_service() {
1378        let client = MockScheduleClient::default();
1379        let handle = make_schedule_handle(client.clone());
1380
1381        handle.pause(Some("p")).await.unwrap();
1382        handle.unpause(Some("u")).await.unwrap();
1383        handle.trigger(Default::default()).await.unwrap();
1384        handle.backfill(vec![]).await.unwrap();
1385
1386        assert_eq!(client.captured.patch.load(Ordering::SeqCst), 4);
1387    }
1388
1389    #[tokio::test]
1390    async fn schedule_describe_accessors_with_populated_fields() {
1391        let client = MockScheduleClient {
1392            describe_response: DescribeScheduleResponse {
1393                schedule: Some(Schedule {
1394                    spec: Some(ScheduleSpec {
1395                        timezone_name: "US/Eastern".to_string(),
1396                        ..Default::default()
1397                    }),
1398                    state: Some(ScheduleState {
1399                        paused: true,
1400                        notes: "maintenance window".to_string(),
1401                        ..Default::default()
1402                    }),
1403                    ..Default::default()
1404                }),
1405                info: Some(ScheduleInfo {
1406                    action_count: 42,
1407                    missed_catchup_window: 3,
1408                    overlap_skipped: 5,
1409                    recent_actions: vec![ScheduleActionResult {
1410                        start_workflow_result: Some(ProtoWorkflowExecution {
1411                            workflow_id: "ra-wf".to_string(),
1412                            run_id: "ra-run".to_string(),
1413                        }),
1414                        ..Default::default()
1415                    }],
1416                    running_workflows: vec![ProtoWorkflowExecution {
1417                        workflow_id: "wf-1".to_string(),
1418                        run_id: "run-1".to_string(),
1419                    }],
1420                    create_time: Some(prost_types::Timestamp {
1421                        seconds: 1_700_000_000,
1422                        nanos: 0,
1423                    }),
1424                    update_time: Some(prost_types::Timestamp {
1425                        seconds: 1_700_001_000,
1426                        nanos: 0,
1427                    }),
1428                    future_action_times: vec![prost_types::Timestamp {
1429                        seconds: 1_700_002_000,
1430                        nanos: 0,
1431                    }],
1432                    ..Default::default()
1433                }),
1434                conflict_token: b"tok".to_vec(),
1435                ..Default::default()
1436            },
1437            ..Default::default()
1438        };
1439
1440        let handle = make_schedule_handle(client);
1441        let desc = handle.describe().await.unwrap();
1442
1443        assert!(desc.paused());
1444        assert_eq!(desc.note(), Some("maintenance window"));
1445        assert_eq!(desc.action_count(), 42);
1446        assert_eq!(desc.missed_catchup_window(), 3);
1447        assert_eq!(desc.overlap_skipped(), 5);
1448
1449        assert_eq!(desc.recent_actions().len(), 1);
1450        assert_eq!(
1451            desc.running_actions(),
1452            vec![ScheduleRunningAction {
1453                workflow_id: "wf-1".to_string(),
1454                run_id: "run-1".to_string(),
1455            }]
1456        );
1457
1458        assert_eq!(desc.future_action_times().len(), 1);
1459        assert!(desc.create_time().is_some());
1460        assert!(desc.update_time().is_some());
1461    }
1462
1463    #[tokio::test]
1464    async fn schedule_describe_defaults_when_nested_fields_are_none() {
1465        let client = MockScheduleClient::default();
1466
1467        let handle = make_schedule_handle(client);
1468        let desc = handle.describe().await.unwrap();
1469
1470        assert!(!desc.paused());
1471        assert_eq!(desc.note(), None);
1472        assert_eq!(desc.action_count(), 0);
1473        assert_eq!(desc.missed_catchup_window(), 0);
1474        assert_eq!(desc.overlap_skipped(), 0);
1475        assert!(desc.recent_actions().is_empty());
1476        assert!(desc.running_actions().is_empty());
1477        assert!(desc.future_action_times().is_empty());
1478        assert!(desc.create_time().is_none());
1479        assert!(desc.update_time().is_none());
1480        assert!(desc.conflict_token().is_empty());
1481    }
1482
1483    #[tokio::test]
1484    async fn schedule_note_returns_none_for_empty_string() {
1485        let client = MockScheduleClient {
1486            describe_response: DescribeScheduleResponse {
1487                schedule: Some(Schedule {
1488                    state: Some(ScheduleState {
1489                        notes: String::new(),
1490                        ..Default::default()
1491                    }),
1492                    ..Default::default()
1493                }),
1494                ..Default::default()
1495            },
1496            ..Default::default()
1497        };
1498
1499        let handle = make_schedule_handle(client);
1500        let desc = handle.describe().await.unwrap();
1501        assert_eq!(desc.note(), None);
1502    }
1503
1504    #[test]
1505    fn schedule_summary_note_returns_none_for_empty_string() {
1506        let entry = ScheduleListEntry {
1507            schedule_id: "s".to_string(),
1508            info: Some(ScheduleListInfo {
1509                notes: String::new(),
1510                ..Default::default()
1511            }),
1512            ..Default::default()
1513        };
1514        let summary = ScheduleSummary::from(entry);
1515        assert_eq!(summary.note(), None);
1516    }
1517
1518    #[test]
1519    fn schedule_summary_accessors() {
1520        let entry = ScheduleListEntry {
1521            schedule_id: "sched-1".to_string(),
1522            memo: Some(Memo {
1523                fields: Default::default(),
1524            }),
1525            search_attributes: Some(SearchAttributes {
1526                indexed_fields: Default::default(),
1527            }),
1528            info: Some(ScheduleListInfo {
1529                spec: Some(ScheduleSpec::default()),
1530                workflow_type: Some(WorkflowType {
1531                    name: "MyWorkflow".to_string(),
1532                }),
1533                notes: "some note".to_string(),
1534                paused: true,
1535                recent_actions: vec![ScheduleActionResult {
1536                    start_workflow_result: Some(ProtoWorkflowExecution {
1537                        workflow_id: "ra-wf".to_string(),
1538                        run_id: "ra-run".to_string(),
1539                    }),
1540                    ..Default::default()
1541                }],
1542                future_action_times: vec![prost_types::Timestamp {
1543                    seconds: 1_700_000_000,
1544                    nanos: 0,
1545                }],
1546            }),
1547        };
1548
1549        let summary = ScheduleSummary::from(entry);
1550        assert_eq!(summary.schedule_id(), "sched-1");
1551        assert!(summary.raw().memo.is_some());
1552        assert!(summary.raw().search_attributes.is_some());
1553        assert_eq!(summary.workflow_type(), Some("MyWorkflow"));
1554        assert_eq!(summary.note(), Some("some note"));
1555        assert!(summary.paused());
1556        assert_eq!(summary.recent_actions().len(), 1);
1557        assert_eq!(summary.future_action_times().len(), 1);
1558    }
1559
1560    #[test]
1561    fn schedule_summary_defaults_when_info_is_none() {
1562        let entry = ScheduleListEntry {
1563            schedule_id: "sched-2".to_string(),
1564            ..Default::default()
1565        };
1566
1567        let summary = ScheduleSummary::from(entry);
1568        assert_eq!(summary.schedule_id(), "sched-2");
1569        assert!(summary.raw().memo.is_none());
1570        assert!(summary.raw().search_attributes.is_none());
1571        assert_eq!(summary.workflow_type(), None);
1572        assert_eq!(summary.note(), None);
1573        assert!(!summary.paused());
1574        assert!(summary.recent_actions().is_empty());
1575        assert!(summary.future_action_times().is_empty());
1576    }
1577
1578    #[test]
1579    fn schedule_description_raw_round_trip() {
1580        let resp = DescribeScheduleResponse {
1581            conflict_token: b"ct".to_vec(),
1582            schedule: Some(Schedule::default()),
1583            ..Default::default()
1584        };
1585        let desc = ScheduleDescription::from(resp.clone());
1586        assert_eq!(desc.raw().conflict_token, b"ct");
1587        let recovered = desc.into_raw();
1588        assert_eq!(recovered.conflict_token, resp.conflict_token);
1589        assert!(recovered.schedule.is_some());
1590    }
1591
1592    #[test]
1593    fn schedule_summary_raw_round_trip() {
1594        let entry = ScheduleListEntry {
1595            schedule_id: "rt-1".to_string(),
1596            ..Default::default()
1597        };
1598        let summary = ScheduleSummary::from(entry.clone());
1599        assert_eq!(summary.raw().schedule_id, "rt-1");
1600        let recovered = summary.into_raw();
1601        assert_eq!(recovered.schedule_id, entry.schedule_id);
1602    }
1603
1604    #[test]
1605    fn schedule_into_update_preserves_schedule() {
1606        let resp = DescribeScheduleResponse {
1607            schedule: Some(Schedule {
1608                state: Some(ScheduleState {
1609                    notes: "my notes".to_string(),
1610                    ..Default::default()
1611                }),
1612                ..Default::default()
1613            }),
1614            ..Default::default()
1615        };
1616        let desc = ScheduleDescription::from(resp);
1617        let update = desc.into_update();
1618
1619        assert_eq!(update.raw().state.as_ref().unwrap().notes, "my notes");
1620    }
1621
1622    #[test]
1623    fn schedule_update_setters_are_chainable() {
1624        let resp = DescribeScheduleResponse {
1625            schedule: Some(Schedule::default()),
1626            ..Default::default()
1627        };
1628        let desc = ScheduleDescription::from(resp);
1629        let mut update = desc.into_update();
1630        update.set_note("chained").set_paused(true);
1631        assert_eq!(update.raw().state.as_ref().unwrap().notes, "chained");
1632        assert!(update.raw().state.as_ref().unwrap().paused);
1633    }
1634
1635    #[test]
1636    fn schedule_recent_action_from_proto_with_timestamps() {
1637        let ts = prost_types::Timestamp {
1638            seconds: 1_700_000_000,
1639            nanos: 0,
1640        };
1641        let proto = ScheduleActionResult {
1642            schedule_time: Some(ts),
1643            actual_time: Some(ts),
1644            start_workflow_result: Some(ProtoWorkflowExecution {
1645                workflow_id: "wf-abc".to_string(),
1646                run_id: "run-xyz".to_string(),
1647            }),
1648            ..Default::default()
1649        };
1650
1651        let action = ScheduleRecentAction::from(&proto);
1652
1653        assert!(action.schedule_time.is_some());
1654        assert!(action.actual_time.is_some());
1655        assert_eq!(action.workflow_id, "wf-abc");
1656        assert_eq!(action.run_id, "run-xyz");
1657    }
1658
1659    #[test]
1660    #[should_panic(expected = "unsupported schedule action")]
1661    fn schedule_recent_action_panics_without_workflow_result() {
1662        let _ = ScheduleRecentAction::from(&ScheduleActionResult::default());
1663    }
1664
1665    #[test]
1666    fn schedule_overlap_policy_default_is_unspecified() {
1667        assert_eq!(
1668            ScheduleOverlapPolicy::default(),
1669            ScheduleOverlapPolicy::Unspecified
1670        );
1671    }
1672}