ora_client/
admin.rs

1//! Wrappers over client connections that provide higher-level abstractions.
2
3use std::sync::Arc;
4
5use futures::{Stream, StreamExt};
6use ora_proto::server::v1::{
7    admin_service_client::AdminServiceClient, AddJobIfNotExistsRequest, AddJobsRequest,
8    AddScheduleIfNotExistsRequest, CountJobsRequest, JobQueryOrder, ListJobsRequest,
9    ListSchedulesRequest, ScheduleQueryOrder,
10};
11use thiserror::Error;
12use tonic::{transport::Channel, Request};
13use uuid::Uuid;
14
15use crate::{
16    job_definition::JobDetails,
17    job_handle::JobHandle,
18    job_query::{JobFilter, JobOrder},
19    job_type::TypedJobDefinition,
20    schedule_definition::{ScheduleDefinition, ScheduleDetails},
21    schedule_handle::ScheduleHandle,
22    schedule_query::{ScheduleFilter, ScheduleOrder},
23    JobType,
24};
25
26#[allow(clippy::wildcard_imports)]
27use tonic::codegen::*;
28
29/// A high-level client for interacting with the Ora server admin endpoints.
30#[derive(Debug, Clone)]
31#[must_use]
32pub struct AdminClient<C = Channel> {
33    client: AdminServiceClient<C>,
34}
35
36impl<C> AdminClient<C>
37where
38    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
39    <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
40    C::Error: Into<StdError>,
41    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
42    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
43{
44    /// Create a new admin client from a gRPC client.
45    pub fn new(client: AdminServiceClient<C>) -> Self {
46        Self { client }
47    }
48
49    /// Add a new job to be executed.
50    pub async fn add_job<J>(
51        &self,
52        definition: TypedJobDefinition<J>,
53    ) -> Result<JobHandle<J, C>, AdminClientError> {
54        let res = self
55            .client
56            .clone()
57            .add_jobs(Request::new(AddJobsRequest {
58                jobs: vec![definition.into_inner().into()],
59            }))
60            .await?
61            .into_inner();
62
63        let job_id: Uuid = res
64            .job_ids
65            .into_iter()
66            .next()
67            .ok_or(AdminClientError::NotEnoughJobIds {
68                expected: 1,
69                actual: 0,
70            })?
71            .parse()?;
72
73        Ok(JobHandle::new(job_id, self.client.clone()))
74    }
75
76    /// Add a new job to be executed.
77    ///
78    /// If an active job with the same job type and
79    /// labels already exists, it will be returned instead.
80    pub async fn add_job_persistent<J>(
81        &self,
82        definition: TypedJobDefinition<J>,
83    ) -> Result<PersistentJob<J, C>, AdminClientError> {
84        let mut filters = JobFilter::default()
85            .with_job_type_id(definition.inner.job_type_id.to_string())
86            .active_only();
87
88        for (key, value) in &definition.inner.labels {
89            filters = filters.with_label_value(key, value);
90        }
91
92        let res = self
93            .client
94            .clone()
95            .add_job_if_not_exists(Request::new(AddJobIfNotExistsRequest {
96                job: Some(definition.into_inner().into()),
97                filter: Some(filters.into()),
98            }))
99            .await?
100            .into_inner();
101
102        let job_id: Uuid = res.job_id.parse()?;
103
104        if res.added {
105            Ok(PersistentJob::Added(JobHandle::new(
106                job_id,
107                self.client.clone(),
108            )))
109        } else {
110            Ok(PersistentJob::Exists(JobHandle::new(
111                job_id,
112                self.client.clone(),
113            )))
114        }
115    }
116
117    /// Add multiple new jobs to be executed.
118    pub async fn add_jobs<J>(
119        &self,
120        definitions: impl IntoIterator<Item = TypedJobDefinition<J>>,
121    ) -> Result<Vec<JobHandle<J, C>>, AdminClientError> {
122        let definitions = definitions
123            .into_iter()
124            .map(|d| d.into_inner().into())
125            .collect();
126
127        let res = self
128            .client
129            .clone()
130            .add_jobs(Request::new(AddJobsRequest { jobs: definitions }))
131            .await?
132            .into_inner();
133
134        let job_ids = res
135            .job_ids
136            .into_iter()
137            .map(|id| id.parse())
138            .collect::<Result<Vec<Uuid>, _>>()?;
139
140        Ok(job_ids
141            .into_iter()
142            .map(|id| JobHandle::new(id, self.client.clone()))
143            .collect())
144    }
145
146    /// Create a job handle for a specific job.
147    ///
148    /// Note that this method does not check if the job exists.
149    pub fn job(&self, job_id: Uuid) -> JobHandle<(), C> {
150        JobHandle::new(job_id, self.client.clone())
151    }
152
153    /// Retrieve a list of jobs of a specific type
154    /// with a specific filter and order.
155    ///
156    /// The returned job handles will have their details pre-fetched.
157    pub fn jobs_of_type<J>(
158        &self,
159        mut filter: JobFilter,
160        order_by: JobOrder,
161    ) -> impl Stream<Item = Result<JobHandle<J, C>, AdminClientError>> + Send + Unpin + 'static
162    where
163        J: JobType,
164    {
165        filter.job_type_ids = [J::id().into()].into_iter().collect();
166
167        let client = self.client.clone();
168
169        async_stream::try_stream!({
170            let mut cursor: Option<String> = None;
171
172            loop {
173                let res = client
174                    .clone()
175                    .list_jobs(Request::new(ListJobsRequest {
176                        cursor: cursor.take(),
177                        limit: 100,
178                        filter: Some(filter.clone().into()),
179                        order: Some(JobQueryOrder::from(order_by) as i32),
180                    }))
181                    .await?
182                    .into_inner();
183
184                for job in res.jobs {
185                    let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
186                    let h = JobHandle::new(job_id, client.clone());
187                    h.set_details(Arc::new(JobDetails::try_from(job)?));
188                    yield h;
189                }
190
191                cursor = res.cursor;
192
193                if !res.has_more {
194                    break;
195                }
196            }
197        })
198        .boxed()
199    }
200
201    /// Retrieve a list of jobs with a specific filter and order.
202    ///
203    /// The returned job handles will have their details pre-fetched.
204    pub fn jobs(
205        &self,
206        filter: JobFilter,
207        order_by: JobOrder,
208    ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
209    {
210        let client = self.client.clone();
211
212        async_stream::try_stream!({
213            let mut cursor: Option<String> = None;
214
215            loop {
216                let res = client
217                    .clone()
218                    .list_jobs(Request::new(ListJobsRequest {
219                        cursor: cursor.take(),
220                        limit: 100,
221                        filter: Some(filter.clone().into()),
222                        order: Some(JobQueryOrder::from(order_by) as i32),
223                    }))
224                    .await?
225                    .into_inner();
226
227                for job in res.jobs {
228                    let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
229                    let h = JobHandle::new(job_id, client.clone());
230                    h.set_details(Arc::new(JobDetails::try_from(job)?));
231                    yield h;
232                }
233
234                cursor = res.cursor;
235
236                if !res.has_more {
237                    break;
238                }
239            }
240        })
241        .boxed()
242    }
243
244    /// Count the number of jobs with a specific filter.
245    pub async fn job_count(&self, filter: JobFilter) -> Result<u64, AdminClientError> {
246        let res = self
247            .client
248            .clone()
249            .count_jobs(Request::new(CountJobsRequest {
250                filter: Some(filter.into()),
251            }))
252            .await?;
253
254        Ok(res.into_inner().count)
255    }
256
257    /// Count the number of jobs of a specific type with a specific filter.
258    pub async fn job_count_of_type<J>(&self, mut filter: JobFilter) -> Result<u64, AdminClientError>
259    where
260        J: JobType,
261    {
262        filter.job_type_ids = [J::id().into()].into_iter().collect();
263
264        let res = self
265            .client
266            .clone()
267            .count_jobs(Request::new(CountJobsRequest {
268                filter: Some(filter.into()),
269            }))
270            .await?;
271
272        Ok(res.into_inner().count)
273    }
274
275    /// Cancel jobs with a specific filter.
276    ///
277    /// Returns handles to jobs that were successfully cancelled.
278    pub async fn cancel_jobs(
279        &self,
280        filter: JobFilter,
281    ) -> Result<Vec<JobHandle<(), C>>, AdminClientError> {
282        let res = self
283            .client
284            .clone()
285            .cancel_jobs(Request::new(ora_proto::server::v1::CancelJobsRequest {
286                filter: Some(filter.into()),
287            }))
288            .await?
289            .into_inner();
290
291        let job_ids = res
292            .job_ids
293            .into_iter()
294            .map(|id| id.parse())
295            .collect::<Result<Vec<Uuid>, _>>()?;
296
297        Ok(job_ids
298            .into_iter()
299            .map(|id| JobHandle::new(id, self.client.clone()))
300            .collect())
301    }
302
303    /// Delete inactive jobs and associated with a specific filter,
304    /// returning the deleted job IDs.
305    pub async fn delete_inactive_jobs(
306        &self,
307        filter: JobFilter,
308    ) -> Result<Vec<Uuid>, AdminClientError> {
309        let res = self
310            .client
311            .clone()
312            .delete_inactive_jobs(Request::new(
313                ora_proto::server::v1::DeleteInactiveJobsRequest {
314                    filter: Some(filter.into()),
315                },
316            ))
317            .await?
318            .into_inner();
319
320        let job_ids = res
321            .job_ids
322            .into_iter()
323            .map(|id| id.parse())
324            .collect::<Result<Vec<Uuid>, _>>()?;
325
326        Ok(job_ids)
327    }
328
329    /// Add a new schedule.
330    pub async fn add_schedule(
331        &self,
332        mut schedule: ScheduleDefinition,
333    ) -> Result<ScheduleHandle<C>, AdminClientError> {
334        if schedule.propagate_labels_to_jobs {
335            match &mut schedule.job_creation_policy {
336                crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
337                    job_definition,
338                ) => {
339                    for (key, value) in &schedule.labels {
340                        if job_definition.labels.contains_key(key) {
341                            continue;
342                        }
343
344                        job_definition.labels.insert(key.clone(), value.clone());
345                    }
346                }
347            }
348        }
349
350        let res = self
351            .client
352            .clone()
353            .add_schedules(Request::new(ora_proto::server::v1::AddSchedulesRequest {
354                schedules: vec![schedule.into()],
355            }))
356            .await?
357            .into_inner();
358
359        let schedule_id: Uuid = res
360            .schedule_ids
361            .into_iter()
362            .next()
363            .ok_or(AdminClientError::NotEnoughScheduleIds {
364                expected: 1,
365                actual: 0,
366            })?
367            .parse()?;
368
369        Ok(ScheduleHandle::new(schedule_id, self.client.clone()))
370    }
371
372    /// Add a new schedule.
373    ///
374    /// If an active schedule with the same job type and
375    /// labels already exists, it will be returned instead.
376    pub async fn add_schedule_persistent(
377        &self,
378        mut schedule: ScheduleDefinition,
379    ) -> Result<PersistentSchedule<C>, AdminClientError> {
380        if schedule.propagate_labels_to_jobs {
381            match &mut schedule.job_creation_policy {
382                crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
383                    job_definition,
384                ) => {
385                    for (key, value) in &schedule.labels {
386                        if job_definition.labels.contains_key(key) {
387                            continue;
388                        }
389
390                        job_definition.labels.insert(key.clone(), value.clone());
391                    }
392                }
393            }
394        }
395
396        let job_type_id = match schedule.job_creation_policy {
397            crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
398                ref job_definition,
399            ) => job_definition.job_type_id.to_string(),
400        };
401
402        let mut filters = ScheduleFilter::default()
403            .with_job_type_id(job_type_id)
404            .active_only();
405
406        for (key, value) in &schedule.labels {
407            filters = filters.with_label_value(key, value);
408        }
409
410        let res = self
411            .client
412            .clone()
413            .add_schedule_if_not_exists(Request::new(AddScheduleIfNotExistsRequest {
414                schedule: Some(schedule.into()),
415                filter: Some(filters.into()),
416            }))
417            .await?
418            .into_inner();
419
420        let schedule_id: Uuid = res.schedule_id.parse()?;
421
422        if res.added {
423            Ok(PersistentSchedule::Added(ScheduleHandle::new(
424                schedule_id,
425                self.client.clone(),
426            )))
427        } else {
428            Ok(PersistentSchedule::Exists(ScheduleHandle::new(
429                schedule_id,
430                self.client.clone(),
431            )))
432        }
433    }
434
435    /// Add multiple new schedules.
436    ///
437    /// Returns handles to the created schedules.
438    pub async fn add_schedules(
439        &self,
440        schedules: impl IntoIterator<Item = ScheduleDefinition>,
441    ) -> Result<Vec<ScheduleHandle<C>>, AdminClientError> {
442        let schedules = schedules
443            .into_iter()
444            .map(|mut schedule| {
445                if schedule.propagate_labels_to_jobs {
446                    match &mut schedule.job_creation_policy {
447                        crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
448                            job_definition,
449                        ) => {
450                            for (key, value) in &schedule.labels {
451                                if job_definition.labels.contains_key(key) {
452                                    continue;
453                                }
454
455                                job_definition.labels.insert(key.clone(), value.clone());
456                            }
457                        }
458                    }
459                }
460
461                schedule
462            })
463            .map(Into::into)
464            .collect::<Vec<_>>();
465
466        let res = self
467            .client
468            .clone()
469            .add_schedules(Request::new(ora_proto::server::v1::AddSchedulesRequest {
470                schedules,
471            }))
472            .await?
473            .into_inner();
474
475        let schedule_ids = res
476            .schedule_ids
477            .into_iter()
478            .map(|id| id.parse())
479            .collect::<Result<Vec<Uuid>, _>>()?;
480
481        Ok(schedule_ids
482            .into_iter()
483            .map(|id| ScheduleHandle::new(id, self.client.clone()))
484            .collect())
485    }
486
487    /// Create a new schedule handle for a specific schedule.
488    ///
489    /// Note that this method does not check if the schedule exists.
490    pub fn schedule(&self, schedule_id: Uuid) -> ScheduleHandle<C> {
491        ScheduleHandle::new(schedule_id, self.client.clone())
492    }
493
494    /// Retrieve a list of schedules with a specific filter and order.
495    ///
496    /// The returned schedule handles will have their details pre-fetched.
497    pub fn schedules(
498        &self,
499        filter: ScheduleFilter,
500        order: ScheduleOrder,
501    ) -> impl Stream<Item = Result<ScheduleHandle<C>, AdminClientError>> + Send + Unpin + 'static
502    {
503        let client = self.client.clone();
504
505        async_stream::try_stream!({
506            let mut cursor: Option<String> = None;
507
508            loop {
509                let res = client
510                    .clone()
511                    .list_schedules(Request::new(ListSchedulesRequest {
512                        cursor: cursor.take(),
513                        limit: 100,
514                        filter: Some(filter.clone().into()),
515                        order: Some(ScheduleQueryOrder::from(order) as i32),
516                    }))
517                    .await?
518                    .into_inner();
519
520                for schedule in res.schedules {
521                    let schedule_id = schedule.id.parse().map_err(AdminClientError::InvalidId)?;
522                    let h = ScheduleHandle::new(schedule_id, client.clone());
523                    h.set_details(Arc::new(ScheduleDetails::try_from(schedule)?));
524                    yield h;
525                }
526
527                cursor = res.cursor;
528
529                if !res.has_more {
530                    break;
531                }
532            }
533        })
534        .boxed()
535    }
536
537    /// Return the number of schedules that match a specific filter.
538    pub async fn schedule_count(&self, filter: ScheduleFilter) -> Result<u64, AdminClientError> {
539        let res = self
540            .client
541            .clone()
542            .count_schedules(Request::new(ora_proto::server::v1::CountSchedulesRequest {
543                filter: Some(filter.into()),
544            }))
545            .await?;
546
547        Ok(res.into_inner().count)
548    }
549
550    /// Cancel schedules with a specific filter,
551    /// optionally cancelling the jobs associated with them.
552    pub async fn cancel_schedules(
553        &self,
554        filter: ScheduleFilter,
555        cancel_jobs: bool,
556    ) -> Result<ScheduleCancellationResult<C>, AdminClientError> {
557        let res = self
558            .client
559            .clone()
560            .cancel_schedules(Request::new(
561                ora_proto::server::v1::CancelSchedulesRequest {
562                    filter: Some(filter.into()),
563                    cancel_jobs,
564                },
565            ))
566            .await?
567            .into_inner();
568
569        let schedule_ids = res
570            .schedule_ids
571            .into_iter()
572            .map(|id| id.parse())
573            .collect::<Result<Vec<Uuid>, _>>()?;
574
575        let job_ids = res
576            .job_ids
577            .into_iter()
578            .map(|id| id.parse())
579            .collect::<Result<Vec<Uuid>, _>>()?;
580
581        Ok(ScheduleCancellationResult {
582            schedules: schedule_ids
583                .into_iter()
584                .map(|id| ScheduleHandle::new(id, self.client.clone()))
585                .collect(),
586            jobs: job_ids
587                .into_iter()
588                .map(|id| JobHandle::new(id, self.client.clone()))
589                .collect(),
590        })
591    }
592}
593
594/// The result of a schedule cancellation.
595pub struct ScheduleCancellationResult<C> {
596    /// Cancelled schedules.
597    pub schedules: Vec<ScheduleHandle<C>>,
598    /// Cancelled jobs.
599    pub jobs: Vec<JobHandle<(), C>>,
600}
601
602impl From<AdminServiceClient<Channel>> for AdminClient {
603    fn from(client: AdminServiceClient<Channel>) -> Self {
604        Self { client }
605    }
606}
607
608/// The result of a persistent job creation.
609pub enum PersistentJob<J, C> {
610    /// The job was added successfully.
611    Added(JobHandle<J, C>),
612    /// The job already exists and is active.
613    Exists(JobHandle<J, C>),
614}
615
616impl<J, C> PersistentJob<J, C> {
617    /// Get the job handle, no matter if it was added or already exists.
618    pub fn into_inner(self) -> JobHandle<J, C> {
619        match self {
620            Self::Added(handle) | Self::Exists(handle) => handle,
621        }
622    }
623}
624
625/// The result of a persistent schedule creation.
626pub enum PersistentSchedule<C> {
627    /// The schedule was added successfully.
628    Added(ScheduleHandle<C>),
629    /// The schedule already exists and is active.
630    Exists(ScheduleHandle<C>),
631}
632
633impl<C> PersistentSchedule<C> {
634    /// Get the schedule handle, no matter if it was added or already exists.
635    pub fn into_inner(self) -> ScheduleHandle<C> {
636        match self {
637            Self::Added(handle) | Self::Exists(handle) => handle,
638        }
639    }
640}
641
642/// Errors that can occur when interacting with the admin client.
643#[allow(missing_docs)]
644#[derive(Debug, Error)]
645pub enum AdminClientError {
646    #[error("gRPC error: {0}")]
647    Grpc(#[from] tonic::Status),
648    #[error("not enough job IDs were returned by the server (expected {expected}, got {actual})")]
649    NotEnoughJobIds { expected: usize, actual: usize },
650    #[error("invalid ID: {0}")]
651    InvalidId(#[from] uuid::Error),
652    #[error(
653        "not enough schedule IDs were returned by the server (expected {expected}, got {actual})"
654    )]
655    NotEnoughScheduleIds { expected: usize, actual: usize },
656    #[error("{0}")]
657    Other(#[from] eyre::Error),
658}