ora-server 0.12.7

Part of the Ora scheduler framework.
Documentation
use std::time::SystemTime;

use crate::{
    grpc::GrpcImpl,
    proto::admin::v1::{self, admin_service_server::AdminService},
    util::{deduplicate_labels, inherit_labels, validate_json},
};
use ora_backend::{
    Backend,
    common::NextPageToken,
    jobs::{JobDefinition, JobFilters, NewJob},
};
use tonic::{Request, Response, Status, async_trait};

use crate::grpc::conv::ResultErrExt;

#[async_trait]
impl<B> AdminService for GrpcImpl<B>
where
    B: Backend,
{
    async fn list_job_types(
        &self,
        _request: Request<v1::ListJobTypesRequest>,
    ) -> Result<Response<v1::ListJobTypesResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let mut job_types = self.executor_pool.list_job_types();
        job_types.extend(self.backend.list_job_types().await.err_status()?);

        job_types.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
        job_types.dedup_by(|a, b| a.id.as_str() == b.id.as_str());

        Ok(Response::new(v1::ListJobTypesResponse {
            job_types: job_types.into_iter().map(Into::into).collect(),
        }))
    }

    async fn add_jobs(
        &self,
        request: Request<v1::AddJobsRequest>,
    ) -> Result<Response<v1::AddJobsResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let mut jobs: Vec<JobDefinition> = request
            .jobs
            .into_iter()
            .map(TryInto::try_into)
            .collect::<Result<Vec<_>, _>>()?;

        for job in &mut jobs {
            deduplicate_labels(&mut job.labels);

            validate_json(&job.input_payload_json).map_err(|e| {
                Status::invalid_argument(format!(
                    "invalid JSON payload for job '{}': {e}",
                    job.job_type_id
                ))
            })?;
        }

        let if_not_exists = match request.if_not_exists {
            Some(f) => Some(f.try_into().err_status()?),
            None => None,
        };

        let added_jobs = self
            .backend
            .add_jobs(
                &jobs
                    .into_iter()
                    .map(|job| NewJob {
                        job,
                        schedule_id: None,
                    })
                    .collect::<Vec<_>>(),
                if_not_exists,
            )
            .await
            .err_status()?;

        let (added_job_ids, existing_job_ids) = match added_jobs {
            ora_backend::jobs::AddedJobs::Added(job_ids) => (job_ids, Vec::new()),
            ora_backend::jobs::AddedJobs::Existing(job_ids) => (Vec::new(), job_ids),
        };

        Ok(Response::new(v1::AddJobsResponse {
            job_ids: added_job_ids
                .into_iter()
                .map(|id| id.0.to_string())
                .collect(),
            existing_job_ids: existing_job_ids
                .into_iter()
                .map(|id| id.0.to_string())
                .collect(),
        }))
    }

    async fn list_jobs(
        &self,
        request: Request<v1::ListJobsRequest>,
    ) -> Result<Response<v1::ListJobsResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let order_by = request.order_by().try_into()?;
        let filters = request.filters.unwrap_or_default();
        let pagination = request.pagination.unwrap_or_default();

        let (jobs, next_page_token) = self
            .backend
            .list_jobs(
                filters.try_into()?,
                order_by,
                pagination.page_size,
                pagination.next_page_token.map(NextPageToken),
            )
            .await
            .err_status()?;

        Ok(Response::new(v1::ListJobsResponse {
            jobs: jobs.into_iter().map(Into::into).collect(),
            next_page_token: next_page_token.map(|t| t.0),
        }))
    }

    async fn count_jobs(
        &self,
        request: Request<v1::CountJobsRequest>,
    ) -> Result<Response<v1::CountJobsResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let filters = request.filters.unwrap_or_default();

        let count = self
            .backend
            .count_jobs(filters.try_into()?)
            .await
            .err_status()?;

        Ok(Response::new(v1::CountJobsResponse { count }))
    }

    async fn cancel_jobs(
        &self,
        request: Request<v1::CancelJobsRequest>,
    ) -> Result<Response<v1::CancelJobsResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let filters = request.filters.unwrap_or_default();

        let cancelled_jobs = self
            .backend
            .cancel_jobs(filters.try_into()?)
            .await
            .err_status()?;

        self.executor_pool.cancel_executions(&cancelled_jobs);

        Ok(Response::new(v1::CancelJobsResponse {
            cancelled_job_ids: cancelled_jobs
                .into_iter()
                .map(|j| j.job_id.to_string())
                .collect(),
        }))
    }

    async fn add_schedules(
        &self,
        request: Request<v1::AddSchedulesRequest>,
    ) -> Result<Response<v1::AddSchedulesResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let if_not_exists = match request.if_not_exists {
            Some(f) => Some(f.try_into().err_status()?),
            None => None,
        };

        let mut schedules: Vec<ora_backend::schedules::ScheduleDefinition> = request
            .schedules
            .into_iter()
            .map(TryInto::try_into)
            .collect::<Result<Vec<_>, _>>()?;

        for schedule in &mut schedules {
            deduplicate_labels(&mut schedule.labels);
        }

        if request.inherit_labels.unwrap_or(true) {
            for schedule in &mut schedules {
                inherit_labels(&schedule.labels, &mut schedule.job_template.labels);
            }
        }

        let added_schedules = self
            .backend
            .add_schedules(&schedules, if_not_exists)
            .await
            .err_status()?;

        let (schedule_ids, existing_schedule_ids) = match added_schedules {
            ora_backend::schedules::AddedSchedules::Added(ids) => (ids, Vec::new()),
            ora_backend::schedules::AddedSchedules::Existing(ids) => (Vec::new(), ids),
        };

        Ok(Response::new(v1::AddSchedulesResponse {
            schedule_ids: schedule_ids
                .into_iter()
                .map(|id| id.0.to_string())
                .collect(),
            existing_schedule_ids: existing_schedule_ids
                .into_iter()
                .map(|id| id.0.to_string())
                .collect(),
        }))
    }

    async fn list_schedules(
        &self,
        request: Request<v1::ListSchedulesRequest>,
    ) -> Result<Response<v1::ListSchedulesResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let order_by = request.order_by().try_into()?;
        let filters = request.filters.unwrap_or_default();
        let pagination = request.pagination.unwrap_or_default();

        let (schedules, next_page_token) = self
            .backend
            .list_schedules(
                filters.try_into()?,
                order_by,
                pagination.page_size,
                pagination.next_page_token.map(NextPageToken),
            )
            .await
            .err_status()?;

        Ok(Response::new(v1::ListSchedulesResponse {
            schedules: schedules.into_iter().map(Into::into).collect(),
            next_page_token: next_page_token.map(|t| t.0),
        }))
    }

    async fn count_schedules(
        &self,
        request: Request<v1::CountSchedulesRequest>,
    ) -> Result<Response<v1::CountSchedulesResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let filters = request.filters.unwrap_or_default();

        let count = self
            .backend
            .count_schedules(filters.try_into()?)
            .await
            .err_status()?;

        Ok(Response::new(v1::CountSchedulesResponse { count }))
    }

    async fn stop_schedules(
        &self,
        request: Request<v1::StopSchedulesRequest>,
    ) -> Result<Response<v1::StopSchedulesResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let filters = request.filters.unwrap_or_default();

        let stopped_schedules = self
            .backend
            .stop_schedules(filters.try_into()?)
            .await
            .err_status()?;

        if request.cancel_active_jobs {
            self.backend
                .cancel_jobs(JobFilters {
                    schedule_ids: Some(stopped_schedules.iter().map(|s| s.schedule_id).collect()),
                    ..Default::default()
                })
                .await
                .err_status()?;
        }

        Ok(Response::new(v1::StopSchedulesResponse {
            cancelled_schedule_ids: stopped_schedules
                .into_iter()
                .map(|s| s.schedule_id.to_string())
                .collect(),
        }))
    }

    async fn list_executors(
        &self,
        _request: Request<v1::ListExecutorsRequest>,
    ) -> Result<Response<v1::ListExecutorsResponse>, Status> {
        if self.wg.is_waiting() {
            return Err(Status::unavailable("server is shutting down"));
        }

        Ok(Response::new(v1::ListExecutorsResponse {
            executors: self.executor_pool.list_executors(),
        }))
    }

    async fn delete_historical_data(
        &self,
        request: tonic::Request<v1::DeleteHistoricalDataRequest>,
    ) -> std::result::Result<tonic::Response<v1::DeleteHistoricalDataResponse>, tonic::Status> {
        if self.wg.is_waiting() {
            return Err(tonic::Status::unavailable("server is shutting down"));
        }

        let request = request.into_inner();

        let before = request
            .before
            .map(TryInto::try_into)
            .transpose()
            .err_status()?;

        self.backend
            .delete_history(before.unwrap_or_else(SystemTime::now))
            .await
            .err_status()?;

        Ok(tonic::Response::new(v1::DeleteHistoricalDataResponse {}))
    }
}