ora_client/
schedule_handle.rsuse std::sync::Arc;
use futures::Stream;
use ora_proto::server::v1::admin_service_client::AdminServiceClient;
use parking_lot::Mutex;
use tonic::transport::Channel;
use uuid::Uuid;
use crate::{
admin::AdminClientError,
job_handle::JobHandle,
job_query::{JobFilter, JobOrder},
schedule_definition::ScheduleDetails,
schedule_query::ScheduleFilter,
AdminClient,
};
#[allow(clippy::wildcard_imports)]
use tonic::codegen::*;
#[derive(Debug, Clone)]
pub struct ScheduleHandle<C = Channel> {
id: Uuid,
client: AdminServiceClient<C>,
details: Arc<Mutex<Option<Arc<ScheduleDetails>>>>,
}
impl<C> ScheduleHandle<C>
where
C: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync + 'static,
<C as tonic::client::GrpcService<tonic::body::BoxBody>>::Future: Send,
C::Error: Into<StdError>,
C::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<C::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub(crate) fn new(id: Uuid, client: AdminServiceClient<C>) -> Self {
Self {
id,
client,
details: Arc::new(Mutex::new(None)),
}
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn details(&self) -> eyre::Result<Arc<ScheduleDetails>> {
if let Some(details) = self.details.lock().as_ref() {
if !details.active {
return Ok(details.clone());
}
}
let schedule = self
.client
.clone()
.list_schedules(tonic::Request::new(
ora_proto::server::v1::ListSchedulesRequest {
cursor: None,
limit: 1,
order: None,
filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
},
))
.await?
.into_inner()
.schedules
.into_iter()
.next()
.ok_or_else(|| eyre::eyre!("schedule details not found"))?;
let details = Arc::new(ScheduleDetails::try_from(schedule)?);
self.set_details(details.clone());
Ok(details)
}
pub fn cached_details(&self) -> Option<Arc<ScheduleDetails>> {
self.details.lock().as_ref().cloned()
}
pub fn jobs(
&self,
mut filter: JobFilter,
order: JobOrder,
) -> impl Stream<Item = Result<JobHandle<(), C>, AdminClientError>> + Send + Unpin + 'static
{
filter.schedule_ids = [self.id].into_iter().collect();
AdminClient::new(self.client.clone()).jobs(filter, order)
}
pub async fn job_count(&self) -> Result<u64, AdminClientError> {
let count = self
.client
.clone()
.count_jobs(tonic::Request::new(
ora_proto::server::v1::CountJobsRequest {
filter: Some(JobFilter::new().with_schedule_id(self.id).into()),
},
))
.await?
.into_inner()
.count;
Ok(count)
}
pub async fn active_job(&self) -> Result<Option<JobHandle<(), C>>, AdminClientError> {
let job = self
.client
.clone()
.list_jobs(tonic::Request::new(
ora_proto::server::v1::ListJobsRequest {
cursor: None,
limit: 1,
order: None,
filter: Some(
JobFilter::new()
.with_schedule_id(self.id)
.active_only()
.into(),
),
},
))
.await?
.into_inner()
.jobs
.into_iter()
.next();
Ok(job
.map(|job| {
let h = JobHandle::new(job.id.parse()?, self.client.clone());
h.set_details(Arc::new(job.try_into()?));
Result::<_, eyre::Report>::Ok(h)
})
.transpose()?)
}
pub async fn cancel(&self, cancel_jobs: bool) -> Result<(), AdminClientError> {
self.client
.clone()
.cancel_schedules(tonic::Request::new(
ora_proto::server::v1::CancelSchedulesRequest {
filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
cancel_jobs,
},
))
.await?;
Ok(())
}
pub(crate) fn set_details(&self, details: Arc<ScheduleDetails>) {
*self.details.lock() = Some(details);
}
}