ora_client/
schedule_handle.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//! A handle to a schedule.

use std::sync::Arc;

use futures::Stream;
use ora_proto::server::v1::admin_service_client::AdminServiceClient;
use parking_lot::Mutex;
use tonic::transport::Channel;
use uuid::Uuid;

use crate::{
    admin::AdminClientError,
    job_handle::JobHandle,
    job_query::{JobFilter, JobOrder},
    schedule_definition::ScheduleDetails,
    schedule_query::ScheduleFilter,
    AdminClient,
};

/// A handle to a schedule.
pub struct ScheduleHandle {
    /// The ID of the schedule.
    id: Uuid,
    /// The client used to interact with the schedule.
    client: AdminServiceClient<Channel>,
    details: Arc<Mutex<Option<Arc<ScheduleDetails>>>>,
}

impl ScheduleHandle {
    /// Create a new schedule handle.
    pub(crate) fn new(id: Uuid, client: AdminServiceClient<Channel>) -> Self {
        Self {
            id,
            client,
            details: Arc::new(Mutex::new(None)),
        }
    }

    /// Get the ID of the schedule.
    pub fn id(&self) -> Uuid {
        self.id
    }

    /// Get the details of the schedule.
    pub async fn details(&self) -> eyre::Result<Arc<ScheduleDetails>> {
        if let Some(details) = self.details.lock().as_ref() {
            if !details.active {
                return Ok(details.clone());
            }
        }

        let schedule = self
            .client
            .clone()
            .list_schedules(tonic::Request::new(
                ora_proto::server::v1::ListSchedulesRequest {
                    cursor: None,
                    limit: 1,
                    order: None,
                    filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
                },
            ))
            .await?
            .into_inner()
            .schedules
            .into_iter()
            .next()
            .ok_or_else(|| eyre::eyre!("schedule details not found"))?;

        let details = Arc::new(ScheduleDetails::try_from(schedule)?);
        self.set_details(details.clone());

        Ok(details)
    }

    /// Get the cached details of the schedule, if available.
    pub fn cached_details(&self) -> Option<Arc<ScheduleDetails>> {
        self.details.lock().as_ref().cloned()
    }

    /// Get the jobs that were created by this schedule with the given filter and order.
    pub fn jobs(
        &self,
        mut filter: JobFilter,
        order: JobOrder,
    ) -> impl Stream<Item = Result<JobHandle, AdminClientError>> + Send + Unpin + 'static {
        filter.schedule_ids = [self.id].into_iter().collect();
        AdminClient::new(self.client.clone()).jobs(filter, order)
    }

    /// Return the amount of jobs that were created by this schedule.
    pub async fn job_count(&self) -> Result<u64, AdminClientError> {
        let count = self
            .client
            .clone()
            .count_jobs(tonic::Request::new(
                ora_proto::server::v1::CountJobsRequest {
                    filter: Some(JobFilter::new().with_schedule_id(self.id).into()),
                },
            ))
            .await?
            .into_inner()
            .count;

        Ok(count)
    }

    /// Get the active job of the schedule.
    pub async fn active_job(&self) -> Result<Option<JobHandle>, AdminClientError> {
        let job = self
            .client
            .clone()
            .list_jobs(tonic::Request::new(
                ora_proto::server::v1::ListJobsRequest {
                    cursor: None,
                    limit: 1,
                    order: None,
                    filter: Some(
                        JobFilter::new()
                            .with_schedule_id(self.id)
                            .active_only()
                            .into(),
                    ),
                },
            ))
            .await?
            .into_inner()
            .jobs
            .into_iter()
            .next();

        Ok(job
            .map(|job| {
                let h = JobHandle::new(job.id.parse()?, self.client.clone());
                h.set_details(Arc::new(job.try_into()?));
                Result::<_, eyre::Report>::Ok(h)
            })
            .transpose()?)
    }

    /// Cancel the schedule, optionally cancelling all jobs created by the schedule.
    pub async fn cancel(&self, cancel_jobs: bool) -> Result<(), AdminClientError> {
        self.client
            .clone()
            .cancel_schedules(tonic::Request::new(
                ora_proto::server::v1::CancelSchedulesRequest {
                    filter: Some(ScheduleFilter::new().with_schedule_id(self.id).into()),
                    cancel_jobs,
                },
            ))
            .await?;

        Ok(())
    }

    pub(crate) fn set_details(&self, details: Arc<ScheduleDetails>) {
        *self.details.lock() = Some(details);
    }
}