use std::{cmp, marker::PhantomData, pin::pin};
use eyre::{Context, OptionExt};
use futures::{Stream, TryStreamExt};
use tonic::Request;
use uuid::Uuid;
use crate::{
admin::{
AdminClient,
jobs::{Job, JobFilters, JobOrderBy},
},
common::{AddedOrExisting, LabelFilter, TimeRange},
job_type::{AnyJobType, JobType, JobTypeId},
proto::{
self,
admin::v1::{AddSchedulesRequest, ListSchedulesRequest, PaginationOptions},
},
schedule::{ScheduleDefinition, ScheduleId},
};
impl AdminClient {
pub async fn add_schedule<J>(
&self,
schedule: ScheduleDefinition<J>,
) -> crate::Result<Schedule<J>>
where
J: JobType,
{
let id = self
.inner
.add_schedules(Request::new(AddSchedulesRequest {
schedules: vec![schedule.try_into()?],
if_not_exists: None,
inherit_labels: Some(true),
}))
.await?
.into_inner()
.schedule_ids
.pop()
.ok_or_eyre("schedule ID not returned after creating the schedule")?
.parse::<Uuid>()
.wrap_err("invalid schedule ID")
.map(ScheduleId)?;
Ok(Schedule {
id,
client: self.clone(),
raw: None,
phantom: PhantomData,
})
}
pub async fn add_schedules<I>(&self, schedules: I) -> crate::Result<Vec<Schedule<AnyJobType>>>
where
I: IntoIterator<Item: TryInto<proto::schedules::v1::Schedule, Error: Into<crate::Error>>>,
{
let schedules = schedules
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()
.map_err(Into::into)?;
self.inner
.add_schedules(Request::new(AddSchedulesRequest {
schedules,
if_not_exists: None,
inherit_labels: Some(true),
}))
.await?
.into_inner()
.schedule_ids
.into_iter()
.map(|id| {
Result::<_, crate::Error>::Ok(Schedule {
client: self.clone(),
id: ScheduleId(
id.parse::<Uuid>()
.wrap_err("server returned invalid schedule ID")?,
),
raw: None,
phantom: PhantomData,
})
})
.collect::<Result<Vec<_>, _>>()
}
pub async fn add_schedule_if_not_exists<J>(
&self,
schedule: ScheduleDefinition<J>,
filters: ScheduleFilters,
) -> crate::Result<AddedOrExisting<Schedule<AnyJobType>>>
where
J: JobType,
{
let res = self
.inner
.add_schedules(Request::new(AddSchedulesRequest {
schedules: vec![schedule.try_into()?],
if_not_exists: Some(filters.into()),
inherit_labels: Some(true),
}))
.await?
.into_inner();
let added_schedule_id = res
.schedule_ids
.into_iter()
.next()
.map(|id| {
id.parse::<Uuid>()
.map(ScheduleId)
.wrap_err("server returned invalid schedule ID")
})
.transpose()?;
let existing_schedule_id = res
.existing_schedule_ids
.into_iter()
.next()
.map(|id| {
id.parse::<Uuid>()
.map(ScheduleId)
.wrap_err("server returned invalid schedule ID")
})
.transpose()?;
match (added_schedule_id, existing_schedule_id) {
(Some(added_schedule_id), None) => Ok(AddedOrExisting::Added(Schedule {
client: self.clone(),
id: added_schedule_id,
raw: None,
phantom: PhantomData,
})),
(None, Some(existing_schedule_id)) => Ok(AddedOrExisting::Existing(Schedule {
client: self.clone(),
id: existing_schedule_id,
raw: None,
phantom: PhantomData,
})),
(None, None) => Err(eyre::eyre!(
"no schedule was added but no existing schedule was returned by the server"
)
.into()),
(Some(_), Some(_)) => Err(eyre::eyre!(
"server returned both an added schedule ID and an existing schedule ID, which is unexpected"
)
.into()),
}
}
pub async fn add_schedules_if_not_exists<I>(
&self,
schedules: I,
filters: ScheduleFilters,
) -> crate::Result<AddedOrExisting<Vec<Schedule<AnyJobType>>>>
where
I: IntoIterator<Item: TryInto<proto::schedules::v1::Schedule, Error = crate::Error>>,
{
let schedules = schedules
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;
let res = self
.inner
.add_schedules(Request::new(AddSchedulesRequest {
schedules,
if_not_exists: Some(filters.into()),
inherit_labels: Some(true),
}))
.await?
.into_inner();
if res.schedule_ids.is_empty() {
Ok(AddedOrExisting::Existing(
res.existing_schedule_ids
.into_iter()
.map(|id| {
Result::<_, crate::Error>::Ok(Schedule {
client: self.clone(),
id: ScheduleId(
id.parse::<Uuid>()
.wrap_err("server returned invalid job ID")?,
),
raw: None,
phantom: PhantomData,
})
})
.collect::<Result<Vec<_>, _>>()?,
))
} else {
Ok(AddedOrExisting::Added(
res.schedule_ids
.into_iter()
.map(|id| {
Result::<_, crate::Error>::Ok(Schedule {
client: self.clone(),
id: ScheduleId(
id.parse::<Uuid>()
.wrap_err("server returned invalid job ID")?,
),
raw: None,
phantom: PhantomData,
})
})
.collect::<Result<Vec<_>, _>>()?,
))
}
}
pub fn list_schedules(
&self,
filters: ScheduleFilters,
order: ScheduleOrderBy,
limit: Option<u32>,
) -> impl Stream<Item = crate::Result<Schedule<AnyJobType>>> {
async_stream::try_stream!({
let mut total_count = 0;
let mut next_page_token = None;
let filters: proto::admin::v1::ScheduleFilters = filters.into();
loop {
if let Some(limit) = limit
&& total_count >= limit
{
break;
}
let response = self
.inner
.list_schedules(Request::new(ListSchedulesRequest {
filters: Some(filters.clone()),
order_by: match order {
ScheduleOrderBy::CreatedAtAsc => {
proto::admin::v1::ScheduleOrderBy::CreatedAtAsc as i32
}
ScheduleOrderBy::CreatedAtDesc => {
proto::admin::v1::ScheduleOrderBy::CreatedAtDesc as i32
}
},
pagination: Some(PaginationOptions {
page_size: if let Some(limit) = limit {
cmp::min(25, limit)
} else {
25
},
next_page_token: next_page_token.clone(),
}),
}))
.await?
.into_inner();
for schedule_proto in response.schedules {
let schedule_id = ScheduleId(
schedule_proto
.id
.parse::<Uuid>()
.wrap_err("server returned invalid schedule ID")?,
);
yield Schedule {
client: self.clone(),
id: schedule_id,
raw: Some(schedule_proto),
phantom: PhantomData,
};
total_count += 1;
}
next_page_token = response.next_page_token;
if next_page_token.is_none() {
break;
}
}
})
}
pub async fn first_schedule(
&self,
filters: ScheduleFilters,
order: ScheduleOrderBy,
) -> crate::Result<Option<Schedule<AnyJobType>>> {
pin!(self.list_schedules(filters, order, Some(1)))
.try_next()
.await
}
pub async fn count_schedules(&self, filters: ScheduleFilters) -> crate::Result<u64> {
let response = self
.inner
.count_schedules(Request::new(proto::admin::v1::CountSchedulesRequest {
filters: Some(filters.into()),
}))
.await?
.into_inner();
Ok(response.count)
}
pub async fn stop_schedules(
&self,
filters: ScheduleFilters,
job_action: StoppedScheduleJobAction,
) -> crate::Result<Vec<Schedule<AnyJobType>>> {
Ok(self
.inner
.stop_schedules(Request::new(proto::admin::v1::StopSchedulesRequest {
filters: Some(filters.into()),
cancel_active_jobs: matches!(job_action, StoppedScheduleJobAction::Cancel),
}))
.await?
.into_inner()
.cancelled_schedule_ids
.into_iter()
.map(|id| {
Ok(Schedule {
client: self.clone(),
id: ScheduleId(id.parse()?),
raw: None,
phantom: PhantomData,
})
})
.collect::<Result<Vec<_>, eyre::Report>>()?)
}
pub async fn schedule_exists(&self, filters: ScheduleFilters) -> crate::Result<bool> {
let count = self.count_schedules(filters).await?;
Ok(count > 0)
}
}
pub struct Schedule<J> {
pub(super) client: AdminClient,
pub(super) id: ScheduleId,
pub(super) raw: Option<proto::admin::v1::Schedule>,
pub(super) phantom: PhantomData<J>,
}
impl<J> Schedule<J> {
#[must_use]
pub fn id(&self) -> ScheduleId {
self.id
}
pub async fn stop(&self, job_action: StoppedScheduleJobAction) -> crate::Result<()> {
self.client
.stop_schedules(
ScheduleFilters {
schedule_ids: Some(vec![self.id]),
..Default::default()
},
job_action,
)
.await?;
Ok(())
}
pub fn list_jobs(
&self,
mut filters: JobFilters,
order: JobOrderBy,
limit: Option<u32>,
) -> impl Stream<Item = crate::Result<Job<J>>> {
filters.schedule_ids = Some(vec![self.id]);
self.client
.list_jobs(filters, order, limit)
.map_ok(Job::cast_any)
}
pub async fn status(&mut self) -> crate::Result<ScheduleStatus> {
let raw = self.fetch_raw().await?;
let raw = raw
.as_ref()
.or(self.raw.as_ref())
.ok_or_eyre("failed to retrieve schedule")?;
match raw.status() {
proto::admin::v1::ScheduleStatus::Active
| proto::admin::v1::ScheduleStatus::Unspecified => Ok(ScheduleStatus::Active),
proto::admin::v1::ScheduleStatus::Stopped => Ok(ScheduleStatus::Stopped),
}
}
pub async fn raw(&mut self) -> crate::Result<proto::admin::v1::Schedule> {
let job = self.fetch_raw().await?;
Ok(job
.or_else(|| self.raw.clone())
.ok_or_eyre("failed to retrieve schedule")?)
}
#[must_use]
pub fn raw_cached(&self) -> Option<&proto::admin::v1::Schedule> {
self.raw.as_ref()
}
async fn fetch_raw(&mut self) -> crate::Result<Option<proto::admin::v1::Schedule>> {
let schedule = self
.client
.inner
.list_schedules(Request::new(ListSchedulesRequest {
filters: Some(proto::admin::v1::ScheduleFilters {
schedule_ids: vec![self.id.to_string()],
..Default::default()
}),
order_by: proto::admin::v1::JobOrderBy::Unspecified as _,
pagination: Some(PaginationOptions {
page_size: 1,
next_page_token: None,
}),
}))
.await?
.into_inner()
.schedules
.pop()
.ok_or_eyre("schedule not found")?;
match self.client.caching_strategy {
crate::admin::CachingStrategy::Cache => {
self.raw = Some(schedule);
Ok(None)
}
crate::admin::CachingStrategy::NoCache => Ok(Some(schedule)),
}
}
}
impl Schedule<AnyJobType> {
pub async fn cast<J>(mut self) -> crate::Result<Option<Schedule<J>>>
where
J: JobType,
{
let schedule = self.fetch_raw().await?;
let schedule = schedule
.as_ref()
.or(self.raw.as_ref())
.ok_or_eyre("failed to fetch job")?;
let job = schedule
.schedule
.as_ref()
.ok_or_eyre("job definition is missing")?
.job_template
.as_ref()
.ok_or_eyre("job template is missing")?;
let job_type_id = &job.job_type_id;
if job_type_id != J::job_type_id().as_str() {
return Ok(None);
}
Ok(Some(Schedule {
client: self.client,
id: self.id,
raw: self.raw,
phantom: PhantomData,
}))
}
#[must_use]
pub fn cast_unchecked<J>(self) -> Schedule<J>
where
J: JobType,
{
Schedule {
client: self.client,
id: self.id,
raw: self.raw,
phantom: PhantomData,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScheduleStatus {
Active,
Stopped,
}
impl ScheduleStatus {
#[must_use]
pub fn is_active(&self) -> bool {
matches!(self, ScheduleStatus::Active)
}
}
#[derive(Debug, Default)]
#[must_use]
pub struct ScheduleFilters {
pub schedule_ids: Option<Vec<ScheduleId>>,
pub job_type_ids: Option<Vec<JobTypeId>>,
pub statuses: Option<Vec<ScheduleStatus>>,
pub created_at: Option<TimeRange>,
pub labels: Option<Vec<LabelFilter>>,
}
impl ScheduleFilters {
pub fn all() -> Self {
Self::default()
}
pub fn job_type<J: JobType>(mut self) -> Self {
self.job_type_ids = Some(vec![J::job_type_id()]);
self
}
pub fn active_only(mut self) -> Self {
self.statuses = Some(vec![ScheduleStatus::Active]);
self
}
pub fn stopped_only(mut self) -> Self {
self.statuses = Some(vec![ScheduleStatus::Stopped]);
self
}
pub fn has_label<K: Into<String>>(mut self, key: K) -> Self {
self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
key: key.into(),
value: None,
});
self
}
pub fn has_label_value<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
self.labels.get_or_insert_with(Vec::new).push(LabelFilter {
key: key.into(),
value: Some(value.into()),
});
self
}
}
#[derive(Debug, Default, Clone, Copy)]
pub enum ScheduleOrderBy {
#[default]
CreatedAtAsc,
CreatedAtDesc,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum StoppedScheduleJobAction {
#[default]
Cancel,
Ignore,
}