1use std::sync::Arc;
4
5use futures::{Stream, StreamExt};
6use ora_proto::server::v1::{
7 admin_service_client::AdminServiceClient, AddJobsRequest, CountJobsRequest, JobQueryOrder,
8 ListJobsRequest, ListSchedulesRequest, ScheduleQueryOrder,
9};
10use thiserror::Error;
11use tonic::{transport::Channel, Request};
12use uuid::Uuid;
13
14use crate::{
15 job_definition::JobDetails,
16 job_handle::JobHandle,
17 job_query::{JobFilter, JobOrder},
18 job_type::TypedJobDefinition,
19 schedule_definition::{ScheduleDefinition, ScheduleDetails},
20 schedule_handle::ScheduleHandle,
21 schedule_query::{ScheduleFilter, ScheduleOrder},
22 JobType,
23};
24
25#[allow(clippy::wildcard_imports)]
26use tonic::codegen::*;
27
28#[derive(Debug, Clone)]
30#[must_use]
31pub struct AdminClient<C = Channel> {
32 client: AdminServiceClient<C>,
33}
34
35impl<C> AdminClient<C>
36where
37 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
38 <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
39 C::Error: Into<StdError>,
40 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
41 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
42{
43 pub fn new(client: AdminServiceClient<C>) -> Self {
45 Self { client }
46 }
47
48 pub async fn add_job<J>(
50 &self,
51 definition: TypedJobDefinition<J>,
52 ) -> Result<JobHandle<J, C>, AdminClientError> {
53 let res = self
54 .client
55 .clone()
56 .add_jobs(Request::new(AddJobsRequest {
57 jobs: vec![definition.into_inner().into()],
58 }))
59 .await?
60 .into_inner();
61
62 let job_id: Uuid = res
63 .job_ids
64 .into_iter()
65 .next()
66 .ok_or(AdminClientError::NotEnoughJobIds {
67 expected: 1,
68 actual: 0,
69 })?
70 .parse()?;
71
72 Ok(JobHandle::new(job_id, self.client.clone()))
73 }
74
75 pub async fn add_jobs<J>(
77 &self,
78 definitions: impl IntoIterator<Item = TypedJobDefinition<J>>,
79 ) -> Result<Vec<JobHandle<J, C>>, AdminClientError> {
80 let definitions = definitions
81 .into_iter()
82 .map(|d| d.into_inner().into())
83 .collect();
84
85 let res = self
86 .client
87 .clone()
88 .add_jobs(Request::new(AddJobsRequest { jobs: definitions }))
89 .await?
90 .into_inner();
91
92 let job_ids = res
93 .job_ids
94 .into_iter()
95 .map(|id| id.parse())
96 .collect::<Result<Vec<Uuid>, _>>()?;
97
98 Ok(job_ids
99 .into_iter()
100 .map(|id| JobHandle::new(id, self.client.clone()))
101 .collect())
102 }
103
104 pub fn job(&self, job_id: Uuid) -> JobHandle<(), C> {
108 JobHandle::new(job_id, self.client.clone())
109 }
110
111 pub fn jobs_of_type<J>(
116 &self,
117 mut filter: JobFilter,
118 order_by: JobOrder,
119 ) -> impl Stream<Item = Result<JobHandle<J, C>, AdminClientError>> + Send + Unpin + 'static
120 where
121 J: JobType,
122 {
123 filter.job_type_ids = [J::id().into()].into_iter().collect();
124
125 let client = self.client.clone();
126
127 async_stream::try_stream!({
128 let mut cursor: Option<String> = None;
129
130 loop {
131 let res = client
132 .clone()
133 .list_jobs(Request::new(ListJobsRequest {
134 cursor: cursor.take(),
135 limit: 100,
136 filter: Some(filter.clone().into()),
137 order: Some(JobQueryOrder::from(order_by) as i32),
138 }))
139 .await?
140 .into_inner();
141
142 for job in res.jobs {
143 let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
144 let h = JobHandle::new(job_id, client.clone());
145 h.set_details(Arc::new(JobDetails::try_from(job)?));
146 yield h;
147 }
148
149 cursor = res.cursor;
150
151 if !res.has_more {
152 break;
153 }
154 }
155 })
156 .boxed()
157 }
158
159 pub fn jobs(
163 &self,
164 filter: JobFilter,
165 order_by: JobOrder,
166 ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
167 {
168 let client = self.client.clone();
169
170 async_stream::try_stream!({
171 let mut cursor: Option<String> = None;
172
173 loop {
174 let res = client
175 .clone()
176 .list_jobs(Request::new(ListJobsRequest {
177 cursor: cursor.take(),
178 limit: 100,
179 filter: Some(filter.clone().into()),
180 order: Some(JobQueryOrder::from(order_by) as i32),
181 }))
182 .await?
183 .into_inner();
184
185 for job in res.jobs {
186 let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
187 let h = JobHandle::new(job_id, client.clone());
188 h.set_details(Arc::new(JobDetails::try_from(job)?));
189 yield h;
190 }
191
192 cursor = res.cursor;
193
194 if !res.has_more {
195 break;
196 }
197 }
198 })
199 .boxed()
200 }
201
202 pub async fn job_count(&self, filter: JobFilter) -> Result<u64, AdminClientError> {
204 let res = self
205 .client
206 .clone()
207 .count_jobs(Request::new(CountJobsRequest {
208 filter: Some(filter.into()),
209 }))
210 .await?;
211
212 Ok(res.into_inner().count)
213 }
214
215 pub async fn job_count_of_type<J>(&self, mut filter: JobFilter) -> Result<u64, AdminClientError>
217 where
218 J: JobType,
219 {
220 filter.job_type_ids = [J::id().into()].into_iter().collect();
221
222 let res = self
223 .client
224 .clone()
225 .count_jobs(Request::new(CountJobsRequest {
226 filter: Some(filter.into()),
227 }))
228 .await?;
229
230 Ok(res.into_inner().count)
231 }
232
233 pub async fn cancel_jobs(
237 &self,
238 filter: JobFilter,
239 ) -> Result<Vec<JobHandle<(), C>>, AdminClientError> {
240 let res = self
241 .client
242 .clone()
243 .cancel_jobs(Request::new(ora_proto::server::v1::CancelJobsRequest {
244 filter: Some(filter.into()),
245 }))
246 .await?
247 .into_inner();
248
249 let job_ids = res
250 .job_ids
251 .into_iter()
252 .map(|id| id.parse())
253 .collect::<Result<Vec<Uuid>, _>>()?;
254
255 Ok(job_ids
256 .into_iter()
257 .map(|id| JobHandle::new(id, self.client.clone()))
258 .collect())
259 }
260
261 pub async fn delete_inactive_jobs(
264 &self,
265 filter: JobFilter,
266 ) -> Result<Vec<Uuid>, AdminClientError> {
267 let res = self
268 .client
269 .clone()
270 .delete_inactive_jobs(Request::new(
271 ora_proto::server::v1::DeleteInactiveJobsRequest {
272 filter: Some(filter.into()),
273 },
274 ))
275 .await?
276 .into_inner();
277
278 let job_ids = res
279 .job_ids
280 .into_iter()
281 .map(|id| id.parse())
282 .collect::<Result<Vec<Uuid>, _>>()?;
283
284 Ok(job_ids)
285 }
286
287 pub async fn add_schedule(
289 &self,
290 mut schedule: ScheduleDefinition,
291 ) -> Result<ScheduleHandle<C>, AdminClientError> {
292 if schedule.propagate_labels_to_jobs {
293 match &mut schedule.job_creation_policy {
294 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
295 job_definition,
296 ) => {
297 for (key, value) in &schedule.labels {
298 if job_definition.labels.contains_key(key) {
299 continue;
300 }
301
302 job_definition.labels.insert(key.clone(), value.clone());
303 }
304 }
305 }
306 }
307
308 let res = self
309 .client
310 .clone()
311 .create_schedules(Request::new(
312 ora_proto::server::v1::CreateSchedulesRequest {
313 schedules: vec![schedule.into()],
314 },
315 ))
316 .await?
317 .into_inner();
318
319 let schedule_id: Uuid = res
320 .schedule_ids
321 .into_iter()
322 .next()
323 .ok_or(AdminClientError::NotEnoughScheduleIds {
324 expected: 1,
325 actual: 0,
326 })?
327 .parse()?;
328
329 Ok(ScheduleHandle::new(schedule_id, self.client.clone()))
330 }
331
332 pub async fn add_schedules(
336 &self,
337 schedules: impl IntoIterator<Item = ScheduleDefinition>,
338 ) -> Result<Vec<ScheduleHandle<C>>, AdminClientError> {
339 let schedules = schedules
340 .into_iter()
341 .map(|mut schedule| {
342 if schedule.propagate_labels_to_jobs {
343 match &mut schedule.job_creation_policy {
344 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
345 job_definition,
346 ) => {
347 for (key, value) in &schedule.labels {
348 if job_definition.labels.contains_key(key) {
349 continue;
350 }
351
352 job_definition.labels.insert(key.clone(), value.clone());
353 }
354 }
355 }
356 }
357
358 schedule
359 })
360 .map(Into::into)
361 .collect::<Vec<_>>();
362
363 let res = self
364 .client
365 .clone()
366 .create_schedules(Request::new(
367 ora_proto::server::v1::CreateSchedulesRequest { schedules },
368 ))
369 .await?
370 .into_inner();
371
372 let schedule_ids = res
373 .schedule_ids
374 .into_iter()
375 .map(|id| id.parse())
376 .collect::<Result<Vec<Uuid>, _>>()?;
377
378 Ok(schedule_ids
379 .into_iter()
380 .map(|id| ScheduleHandle::new(id, self.client.clone()))
381 .collect())
382 }
383
384 pub fn schedule(&self, schedule_id: Uuid) -> ScheduleHandle<C> {
388 ScheduleHandle::new(schedule_id, self.client.clone())
389 }
390
391 pub fn schedules(
395 &self,
396 filter: ScheduleFilter,
397 order: ScheduleOrder,
398 ) -> impl Stream<Item = Result<ScheduleHandle<C>, AdminClientError>> + Send + Unpin + 'static
399 {
400 let client = self.client.clone();
401
402 async_stream::try_stream!({
403 let mut cursor: Option<String> = None;
404
405 loop {
406 let res = client
407 .clone()
408 .list_schedules(Request::new(ListSchedulesRequest {
409 cursor: cursor.take(),
410 limit: 100,
411 filter: Some(filter.clone().into()),
412 order: Some(ScheduleQueryOrder::from(order) as i32),
413 }))
414 .await?
415 .into_inner();
416
417 for schedule in res.schedules {
418 let schedule_id = schedule.id.parse().map_err(AdminClientError::InvalidId)?;
419 let h = ScheduleHandle::new(schedule_id, client.clone());
420 h.set_details(Arc::new(ScheduleDetails::try_from(schedule)?));
421 yield h;
422 }
423
424 cursor = res.cursor;
425
426 if !res.has_more {
427 break;
428 }
429 }
430 })
431 .boxed()
432 }
433
434 pub async fn schedule_count(&self, filter: ScheduleFilter) -> Result<u64, AdminClientError> {
436 let res = self
437 .client
438 .clone()
439 .count_schedules(Request::new(ora_proto::server::v1::CountSchedulesRequest {
440 filter: Some(filter.into()),
441 }))
442 .await?;
443
444 Ok(res.into_inner().count)
445 }
446
447 pub async fn cancel_schedules(
450 &self,
451 filter: ScheduleFilter,
452 cancel_jobs: bool,
453 ) -> Result<ScheduleCancellationResult<C>, AdminClientError> {
454 let res = self
455 .client
456 .clone()
457 .cancel_schedules(Request::new(
458 ora_proto::server::v1::CancelSchedulesRequest {
459 filter: Some(filter.into()),
460 cancel_jobs,
461 },
462 ))
463 .await?
464 .into_inner();
465
466 let schedule_ids = res
467 .schedule_ids
468 .into_iter()
469 .map(|id| id.parse())
470 .collect::<Result<Vec<Uuid>, _>>()?;
471
472 let job_ids = res
473 .job_ids
474 .into_iter()
475 .map(|id| id.parse())
476 .collect::<Result<Vec<Uuid>, _>>()?;
477
478 Ok(ScheduleCancellationResult {
479 schedules: schedule_ids
480 .into_iter()
481 .map(|id| ScheduleHandle::new(id, self.client.clone()))
482 .collect(),
483 jobs: job_ids
484 .into_iter()
485 .map(|id| JobHandle::new(id, self.client.clone()))
486 .collect(),
487 })
488 }
489}
490
491pub struct ScheduleCancellationResult<C> {
493 pub schedules: Vec<ScheduleHandle<C>>,
495 pub jobs: Vec<JobHandle<(), C>>,
497}
498
499impl From<AdminServiceClient<Channel>> for AdminClient {
500 fn from(client: AdminServiceClient<Channel>) -> Self {
501 Self { client }
502 }
503}
504
505#[allow(missing_docs)]
507#[derive(Debug, Error)]
508pub enum AdminClientError {
509 #[error("gRPC error: {0}")]
510 Grpc(#[from] tonic::Status),
511 #[error("not enough job IDs were returned by the server (expected {expected}, got {actual})")]
512 NotEnoughJobIds { expected: usize, actual: usize },
513 #[error("invalid ID: {0}")]
514 InvalidId(#[from] uuid::Error),
515 #[error(
516 "not enough schedule IDs were returned by the server (expected {expected}, got {actual})"
517 )]
518 NotEnoughScheduleIds { expected: usize, actual: usize },
519 #[error("{0}")]
520 Other(#[from] eyre::Error),
521}