ora_client/
admin.rs

1//! Wrappers over client connections that provide higher-level abstractions.
2
3use std::sync::Arc;
4
5use futures::{Stream, StreamExt};
6use ora_proto::server::v1::{
7    admin_service_client::AdminServiceClient, AddJobsRequest, CountJobsRequest, JobQueryOrder,
8    ListJobsRequest, ListSchedulesRequest, ScheduleQueryOrder,
9};
10use thiserror::Error;
11use tonic::{transport::Channel, Request};
12use uuid::Uuid;
13
14use crate::{
15    job_definition::JobDetails,
16    job_handle::JobHandle,
17    job_query::{JobFilter, JobOrder},
18    job_type::TypedJobDefinition,
19    schedule_definition::{ScheduleDefinition, ScheduleDetails},
20    schedule_handle::ScheduleHandle,
21    schedule_query::{ScheduleFilter, ScheduleOrder},
22    JobType,
23};
24
25#[allow(clippy::wildcard_imports)]
26use tonic::codegen::*;
27
28/// A high-level client for interacting with the Ora server admin endpoints.
29#[derive(Debug, Clone)]
30#[must_use]
31pub struct AdminClient<C = Channel> {
32    client: AdminServiceClient<C>,
33}
34
35impl<C> AdminClient<C>
36where
37    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
38    <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
39    C::Error: Into<StdError>,
40    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
41    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
42{
43    /// Create a new admin client from a gRPC client.
44    pub fn new(client: AdminServiceClient<C>) -> Self {
45        Self { client }
46    }
47
48    /// Add a new job to be executed.
49    pub async fn add_job<J>(
50        &self,
51        definition: TypedJobDefinition<J>,
52    ) -> Result<JobHandle<J, C>, AdminClientError> {
53        let res = self
54            .client
55            .clone()
56            .add_jobs(Request::new(AddJobsRequest {
57                jobs: vec![definition.into_inner().into()],
58            }))
59            .await?
60            .into_inner();
61
62        let job_id: Uuid = res
63            .job_ids
64            .into_iter()
65            .next()
66            .ok_or(AdminClientError::NotEnoughJobIds {
67                expected: 1,
68                actual: 0,
69            })?
70            .parse()?;
71
72        Ok(JobHandle::new(job_id, self.client.clone()))
73    }
74
75    /// Add multiple new jobs to be executed.
76    pub async fn add_jobs<J>(
77        &self,
78        definitions: impl IntoIterator<Item = TypedJobDefinition<J>>,
79    ) -> Result<Vec<JobHandle<J, C>>, AdminClientError> {
80        let definitions = definitions
81            .into_iter()
82            .map(|d| d.into_inner().into())
83            .collect();
84
85        let res = self
86            .client
87            .clone()
88            .add_jobs(Request::new(AddJobsRequest { jobs: definitions }))
89            .await?
90            .into_inner();
91
92        let job_ids = res
93            .job_ids
94            .into_iter()
95            .map(|id| id.parse())
96            .collect::<Result<Vec<Uuid>, _>>()?;
97
98        Ok(job_ids
99            .into_iter()
100            .map(|id| JobHandle::new(id, self.client.clone()))
101            .collect())
102    }
103
104    /// Create a job handle for a specific job.
105    ///
106    /// Note that this method does not check if the job exists.
107    pub fn job(&self, job_id: Uuid) -> JobHandle<(), C> {
108        JobHandle::new(job_id, self.client.clone())
109    }
110
111    /// Retrieve a list of jobs of a specific type
112    /// with a specific filter and order.
113    ///
114    /// The returned job handles will have their details pre-fetched.
115    pub fn jobs_of_type<J>(
116        &self,
117        mut filter: JobFilter,
118        order_by: JobOrder,
119    ) -> impl Stream<Item = Result<JobHandle<J, C>, AdminClientError>> + Send + Unpin + 'static
120    where
121        J: JobType,
122    {
123        filter.job_type_ids = [J::id().into()].into_iter().collect();
124
125        let client = self.client.clone();
126
127        async_stream::try_stream!({
128            let mut cursor: Option<String> = None;
129
130            loop {
131                let res = client
132                    .clone()
133                    .list_jobs(Request::new(ListJobsRequest {
134                        cursor: cursor.take(),
135                        limit: 100,
136                        filter: Some(filter.clone().into()),
137                        order: Some(JobQueryOrder::from(order_by) as i32),
138                    }))
139                    .await?
140                    .into_inner();
141
142                for job in res.jobs {
143                    let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
144                    let h = JobHandle::new(job_id, client.clone());
145                    h.set_details(Arc::new(JobDetails::try_from(job)?));
146                    yield h;
147                }
148
149                cursor = res.cursor;
150
151                if !res.has_more {
152                    break;
153                }
154            }
155        })
156        .boxed()
157    }
158
159    /// Retrieve a list of jobs with a specific filter and order.
160    ///
161    /// The returned job handles will have their details pre-fetched.
162    pub fn jobs(
163        &self,
164        filter: JobFilter,
165        order_by: JobOrder,
166    ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
167    {
168        let client = self.client.clone();
169
170        async_stream::try_stream!({
171            let mut cursor: Option<String> = None;
172
173            loop {
174                let res = client
175                    .clone()
176                    .list_jobs(Request::new(ListJobsRequest {
177                        cursor: cursor.take(),
178                        limit: 100,
179                        filter: Some(filter.clone().into()),
180                        order: Some(JobQueryOrder::from(order_by) as i32),
181                    }))
182                    .await?
183                    .into_inner();
184
185                for job in res.jobs {
186                    let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
187                    let h = JobHandle::new(job_id, client.clone());
188                    h.set_details(Arc::new(JobDetails::try_from(job)?));
189                    yield h;
190                }
191
192                cursor = res.cursor;
193
194                if !res.has_more {
195                    break;
196                }
197            }
198        })
199        .boxed()
200    }
201
202    /// Count the number of jobs with a specific filter.
203    pub async fn job_count(&self, filter: JobFilter) -> Result<u64, AdminClientError> {
204        let res = self
205            .client
206            .clone()
207            .count_jobs(Request::new(CountJobsRequest {
208                filter: Some(filter.into()),
209            }))
210            .await?;
211
212        Ok(res.into_inner().count)
213    }
214
215    /// Count the number of jobs of a specific type with a specific filter.
216    pub async fn job_count_of_type<J>(&self, mut filter: JobFilter) -> Result<u64, AdminClientError>
217    where
218        J: JobType,
219    {
220        filter.job_type_ids = [J::id().into()].into_iter().collect();
221
222        let res = self
223            .client
224            .clone()
225            .count_jobs(Request::new(CountJobsRequest {
226                filter: Some(filter.into()),
227            }))
228            .await?;
229
230        Ok(res.into_inner().count)
231    }
232
233    /// Cancel jobs with a specific filter.
234    ///
235    /// Returns handles to jobs that were successfully cancelled.
236    pub async fn cancel_jobs(
237        &self,
238        filter: JobFilter,
239    ) -> Result<Vec<JobHandle<(), C>>, AdminClientError> {
240        let res = self
241            .client
242            .clone()
243            .cancel_jobs(Request::new(ora_proto::server::v1::CancelJobsRequest {
244                filter: Some(filter.into()),
245            }))
246            .await?
247            .into_inner();
248
249        let job_ids = res
250            .job_ids
251            .into_iter()
252            .map(|id| id.parse())
253            .collect::<Result<Vec<Uuid>, _>>()?;
254
255        Ok(job_ids
256            .into_iter()
257            .map(|id| JobHandle::new(id, self.client.clone()))
258            .collect())
259    }
260
261    /// Delete inactive jobs and associated with a specific filter,
262    /// returning the deleted job IDs.
263    pub async fn delete_inactive_jobs(
264        &self,
265        filter: JobFilter,
266    ) -> Result<Vec<Uuid>, AdminClientError> {
267        let res = self
268            .client
269            .clone()
270            .delete_inactive_jobs(Request::new(
271                ora_proto::server::v1::DeleteInactiveJobsRequest {
272                    filter: Some(filter.into()),
273                },
274            ))
275            .await?
276            .into_inner();
277
278        let job_ids = res
279            .job_ids
280            .into_iter()
281            .map(|id| id.parse())
282            .collect::<Result<Vec<Uuid>, _>>()?;
283
284        Ok(job_ids)
285    }
286
287    /// Add a new schedule.
288    pub async fn add_schedule(
289        &self,
290        mut schedule: ScheduleDefinition,
291    ) -> Result<ScheduleHandle<C>, AdminClientError> {
292        if schedule.propagate_labels_to_jobs {
293            match &mut schedule.job_creation_policy {
294                crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
295                    job_definition,
296                ) => {
297                    for (key, value) in &schedule.labels {
298                        if job_definition.labels.contains_key(key) {
299                            continue;
300                        }
301
302                        job_definition.labels.insert(key.clone(), value.clone());
303                    }
304                }
305            }
306        }
307
308        let res = self
309            .client
310            .clone()
311            .create_schedules(Request::new(
312                ora_proto::server::v1::CreateSchedulesRequest {
313                    schedules: vec![schedule.into()],
314                },
315            ))
316            .await?
317            .into_inner();
318
319        let schedule_id: Uuid = res
320            .schedule_ids
321            .into_iter()
322            .next()
323            .ok_or(AdminClientError::NotEnoughScheduleIds {
324                expected: 1,
325                actual: 0,
326            })?
327            .parse()?;
328
329        Ok(ScheduleHandle::new(schedule_id, self.client.clone()))
330    }
331
332    /// Add multiple new schedules.
333    ///
334    /// Returns handles to the created schedules.
335    pub async fn add_schedules(
336        &self,
337        schedules: impl IntoIterator<Item = ScheduleDefinition>,
338    ) -> Result<Vec<ScheduleHandle<C>>, AdminClientError> {
339        let schedules = schedules
340            .into_iter()
341            .map(|mut schedule| {
342                if schedule.propagate_labels_to_jobs {
343                    match &mut schedule.job_creation_policy {
344                        crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
345                            job_definition,
346                        ) => {
347                            for (key, value) in &schedule.labels {
348                                if job_definition.labels.contains_key(key) {
349                                    continue;
350                                }
351
352                                job_definition.labels.insert(key.clone(), value.clone());
353                            }
354                        }
355                    }
356                }
357
358                schedule
359            })
360            .map(Into::into)
361            .collect::<Vec<_>>();
362
363        let res = self
364            .client
365            .clone()
366            .create_schedules(Request::new(
367                ora_proto::server::v1::CreateSchedulesRequest { schedules },
368            ))
369            .await?
370            .into_inner();
371
372        let schedule_ids = res
373            .schedule_ids
374            .into_iter()
375            .map(|id| id.parse())
376            .collect::<Result<Vec<Uuid>, _>>()?;
377
378        Ok(schedule_ids
379            .into_iter()
380            .map(|id| ScheduleHandle::new(id, self.client.clone()))
381            .collect())
382    }
383
384    /// Create a new schedule handle for a specific schedule.
385    ///
386    /// Note that this method does not check if the schedule exists.
387    pub fn schedule(&self, schedule_id: Uuid) -> ScheduleHandle<C> {
388        ScheduleHandle::new(schedule_id, self.client.clone())
389    }
390
391    /// Retrieve a list of schedules with a specific filter and order.
392    ///
393    /// The returned schedule handles will have their details pre-fetched.
394    pub fn schedules(
395        &self,
396        filter: ScheduleFilter,
397        order: ScheduleOrder,
398    ) -> impl Stream<Item = Result<ScheduleHandle<C>, AdminClientError>> + Send + Unpin + 'static
399    {
400        let client = self.client.clone();
401
402        async_stream::try_stream!({
403            let mut cursor: Option<String> = None;
404
405            loop {
406                let res = client
407                    .clone()
408                    .list_schedules(Request::new(ListSchedulesRequest {
409                        cursor: cursor.take(),
410                        limit: 100,
411                        filter: Some(filter.clone().into()),
412                        order: Some(ScheduleQueryOrder::from(order) as i32),
413                    }))
414                    .await?
415                    .into_inner();
416
417                for schedule in res.schedules {
418                    let schedule_id = schedule.id.parse().map_err(AdminClientError::InvalidId)?;
419                    let h = ScheduleHandle::new(schedule_id, client.clone());
420                    h.set_details(Arc::new(ScheduleDetails::try_from(schedule)?));
421                    yield h;
422                }
423
424                cursor = res.cursor;
425
426                if !res.has_more {
427                    break;
428                }
429            }
430        })
431        .boxed()
432    }
433
434    /// Return the number of schedules that match a specific filter.
435    pub async fn schedule_count(&self, filter: ScheduleFilter) -> Result<u64, AdminClientError> {
436        let res = self
437            .client
438            .clone()
439            .count_schedules(Request::new(ora_proto::server::v1::CountSchedulesRequest {
440                filter: Some(filter.into()),
441            }))
442            .await?;
443
444        Ok(res.into_inner().count)
445    }
446
447    /// Cancel schedules with a specific filter,
448    /// optionally cancelling the jobs associated with them.
449    pub async fn cancel_schedules(
450        &self,
451        filter: ScheduleFilter,
452        cancel_jobs: bool,
453    ) -> Result<ScheduleCancellationResult<C>, AdminClientError> {
454        let res = self
455            .client
456            .clone()
457            .cancel_schedules(Request::new(
458                ora_proto::server::v1::CancelSchedulesRequest {
459                    filter: Some(filter.into()),
460                    cancel_jobs,
461                },
462            ))
463            .await?
464            .into_inner();
465
466        let schedule_ids = res
467            .schedule_ids
468            .into_iter()
469            .map(|id| id.parse())
470            .collect::<Result<Vec<Uuid>, _>>()?;
471
472        let job_ids = res
473            .job_ids
474            .into_iter()
475            .map(|id| id.parse())
476            .collect::<Result<Vec<Uuid>, _>>()?;
477
478        Ok(ScheduleCancellationResult {
479            schedules: schedule_ids
480                .into_iter()
481                .map(|id| ScheduleHandle::new(id, self.client.clone()))
482                .collect(),
483            jobs: job_ids
484                .into_iter()
485                .map(|id| JobHandle::new(id, self.client.clone()))
486                .collect(),
487        })
488    }
489}
490
491/// The result of a schedule cancellation.
492pub struct ScheduleCancellationResult<C> {
493    /// Cancelled schedules.
494    pub schedules: Vec<ScheduleHandle<C>>,
495    /// Cancelled jobs.
496    pub jobs: Vec<JobHandle<(), C>>,
497}
498
499impl From<AdminServiceClient<Channel>> for AdminClient {
500    fn from(client: AdminServiceClient<Channel>) -> Self {
501        Self { client }
502    }
503}
504
505/// Errors that can occur when interacting with the admin client.
506#[allow(missing_docs)]
507#[derive(Debug, Error)]
508pub enum AdminClientError {
509    #[error("gRPC error: {0}")]
510    Grpc(#[from] tonic::Status),
511    #[error("not enough job IDs were returned by the server (expected {expected}, got {actual})")]
512    NotEnoughJobIds { expected: usize, actual: usize },
513    #[error("invalid ID: {0}")]
514    InvalidId(#[from] uuid::Error),
515    #[error(
516        "not enough schedule IDs were returned by the server (expected {expected}, got {actual})"
517    )]
518    NotEnoughScheduleIds { expected: usize, actual: usize },
519    #[error("{0}")]
520    Other(#[from] eyre::Error),
521}