1use std::{
4 cmp,
5 marker::PhantomData,
6 pin::{Pin, pin},
7 time::SystemTime,
8};
9
10use eyre::{Context, ContextCompat, OptionExt, bail};
11use futures::{FutureExt, Stream, TryStreamExt};
12use tonic::Request;
13use uuid::Uuid;
14
15use crate::{
16 admin::{AdminClient, schedules::Schedule},
17 common::{AddedOrExisting, LabelFilter, TimeRange},
18 execution::{ExecutionId, ExecutionStatus},
19 executor::ExecutorId,
20 job::{JobDefinition, JobId},
21 job_type::{AnyJobType, JobType, JobTypeId},
22 proto::{
23 self,
24 admin::v1::{AddJobsRequest, ListJobsRequest, PaginationOptions},
25 },
26 schedule::ScheduleId,
27};
28
29impl AdminClient {
30 pub async fn add_job<J>(&self, job: JobDefinition<J>) -> crate::Result<Job<J>>
32 where
33 J: JobType,
34 {
35 let id = self
36 .inner
37 .add_jobs(Request::new(AddJobsRequest {
38 jobs: vec![job.try_into()?],
39 if_not_exists: None,
40 }))
41 .await?
42 .into_inner()
43 .job_ids
44 .pop()
45 .ok_or_eyre("job that was just added was not found")?
46 .parse::<Uuid>()
47 .map(JobId)
48 .wrap_err("server returned invalid job ID")?;
49
50 Ok(Job {
51 client: self.clone(),
52 id,
53 phantom: PhantomData,
54 raw: None,
55 })
56 }
57
58 pub async fn add_jobs<I>(&self, jobs: I) -> crate::Result<Vec<Job<AnyJobType>>>
60 where
61 I: IntoIterator<Item: TryInto<proto::jobs::v1::Job, Error: Into<crate::Error>>>,
62 {
63 let jobs = jobs
64 .into_iter()
65 .map(TryInto::try_into)
66 .collect::<Result<Vec<_>, _>>()
67 .map_err(Into::into)?;
68
69 self.inner
70 .add_jobs(Request::new(AddJobsRequest {
71 jobs,
72 if_not_exists: None,
73 }))
74 .await?
75 .into_inner()
76 .job_ids
77 .into_iter()
78 .map(|id| {
79 Result::<_, crate::Error>::Ok(Job {
80 client: self.clone(),
81 id: JobId(
82 id.parse::<Uuid>()
83 .wrap_err("server returned invalid job ID")?,
84 ),
85 raw: None,
86 phantom: PhantomData,
87 })
88 })
89 .collect::<Result<Vec<_>, _>>()
90 }
91
92 pub async fn add_job_if_not_exists<J>(
99 &self,
100 job: JobDefinition<J>,
101 filters: JobFilters,
102 ) -> crate::Result<AddedOrExisting<Job<AnyJobType>>>
103 where
104 J: JobType,
105 {
106 let res = self
107 .inner
108 .add_jobs(Request::new(AddJobsRequest {
109 jobs: vec![job.try_into()?],
110 if_not_exists: Some(filters.into()),
111 }))
112 .await?
113 .into_inner();
114
115 let added_job_id = res
116 .job_ids
117 .into_iter()
118 .next()
119 .map(|id| {
120 id.parse::<Uuid>()
121 .map(JobId)
122 .wrap_err("server returned invalid job ID")
123 })
124 .transpose()?;
125
126 let existing_job_id = res
127 .existing_job_ids
128 .into_iter()
129 .next()
130 .map(|id| {
131 id.parse::<Uuid>()
132 .map(JobId)
133 .wrap_err("server returned invalid job ID")
134 })
135 .transpose()?;
136
137 match (added_job_id, existing_job_id) {
138 (Some(added_job_id), None) => Ok(AddedOrExisting::Added(Job {
139 client: self.clone(),
140 id: added_job_id,
141 raw: None,
142 phantom: PhantomData,
143 })),
144 (None, Some(existing_job_id)) => Ok(AddedOrExisting::Existing(Job {
145 client: self.clone(),
146 id: existing_job_id,
147 raw: None,
148 phantom: PhantomData,
149 })),
150 (None, None) => Err(eyre::eyre!(
151 "no job was added but no existing job was returned by the server"
152 )
153 .into()),
154 (Some(_), Some(_)) => Err(eyre::eyre!(
155 "server returned both an added job ID and an existing job ID, which is unexpected"
156 )
157 .into()),
158 }
159 }
160
161 pub async fn add_jobs_if_not_exists<I>(
163 &self,
164 jobs: I,
165 filters: JobFilters,
166 ) -> crate::Result<AddedOrExisting<Vec<Job<AnyJobType>>>>
167 where
168 I: IntoIterator<Item: TryInto<proto::jobs::v1::Job, Error = crate::Error>>,
169 {
170 let jobs = jobs
171 .into_iter()
172 .map(TryInto::try_into)
173 .collect::<Result<Vec<_>, _>>()?;
174
175 let res = self
176 .inner
177 .add_jobs(Request::new(AddJobsRequest {
178 jobs,
179 if_not_exists: Some(filters.into()),
180 }))
181 .await?
182 .into_inner();
183
184 if res.job_ids.is_empty() {
185 Ok(AddedOrExisting::Existing(
186 res.existing_job_ids
187 .into_iter()
188 .map(|id| {
189 Result::<_, crate::Error>::Ok(Job {
190 client: self.clone(),
191 id: JobId(
192 id.parse::<Uuid>()
193 .wrap_err("server returned invalid job ID")?,
194 ),
195 raw: None,
196 phantom: PhantomData,
197 })
198 })
199 .collect::<Result<Vec<_>, _>>()?,
200 ))
201 } else {
202 Ok(AddedOrExisting::Added(
203 res.job_ids
204 .into_iter()
205 .map(|id| {
206 Result::<_, crate::Error>::Ok(Job {
207 client: self.clone(),
208 id: JobId(
209 id.parse::<Uuid>()
210 .wrap_err("server returned invalid job ID")?,
211 ),
212 raw: None,
213 phantom: PhantomData,
214 })
215 })
216 .collect::<Result<Vec<_>, _>>()?,
217 ))
218 }
219 }
220
221 pub fn list_jobs(
223 &self,
224 filters: JobFilters,
225 order: JobOrderBy,
226 limit: Option<u32>,
227 ) -> impl Stream<Item = crate::Result<Job<AnyJobType>>> {
228 async_stream::try_stream!({
229 let mut total_count = 0;
230 let mut next_page_token = None;
231
232 let filters: proto::admin::v1::JobFilters = filters.into();
233
234 loop {
235 if let Some(limit) = limit
236 && total_count >= limit
237 {
238 break;
239 }
240
241 let response = self
242 .inner
243 .list_jobs(Request::new(ListJobsRequest {
244 filters: Some(filters.clone()),
245 order_by: match order {
246 JobOrderBy::TargetExecutionTimeAsc => {
247 proto::admin::v1::JobOrderBy::TargetExecutionTimeAsc as i32
248 }
249 JobOrderBy::TargetExecutionTimeDesc => {
250 proto::admin::v1::JobOrderBy::TargetExecutionTimeDesc as i32
251 }
252 JobOrderBy::CreatedAtAsc => {
253 proto::admin::v1::JobOrderBy::CreatedAtAsc as i32
254 }
255 JobOrderBy::CreatedAtDesc => {
256 proto::admin::v1::JobOrderBy::CreatedAtDesc as i32
257 }
258 },
259 pagination: Some(PaginationOptions {
260 page_size: if let Some(limit) = limit {
261 cmp::min(25, limit)
262 } else {
263 25
264 },
265 next_page_token: next_page_token.clone(),
266 }),
267 }))
268 .await?
269 .into_inner();
270
271 for job_proto in response.jobs {
272 let job_id = JobId(
273 job_proto
274 .id
275 .parse::<Uuid>()
276 .wrap_err("server returned invalid job ID")?,
277 );
278
279 yield Job {
280 client: self.clone(),
281 id: job_id,
282 raw: Some(job_proto),
283 phantom: PhantomData,
284 };
285 total_count += 1;
286 }
287
288 next_page_token = response.next_page_token;
289
290 if next_page_token.is_none() {
291 break;
292 }
293 }
294 })
295 }
296
297 pub async fn first_job(
301 &self,
302 filters: JobFilters,
303 order: JobOrderBy,
304 ) -> crate::Result<Option<Job<AnyJobType>>> {
305 pin!(self.list_jobs(filters, order, Some(1)))
306 .try_next()
307 .await
308 }
309
310 pub async fn job_exists(&self, filters: JobFilters) -> crate::Result<bool> {
312 let count = self.count_jobs(filters).await?;
315 Ok(count > 0)
316 }
317
318 pub async fn count_jobs(&self, filters: JobFilters) -> crate::Result<u64> {
320 Ok(self
321 .inner
322 .count_jobs(Request::new(proto::admin::v1::CountJobsRequest {
323 filters: Some(filters.into()),
324 }))
325 .await?
326 .into_inner()
327 .count)
328 }
329
330 pub async fn cancel_jobs(&self, filters: JobFilters) -> crate::Result<Vec<Job<AnyJobType>>> {
334 Ok(self
335 .inner
336 .cancel_jobs(Request::new(proto::admin::v1::CancelJobsRequest {
337 filters: Some(filters.into()),
338 }))
339 .await?
340 .into_inner()
341 .cancelled_job_ids
342 .into_iter()
343 .map(|id| {
344 Ok(Job {
345 client: self.clone(),
346 id: JobId(id.parse()?),
347 raw: None,
348 phantom: PhantomData,
349 })
350 })
351 .collect::<Result<Vec<_>, eyre::Report>>()?)
352 }
353}
354
355#[derive(Debug, Default, Clone)]
357#[must_use]
358pub struct JobFilters {
359 pub job_ids: Option<Vec<JobId>>,
361 pub job_type_ids: Option<Vec<JobTypeId>>,
363 pub executor_ids: Option<Vec<ExecutorId>>,
365 pub execution_ids: Option<Vec<ExecutionId>>,
367 pub execution_statuses: Option<Vec<ExecutionStatus>>,
369 pub target_execution_time: Option<TimeRange>,
371 pub created_at: Option<TimeRange>,
373 pub labels: Option<Vec<LabelFilter>>,
375 pub schedule_ids: Option<Vec<ScheduleId>>,
377}
378
379impl JobFilters {
380 pub fn all() -> Self {
382 Self::default()
383 }
384
385 pub fn job_type<J: JobType>(mut self) -> Self {
387 self.job_type_ids = Some(vec![J::job_type_id()]);
388 self
389 }
390
391 pub fn completed_only(mut self) -> Self {
393 self.execution_statuses = Some(vec![
394 ExecutionStatus::Succeeded,
395 ExecutionStatus::Failed,
396 ExecutionStatus::Cancelled,
397 ]);
398
399 self
400 }
401
402 pub fn active_only(mut self) -> Self {
404 self.execution_statuses = Some(vec![ExecutionStatus::Pending, ExecutionStatus::InProgress]);
405
406 self
407 }
408
409 pub fn has_label<K: Into<String>>(mut self, key: K) -> Self {
411 self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
412 key: key.into(),
413 value: None,
414 });
415
416 self
417 }
418
419 pub fn has_label_value<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
421 self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
422 key: key.into(),
423 value: Some(value.into()),
424 });
425
426 self
427 }
428}
429
430#[derive(Debug, Default, Clone, Copy)]
432pub enum JobOrderBy {
433 #[default]
435 TargetExecutionTimeAsc,
436 TargetExecutionTimeDesc,
438 CreatedAtAsc,
440 CreatedAtDesc,
442}
443
444pub struct Job<J> {
446 client: AdminClient,
447 id: JobId,
448 raw: Option<proto::admin::v1::Job>,
449 phantom: PhantomData<J>,
450}
451
452impl<J> Job<J> {
453 #[must_use]
455 pub fn id(&self) -> JobId {
456 self.id
457 }
458
459 pub async fn raw(&mut self) -> crate::Result<proto::admin::v1::Job> {
461 let job = self.fetch_raw().await?;
462
463 Ok(job
464 .or_else(|| self.raw.clone())
465 .ok_or_eyre("failed to retrieve job")?)
466 }
467
468 #[must_use]
472 pub fn raw_cached(&self) -> Option<&proto::admin::v1::Job> {
473 self.raw.as_ref()
474 }
475
476 #[must_use]
479 pub fn into_raw(self) -> Option<proto::admin::v1::Job> {
480 self.raw
481 }
482
483 pub async fn cancel(&mut self) -> crate::Result<()> {
485 self.client
486 .inner
487 .cancel_jobs(Request::new(proto::admin::v1::CancelJobsRequest {
488 filters: Some(proto::admin::v1::JobFilters {
489 job_ids: vec![self.id.to_string()],
490 ..Default::default()
491 }),
492 }))
493 .await?;
494
495 Ok(())
496 }
497
498 pub async fn executions(&mut self) -> crate::Result<Vec<Execution<J>>> {
500 let job = self.fetch_raw().await?;
501
502 let job = job
503 .as_ref()
504 .or(self.raw.as_ref())
505 .ok_or_eyre("failed to fetch job")?;
506
507 Ok(job
508 .executions
509 .iter()
510 .map(|e| {
511 Ok(Execution {
512 id: ExecutionId(e.id.parse()?),
513 status: match e.status() {
514 proto::admin::v1::ExecutionStatus::Unspecified => {
515 bail!("unknown execution status")
516 }
517 proto::admin::v1::ExecutionStatus::Pending => ExecutionStatus::Pending,
518 proto::admin::v1::ExecutionStatus::InProgress => {
519 ExecutionStatus::InProgress
520 }
521 proto::admin::v1::ExecutionStatus::Succeeded => ExecutionStatus::Succeeded,
522 proto::admin::v1::ExecutionStatus::Failed => ExecutionStatus::Failed,
523 proto::admin::v1::ExecutionStatus::Cancelled => ExecutionStatus::Cancelled,
524 },
525 executor_id: e
526 .executor_id
527 .as_ref()
528 .map(|id| Result::<_, eyre::Report>::Ok(ExecutorId(id.parse()?)))
529 .transpose()?,
530
531 created_at: e
532 .created_at
533 .map(TryFrom::try_from)
534 .transpose()?
535 .ok_or_eyre("missing created_at for execution")?,
536 started_at: e.started_at.map(TryFrom::try_from).transpose()?,
537 succeeded_at: e.succeeded_at.map(TryFrom::try_from).transpose()?,
538 failed_at: e.failed_at.map(TryFrom::try_from).transpose()?,
539 cancelled_at: e.cancelled_at.map(TryFrom::try_from).transpose()?,
540 output_json: e.output_json.clone(),
541 failure_reason: e.failure_reason.clone(),
542 _job_type: PhantomData,
543 })
544 })
545 .collect::<Result<Vec<_>, eyre::Report>>()?)
546 }
547
548 #[cfg(not(target_arch = "wasm32"))]
550 pub async fn terminated(&mut self) -> crate::Result<()> {
551 loop {
552 let executions = self.executions().await?;
553
554 let Some(last_execution) = executions.last() else {
555 return Err(eyre::eyre!("no executions found for job").into());
556 };
557
558 if last_execution.status().is_terminal() {
559 return Ok(());
560 }
561
562 tokio::time::sleep(self.client.poll_interval).await;
563 }
564 }
565
566 pub async fn output_json(&mut self) -> crate::Result<Option<String>> {
571 let Some(last_execution) = self.executions().await?.pop() else {
572 return Err(eyre::eyre!("no executions found for job").into());
573 };
574
575 match last_execution.status() {
576 ExecutionStatus::Succeeded => Ok(last_execution.output_json().map(String::from)),
577 _ => Ok(None),
578 }
579 }
580
581 pub async fn failure_reason(&mut self) -> crate::Result<Option<String>> {
586 let Some(last_execution) = self.executions().await?.pop() else {
587 return Err(eyre::eyre!("no executions found for job").into());
588 };
589
590 match last_execution.status() {
591 ExecutionStatus::Failed => Ok(last_execution.failure_reason().map(String::from)),
592 _ => Ok(None),
593 }
594 }
595
596 pub async fn schedule(&mut self) -> crate::Result<Option<Schedule<J>>> {
598 let job = self.fetch_raw().await?;
599
600 let job = job
601 .as_ref()
602 .or(self.raw.as_ref())
603 .ok_or_eyre("failed to fetch job")?;
604
605 if let Some(schedule_id) = &job.schedule_id {
606 let schedule_id = ScheduleId(
607 schedule_id
608 .parse::<Uuid>()
609 .wrap_err("server returned invalid schedule ID")?,
610 );
611
612 Ok(Some(Schedule {
613 client: self.client.clone(),
614 id: schedule_id,
615 raw: None,
616 phantom: PhantomData,
617 }))
618 } else {
619 Ok(None)
620 }
621 }
622
623 async fn fetch_raw(&mut self) -> crate::Result<Option<proto::admin::v1::Job>> {
628 if let Some(cached) = &self.raw
630 && cached
631 .executions
632 .last()
633 .is_some_and(|e| ExecutionStatus::from(e.status()).is_terminal())
634 {
635 return Ok(None);
636 }
637
638 let job = self
639 .client
640 .inner
641 .list_jobs(Request::new(ListJobsRequest {
642 filters: Some(proto::admin::v1::JobFilters {
643 job_ids: vec![self.id.to_string()],
644 ..Default::default()
645 }),
646 order_by: proto::admin::v1::JobOrderBy::Unspecified as _,
647 pagination: Some(PaginationOptions {
648 page_size: 1,
649 next_page_token: None,
650 }),
651 }))
652 .await?
653 .into_inner()
654 .jobs
655 .pop()
656 .ok_or_eyre("job not found")?;
657
658 match self.client.caching_strategy {
659 crate::admin::CachingStrategy::Cache => {
660 self.raw = Some(job);
661 Ok(None)
662 }
663 crate::admin::CachingStrategy::NoCache => Ok(Some(job)),
664 }
665 }
666}
667
668impl Job<AnyJobType> {
669 pub async fn cast<J>(mut self) -> crate::Result<Option<Job<J>>>
675 where
676 J: JobType,
677 {
678 let job = self.fetch_raw().await?;
679
680 let job = job
681 .as_ref()
682 .or(self.raw.as_ref())
683 .ok_or_eyre("failed to fetch job")?;
684
685 let job = job.job.as_ref().ok_or_eyre("job definition is missing")?;
686
687 let job_type_id = &job.job_type_id;
688
689 if job_type_id != J::job_type_id().as_str() {
690 return Ok(None);
691 }
692
693 Ok(Some(Job {
694 client: self.client,
695 id: self.id,
696 raw: self.raw,
697 phantom: PhantomData,
698 }))
699 }
700
701 #[must_use]
706 pub fn cast_unchecked<J>(self) -> Job<J>
707 where
708 J: JobType,
709 {
710 Job {
711 client: self.client,
712 id: self.id,
713 raw: self.raw,
714 phantom: PhantomData,
715 }
716 }
717
718 pub(super) fn cast_any<J>(self) -> Job<J> {
719 Job {
720 client: self.client,
721 id: self.id,
722 raw: self.raw,
723 phantom: PhantomData,
724 }
725 }
726}
727
728impl<J> Job<J>
729where
730 J: JobType,
731{
732 pub async fn definition(&mut self) -> crate::Result<JobDefinition<J>> {
734 let job = self.fetch_raw().await?;
735
736 let job = job
737 .as_ref()
738 .or(self.raw.as_ref())
739 .ok_or_eyre("failed to fetch job")?;
740
741 let job = job.job.as_ref().ok_or_eyre("job definition is missing")?;
742
743 let input: J = serde_json::from_str(&job.input_payload_json)
744 .wrap_err("failed to deserialize job input")?;
745
746 Ok(JobDefinition {
747 target_execution_time: job
748 .target_execution_time
749 .as_ref()
750 .copied()
751 .map(TryFrom::try_from)
752 .transpose()
753 .wrap_err("invalid target execution time")?
754 .ok_or_eyre("missing target execution time for job")?,
755 input,
756 labels: job
757 .labels
758 .iter()
759 .map(|label| (label.key.clone(), label.value.clone()))
760 .collect(),
761 timeout_policy: job.timeout_policy.unwrap_or_default().into(),
762 retry_policy: job.retry_policy.unwrap_or_default().into(),
763 })
764 }
765
766 pub async fn wait_result(&mut self) -> Result<<J as JobType>::Output, crate::Error> {
771 self.terminated().await?;
772
773 let Some(last_execution) = self.executions().await?.pop() else {
774 return Err(eyre::eyre!("no executions found for job").into());
775 };
776
777 match last_execution.status() {
778 ExecutionStatus::Succeeded => Ok(last_execution
779 .output()?
780 .ok_or_eyre("job succeeded but no output was found")?),
781 ExecutionStatus::Failed => Err(crate::Error::JobFailed(
782 last_execution
783 .failure_reason()
784 .wrap_err("job failed but no reason was provided")?
785 .into(),
786 )),
787 ExecutionStatus::Cancelled => Err(crate::Error::JobCancelled),
788 _ => {
789 Err(eyre::eyre!("job is in unexpected state: {:?}", last_execution.status()).into())
790 }
791 }
792 }
793
794 async fn into_result(mut self) -> crate::Result<J::Output> {
795 self.wait_result().await
796 }
797}
798
799impl<J> IntoFuture for Job<J>
800where
801 J: JobType,
802{
803 type Output = crate::Result<J::Output>;
804
805 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
806
807 fn into_future(self) -> Self::IntoFuture {
808 self.into_result().boxed()
809 }
810}
811
812#[derive(Debug)]
814pub struct Execution<J> {
815 id: ExecutionId,
816 executor_id: Option<ExecutorId>,
817 status: ExecutionStatus,
818 created_at: SystemTime,
819 started_at: Option<SystemTime>,
820 succeeded_at: Option<SystemTime>,
821 failed_at: Option<SystemTime>,
822 cancelled_at: Option<SystemTime>,
823 output_json: Option<String>,
824 failure_reason: Option<String>,
825 _job_type: PhantomData<J>,
826}
827
828impl<J> Execution<J> {
829 #[inline]
831 #[must_use]
832 pub const fn id(&self) -> ExecutionId {
833 self.id
834 }
835
836 #[inline]
838 #[must_use]
839 pub const fn executor_id(&self) -> Option<ExecutorId> {
840 self.executor_id
841 }
842
843 #[inline]
845 #[must_use]
846 pub const fn status(&self) -> ExecutionStatus {
847 self.status
848 }
849
850 #[inline]
852 #[must_use]
853 pub const fn created_at(&self) -> SystemTime {
854 self.created_at
855 }
856
857 #[inline]
859 #[must_use]
860 pub const fn started_at(&self) -> Option<SystemTime> {
861 self.started_at
862 }
863
864 #[inline]
866 #[must_use]
867 pub const fn succeeded_at(&self) -> Option<SystemTime> {
868 self.succeeded_at
869 }
870
871 #[inline]
873 #[must_use]
874 pub const fn failed_at(&self) -> Option<SystemTime> {
875 self.failed_at
876 }
877
878 #[inline]
880 #[must_use]
881 pub const fn cancelled_at(&self) -> Option<SystemTime> {
882 self.cancelled_at
883 }
884
885 #[inline]
889 #[must_use]
890 pub fn ended_at(&self) -> Option<SystemTime> {
891 self.succeeded_at.or(self.failed_at).or(self.cancelled_at)
892 }
893
894 pub fn output(&self) -> crate::Result<Option<J::Output>>
896 where
897 J: JobType,
898 {
899 if let Some(ref output_json) = self.output_json {
900 let output = serde_json::from_str::<J::Output>(output_json)?;
901 Ok(Some(output))
902 } else {
903 Ok(None)
904 }
905 }
906
907 #[inline]
909 #[must_use]
910 pub fn output_json(&self) -> Option<&str> {
911 self.output_json.as_deref()
912 }
913
914 #[inline]
916 #[must_use]
917 pub fn failure_reason(&self) -> Option<&str> {
918 self.failure_reason.as_deref()
919 }
920}