1use std::sync::Arc;
4
5use futures::{Stream, StreamExt};
6use ora_proto::server::v1::{
7 admin_service_client::AdminServiceClient, AddJobIfNotExistsRequest, AddJobsRequest,
8 AddScheduleIfNotExistsRequest, CountJobsRequest, JobQueryOrder, ListJobsRequest,
9 ListSchedulesRequest, ScheduleQueryOrder,
10};
11use thiserror::Error;
12use tonic::{transport::Channel, Request};
13use uuid::Uuid;
14
15use crate::{
16 job_definition::JobDetails,
17 job_handle::JobHandle,
18 job_query::{JobFilter, JobOrder},
19 job_type::TypedJobDefinition,
20 schedule_definition::{ScheduleDefinition, ScheduleDetails},
21 schedule_handle::ScheduleHandle,
22 schedule_query::{ScheduleFilter, ScheduleOrder},
23 JobType,
24};
25
26#[allow(clippy::wildcard_imports)]
27use tonic::codegen::*;
28
29#[derive(Debug, Clone)]
31#[must_use]
32pub struct AdminClient<C = Channel> {
33 client: AdminServiceClient<C>,
34}
35
36impl<C> AdminClient<C>
37where
38 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
39 <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
40 C::Error: Into<StdError>,
41 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
42 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
43{
44 pub fn new(client: AdminServiceClient<C>) -> Self {
46 Self { client }
47 }
48
49 pub async fn add_job<J>(
51 &self,
52 definition: TypedJobDefinition<J>,
53 ) -> Result<JobHandle<J, C>, AdminClientError> {
54 let res = self
55 .client
56 .clone()
57 .add_jobs(Request::new(AddJobsRequest {
58 jobs: vec![definition.into_inner().into()],
59 }))
60 .await?
61 .into_inner();
62
63 let job_id: Uuid = res
64 .job_ids
65 .into_iter()
66 .next()
67 .ok_or(AdminClientError::NotEnoughJobIds {
68 expected: 1,
69 actual: 0,
70 })?
71 .parse()?;
72
73 Ok(JobHandle::new(job_id, self.client.clone()))
74 }
75
76 pub async fn add_job_persistent<J>(
81 &self,
82 definition: TypedJobDefinition<J>,
83 ) -> Result<PersistentJob<J, C>, AdminClientError> {
84 let mut filters = JobFilter::default()
85 .with_job_type_id(definition.inner.job_type_id.to_string())
86 .active_only();
87
88 for (key, value) in &definition.inner.labels {
89 filters = filters.with_label_value(key, value);
90 }
91
92 let res = self
93 .client
94 .clone()
95 .add_job_if_not_exists(Request::new(AddJobIfNotExistsRequest {
96 job: Some(definition.into_inner().into()),
97 filter: Some(filters.into()),
98 }))
99 .await?
100 .into_inner();
101
102 let job_id: Uuid = res.job_id.parse()?;
103
104 if res.added {
105 Ok(PersistentJob::Added(JobHandle::new(
106 job_id,
107 self.client.clone(),
108 )))
109 } else {
110 Ok(PersistentJob::Exists(JobHandle::new(
111 job_id,
112 self.client.clone(),
113 )))
114 }
115 }
116
117 pub async fn add_jobs<J>(
119 &self,
120 definitions: impl IntoIterator<Item = TypedJobDefinition<J>>,
121 ) -> Result<Vec<JobHandle<J, C>>, AdminClientError> {
122 let definitions = definitions
123 .into_iter()
124 .map(|d| d.into_inner().into())
125 .collect();
126
127 let res = self
128 .client
129 .clone()
130 .add_jobs(Request::new(AddJobsRequest { jobs: definitions }))
131 .await?
132 .into_inner();
133
134 let job_ids = res
135 .job_ids
136 .into_iter()
137 .map(|id| id.parse())
138 .collect::<Result<Vec<Uuid>, _>>()?;
139
140 Ok(job_ids
141 .into_iter()
142 .map(|id| JobHandle::new(id, self.client.clone()))
143 .collect())
144 }
145
146 pub fn job(&self, job_id: Uuid) -> JobHandle<(), C> {
150 JobHandle::new(job_id, self.client.clone())
151 }
152
153 pub fn jobs_of_type<J>(
158 &self,
159 mut filter: JobFilter,
160 order_by: JobOrder,
161 ) -> impl Stream<Item = Result<JobHandle<J, C>, AdminClientError>> + Send + Unpin + 'static
162 where
163 J: JobType,
164 {
165 filter.job_type_ids = [J::id().into()].into_iter().collect();
166
167 let client = self.client.clone();
168
169 async_stream::try_stream!({
170 let mut cursor: Option<String> = None;
171
172 loop {
173 let res = client
174 .clone()
175 .list_jobs(Request::new(ListJobsRequest {
176 cursor: cursor.take(),
177 limit: 100,
178 filter: Some(filter.clone().into()),
179 order: Some(JobQueryOrder::from(order_by) as i32),
180 }))
181 .await?
182 .into_inner();
183
184 for job in res.jobs {
185 let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
186 let h = JobHandle::new(job_id, client.clone());
187 h.set_details(Arc::new(JobDetails::try_from(job)?));
188 yield h;
189 }
190
191 cursor = res.cursor;
192
193 if !res.has_more {
194 break;
195 }
196 }
197 })
198 .boxed()
199 }
200
201 pub fn jobs(
205 &self,
206 filter: JobFilter,
207 order_by: JobOrder,
208 ) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
209 {
210 let client = self.client.clone();
211
212 async_stream::try_stream!({
213 let mut cursor: Option<String> = None;
214
215 loop {
216 let res = client
217 .clone()
218 .list_jobs(Request::new(ListJobsRequest {
219 cursor: cursor.take(),
220 limit: 100,
221 filter: Some(filter.clone().into()),
222 order: Some(JobQueryOrder::from(order_by) as i32),
223 }))
224 .await?
225 .into_inner();
226
227 for job in res.jobs {
228 let job_id = job.id.parse().map_err(AdminClientError::InvalidId)?;
229 let h = JobHandle::new(job_id, client.clone());
230 h.set_details(Arc::new(JobDetails::try_from(job)?));
231 yield h;
232 }
233
234 cursor = res.cursor;
235
236 if !res.has_more {
237 break;
238 }
239 }
240 })
241 .boxed()
242 }
243
244 pub async fn job_count(&self, filter: JobFilter) -> Result<u64, AdminClientError> {
246 let res = self
247 .client
248 .clone()
249 .count_jobs(Request::new(CountJobsRequest {
250 filter: Some(filter.into()),
251 }))
252 .await?;
253
254 Ok(res.into_inner().count)
255 }
256
257 pub async fn job_count_of_type<J>(&self, mut filter: JobFilter) -> Result<u64, AdminClientError>
259 where
260 J: JobType,
261 {
262 filter.job_type_ids = [J::id().into()].into_iter().collect();
263
264 let res = self
265 .client
266 .clone()
267 .count_jobs(Request::new(CountJobsRequest {
268 filter: Some(filter.into()),
269 }))
270 .await?;
271
272 Ok(res.into_inner().count)
273 }
274
275 pub async fn cancel_jobs(
279 &self,
280 filter: JobFilter,
281 ) -> Result<Vec<JobHandle<(), C>>, AdminClientError> {
282 let res = self
283 .client
284 .clone()
285 .cancel_jobs(Request::new(ora_proto::server::v1::CancelJobsRequest {
286 filter: Some(filter.into()),
287 }))
288 .await?
289 .into_inner();
290
291 let job_ids = res
292 .job_ids
293 .into_iter()
294 .map(|id| id.parse())
295 .collect::<Result<Vec<Uuid>, _>>()?;
296
297 Ok(job_ids
298 .into_iter()
299 .map(|id| JobHandle::new(id, self.client.clone()))
300 .collect())
301 }
302
303 pub async fn delete_inactive_jobs(
306 &self,
307 filter: JobFilter,
308 ) -> Result<Vec<Uuid>, AdminClientError> {
309 let res = self
310 .client
311 .clone()
312 .delete_inactive_jobs(Request::new(
313 ora_proto::server::v1::DeleteInactiveJobsRequest {
314 filter: Some(filter.into()),
315 },
316 ))
317 .await?
318 .into_inner();
319
320 let job_ids = res
321 .job_ids
322 .into_iter()
323 .map(|id| id.parse())
324 .collect::<Result<Vec<Uuid>, _>>()?;
325
326 Ok(job_ids)
327 }
328
329 pub async fn add_schedule(
331 &self,
332 mut schedule: ScheduleDefinition,
333 ) -> Result<ScheduleHandle<C>, AdminClientError> {
334 if schedule.propagate_labels_to_jobs {
335 match &mut schedule.job_creation_policy {
336 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
337 job_definition,
338 ) => {
339 for (key, value) in &schedule.labels {
340 if job_definition.labels.contains_key(key) {
341 continue;
342 }
343
344 job_definition.labels.insert(key.clone(), value.clone());
345 }
346 }
347 }
348 }
349
350 let res = self
351 .client
352 .clone()
353 .add_schedules(Request::new(ora_proto::server::v1::AddSchedulesRequest {
354 schedules: vec![schedule.into()],
355 }))
356 .await?
357 .into_inner();
358
359 let schedule_id: Uuid = res
360 .schedule_ids
361 .into_iter()
362 .next()
363 .ok_or(AdminClientError::NotEnoughScheduleIds {
364 expected: 1,
365 actual: 0,
366 })?
367 .parse()?;
368
369 Ok(ScheduleHandle::new(schedule_id, self.client.clone()))
370 }
371
372 pub async fn add_schedule_persistent(
377 &self,
378 mut schedule: ScheduleDefinition,
379 ) -> Result<PersistentSchedule<C>, AdminClientError> {
380 if schedule.propagate_labels_to_jobs {
381 match &mut schedule.job_creation_policy {
382 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
383 job_definition,
384 ) => {
385 for (key, value) in &schedule.labels {
386 if job_definition.labels.contains_key(key) {
387 continue;
388 }
389
390 job_definition.labels.insert(key.clone(), value.clone());
391 }
392 }
393 }
394 }
395
396 let job_type_id = match schedule.job_creation_policy {
397 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
398 ref job_definition,
399 ) => job_definition.job_type_id.to_string(),
400 };
401
402 let mut filters = ScheduleFilter::default()
403 .with_job_type_id(job_type_id)
404 .active_only();
405
406 for (key, value) in &schedule.labels {
407 filters = filters.with_label_value(key, value);
408 }
409
410 let res = self
411 .client
412 .clone()
413 .add_schedule_if_not_exists(Request::new(AddScheduleIfNotExistsRequest {
414 schedule: Some(schedule.into()),
415 filter: Some(filters.into()),
416 }))
417 .await?
418 .into_inner();
419
420 let schedule_id: Uuid = res.schedule_id.parse()?;
421
422 if res.added {
423 Ok(PersistentSchedule::Added(ScheduleHandle::new(
424 schedule_id,
425 self.client.clone(),
426 )))
427 } else {
428 Ok(PersistentSchedule::Exists(ScheduleHandle::new(
429 schedule_id,
430 self.client.clone(),
431 )))
432 }
433 }
434
435 pub async fn add_schedules(
439 &self,
440 schedules: impl IntoIterator<Item = ScheduleDefinition>,
441 ) -> Result<Vec<ScheduleHandle<C>>, AdminClientError> {
442 let schedules = schedules
443 .into_iter()
444 .map(|mut schedule| {
445 if schedule.propagate_labels_to_jobs {
446 match &mut schedule.job_creation_policy {
447 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(
448 job_definition,
449 ) => {
450 for (key, value) in &schedule.labels {
451 if job_definition.labels.contains_key(key) {
452 continue;
453 }
454
455 job_definition.labels.insert(key.clone(), value.clone());
456 }
457 }
458 }
459 }
460
461 schedule
462 })
463 .map(Into::into)
464 .collect::<Vec<_>>();
465
466 let res = self
467 .client
468 .clone()
469 .add_schedules(Request::new(ora_proto::server::v1::AddSchedulesRequest {
470 schedules,
471 }))
472 .await?
473 .into_inner();
474
475 let schedule_ids = res
476 .schedule_ids
477 .into_iter()
478 .map(|id| id.parse())
479 .collect::<Result<Vec<Uuid>, _>>()?;
480
481 Ok(schedule_ids
482 .into_iter()
483 .map(|id| ScheduleHandle::new(id, self.client.clone()))
484 .collect())
485 }
486
487 pub fn schedule(&self, schedule_id: Uuid) -> ScheduleHandle<C> {
491 ScheduleHandle::new(schedule_id, self.client.clone())
492 }
493
494 pub fn schedules(
498 &self,
499 filter: ScheduleFilter,
500 order: ScheduleOrder,
501 ) -> impl Stream<Item = Result<ScheduleHandle<C>, AdminClientError>> + Send + Unpin + 'static
502 {
503 let client = self.client.clone();
504
505 async_stream::try_stream!({
506 let mut cursor: Option<String> = None;
507
508 loop {
509 let res = client
510 .clone()
511 .list_schedules(Request::new(ListSchedulesRequest {
512 cursor: cursor.take(),
513 limit: 100,
514 filter: Some(filter.clone().into()),
515 order: Some(ScheduleQueryOrder::from(order) as i32),
516 }))
517 .await?
518 .into_inner();
519
520 for schedule in res.schedules {
521 let schedule_id = schedule.id.parse().map_err(AdminClientError::InvalidId)?;
522 let h = ScheduleHandle::new(schedule_id, client.clone());
523 h.set_details(Arc::new(ScheduleDetails::try_from(schedule)?));
524 yield h;
525 }
526
527 cursor = res.cursor;
528
529 if !res.has_more {
530 break;
531 }
532 }
533 })
534 .boxed()
535 }
536
537 pub async fn schedule_count(&self, filter: ScheduleFilter) -> Result<u64, AdminClientError> {
539 let res = self
540 .client
541 .clone()
542 .count_schedules(Request::new(ora_proto::server::v1::CountSchedulesRequest {
543 filter: Some(filter.into()),
544 }))
545 .await?;
546
547 Ok(res.into_inner().count)
548 }
549
550 pub async fn cancel_schedules(
553 &self,
554 filter: ScheduleFilter,
555 cancel_jobs: bool,
556 ) -> Result<ScheduleCancellationResult<C>, AdminClientError> {
557 let res = self
558 .client
559 .clone()
560 .cancel_schedules(Request::new(
561 ora_proto::server::v1::CancelSchedulesRequest {
562 filter: Some(filter.into()),
563 cancel_jobs,
564 },
565 ))
566 .await?
567 .into_inner();
568
569 let schedule_ids = res
570 .schedule_ids
571 .into_iter()
572 .map(|id| id.parse())
573 .collect::<Result<Vec<Uuid>, _>>()?;
574
575 let job_ids = res
576 .job_ids
577 .into_iter()
578 .map(|id| id.parse())
579 .collect::<Result<Vec<Uuid>, _>>()?;
580
581 Ok(ScheduleCancellationResult {
582 schedules: schedule_ids
583 .into_iter()
584 .map(|id| ScheduleHandle::new(id, self.client.clone()))
585 .collect(),
586 jobs: job_ids
587 .into_iter()
588 .map(|id| JobHandle::new(id, self.client.clone()))
589 .collect(),
590 })
591 }
592}
593
594pub struct ScheduleCancellationResult<C> {
596 pub schedules: Vec<ScheduleHandle<C>>,
598 pub jobs: Vec<JobHandle<(), C>>,
600}
601
602impl From<AdminServiceClient<Channel>> for AdminClient {
603 fn from(client: AdminServiceClient<Channel>) -> Self {
604 Self { client }
605 }
606}
607
608pub enum PersistentJob<J, C> {
610 Added(JobHandle<J, C>),
612 Exists(JobHandle<J, C>),
614}
615
616impl<J, C> PersistentJob<J, C> {
617 pub fn into_inner(self) -> JobHandle<J, C> {
619 match self {
620 Self::Added(handle) | Self::Exists(handle) => handle,
621 }
622 }
623}
624
625pub enum PersistentSchedule<C> {
627 Added(ScheduleHandle<C>),
629 Exists(ScheduleHandle<C>),
631}
632
633impl<C> PersistentSchedule<C> {
634 pub fn into_inner(self) -> ScheduleHandle<C> {
636 match self {
637 Self::Added(handle) | Self::Exists(handle) => handle,
638 }
639 }
640}
641
642#[allow(missing_docs)]
644#[derive(Debug, Error)]
645pub enum AdminClientError {
646 #[error("gRPC error: {0}")]
647 Grpc(#[from] tonic::Status),
648 #[error("not enough job IDs were returned by the server (expected {expected}, got {actual})")]
649 NotEnoughJobIds { expected: usize, actual: usize },
650 #[error("invalid ID: {0}")]
651 InvalidId(#[from] uuid::Error),
652 #[error(
653 "not enough schedule IDs were returned by the server (expected {expected}, got {actual})"
654 )]
655 NotEnoughScheduleIds { expected: usize, actual: usize },
656 #[error("{0}")]
657 Other(#[from] eyre::Error),
658}