use std::{future::IntoFuture, sync::Arc};
use eyre::{bail, Context, OptionExt};
use futures::{future::BoxFuture, FutureExt};
use ora_proto::server::v1::admin_service_client::AdminServiceClient;
use parking_lot::Mutex;
use tonic::transport::Channel;
use uuid::Uuid;
use crate::{
job_definition::{JobDetails, JobStatus},
job_query::JobFilter,
JobType,
};
#[allow(clippy::wildcard_imports)]
use tonic::codegen::*;
#[derive(Debug)]
pub struct JobHandle<J = (), C = Channel> {
id: Uuid,
client: AdminServiceClient<C>,
details: Arc<Mutex<Option<Arc<JobDetails>>>>,
_job_type: std::marker::PhantomData<J>,
}
impl<J> Clone for JobHandle<J> {
fn clone(&self) -> Self {
Self {
id: self.id,
client: self.client.clone(),
details: self.details.clone(),
_job_type: self._job_type,
}
}
}
impl<J, C> JobHandle<J, C>
where
C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
C::Error: Into<StdError>,
C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
Self {
id,
client,
details: Arc::new(Mutex::new(None)),
_job_type: std::marker::PhantomData,
}
}
pub(crate) fn set_details(&self, details: Arc<JobDetails>) {
*self.details.lock() = Some(details);
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn details(&self) -> eyre::Result<Arc<JobDetails>> {
{
let details = self.details.lock();
if let Some(details) = &*details {
if !details.active {
return Ok(details.clone());
}
}
}
let job = self
.client
.clone()
.list_jobs(tonic::Request::new(
ora_proto::server::v1::ListJobsRequest {
cursor: None,
limit: 1,
order: None,
filter: Some(JobFilter::new().with_job_id(self.id).into()),
},
))
.await?
.into_inner()
.jobs
.into_iter()
.next()
.ok_or_else(|| eyre::eyre!("Job details not found"))?;
let details = Arc::new(JobDetails::try_from(job)?);
self.set_details(details.clone());
Ok(details)
}
pub async fn status(&self) -> eyre::Result<JobStatus> {
Ok(self.details().await?.status())
}
pub fn details_cached(&self) -> Option<Arc<JobDetails>> {
self.details.lock().as_ref().cloned()
}
pub async fn cancel(&self) -> eyre::Result<()> {
self.client
.clone()
.cancel_jobs(tonic::Request::new(
ora_proto::server::v1::CancelJobsRequest {
filter: Some(JobFilter::new().with_job_id(self.id).into()),
},
))
.await
.wrap_err("failed to cancel job")?;
Ok(())
}
pub fn cast_type<T: JobType>(&self) -> JobHandle<T, C> {
JobHandle {
id: self.id,
client: self.client.clone(),
details: self.details.clone(),
_job_type: std::marker::PhantomData,
}
}
pub fn cast_unknown(&self) -> JobHandle<(), C> {
JobHandle {
id: self.id,
client: self.client.clone(),
details: self.details.clone(),
_job_type: std::marker::PhantomData,
}
}
}
impl<J, C> JobHandle<J, C>
where
J: JobType,
C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone,
C::Error: Into<StdError>,
C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub async fn input(&self) -> eyre::Result<J> {
let details = self.details().await?;
serde_json::from_str(&details.input_payload_json)
.wrap_err("failed to deserialize job input")
}
pub async fn wait(&self) -> eyre::Result<J::Output> {
self.wait_with_interval(std::time::Duration::from_millis(100))
.await
}
pub async fn wait_with_interval(
&self,
interval: std::time::Duration,
) -> eyre::Result<J::Output> {
loop {
let details = self.details().await?;
if details.active {
tokio::time::sleep(interval).await;
continue;
}
let exec = details
.executions
.last()
.ok_or_eyre("no executions found for job")?;
if let Some(failed) = &exec.failure_reason {
return Err(eyre::eyre!("job failed: {failed}"));
}
if let Some(output) = &exec.output_payload_json {
return serde_json::from_str(output).wrap_err("failed to deserialize job output");
}
bail!("job has no output or failure reason, but it is not active");
}
}
}
impl<J, C> IntoFuture for JobHandle<J, C>
where
J: JobType,
C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
<C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
C::Error: Into<StdError> + Send,
C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
type Output = eyre::Result<J::Output>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
async move { self.wait().await }.boxed()
}
}