ora_client/
job_handle.rs

1//! Wrappers for interacting with jobs.
2
3use std::{future::IntoFuture, sync::Arc};
4
5use eyre::{bail, Context, OptionExt};
6use futures::{future::BoxFuture, FutureExt};
7use ora_proto::server::v1::admin_service_client::AdminServiceClient;
8use parking_lot::Mutex;
9use tonic::transport::Channel;
10use uuid::Uuid;
11
12use crate::{
13    job_definition::{JobDetails, JobStatus},
14    job_query::JobFilter,
15    JobType,
16};
17#[allow(clippy::wildcard_imports)]
18use tonic::codegen::*;
19
20/// A handle to a single job.
21///
22/// The handle also caches the details of the job, so that
23/// they can be accessed without making a network request.
24#[derive(Debug)]
25pub struct JobHandle<J = (), C = Channel> {
26    id: Uuid,
27    client: AdminServiceClient<C>,
28    details: Arc<Mutex<Option<Arc<JobDetails>>>>,
29    _job_type: std::marker::PhantomData<J>,
30}
31
32impl<J> Clone for JobHandle<J> {
33    fn clone(&self) -> Self {
34        Self {
35            id: self.id,
36            client: self.client.clone(),
37            details: self.details.clone(),
38            _job_type: self._job_type,
39        }
40    }
41}
42
43impl<J, C> JobHandle<J, C>
44where
45    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
46    C::Error: Into<StdError>,
47    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
48    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
49{
50    /// Create a new job handle.
51    pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
52        Self {
53            id,
54            client,
55            details: Arc::new(Mutex::new(None)),
56            _job_type: std::marker::PhantomData,
57        }
58    }
59
60    pub(crate) fn set_details(&self, details: Arc<JobDetails>) {
61        *self.details.lock() = Some(details);
62    }
63
64    /// Get the ID of the job.
65    pub fn id(&self) -> Uuid {
66        self.id
67    }
68
69    /// Get the details of the job.
70    pub async fn details(&self) -> eyre::Result<Arc<JobDetails>> {
71        {
72            let details = self.details.lock();
73
74            if let Some(details) = &*details {
75                if !details.active {
76                    return Ok(details.clone());
77                }
78            }
79        }
80
81        let job = self
82            .client
83            .clone()
84            .list_jobs(tonic::Request::new(
85                ora_proto::server::v1::ListJobsRequest {
86                    cursor: None,
87                    limit: 1,
88                    order: None,
89                    filter: Some(JobFilter::new().with_job_id(self.id).into()),
90                },
91            ))
92            .await?
93            .into_inner()
94            .jobs
95            .into_iter()
96            .next()
97            .ok_or_else(|| eyre::eyre!("Job details not found"))?;
98
99        let details = Arc::new(JobDetails::try_from(job)?);
100        self.set_details(details.clone());
101
102        Ok(details)
103    }
104
105    /// Get the status of the job.
106    pub async fn status(&self) -> eyre::Result<JobStatus> {
107        Ok(self.details().await?.status())
108    }
109
110    /// Get the cached details of the job, if available.
111    ///
112    /// This is useful for getting the details of a job without
113    /// making a network request.
114    pub fn details_cached(&self) -> Option<Arc<JobDetails>> {
115        self.details.lock().as_ref().cloned()
116    }
117
118    /// Cancel the job.
119    pub async fn cancel(&self) -> eyre::Result<()> {
120        self.client
121            .clone()
122            .cancel_jobs(tonic::Request::new(
123                ora_proto::server::v1::CancelJobsRequest {
124                    filter: Some(JobFilter::new().with_job_id(self.id).into()),
125                },
126            ))
127            .await
128            .wrap_err("failed to cancel job")?;
129
130        Ok(())
131    }
132
133    /// Cast the job handle to a specific job type.
134    ///
135    /// This does not perform any runtime checks, so it is up to the caller
136    /// to ensure that the job type is correct.
137    pub fn cast_type<T: JobType>(&self) -> JobHandle<T, C> {
138        JobHandle {
139            id: self.id,
140            client: self.client.clone(),
141            details: self.details.clone(),
142            _job_type: std::marker::PhantomData,
143        }
144    }
145
146    /// Cast the job handle to an unknown job type.
147    pub fn cast_unknown(&self) -> JobHandle<(), C> {
148        JobHandle {
149            id: self.id,
150            client: self.client.clone(),
151            details: self.details.clone(),
152            _job_type: std::marker::PhantomData,
153        }
154    }
155}
156
157impl<J, C> JobHandle<J, C>
158where
159    J: JobType,
160    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
161    C::Error: Into<StdError>,
162    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
163    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
164{
165    /// Retrieve the input of the job.
166    pub async fn input(&self) -> eyre::Result<J> {
167        let details = self.details().await?;
168        serde_json::from_str(&details.input_payload_json)
169            .wrap_err("failed to deserialize job input")
170    }
171
172    /// Retrieve the output of the job.
173    pub async fn wait(&self) -> eyre::Result<J::Output> {
174        self.wait_with_interval(std::time::Duration::from_millis(100))
175            .await
176    }
177
178    /// Retrieve the output of the job with a given polling interval.
179    pub async fn wait_with_interval(
180        &self,
181        interval: std::time::Duration,
182    ) -> eyre::Result<J::Output> {
183        loop {
184            let details = self.details().await?;
185
186            if details.active {
187                tokio::time::sleep(interval).await;
188                continue;
189            }
190
191            let exec = details
192                .executions
193                .last()
194                .ok_or_eyre("no executions found for job")?;
195
196            if let Some(failed) = &exec.failure_reason {
197                return Err(eyre::eyre!("job failed: {failed}"));
198            }
199
200            if let Some(output) = &exec.output_payload_json {
201                return serde_json::from_str(output).wrap_err("failed to deserialize job output");
202            }
203
204            bail!("job has no output or failure reason, but it is not active");
205        }
206    }
207}
208
209impl<J, C> IntoFuture for JobHandle<J, C>
210where
211    J: JobType,
212    C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
213    <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
214    C::Error: Into<StdError> + Send,
215    C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
216    <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
217{
218    type Output = eyre::Result<J::Output>;
219
220    type IntoFuture = BoxFuture<'static, Self::Output>;
221
222    fn into_future(self) -> Self::IntoFuture {
223        async move { self.wait().await }.boxed()
224    }
225}