ora_client/
schedule_handle.rs

1//! A handle to a schedule.
2
3use std::sync::Arc;
4
5use futures::Stream;
6use ora_proto::server::v1::admin_service_client::AdminServiceClient;
7use parking_lot::Mutex;
8use tonic::transport::Channel;
9use uuid::Uuid;
10
11use crate::{
12    admin::AdminClientError,
13    job_handle::JobHandle,
14    job_query::{JobFilter, JobOrder},
15    schedule_definition::ScheduleDetails,
16    schedule_query::ScheduleFilter,
17    AdminClient,
18};
19#[allow(clippy::wildcard_imports)]
20use tonic::codegen::*;
21
22/// A handle to a schedule.
23#[derive(Debug, Clone)]
24pub struct ScheduleHandle<C = Channel> {
25    /// The ID of the schedule.
26    id: Uuid,
27    /// The client used to interact with the schedule.
28    client: AdminServiceClient<C>,
29    details: Arc<Mutex<Option<Arc<ScheduleDetails>>>>,
30}
31
32impl<C> ScheduleHandle<C>
33where
34    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
35    <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
36    C::Error: Into<StdError>,
37    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
38    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
39{
40    /// Create a new schedule handle.
41    pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
42        Self {
43            id,
44            client,
45            details: Arc::new(Mutex::new(None)),
46        }
47    }
48
49    /// Get the ID of the schedule.
50    pub fn id(&self) -> Uuid {
51        self.id
52    }
53
54    /// Get the details of the schedule.
55    pub async fn details(&self) -> eyre::Result<Arc<ScheduleDetails>> {
56        if let Some(details) = self.details.lock().as_ref() {
57            if !details.active {
58                return Ok(details.clone());
59            }
60        }
61
62        let schedule = self
63            .client
64            .clone()
65            .list_schedules(tonic::Request::new(
66                ora_proto::server::v1::ListSchedulesRequest {
67                    cursor: None,
68                    limit: 1,
69                    order: None,
70                    filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
71                },
72            ))
73            .await?
74            .into_inner()
75            .schedules
76            .into_iter()
77            .next()
78            .ok_or_else(|| eyre::eyre!("schedule details not found"))?;
79
80        let details = Arc::new(ScheduleDetails::try_from(schedule)?);
81        self.set_details(details.clone());
82
83        Ok(details)
84    }
85
86    /// Get the cached details of the schedule, if available.
87    pub fn cached_details(&self) -> Option<Arc<ScheduleDetails>> {
88        self.details.lock().as_ref().cloned()
89    }
90
91    /// Get the jobs that were created by this schedule with the given filter and order.
92    pub fn jobs(
93        &self,
94        mut filter: JobFilter,
95        order: JobOrder,
96    ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
97    {
98        filter.schedule_ids = [self.id].into_iter().collect();
99        AdminClient::new(self.client.clone()).jobs(filter, order)
100    }
101
102    /// Return the amount of jobs that were created by this schedule.
103    pub async fn job_count(&self) -> Result<u64, AdminClientError> {
104        let count = self
105            .client
106            .clone()
107            .count_jobs(tonic::Request::new(
108                ora_proto::server::v1::CountJobsRequest {
109                    filter: Some(JobFilter::new().with_schedule_id(self.id).into()),
110                },
111            ))
112            .await?
113            .into_inner()
114            .count;
115
116        Ok(count)
117    }
118
119    /// Get the active job of the schedule.
120    pub async fn active_job(&self) -> Result<Option<JobHandle<(), C>>, AdminClientError> {
121        let job = self
122            .client
123            .clone()
124            .list_jobs(tonic::Request::new(
125                ora_proto::server::v1::ListJobsRequest {
126                    cursor: None,
127                    limit: 1,
128                    order: None,
129                    filter: Some(
130                        JobFilter::new()
131                            .with_schedule_id(self.id)
132                            .active_only()
133                            .into(),
134                    ),
135                },
136            ))
137            .await?
138            .into_inner()
139            .jobs
140            .into_iter()
141            .next();
142
143        Ok(job
144            .map(|job| {
145                let h = JobHandle::new(job.id.parse()?, self.client.clone());
146                h.set_details(Arc::new(job.try_into()?));
147                Result::<_, eyre::Report>::Ok(h)
148            })
149            .transpose()?)
150    }
151
152    /// Cancel the schedule, optionally cancelling all jobs created by the schedule.
153    pub async fn cancel(&self, cancel_jobs: bool) -> Result<(), AdminClientError> {
154        self.client
155            .clone()
156            .cancel_schedules(tonic::Request::new(
157                ora_proto::server::v1::CancelSchedulesRequest {
158                    filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
159                    cancel_jobs,
160                },
161            ))
162            .await?;
163
164        Ok(())
165    }
166
167    pub(crate) fn set_details(&self, details: Arc<ScheduleDetails>) {
168        *self.details.lock() = Some(details);
169    }
170}