1use std::{future::IntoFuture, sync::Arc};
4
5use eyre::{bail, Context, OptionExt};
6use futures::{future::BoxFuture, FutureExt};
7use ora_proto::server::v1::admin_service_client::AdminServiceClient;
8use parking_lot::Mutex;
9use tonic::transport::Channel;
10use uuid::Uuid;
11
12use crate::{
13 job_definition::{JobDetails, JobStatus},
14 job_query::JobFilter,
15 JobType,
16};
17#[allow(clippy::wildcard_imports)]
18use tonic::codegen::*;
19
20#[derive(Debug)]
25pub struct JobHandle<J = (), C = Channel> {
26 id: Uuid,
27 client: AdminServiceClient<C>,
28 details: Arc<Mutex<Option<Arc<JobDetails>>>>,
29 _job_type: std::marker::PhantomData<J>,
30}
31
32impl<J> Clone for JobHandle<J> {
33 fn clone(&self) -> Self {
34 Self {
35 id: self.id,
36 client: self.client.clone(),
37 details: self.details.clone(),
38 _job_type: self._job_type,
39 }
40 }
41}
42
43impl<J, C> JobHandle<J, C>
44where
45 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
46 C::Error: Into<StdError>,
47 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
48 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
49{
50 pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
52 Self {
53 id,
54 client,
55 details: Arc::new(Mutex::new(None)),
56 _job_type: std::marker::PhantomData,
57 }
58 }
59
60 pub(crate) fn set_details(&self, details: Arc<JobDetails>) {
61 *self.details.lock() = Some(details);
62 }
63
64 pub fn id(&self) -> Uuid {
66 self.id
67 }
68
69 pub async fn details(&self) -> eyre::Result<Arc<JobDetails>> {
71 {
72 let details = self.details.lock();
73
74 if let Some(details) = &*details {
75 if !details.active {
76 return Ok(details.clone());
77 }
78 }
79 }
80
81 let job = self
82 .client
83 .clone()
84 .list_jobs(tonic::Request::new(
85 ora_proto::server::v1::ListJobsRequest {
86 cursor: None,
87 limit: 1,
88 order: None,
89 filter: Some(JobFilter::new().with_job_id(self.id).into()),
90 },
91 ))
92 .await?
93 .into_inner()
94 .jobs
95 .into_iter()
96 .next()
97 .ok_or_else(|| eyre::eyre!("Job details not found"))?;
98
99 let details = Arc::new(JobDetails::try_from(job)?);
100 self.set_details(details.clone());
101
102 Ok(details)
103 }
104
105 pub async fn status(&self) -> eyre::Result<JobStatus> {
107 Ok(self.details().await?.status())
108 }
109
110 pub fn details_cached(&self) -> Option<Arc<JobDetails>> {
115 self.details.lock().as_ref().cloned()
116 }
117
118 pub async fn cancel(&self) -> eyre::Result<()> {
120 self.client
121 .clone()
122 .cancel_jobs(tonic::Request::new(
123 ora_proto::server::v1::CancelJobsRequest {
124 filter: Some(JobFilter::new().with_job_id(self.id).into()),
125 },
126 ))
127 .await
128 .wrap_err("failed to cancel job")?;
129
130 Ok(())
131 }
132
133 pub fn cast_type<T: JobType>(&self) -> JobHandle<T, C> {
138 JobHandle {
139 id: self.id,
140 client: self.client.clone(),
141 details: self.details.clone(),
142 _job_type: std::marker::PhantomData,
143 }
144 }
145
146 pub fn cast_unknown(&self) -> JobHandle<(), C> {
148 JobHandle {
149 id: self.id,
150 client: self.client.clone(),
151 details: self.details.clone(),
152 _job_type: std::marker::PhantomData,
153 }
154 }
155}
156
157impl<J, C> JobHandle<J, C>
158where
159 J: JobType,
160 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
161 C::Error: Into<StdError>,
162 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
163 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
164{
165 pub async fn input(&self) -> eyre::Result<J> {
167 let details = self.details().await?;
168 serde_json::from_str(&details.input_payload_json)
169 .wrap_err("failed to deserialize job input")
170 }
171
172 pub async fn wait(&self) -> eyre::Result<J::Output> {
174 self.wait_with_interval(std::time::Duration::from_millis(100))
175 .await
176 }
177
178 pub async fn wait_with_interval(
180 &self,
181 interval: std::time::Duration,
182 ) -> eyre::Result<J::Output> {
183 loop {
184 let details = self.details().await?;
185
186 if details.active {
187 tokio::time::sleep(interval).await;
188 continue;
189 }
190
191 let exec = details
192 .executions
193 .last()
194 .ok_or_eyre("no executions found for job")?;
195
196 if let Some(failed) = &exec.failure_reason {
197 return Err(eyre::eyre!("job failed: {failed}"));
198 }
199
200 if let Some(output) = &exec.output_payload_json {
201 return serde_json::from_str(output).wrap_err("failed to deserialize job output");
202 }
203
204 bail!("job has no output or failure reason, but it is not active");
205 }
206 }
207}
208
209impl<J, C> IntoFuture for JobHandle<J, C>
210where
211 J: JobType,
212 C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
213 <C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
214 C::Error: Into<StdError> + Send,
215 C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
216 <C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
217{
218 type Output = eyre::Result<J::Output>;
219
220 type IntoFuture = BoxFuture<'static, Self::Output>;
221
222 fn into_future(self) -> Self::IntoFuture {
223 async move { self.wait().await }.boxed()
224 }
225}