Skip to main content

ora/admin/
jobs.rs

1//! Job management.
2
3use std::{
4    cmp,
5    marker::PhantomData,
6    pin::{Pin, pin},
7    time::SystemTime,
8};
9
10use eyre::{Context, ContextCompat, OptionExt, bail};
11use futures::{FutureExt, Stream, TryStreamExt};
12use tonic::Request;
13use uuid::Uuid;
14
15use crate::{
16    admin::{AdminClient, schedules::Schedule},
17    common::{AddedOrExisting, LabelFilter, TimeRange},
18    execution::{ExecutionId, ExecutionStatus},
19    executor::ExecutorId,
20    job::{JobDefinition, JobId},
21    job_type::{AnyJobType, JobType, JobTypeId},
22    proto::{
23        self,
24        admin::v1::{AddJobsRequest, ListJobsRequest, PaginationOptions},
25    },
26    schedule::ScheduleId,
27};
28
29impl AdminClient {
30    /// Add a job to be executed.
31    pub async fn add_job<J>(&self, job: JobDefinition<J>) -> crate::Result<Job<J>>
32    where
33        J: JobType,
34    {
35        let id = self
36            .inner
37            .add_jobs(Request::new(AddJobsRequest {
38                jobs: vec![job.try_into()?],
39                if_not_exists: None,
40            }))
41            .await?
42            .into_inner()
43            .job_ids
44            .pop()
45            .ok_or_eyre("job that was just added was not found")?
46            .parse::<Uuid>()
47            .map(JobId)
48            .wrap_err("server returned invalid job ID")?;
49
50        Ok(Job {
51            client: self.clone(),
52            id,
53            phantom: PhantomData,
54            raw: None,
55        })
56    }
57
58    /// Add multiple jobs to be executed.
59    pub async fn add_jobs<I>(&self, jobs: I) -> crate::Result<Vec<Job<AnyJobType>>>
60    where
61        I: IntoIterator<Item: TryInto<proto::jobs::v1::Job, Error: Into<crate::Error>>>,
62    {
63        let jobs = jobs
64            .into_iter()
65            .map(TryInto::try_into)
66            .collect::<Result<Vec<_>, _>>()
67            .map_err(Into::into)?;
68
69        self.inner
70            .add_jobs(Request::new(AddJobsRequest {
71                jobs,
72                if_not_exists: None,
73            }))
74            .await?
75            .into_inner()
76            .job_ids
77            .into_iter()
78            .map(|id| {
79                Result::<_, crate::Error>::Ok(Job {
80                    client: self.clone(),
81                    id: JobId(
82                        id.parse::<Uuid>()
83                            .wrap_err("server returned invalid job ID")?,
84                    ),
85                    raw: None,
86                    phantom: PhantomData,
87                })
88            })
89            .collect::<Result<Vec<_>, _>>()
90    }
91
92    /// Add a job to be executed.
93    ///
94    /// If any jobs exists based on the given filters, no
95    /// jobs will be added.
96    /// If multiple existing jobs are matched by the filter,
97    /// one is arbitrarily chosen and returned.
98    pub async fn add_job_if_not_exists<J>(
99        &self,
100        job: JobDefinition<J>,
101        filters: JobFilters,
102    ) -> crate::Result<AddedOrExisting<Job<AnyJobType>>>
103    where
104        J: JobType,
105    {
106        let res = self
107            .inner
108            .add_jobs(Request::new(AddJobsRequest {
109                jobs: vec![job.try_into()?],
110                if_not_exists: Some(filters.into()),
111            }))
112            .await?
113            .into_inner();
114
115        let added_job_id = res
116            .job_ids
117            .into_iter()
118            .next()
119            .map(|id| {
120                id.parse::<Uuid>()
121                    .map(JobId)
122                    .wrap_err("server returned invalid job ID")
123            })
124            .transpose()?;
125
126        let existing_job_id = res
127            .existing_job_ids
128            .into_iter()
129            .next()
130            .map(|id| {
131                id.parse::<Uuid>()
132                    .map(JobId)
133                    .wrap_err("server returned invalid job ID")
134            })
135            .transpose()?;
136
137        match (added_job_id, existing_job_id) {
138            (Some(added_job_id), None) => Ok(AddedOrExisting::Added(Job {
139                client: self.clone(),
140                id: added_job_id,
141                raw: None,
142                phantom: PhantomData,
143            })),
144            (None, Some(existing_job_id)) => Ok(AddedOrExisting::Existing(Job {
145                client: self.clone(),
146                id: existing_job_id,
147                raw: None,
148                phantom: PhantomData,
149            })),
150            (None, None) => Err(eyre::eyre!(
151                "no job was added but no existing job was returned by the server"
152            )
153            .into()),
154            (Some(_), Some(_)) => Err(eyre::eyre!(
155                "server returned both an added job ID and an existing job ID, which is unexpected"
156            )
157            .into()),
158        }
159    }
160
161    /// Add multiple jobs if no existing jobs match the given filters.
162    pub async fn add_jobs_if_not_exists<I>(
163        &self,
164        jobs: I,
165        filters: JobFilters,
166    ) -> crate::Result<AddedOrExisting<Vec<Job<AnyJobType>>>>
167    where
168        I: IntoIterator<Item: TryInto<proto::jobs::v1::Job, Error = crate::Error>>,
169    {
170        let jobs = jobs
171            .into_iter()
172            .map(TryInto::try_into)
173            .collect::<Result<Vec<_>, _>>()?;
174
175        let res = self
176            .inner
177            .add_jobs(Request::new(AddJobsRequest {
178                jobs,
179                if_not_exists: Some(filters.into()),
180            }))
181            .await?
182            .into_inner();
183
184        if res.job_ids.is_empty() {
185            Ok(AddedOrExisting::Existing(
186                res.existing_job_ids
187                    .into_iter()
188                    .map(|id| {
189                        Result::<_, crate::Error>::Ok(Job {
190                            client: self.clone(),
191                            id: JobId(
192                                id.parse::<Uuid>()
193                                    .wrap_err("server returned invalid job ID")?,
194                            ),
195                            raw: None,
196                            phantom: PhantomData,
197                        })
198                    })
199                    .collect::<Result<Vec<_>, _>>()?,
200            ))
201        } else {
202            Ok(AddedOrExisting::Added(
203                res.job_ids
204                    .into_iter()
205                    .map(|id| {
206                        Result::<_, crate::Error>::Ok(Job {
207                            client: self.clone(),
208                            id: JobId(
209                                id.parse::<Uuid>()
210                                    .wrap_err("server returned invalid job ID")?,
211                            ),
212                            raw: None,
213                            phantom: PhantomData,
214                        })
215                    })
216                    .collect::<Result<Vec<_>, _>>()?,
217            ))
218        }
219    }
220
221    /// List jobs based on the given filters.
222    pub fn list_jobs(
223        &self,
224        filters: JobFilters,
225        order: JobOrderBy,
226        limit: Option<u32>,
227    ) -> impl Stream<Item = crate::Result<Job<AnyJobType>>> {
228        async_stream::try_stream!({
229            let mut total_count = 0;
230            let mut next_page_token = None;
231
232            let filters: proto::admin::v1::JobFilters = filters.into();
233
234            loop {
235                if let Some(limit) = limit
236                    && total_count >= limit
237                {
238                    break;
239                }
240
241                let response = self
242                    .inner
243                    .list_jobs(Request::new(ListJobsRequest {
244                        filters: Some(filters.clone()),
245                        order_by: match order {
246                            JobOrderBy::TargetExecutionTimeAsc => {
247                                proto::admin::v1::JobOrderBy::TargetExecutionTimeAsc as i32
248                            }
249                            JobOrderBy::TargetExecutionTimeDesc => {
250                                proto::admin::v1::JobOrderBy::TargetExecutionTimeDesc as i32
251                            }
252                            JobOrderBy::CreatedAtAsc => {
253                                proto::admin::v1::JobOrderBy::CreatedAtAsc as i32
254                            }
255                            JobOrderBy::CreatedAtDesc => {
256                                proto::admin::v1::JobOrderBy::CreatedAtDesc as i32
257                            }
258                        },
259                        pagination: Some(PaginationOptions {
260                            page_size: if let Some(limit) = limit {
261                                cmp::min(25, limit)
262                            } else {
263                                25
264                            },
265                            next_page_token: next_page_token.clone(),
266                        }),
267                    }))
268                    .await?
269                    .into_inner();
270
271                for job_proto in response.jobs {
272                    let job_id = JobId(
273                        job_proto
274                            .id
275                            .parse::<Uuid>()
276                            .wrap_err("server returned invalid job ID")?,
277                    );
278
279                    yield Job {
280                        client: self.clone(),
281                        id: job_id,
282                        raw: Some(job_proto),
283                        phantom: PhantomData,
284                    };
285                    total_count += 1;
286                }
287
288                next_page_token = response.next_page_token;
289
290                if next_page_token.is_none() {
291                    break;
292                }
293            }
294        })
295    }
296
297    /// Return the first job that matches the given filters, if any.
298    ///
299    /// This is just a convenience function that calls `list_jobs` with a limit of 1.
300    pub async fn first_job(
301        &self,
302        filters: JobFilters,
303        order: JobOrderBy,
304    ) -> crate::Result<Option<Job<AnyJobType>>> {
305        pin!(self.list_jobs(filters, order, Some(1)))
306            .try_next()
307            .await
308    }
309
310    /// Return whether any jobs exist based on the given filters.
311    pub async fn job_exists(&self, filters: JobFilters) -> crate::Result<bool> {
312        // As of writing this the API doesn't have a separate endpoint
313        // for this, so we just count the jobs.
314        let count = self.count_jobs(filters).await?;
315        Ok(count > 0)
316    }
317
318    /// Count the number of jobs based on the given filters.
319    pub async fn count_jobs(&self, filters: JobFilters) -> crate::Result<u64> {
320        Ok(self
321            .inner
322            .count_jobs(Request::new(proto::admin::v1::CountJobsRequest {
323                filters: Some(filters.into()),
324            }))
325            .await?
326            .into_inner()
327            .count)
328    }
329
330    /// Cancel all jobs that match the given filters.
331    ///
332    /// Returns the cancelled jobs.
333    pub async fn cancel_jobs(&self, filters: JobFilters) -> crate::Result<Vec<Job<AnyJobType>>> {
334        Ok(self
335            .inner
336            .cancel_jobs(Request::new(proto::admin::v1::CancelJobsRequest {
337                filters: Some(filters.into()),
338            }))
339            .await?
340            .into_inner()
341            .cancelled_job_ids
342            .into_iter()
343            .map(|id| {
344                Ok(Job {
345                    client: self.clone(),
346                    id: JobId(id.parse()?),
347                    raw: None,
348                    phantom: PhantomData,
349                })
350            })
351            .collect::<Result<Vec<_>, eyre::Report>>()?)
352    }
353}
354
355/// Filters for querying jobs.
356#[derive(Debug, Default, Clone)]
357#[must_use]
358pub struct JobFilters {
359    /// Filter by job IDs.
360    pub job_ids: Option<Vec<JobId>>,
361    /// Filter by job type IDs.
362    pub job_type_ids: Option<Vec<JobTypeId>>,
363    /// Filter by executor IDs.
364    pub executor_ids: Option<Vec<ExecutorId>>,
365    /// Filter by execution IDs.
366    pub execution_ids: Option<Vec<ExecutionId>>,
367    /// Execution status.
368    pub execution_statuses: Option<Vec<ExecutionStatus>>,
369    /// Filter by target execution time range.
370    pub target_execution_time: Option<TimeRange>,
371    /// Filter by creation time range.
372    pub created_at: Option<TimeRange>,
373    /// Filter by labels.
374    pub labels: Option<Vec<LabelFilter>>,
375    /// Filter by schedule IDs.
376    pub schedule_ids: Option<Vec<ScheduleId>>,
377}
378
379impl JobFilters {
380    /// Include all jobs.
381    pub fn all() -> Self {
382        Self::default()
383    }
384
385    /// Filter for a job type.
386    pub fn job_type<J: JobType>(mut self) -> Self {
387        self.job_type_ids = Some(vec![J::job_type_id()]);
388        self
389    }
390
391    /// Filter for completed (succeeded, failed, or cancelled) jobs only.
392    pub fn completed_only(mut self) -> Self {
393        self.execution_statuses = Some(vec![
394            ExecutionStatus::Succeeded,
395            ExecutionStatus::Failed,
396            ExecutionStatus::Cancelled,
397        ]);
398
399        self
400    }
401
402    /// Filter for pending or in-progress jobs only.
403    pub fn active_only(mut self) -> Self {
404        self.execution_statuses = Some(vec![ExecutionStatus::Pending, ExecutionStatus::InProgress]);
405
406        self
407    }
408
409    /// Filter by existence of a label.
410    pub fn has_label<K: Into<String>>(mut self, key: K) -> Self {
411        self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
412            key: key.into(),
413            value: None,
414        });
415
416        self
417    }
418
419    /// Filter by a label key and value.
420    pub fn has_label_value<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
421        self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
422            key: key.into(),
423            value: Some(value.into()),
424        });
425
426        self
427    }
428}
429
430/// The ordering options for listing jobs.
431#[derive(Debug, Default, Clone, Copy)]
432pub enum JobOrderBy {
433    /// Order by target execution time ascending.
434    #[default]
435    TargetExecutionTimeAsc,
436    /// Order by target execution time descending.
437    TargetExecutionTimeDesc,
438    /// Order by creation time ascending.
439    CreatedAtAsc,
440    /// Order by creation time descending.
441    CreatedAtDesc,
442}
443
444/// A job in the ora scheduler.
445pub struct Job<J> {
446    client: AdminClient,
447    id: JobId,
448    raw: Option<proto::admin::v1::Job>,
449    phantom: PhantomData<J>,
450}
451
452impl<J> Job<J> {
453    /// Return the ID of the job.
454    #[must_use]
455    pub fn id(&self) -> JobId {
456        self.id
457    }
458
459    /// Fetch and return the raw job without any processing.
460    pub async fn raw(&mut self) -> crate::Result<proto::admin::v1::Job> {
461        let job = self.fetch_raw().await?;
462
463        Ok(job
464            .or_else(|| self.raw.clone())
465            .ok_or_eyre("failed to retrieve job")?)
466    }
467
468    /// Return the cached raw job, if any.
469    ///
470    /// This will not fetch any data from the server.
471    #[must_use]
472    pub fn raw_cached(&self) -> Option<&proto::admin::v1::Job> {
473        self.raw.as_ref()
474    }
475
476    /// Turn this job into its raw representation,
477    /// if it was cached.
478    #[must_use]
479    pub fn into_raw(self) -> Option<proto::admin::v1::Job> {
480        self.raw
481    }
482
483    /// Cancel the job.
484    pub async fn cancel(&mut self) -> crate::Result<()> {
485        self.client
486            .inner
487            .cancel_jobs(Request::new(proto::admin::v1::CancelJobsRequest {
488                filters: Some(proto::admin::v1::JobFilters {
489                    job_ids: vec![self.id.to_string()],
490                    ..Default::default()
491                }),
492            }))
493            .await?;
494
495        Ok(())
496    }
497
498    /// Fetch and return the executions of the job.
499    pub async fn executions(&mut self) -> crate::Result<Vec<Execution<J>>> {
500        let job = self.fetch_raw().await?;
501
502        let job = job
503            .as_ref()
504            .or(self.raw.as_ref())
505            .ok_or_eyre("failed to fetch job")?;
506
507        Ok(job
508            .executions
509            .iter()
510            .map(|e| {
511                Ok(Execution {
512                    id: ExecutionId(e.id.parse()?),
513                    status: match e.status() {
514                        proto::admin::v1::ExecutionStatus::Unspecified => {
515                            bail!("unknown execution status")
516                        }
517                        proto::admin::v1::ExecutionStatus::Pending => ExecutionStatus::Pending,
518                        proto::admin::v1::ExecutionStatus::InProgress => {
519                            ExecutionStatus::InProgress
520                        }
521                        proto::admin::v1::ExecutionStatus::Succeeded => ExecutionStatus::Succeeded,
522                        proto::admin::v1::ExecutionStatus::Failed => ExecutionStatus::Failed,
523                        proto::admin::v1::ExecutionStatus::Cancelled => ExecutionStatus::Cancelled,
524                    },
525                    executor_id: e
526                        .executor_id
527                        .as_ref()
528                        .map(|id| Result::<_, eyre::Report>::Ok(ExecutorId(id.parse()?)))
529                        .transpose()?,
530
531                    created_at: e
532                        .created_at
533                        .map(TryFrom::try_from)
534                        .transpose()?
535                        .ok_or_eyre("missing created_at for execution")?,
536                    started_at: e.started_at.map(TryFrom::try_from).transpose()?,
537                    succeeded_at: e.succeeded_at.map(TryFrom::try_from).transpose()?,
538                    failed_at: e.failed_at.map(TryFrom::try_from).transpose()?,
539                    cancelled_at: e.cancelled_at.map(TryFrom::try_from).transpose()?,
540                    output_json: e.output_json.clone(),
541                    failure_reason: e.failure_reason.clone(),
542                    _job_type: PhantomData,
543                })
544            })
545            .collect::<Result<Vec<_>, eyre::Report>>()?)
546    }
547
548    /// Wait for the job to terminate.
549    #[cfg(not(target_arch = "wasm32"))]
550    pub async fn terminated(&mut self) -> crate::Result<()> {
551        loop {
552            let executions = self.executions().await?;
553
554            let Some(last_execution) = executions.last() else {
555                return Err(eyre::eyre!("no executions found for job").into());
556            };
557
558            if last_execution.status().is_terminal() {
559                return Ok(());
560            }
561
562            tokio::time::sleep(self.client.poll_interval).await;
563        }
564    }
565
566    /// Fetch the output JSON of the job.
567    ///
568    /// Returns `None` if the job has not finished
569    /// or failed.
570    pub async fn output_json(&mut self) -> crate::Result<Option<String>> {
571        let Some(last_execution) = self.executions().await?.pop() else {
572            return Err(eyre::eyre!("no executions found for job").into());
573        };
574
575        match last_execution.status() {
576            ExecutionStatus::Succeeded => Ok(last_execution.output_json().map(String::from)),
577            _ => Ok(None),
578        }
579    }
580
581    /// Fetch the failure reason of the job.
582    ///
583    /// Returns `None` if the job has not failed.
584    /// Note that cancellation is not considered a failure.
585    pub async fn failure_reason(&mut self) -> crate::Result<Option<String>> {
586        let Some(last_execution) = self.executions().await?.pop() else {
587            return Err(eyre::eyre!("no executions found for job").into());
588        };
589
590        match last_execution.status() {
591            ExecutionStatus::Failed => Ok(last_execution.failure_reason().map(String::from)),
592            _ => Ok(None),
593        }
594    }
595
596    /// Return the schedule of the job, if any.
597    pub async fn schedule(&mut self) -> crate::Result<Option<Schedule<J>>> {
598        let job = self.fetch_raw().await?;
599
600        let job = job
601            .as_ref()
602            .or(self.raw.as_ref())
603            .ok_or_eyre("failed to fetch job")?;
604
605        if let Some(schedule_id) = &job.schedule_id {
606            let schedule_id = ScheduleId(
607                schedule_id
608                    .parse::<Uuid>()
609                    .wrap_err("server returned invalid schedule ID")?,
610            );
611
612            Ok(Some(Schedule {
613                client: self.client.clone(),
614                id: schedule_id,
615                raw: None,
616                phantom: PhantomData,
617            }))
618        } else {
619            Ok(None)
620        }
621    }
622
623    /// Always fetch the raw raw data from the server.
624    ///
625    /// Returns `None` if the data was cached, this is
626    /// to avoid cloning.
627    async fn fetch_raw(&mut self) -> crate::Result<Option<proto::admin::v1::Job>> {
628        // We avoid fetching if we already have a job that terminated.
629        if let Some(cached) = &self.raw
630            && cached
631                .executions
632                .last()
633                .is_some_and(|e| ExecutionStatus::from(e.status()).is_terminal())
634        {
635            return Ok(None);
636        }
637
638        let job = self
639            .client
640            .inner
641            .list_jobs(Request::new(ListJobsRequest {
642                filters: Some(proto::admin::v1::JobFilters {
643                    job_ids: vec![self.id.to_string()],
644                    ..Default::default()
645                }),
646                order_by: proto::admin::v1::JobOrderBy::Unspecified as _,
647                pagination: Some(PaginationOptions {
648                    page_size: 1,
649                    next_page_token: None,
650                }),
651            }))
652            .await?
653            .into_inner()
654            .jobs
655            .pop()
656            .ok_or_eyre("job not found")?;
657
658        match self.client.caching_strategy {
659            crate::admin::CachingStrategy::Cache => {
660                self.raw = Some(job);
661                Ok(None)
662            }
663            crate::admin::CachingStrategy::NoCache => Ok(Some(job)),
664        }
665    }
666}
667
668impl Job<AnyJobType> {
669    /// Try to cast the job to the given job type,
670    /// validating that the job is of the expected type.
671    ///
672    /// This function will fetch the required data
673    /// from the server if necessary.
674    pub async fn cast<J>(mut self) -> crate::Result<Option<Job<J>>>
675    where
676        J: JobType,
677    {
678        let job = self.fetch_raw().await?;
679
680        let job = job
681            .as_ref()
682            .or(self.raw.as_ref())
683            .ok_or_eyre("failed to fetch job")?;
684
685        let job = job.job.as_ref().ok_or_eyre("job definition is missing")?;
686
687        let job_type_id = &job.job_type_id;
688
689        if job_type_id != J::job_type_id().as_str() {
690            return Ok(None);
691        }
692
693        Ok(Some(Job {
694            client: self.client,
695            id: self.id,
696            raw: self.raw,
697            phantom: PhantomData,
698        }))
699    }
700
701    /// Cast this job to the given job type.
702    ///
703    /// Note that no validation is performed to ensure
704    /// that the job is actually of the given type.
705    #[must_use]
706    pub fn cast_unchecked<J>(self) -> Job<J>
707    where
708        J: JobType,
709    {
710        Job {
711            client: self.client,
712            id: self.id,
713            raw: self.raw,
714            phantom: PhantomData,
715        }
716    }
717
718    pub(super) fn cast_any<J>(self) -> Job<J> {
719        Job {
720            client: self.client,
721            id: self.id,
722            raw: self.raw,
723            phantom: PhantomData,
724        }
725    }
726}
727
728impl<J> Job<J>
729where
730    J: JobType,
731{
732    /// Fetch and return the job definition of the job.
733    pub async fn definition(&mut self) -> crate::Result<JobDefinition<J>> {
734        let job = self.fetch_raw().await?;
735
736        let job = job
737            .as_ref()
738            .or(self.raw.as_ref())
739            .ok_or_eyre("failed to fetch job")?;
740
741        let job = job.job.as_ref().ok_or_eyre("job definition is missing")?;
742
743        let input: J = serde_json::from_str(&job.input_payload_json)
744            .wrap_err("failed to deserialize job input")?;
745
746        Ok(JobDefinition {
747            target_execution_time: job
748                .target_execution_time
749                .as_ref()
750                .copied()
751                .map(TryFrom::try_from)
752                .transpose()
753                .wrap_err("invalid target execution time")?
754                .ok_or_eyre("missing target execution time for job")?,
755            input,
756            labels: job
757                .labels
758                .iter()
759                .map(|label| (label.key.clone(), label.value.clone()))
760                .collect(),
761            timeout_policy: job.timeout_policy.unwrap_or_default().into(),
762            retry_policy: job.retry_policy.unwrap_or_default().into(),
763        })
764    }
765
766    /// Wait for the job to complete and retrieve
767    /// its result.
768    ///
769    /// If a job was cancelled, it will be treated as an error.
770    pub async fn wait_result(&mut self) -> Result<<J as JobType>::Output, crate::Error> {
771        self.terminated().await?;
772
773        let Some(last_execution) = self.executions().await?.pop() else {
774            return Err(eyre::eyre!("no executions found for job").into());
775        };
776
777        match last_execution.status() {
778            ExecutionStatus::Succeeded => Ok(last_execution
779                .output()?
780                .ok_or_eyre("job succeeded but no output was found")?),
781            ExecutionStatus::Failed => Err(crate::Error::JobFailed(
782                last_execution
783                    .failure_reason()
784                    .wrap_err("job failed but no reason was provided")?
785                    .into(),
786            )),
787            ExecutionStatus::Cancelled => Err(crate::Error::JobCancelled),
788            _ => {
789                Err(eyre::eyre!("job is in unexpected state: {:?}", last_execution.status()).into())
790            }
791        }
792    }
793
794    async fn into_result(mut self) -> crate::Result<J::Output> {
795        self.wait_result().await
796    }
797}
798
799impl<J> IntoFuture for Job<J>
800where
801    J: JobType,
802{
803    type Output = crate::Result<J::Output>;
804
805    type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
806
807    fn into_future(self) -> Self::IntoFuture {
808        self.into_result().boxed()
809    }
810}
811
812/// Details of an execution of a job.
813#[derive(Debug)]
814pub struct Execution<J> {
815    id: ExecutionId,
816    executor_id: Option<ExecutorId>,
817    status: ExecutionStatus,
818    created_at: SystemTime,
819    started_at: Option<SystemTime>,
820    succeeded_at: Option<SystemTime>,
821    failed_at: Option<SystemTime>,
822    cancelled_at: Option<SystemTime>,
823    output_json: Option<String>,
824    failure_reason: Option<String>,
825    _job_type: PhantomData<J>,
826}
827
828impl<J> Execution<J> {
829    /// Return the ID of the execution.
830    #[inline]
831    #[must_use]
832    pub const fn id(&self) -> ExecutionId {
833        self.id
834    }
835
836    /// Return the executor ID that executed the job, if any.
837    #[inline]
838    #[must_use]
839    pub const fn executor_id(&self) -> Option<ExecutorId> {
840        self.executor_id
841    }
842
843    /// Return the status of the execution.
844    #[inline]
845    #[must_use]
846    pub const fn status(&self) -> ExecutionStatus {
847        self.status
848    }
849
850    /// Return the timestamp when the execution was created.
851    #[inline]
852    #[must_use]
853    pub const fn created_at(&self) -> SystemTime {
854        self.created_at
855    }
856
857    /// Return the timestamp when the execution started.
858    #[inline]
859    #[must_use]
860    pub const fn started_at(&self) -> Option<SystemTime> {
861        self.started_at
862    }
863
864    /// Return the timestamp when the execution successfully completed.
865    #[inline]
866    #[must_use]
867    pub const fn succeeded_at(&self) -> Option<SystemTime> {
868        self.succeeded_at
869    }
870
871    /// Return the timestamp when the execution failed.
872    #[inline]
873    #[must_use]
874    pub const fn failed_at(&self) -> Option<SystemTime> {
875        self.failed_at
876    }
877
878    /// Return the timestamp when the execution was cancelled.
879    #[inline]
880    #[must_use]
881    pub const fn cancelled_at(&self) -> Option<SystemTime> {
882        self.cancelled_at
883    }
884
885    /// Get the end time of the execution, if any.
886    ///
887    /// This is the time when the execution either succeeded, failed, or was cancelled.
888    #[inline]
889    #[must_use]
890    pub fn ended_at(&self) -> Option<SystemTime> {
891        self.succeeded_at.or(self.failed_at).or(self.cancelled_at)
892    }
893
894    /// Parse the output of the execution.
895    pub fn output(&self) -> crate::Result<Option<J::Output>>
896    where
897        J: JobType,
898    {
899        if let Some(ref output_json) = self.output_json {
900            let output = serde_json::from_str::<J::Output>(output_json)?;
901            Ok(Some(output))
902        } else {
903            Ok(None)
904        }
905    }
906
907    /// Return the raw JSON output of the execution.
908    #[inline]
909    #[must_use]
910    pub fn output_json(&self) -> Option<&str> {
911        self.output_json.as_deref()
912    }
913
914    /// Return the failure reason if the execution failed or was cancelled.
915    #[inline]
916    #[must_use]
917    pub fn failure_reason(&self) -> Option<&str> {
918        self.failure_reason.as_deref()
919    }
920}