ora_client/
schedule_handle.rs1use std::sync::Arc;
4
5use futures::Stream;
6use ora_proto::server::v1::admin_service_client::AdminServiceClient;
7use parking_lot::Mutex;
8use tonic::transport::Channel;
9use uuid::Uuid;
10
11use crate::{
12 admin::AdminClientError,
13 job_handle::JobHandle,
14 job_query::{JobFilter, JobOrder},
15 schedule_definition::ScheduleDetails,
16 schedule_query::ScheduleFilter,
17 AdminClient,
18};
19#[allow(clippy::wildcard_imports)]
20use tonic::codegen::*;
21
22#[derive(Debug, Clone)]
24pub struct ScheduleHandle<C = Channel> {
25 id: Uuid,
27 client: AdminServiceClient<C>,
29 details: Arc<Mutex<Option<Arc<ScheduleDetails>>>>,
30}
31
32impl<C> ScheduleHandle<C>
33where
34 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
35 <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
36 C::Error: Into<StdError>,
37 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
38 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
39{
40 pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
42 Self {
43 id,
44 client,
45 details: Arc::new(Mutex::new(None)),
46 }
47 }
48
49 pub fn id(&self) -> Uuid {
51 self.id
52 }
53
54 pub async fn details(&self) -> eyre::Result<Arc<ScheduleDetails>> {
56 if let Some(details) = self.details.lock().as_ref() {
57 if !details.active {
58 return Ok(details.clone());
59 }
60 }
61
62 let schedule = self
63 .client
64 .clone()
65 .list_schedules(tonic::Request::new(
66 ora_proto::server::v1::ListSchedulesRequest {
67 cursor: None,
68 limit: 1,
69 order: None,
70 filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
71 },
72 ))
73 .await?
74 .into_inner()
75 .schedules
76 .into_iter()
77 .next()
78 .ok_or_else(|| eyre::eyre!("schedule details not found"))?;
79
80 let details = Arc::new(ScheduleDetails::try_from(schedule)?);
81 self.set_details(details.clone());
82
83 Ok(details)
84 }
85
86 pub fn cached_details(&self) -> Option<Arc<ScheduleDetails>> {
88 self.details.lock().as_ref().cloned()
89 }
90
91 pub fn jobs(
93 &self,
94 mut filter: JobFilter,
95 order: JobOrder,
96 ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
97 {
98 filter.schedule_ids = [self.id].into_iter().collect();
99 AdminClient::new(self.client.clone()).jobs(filter, order)
100 }
101
102 pub async fn job_count(&self) -> Result<u64, AdminClientError> {
104 let count = self
105 .client
106 .clone()
107 .count_jobs(tonic::Request::new(
108 ora_proto::server::v1::CountJobsRequest {
109 filter: Some(JobFilter::new().with_schedule_id(self.id).into()),
110 },
111 ))
112 .await?
113 .into_inner()
114 .count;
115
116 Ok(count)
117 }
118
119 pub async fn active_job(&self) -> Result<Option<JobHandle<(), C>>, AdminClientError> {
121 let job = self
122 .client
123 .clone()
124 .list_jobs(tonic::Request::new(
125 ora_proto::server::v1::ListJobsRequest {
126 cursor: None,
127 limit: 1,
128 order: None,
129 filter: Some(
130 JobFilter::new()
131 .with_schedule_id(self.id)
132 .active_only()
133 .into(),
134 ),
135 },
136 ))
137 .await?
138 .into_inner()
139 .jobs
140 .into_iter()
141 .next();
142
143 Ok(job
144 .map(|job| {
145 let h = JobHandle::new(job.id.parse()?, self.client.clone());
146 h.set_details(Arc::new(job.try_into()?));
147 Result::<_, eyre::Report>::Ok(h)
148 })
149 .transpose()?)
150 }
151
152 pub async fn cancel(&self, cancel_jobs: bool) -> Result<(), AdminClientError> {
154 self.client
155 .clone()
156 .cancel_schedules(tonic::Request::new(
157 ora_proto::server::v1::CancelSchedulesRequest {
158 filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
159 cancel_jobs,
160 },
161 ))
162 .await?;
163
164 Ok(())
165 }
166
167 pub(crate) fn set_details(&self, details: Arc<ScheduleDetails>) {
168 *self.details.lock() = Some(details);
169 }
170}