use std::{
fmt::Debug,
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use async_trait::async_trait;
use futures::{future::Shared, FutureExt};
use ora_client::{ClientOperations, RawTaskResult, ScheduleOperations, TaskOperations};
pub use ora_client::{
LabelMatch, LabelValueMatch, ScheduleListOrder, Schedules, TaskListOrder, Tasks,
};
use ora_common::{
schedule::ScheduleDefinition,
task::{TaskDataFormat, TaskDefinition, TaskStatus},
};
use serde::de::DeserializeOwned;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::Task;
#[async_trait]
pub trait Client: core::fmt::Debug + Send + Sync + 'static {
async fn add_task<T>(&self, task: TaskDefinition<T>) -> eyre::Result<TaskHandle<T>>
where
T: Send + 'static;
async fn add_tasks<T, I>(&self, tasks: I) -> eyre::Result<Vec<TaskHandle<T>>>
where
T: Send + 'static,
I: IntoIterator<Item = TaskDefinition<T>> + Send,
I::IntoIter: ExactSizeIterator + Send;
async fn task<T>(&self, task_id: Uuid) -> eyre::Result<TaskHandle<T>>
where
T: Send + 'static;
async fn tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>>;
async fn task_count(&self, options: &Tasks) -> eyre::Result<u64>;
async fn cancel_tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>>;
async fn add_schedule(&self, schedule: ScheduleDefinition) -> eyre::Result<ScheduleHandle>;
async fn add_schedules<I>(&self, schedules: I) -> eyre::Result<Vec<ScheduleHandle>>
where
I: IntoIterator<Item = ScheduleDefinition> + Send,
I::IntoIter: ExactSizeIterator + Send;
async fn schedule(&self, schedule_id: Uuid) -> eyre::Result<ScheduleHandle>;
async fn schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>>;
async fn schedule_count(&self, options: &Schedules) -> eyre::Result<u64>;
async fn cancel_schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>>;
}
#[async_trait]
pub trait ClientObj {
async fn add_task(&self, task: TaskDefinition) -> eyre::Result<TaskHandle>;
async fn add_tasks(
&self,
tasks: &mut (dyn ExactSizeIterator<Item = TaskDefinition> + Send),
) -> eyre::Result<Vec<TaskHandle>>;
async fn task(&self, task_id: Uuid) -> eyre::Result<TaskHandle>;
async fn tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>>;
async fn cancel_tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>>;
async fn task_count(&self, options: &Tasks) -> eyre::Result<u64>;
async fn add_schedule(&self, schedule: ScheduleDefinition) -> eyre::Result<ScheduleHandle>;
async fn add_schedules(
&self,
schedules: &mut (dyn ExactSizeIterator<Item = ScheduleDefinition> + Send),
) -> eyre::Result<Vec<ScheduleHandle>>;
async fn schedule(&self, schedule_id: Uuid) -> eyre::Result<ScheduleHandle>;
async fn schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>>;
async fn schedule_count(&self, options: &Schedules) -> eyre::Result<u64>;
async fn cancel_schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>>;
}
#[async_trait]
impl<C> Client for C
where
C: ClientOperations,
{
async fn add_task<T>(&self, task: TaskDefinition<T>) -> eyre::Result<TaskHandle<T>>
where
T: Send + 'static,
{
let id = <Self as ClientOperations>::add_task(self, task.cast()).await?;
<Self as Client>::task::<T>(self, id).await
}
async fn add_tasks<T, I>(&self, tasks: I) -> eyre::Result<Vec<TaskHandle<T>>>
where
I: IntoIterator<Item = TaskDefinition<T>> + Send,
I::IntoIter: ExactSizeIterator + Send,
{
let ids = <Self as ClientOperations>::add_tasks(
self,
&mut tasks.into_iter().map(TaskDefinition::cast),
)
.await?;
Ok(<Self as ClientOperations>::tasks_by_ids(self, ids)
.await?
.into_iter()
.map(|ops| {
let ops2 = ops.clone();
TaskHandle {
id: ops.id(),
out: async move { Arc::new(ops2.wait_result().await) }
.boxed()
.shared(),
ops,
task_type: PhantomData,
}
})
.collect())
}
async fn task<T>(&self, task_id: Uuid) -> eyre::Result<TaskHandle<T>> {
Ok(TaskHandle::new_raw(
<Self as ClientOperations>::task(self, task_id).await?,
))
}
async fn tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>> {
Ok(<Self as ClientOperations>::tasks(self, options)
.await?
.into_iter()
.map(|ops| {
let ops2 = ops.clone();
TaskHandle {
id: ops.id(),
out: async move { Arc::new(ops2.wait_result().await) }
.boxed()
.shared(),
ops,
task_type: PhantomData,
}
})
.collect())
}
async fn task_count(&self, options: &Tasks) -> eyre::Result<u64> {
<Self as ClientOperations>::task_count(self, options).await
}
async fn cancel_tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>> {
<Self as Client>::cancel_tasks(self, options).await
}
async fn add_schedule(&self, schedule: ScheduleDefinition) -> eyre::Result<ScheduleHandle> {
let id = <Self as ClientOperations>::add_schedule(self, schedule).await?;
<Self as Client>::schedule(self, id).await
}
async fn add_schedules<I>(&self, schedules: I) -> eyre::Result<Vec<ScheduleHandle>>
where
I: IntoIterator<Item = ScheduleDefinition> + Send,
I::IntoIter: ExactSizeIterator + Send,
{
let ids =
<Self as ClientOperations>::add_schedules(self, &mut schedules.into_iter()).await?;
Ok(<Self as ClientOperations>::schedules_by_ids(self, ids)
.await?
.into_iter()
.map(|ops| ScheduleHandle { id: ops.id(), ops })
.collect())
}
async fn schedule(&self, schedule_id: Uuid) -> eyre::Result<ScheduleHandle> {
let ops = <Self as ClientOperations>::schedule(self, schedule_id).await?;
Ok(ScheduleHandle {
id: schedule_id,
ops,
})
}
async fn schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>> {
Ok(<Self as ClientOperations>::schedules(self, options)
.await?
.into_iter()
.map(|ops| ScheduleHandle { id: ops.id(), ops })
.collect())
}
async fn schedule_count(&self, options: &Schedules) -> eyre::Result<u64> {
<Self as ClientOperations>::schedule_count(self, options).await
}
async fn cancel_schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>> {
<Self as Client>::cancel_schedules(self, options).await
}
}
#[async_trait]
impl<C> ClientObj for C
where
C: Client,
{
async fn add_task(&self, task: TaskDefinition) -> eyre::Result<TaskHandle> {
<Self as Client>::add_task(self, task).await
}
async fn add_tasks(
&self,
tasks: &mut (dyn ExactSizeIterator<Item = TaskDefinition> + Send),
) -> eyre::Result<Vec<TaskHandle>> {
<Self as Client>::add_tasks(self, tasks).await
}
async fn task(&self, task_id: Uuid) -> eyre::Result<TaskHandle> {
<Self as Client>::task(self, task_id).await
}
async fn tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>> {
<Self as Client>::tasks(self, options).await
}
async fn cancel_tasks(&self, options: &Tasks) -> eyre::Result<Vec<TaskHandle>> {
<Self as Client>::cancel_tasks(self, options).await
}
async fn task_count(&self, options: &Tasks) -> eyre::Result<u64> {
<Self as Client>::task_count(self, options).await
}
async fn add_schedule(&self, schedule: ScheduleDefinition) -> eyre::Result<ScheduleHandle> {
<Self as Client>::add_schedule(self, schedule).await
}
async fn add_schedules(
&self,
schedules: &mut (dyn ExactSizeIterator<Item = ScheduleDefinition> + Send),
) -> eyre::Result<Vec<ScheduleHandle>> {
<Self as Client>::add_schedules(self, schedules).await
}
async fn schedule(&self, schedule_id: Uuid) -> eyre::Result<ScheduleHandle> {
<Self as Client>::schedule(self, schedule_id).await
}
async fn schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>> {
<Self as Client>::schedules(self, options).await
}
async fn schedule_count(&self, options: &Schedules) -> eyre::Result<u64> {
<Self as Client>::schedule_count(self, options).await
}
async fn cancel_schedules(&self, options: &Schedules) -> eyre::Result<Vec<ScheduleHandle>> {
<Self as Client>::cancel_schedules(self, options).await
}
}
type SharedTaskOutputFut =
Shared<Pin<Box<dyn Future<Output = Arc<eyre::Result<RawTaskResult>>> + Send>>>;
pub struct TaskHandle<T = ()> {
id: Uuid,
ops: Arc<dyn TaskOperations>,
out: SharedTaskOutputFut,
task_type: PhantomData<T>,
}
impl<T> Clone for TaskHandle<T> {
fn clone(&self) -> Self {
Self {
id: self.id,
ops: self.ops.clone(),
out: self.out.clone(),
task_type: self.task_type,
}
}
}
impl<T> TaskHandle<T> {
pub fn new_raw(task_operations: Arc<dyn TaskOperations>) -> Self {
let ops2 = task_operations.clone();
TaskHandle {
id: task_operations.id(),
out: async move { Arc::new(ops2.wait_result().await) }
.boxed()
.shared(),
ops: task_operations,
task_type: PhantomData,
}
}
}
impl<T> std::fmt::Debug for TaskHandle<T>
where
T: Debug + Unpin,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskHandle")
.field("id", &self.id)
.field("ops", &self.ops)
.finish_non_exhaustive()
}
}
#[allow(clippy::missing_errors_doc)]
impl<T> TaskHandle<T> {
#[must_use]
pub const fn id(&self) -> Uuid {
self.id
}
pub async fn status(&self) -> eyre::Result<TaskStatus> {
self.ops.status().await
}
pub async fn target(&self) -> eyre::Result<OffsetDateTime> {
self.ops.target().await.map(Into::into)
}
pub async fn definition(&self) -> eyre::Result<TaskDefinition<T>> {
self.ops.definition().await.map(TaskDefinition::cast)
}
pub async fn schedule(&self) -> eyre::Result<Option<ScheduleHandle>> {
Ok(self
.ops
.schedule()
.await?
.map(|ops| ScheduleHandle { id: ops.id(), ops }))
}
pub async fn added_at(&self) -> eyre::Result<OffsetDateTime> {
self.ops.added_at().await.map(Into::into)
}
pub async fn ready_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.ready_at().await.map(|v| v.map(Into::into))
}
pub async fn started_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.started_at().await.map(|v| v.map(Into::into))
}
pub async fn succeeded_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.succeeded_at().await.map(|v| v.map(Into::into))
}
pub async fn failed_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.failed_at().await.map(|v| v.map(Into::into))
}
pub async fn cancelled_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.cancelled_at().await.map(|v| v.map(Into::into))
}
pub async fn cancel(&self) -> eyre::Result<()> {
self.ops.cancel().await
}
pub async fn worker_id(&self) -> eyre::Result<Option<Uuid>> {
self.ops.worker_id().await
}
#[must_use]
pub fn cast<U>(self) -> TaskHandle<U> {
TaskHandle {
id: self.id,
ops: self.ops,
out: self.out,
task_type: PhantomData,
}
}
}
impl<T> Future for TaskHandle<T>
where
T: Task + Unpin,
{
type Output = eyre::Result<T::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.out.poll_unpin(cx) {
Poll::Ready(res) => match &*res {
Ok(raw_out) => Poll::Ready(match &raw_out {
RawTaskResult::Success {
output_format,
output,
} => match output_format {
TaskDataFormat::Unknown => Err(eyre::eyre!("unknown format")),
TaskDataFormat::MessagePack => Ok(rmp_serde::from_slice(output)?),
TaskDataFormat::Json => Ok(serde_json::from_slice(output)?),
},
RawTaskResult::Failure { reason } => Err(eyre::eyre!("task failed: {reason}")),
RawTaskResult::Cancelled => Err(eyre::eyre!("task cancelled")),
}),
Err(error) => Poll::Ready(Err(eyre::eyre!("{error:?}"))),
},
Poll::Pending => Poll::Pending,
}
}
}
impl Future for TaskHandle<()> {
type Output = eyre::Result<TaskOutput>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.out.poll_unpin(cx) {
Poll::Ready(res) => match &*res {
Ok(raw_out) => Poll::Ready(match &raw_out {
RawTaskResult::Success {
output_format,
output,
} => Ok(TaskOutput {
output: output.clone(),
output_format: *output_format,
}),
RawTaskResult::Failure { reason } => Err(eyre::eyre!("task failed: {reason}")),
RawTaskResult::Cancelled => Err(eyre::eyre!("task cancelled")),
}),
Err(error) => Poll::Ready(Err(eyre::eyre!("{error:?}"))),
},
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug)]
pub struct TaskOutput {
output_format: TaskDataFormat,
output: Vec<u8>,
}
impl TaskOutput {
pub fn get<T: DeserializeOwned>(&self) -> eyre::Result<T> {
match self.output_format {
TaskDataFormat::Unknown => eyre::bail!("unknown format"),
TaskDataFormat::MessagePack => Ok(rmp_serde::from_slice(&self.output)?),
TaskDataFormat::Json => Ok(serde_json::from_slice(&self.output)?),
}
}
#[must_use]
pub fn bytes(&self) -> &[u8] {
&self.output
}
#[must_use]
pub fn into_bytes(self) -> Vec<u8> {
self.output
}
#[must_use]
pub fn format(&self) -> TaskDataFormat {
self.output_format
}
}
#[derive(Debug, Clone)]
pub struct ScheduleHandle {
id: Uuid,
ops: Arc<dyn ScheduleOperations>,
}
#[allow(clippy::missing_errors_doc)]
impl ScheduleHandle {
pub fn new_raw(ops: Arc<dyn ScheduleOperations>) -> Self {
ScheduleHandle { id: ops.id(), ops }
}
#[must_use]
pub fn id(&self) -> Uuid {
self.id
}
pub async fn definition(&self) -> eyre::Result<ScheduleDefinition> {
self.ops.definition().await
}
pub async fn is_active(&self) -> eyre::Result<bool> {
self.ops.is_active().await
}
pub async fn cancelled_at(&self) -> eyre::Result<Option<OffsetDateTime>> {
self.ops.cancelled_at().await.map(|v| v.map(Into::into))
}
pub async fn active_task(&self) -> eyre::Result<Option<TaskHandle>> {
let ops = self.ops.active_task().await?;
Ok(ops.map(|ops| {
let ops2 = ops.clone();
TaskHandle {
id: ops.id(),
ops,
out: async move { Arc::new(ops2.wait_result().await) }
.boxed()
.shared(),
task_type: PhantomData,
}
}))
}
pub async fn cancel(&self) -> eyre::Result<()> {
self.ops.cancel().await
}
}