1use std::{num::NonZero, sync::Arc, time::SystemTime};
4
5use async_trait::async_trait;
6use eyre::bail;
7use ora_proto::server::v1::executor_service_client::ExecutorServiceClient;
8use tokio_util::sync::CancellationToken;
9use tonic::transport::Channel;
10use uuid::Uuid;
11
12use crate::job_type::{JobType, JobTypeExt, JobTypeMetadata};
13
14mod run;
15
16pub use eyre::Result;
17
18#[derive(Debug, Clone)]
20pub struct ExecutorOptions {
21 pub name: String,
23 pub max_concurrent_executions: NonZero<u32>,
27 pub cancellation_grace_period: std::time::Duration,
30}
31
32impl Default for ExecutorOptions {
33 fn default() -> Self {
34 Self {
35 name: String::new(),
36 max_concurrent_executions: NonZero::new(1).unwrap(),
37 cancellation_grace_period: std::time::Duration::from_secs(30),
38 }
39 }
40}
41
42pub struct Executor<C = Channel> {
44 options: ExecutorOptions,
45 client: ExecutorServiceClient<C>,
46 handlers: Vec<Arc<dyn ExecutionHandlerRaw + Send + Sync>>,
47}
48
49impl<C> Executor<C> {
50 pub fn new(client: ExecutorServiceClient<C>) -> Self {
52 Self::with_options(client, ExecutorOptions::default())
53 }
54
55 pub fn with_options(client: ExecutorServiceClient<C>, options: ExecutorOptions) -> Self {
57 Self {
58 client,
59 options,
60 handlers: Vec::new(),
61 }
62 }
63
64 pub fn add_handler(&mut self, handler: Arc<dyn ExecutionHandlerRaw + Send + Sync>) {
70 assert!(
71 !self
72 .handlers
73 .iter()
74 .any(|h| h.job_type_metadata().id == handler.job_type_metadata().id),
75 "A handler for job type {} is already registered",
76 handler.job_type_metadata().id
77 );
78
79 self.handlers.push(handler);
80 }
81
82 pub fn try_add_handler(
87 &mut self,
88 handler: Arc<dyn ExecutionHandlerRaw + Send + Sync>,
89 ) -> eyre::Result<()> {
90 if self
91 .handlers
92 .iter()
93 .any(|h| h.job_type_metadata().id == handler.job_type_metadata().id)
94 {
95 bail!(
96 "A handler for job type {} is already registered",
97 handler.job_type_metadata().id
98 );
99 }
100
101 self.handlers.push(handler);
102 Ok(())
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct ExecutionContext {
109 execution_id: Uuid,
110 job_id: Uuid,
111 target_execution_time: SystemTime,
112 attempt_number: u64,
113 job_type_id: String,
114 cancellation_token: CancellationToken,
115}
116
117impl ExecutionContext {
118 #[must_use]
120 pub fn execution_id(&self) -> Uuid {
121 self.execution_id
122 }
123
124 #[must_use]
126 pub fn job_id(&self) -> Uuid {
127 self.job_id
128 }
129
130 #[must_use]
132 pub fn target_execution_time(&self) -> SystemTime {
133 self.target_execution_time
134 }
135
136 #[must_use]
140 pub fn attempt_number(&self) -> u64 {
141 self.attempt_number
142 }
143
144 #[must_use]
146 pub fn job_type_id(&self) -> &str {
147 &self.job_type_id
148 }
149
150 pub async fn cancelled(&self) {
152 self.cancellation_token.cancelled().await;
153 }
154
155 #[must_use]
157 pub fn is_cancelled(&self) -> bool {
158 self.cancellation_token.is_cancelled()
159 }
160}
161
162#[async_trait]
164pub trait ExecutionHandler<J>
165where
166 J: JobType,
167{
168 async fn execute(&self, context: ExecutionContext, input: J) -> eyre::Result<J::Output>;
170
171 fn raw_handler(self) -> Arc<dyn ExecutionHandlerRaw + Send + Sync>
173 where
174 Self: Sized + Send + Sync + 'static,
175 {
176 struct H<J, F>(F, std::marker::PhantomData<J>, JobTypeMetadata);
177
178 #[async_trait]
179 impl<J, F> ExecutionHandlerRaw for H<J, F>
180 where
181 J: JobType,
182 F: ExecutionHandler<J> + Send + Sync + 'static,
183 {
184 fn can_execute(&self, context: &ExecutionContext) -> bool {
185 context.job_type_id == J::id()
186 }
187
188 async fn execute(
189 &self,
190 context: ExecutionContext,
191 input_json: &str,
192 ) -> Result<String, String> {
193 let input = serde_json::from_str::<J>(input_json)
194 .map_err(|e| format!("Failed to parse job input JSON: {e}"))?;
195
196 let result = self
197 .0
198 .execute(context, input)
199 .await
200 .map_err(|e| e.to_string())?;
201
202 let output_json = serde_json::to_string(&result)
203 .map_err(|e| format!("Failed to serialize job output JSON: {e}"))?;
204
205 Ok(output_json)
206 }
207
208 fn job_type_metadata(&self) -> &JobTypeMetadata {
209 &self.2
210 }
211 }
212
213 Arc::new(H(self, std::marker::PhantomData, J::metadata()))
214 }
215}
216
217#[async_trait]
218impl<J, F, Fut> ExecutionHandler<J> for F
219where
220 J: JobType,
221 F: Fn(ExecutionContext, J) -> Fut + Send + Sync + 'static,
222 Fut: std::future::Future<Output = eyre::Result<J::Output>> + Send + 'static,
223{
224 async fn execute(&self, context: ExecutionContext, input: J) -> eyre::Result<J::Output> {
225 self(context, input).await
226 }
227}
228
229#[async_trait]
231pub trait ExecutionHandlerRaw {
232 fn can_execute(&self, context: &ExecutionContext) -> bool;
235
236 async fn execute(&self, context: ExecutionContext, input_json: &str) -> Result<String, String>;
244
245 fn job_type_metadata(&self) -> &JobTypeMetadata;
247}
248
249pub trait IntoExecutionHandler: Sized + Send + Sync + 'static {
252 fn handler<J>(self) -> Arc<dyn ExecutionHandlerRaw + Send + Sync>
255 where
256 Self: ExecutionHandler<J>,
257 J: JobType,
258 {
259 <Self as ExecutionHandler<J>>::raw_handler(self)
260 }
261}
262
263impl<W> IntoExecutionHandler for W where W: Sized + Send + Sync + 'static {}