1use std::{
4 future::ready,
5 num::NonZeroUsize,
6 time::{Duration, SystemTime},
7};
8
9use executor_registry::{ExecutorRegistry, ExecutorRegistryOptions};
10use futures::{future::select, StreamExt};
11use ora_proto::{
12 server::v1::{
13 admin_service_client::AdminServiceClient, admin_service_server::AdminService,
14 executor_service_client::ExecutorServiceClient, executor_service_server::ExecutorService,
15 },
16 snapshot::v1::{
17 snapshot_service_client::SnapshotServiceClient, snapshot_service_server::SnapshotService,
18 },
19};
20use ora_storage::{JobQueryFilters, ScheduleQueryFilters};
21use scheduling::{
22 create_executions, create_schedule_jobs, mark_executions_ready, schedule_executions,
23 timer::spawn_timer,
24};
25use storage::StorageWrapper;
26use tokio::time::{sleep, timeout};
27use tonic::transport::Channel;
28use tracing::Instrument;
29use wgroup::WaitGroup;
30
31pub(crate) mod scheduling;
32pub(crate) mod time;
33
34pub(crate) mod admin;
35pub(crate) mod events;
36pub(crate) mod executor_registry;
37pub(crate) mod snapshot;
38pub(crate) mod storage;
39
40#[cfg(feature = "metrics")]
41pub(crate) mod metrics;
42
43pub use ora_storage::{Storage, StorageSnapshot};
44
45pub use events::{AuditEvent, AuditEventKind};
46
47pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
49pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
51
52pub use ora_timer::TimerOptions;
53
54#[derive(Debug, Clone)]
56pub struct ServerOptions {
57 pub timer: ora_timer::TimerOptions,
59 pub executor_heartbeat_timeout: std::time::Duration,
61 pub timer_buffer_size: NonZeroUsize,
63 pub event_buffer_size: NonZeroUsize,
65 pub bookkeeping_interval: std::time::Duration,
72 pub executor_shutdown_timeout: std::time::Duration,
74 pub max_job_age: Option<std::time::Duration>,
78 pub max_schedule_age: Option<std::time::Duration>,
82}
83
84impl Default for ServerOptions {
85 fn default() -> Self {
86 Self {
87 timer: Default::default(),
88 executor_heartbeat_timeout: Duration::from_secs(60),
89 timer_buffer_size: NonZeroUsize::new(10_1000).unwrap(),
90 event_buffer_size: NonZeroUsize::new(10_1000).unwrap(),
91 bookkeeping_interval: Duration::from_secs(5),
92 executor_shutdown_timeout: Duration::from_secs(10),
93 max_job_age: None,
94 max_schedule_age: None,
95 }
96 }
97}
98
99#[must_use = "the server needs to be explicitly stopped"]
101pub struct Server<S>
102where
103 S: ora_storage::Storage,
104{
105 storage: S,
106 executor_registry: executor_registry::ExecutorRegistry<StorageWrapper<S>>,
107 admin: admin::Admin<StorageWrapper<S>>,
108 options: ServerOptions,
109 event_bus: events::EventBus,
110 wg: Option<WaitGroup>,
111}
112
113impl<S> Server<S>
114where
115 S: ora_storage::Storage,
116{
117 pub fn spawn(storage: S, options: ServerOptions) -> eyre::Result<Self> {
122 let wg = WaitGroup::new();
123 let event_bus = events::EventBus::new(options.event_buffer_size.get());
124
125 let storage = StorageWrapper::new(storage, event_bus.clone());
126
127 let executor_registry = executor_registry::ExecutorRegistry::new(
128 storage.clone(),
129 ExecutorRegistryOptions {
130 executor_timeout: options.executor_heartbeat_timeout,
131 },
132 wg.handle(),
133 event_bus.clone(),
134 );
135 let admin = admin::Admin::new(
136 storage.clone(),
137 wg.handle(),
138 event_bus.clone(),
139 executor_registry.clone(),
140 );
141
142 let (mut pending_buf_producer, pending_buf_consumer) =
143 rtrb::RingBuffer::new(options.timer_buffer_size.get());
144
145 let (ready_buf_producer, mut ready_buf_consumer) =
146 rtrb::RingBuffer::new(options.timer_buffer_size.get());
147
148 spawn_timer(
149 ready_buf_producer,
150 pending_buf_consumer,
151 wg.add_with("timer"),
152 options.timer,
153 event_bus.clone(),
154 )?;
155
156 tokio::spawn({
157 let guard = wg.add_with("create_job_executions");
158 let backend = storage.clone();
159 let event_bus = event_bus.clone();
160
161 async move {
162 loop {
163 tracing::trace!("running create_job_executions");
164
165 if guard.is_waiting() {
166 break;
167 }
168
169 if let Err(error) = create_executions(&event_bus, &backend).await {
170 tracing::error!(?error, "failed to create job executions");
171 }
172
173 let mut events = event_bus
174 .subscribe_job_events()
175 .filter(|event| ready(matches!(event, events::JobEvent::JobsCreated)));
176 _ = timeout(options.bookkeeping_interval, events.next()).await;
177 }
178 }
179 .instrument(tracing::info_span!("create_job_executions"))
180 });
181
182 tokio::spawn({
183 let guard = wg.add_with("schedule_executions");
184 let backend = storage.clone();
185 let event_bus = event_bus.clone();
186
187 async move {
188 let mut last_scheduled_id = None;
189
190 loop {
191 tracing::trace!("running schedule_executions");
192
193 if guard.is_waiting() {
194 break;
195 }
196
197 match schedule_executions(
198 &backend,
199 &mut pending_buf_producer,
200 last_scheduled_id,
201 )
202 .await
203 {
204 Ok(last_id) => {
205 last_scheduled_id = last_id.or(last_scheduled_id);
206 }
207 Err(error) => {
208 tracing::error!(?error, "failed to schedule executions");
209 }
210 }
211
212 let mut events = event_bus.subscribe_execution_events().filter(|event| {
213 ready(matches!(event, events::ExecutionEvent::ExecutionsAdded))
214 });
215 _ = timeout(options.bookkeeping_interval, events.next()).await;
216 }
217 }
218 .instrument(tracing::info_span!("schedule_executions"))
219 });
220
221 tokio::spawn({
222 let guard = wg.add_with("ready_executions");
223 let backend = storage.clone();
224 let event_bus = event_bus.clone();
225
226 async move {
227 loop {
228 tracing::trace!("running ready_executions");
229
230 if guard.is_waiting() {
231 break;
232 }
233
234 if let Err(error) =
235 mark_executions_ready(&event_bus, &backend, &mut ready_buf_consumer).await
236 {
237 tracing::error!(?error, "failed to mark executions ready");
238 }
239
240 let mut events = event_bus.subscribe_execution_events().filter(|event| {
241 ready(matches!(
242 event,
243 events::ExecutionEvent::TimedExecutionsReady
244 ))
245 });
246 _ = timeout(options.bookkeeping_interval, events.next()).await;
247 }
248 }
249 .instrument(tracing::info_span!("ready_executions"))
250 });
251
252 tokio::spawn({
253 let guard = wg.add_with("assign_executions");
254 let executor_registry = executor_registry.clone();
255 let event_bus = event_bus.clone();
256
257 async move {
258 loop {
259 tracing::trace!("running assign_executions");
260
261 if guard.is_waiting() {
262 break;
263 }
264
265 if let Err(error) = executor_registry.assign_executions().await {
266 tracing::error!(?error, "failed to assign executions");
267 }
268
269 let mut execution_events =
270 event_bus.subscribe_execution_events().filter(|event| {
271 ready(matches!(
272 event,
273 events::ExecutionEvent::ExecutionsReadyToRun
274 ))
275 });
276 let mut executor_events =
277 event_bus.subscribe_executor_events().filter(|event| {
278 ready(matches!(event, events::ExecutorEvent::ExecutorReady))
279 });
280
281 _ = timeout(
282 options.bookkeeping_interval,
283 select(execution_events.next(), executor_events.next()),
284 )
285 .await;
286 }
287 }
288 .instrument(tracing::info_span!("assign_executions"))
289 });
290
291 tokio::spawn({
292 let guard = wg.add_with("execution_timeouts");
293 let executor_registry = executor_registry.clone();
294 let event_bus = event_bus.clone();
295
296 async move {
297 loop {
298 tracing::trace!("running execution_timeouts");
299
300 if guard.is_waiting() {
301 break;
302 }
303
304 if let Err(error) = executor_registry.fail_timed_out_executions().await {
305 tracing::error!(?error, "failed to fail timed out executions");
306 }
307
308 let mut events = event_bus.subscribe_execution_events().filter(|event| {
309 ready(matches!(
310 event,
311 events::ExecutionEvent::TimedExecutionsReady
312 ))
313 });
314 _ = timeout(options.bookkeeping_interval, events.next()).await;
315 }
316 }
317 .instrument(tracing::info_span!("execution_timeouts"))
318 });
319
320 tokio::spawn({
321 let guard = wg.add_with("reap_dead_executors");
322 let executor_registry = executor_registry.clone();
323
324 async move {
325 loop {
326 tracing::trace!("running reap_dead_executors");
327
328 if guard.is_waiting() {
329 break;
330 }
331
332 if let Err(error) = executor_registry.reap_dead_executors().await {
333 tracing::error!(?error, "failed to reap dead executors");
334 }
335
336 sleep(options.bookkeeping_interval).await;
337 }
338 }
339 .instrument(tracing::info_span!("reap_dead_executors"))
340 });
341
342 tokio::spawn({
343 let guard = wg.add_with("clean_up_orphan_executions");
344 let executor_registry = executor_registry.clone();
345
346 async move {
347 loop {
348 tracing::trace!("running clean_up_orphan_executions");
349
350 if guard.is_waiting() {
351 break;
352 }
353
354 if let Err(error) = executor_registry.clean_up_orphan_executions().await {
355 tracing::error!(?error, "failed to clean up orphan executions");
356 }
357
358 sleep(options.bookkeeping_interval).await;
359 }
360 }
361 .instrument(tracing::info_span!("clean_up_orphan_executions"))
362 });
363
364 if let Some(max_job_age) = options.max_job_age {
365 tokio::spawn({
366 let guard = wg.add_with("remove_old_jobs");
367 let storage = storage.clone();
368
369 async move {
370 loop {
371 tracing::trace!("running remove_old_jobs");
372
373 if guard.is_waiting() {
374 break;
375 }
376
377 let Some(after) = SystemTime::now().checked_sub(max_job_age) else {
378 sleep(options.bookkeeping_interval).await;
379 continue;
380 };
381
382 if let Err(error) = storage
383 .delete_jobs(JobQueryFilters {
384 active: Some(false),
385 created_before: Some(after),
386 ..Default::default()
387 })
388 .await
389 {
390 tracing::error!(?error, "failed to clean up jobs");
391 }
392
393 sleep(options.bookkeeping_interval).await;
394 }
395 }
396 .instrument(tracing::info_span!("remove_old_jobs"))
397 });
398 }
399
400 if let Some(max_schedule_age) = options.max_schedule_age {
401 tokio::spawn({
402 let guard = wg.add_with("remove_old_schedules");
403 let storage = storage.clone();
404
405 async move {
406 loop {
407 tracing::trace!("running remove_old_schedules");
408
409 if guard.is_waiting() {
410 break;
411 }
412
413 let Some(after) = SystemTime::now().checked_sub(max_schedule_age) else {
414 sleep(options.bookkeeping_interval).await;
415 continue;
416 };
417
418 if let Err(error) = storage
419 .delete_schedules(ScheduleQueryFilters {
420 active: Some(false),
421 created_before: Some(after),
422 ..Default::default()
423 })
424 .await
425 {
426 tracing::error!(?error, "failed to clean up schedules");
427 }
428
429 sleep(options.bookkeeping_interval).await;
430 }
431 }
432 .instrument(tracing::info_span!("remove_old_schedules"))
433 });
434 }
435
436 tokio::spawn({
437 let guard = wg.add_with("create_schedule_jobs");
438 let event_bus = event_bus.clone();
439 let storage = storage.clone();
440
441 async move {
442 loop {
443 tracing::trace!("running create_schedule_jobs");
444
445 if guard.is_waiting() {
446 break;
447 }
448
449 if let Err(error) = create_schedule_jobs(&event_bus, &storage).await {
450 tracing::error!(?error, "failed to create schedule jobs");
451 }
452
453 let mut execution_events =
454 event_bus.subscribe_execution_events().filter(|event| {
455 ready(matches!(event, events::ExecutionEvent::ExecutionsFinished))
456 });
457 let mut schedule_events =
458 event_bus.subscribe_schedule_events().filter(|event| {
459 ready(matches!(event, events::ScheduleEvent::SchedulesAdded))
460 });
461
462 _ = timeout(
463 options.bookkeeping_interval,
464 select(execution_events.next(), schedule_events.next()),
465 )
466 .await;
467 }
468 }
469 .instrument(tracing::info_span!("create_schedule_jobs"))
470 });
471
472 #[cfg(feature = "metrics")]
473 tokio::spawn({
474 let storage = storage.clone();
475 let event_bus = event_bus.clone();
476
477 async move {
478 if let Err(error) = crate::metrics::collect_metrics(&storage, &event_bus).await {
479 tracing::error!(?error, "error collecting metrics");
480 }
481 }
482 });
483
484 Ok(Self {
485 storage: storage.inner.clone(),
486 executor_registry,
487 admin,
488 options,
489 event_bus,
490 wg: Some(wg),
491 })
492 }
493
494 pub fn admin_service(&self) -> impl AdminService {
496 self.admin.clone()
497 }
498
499 pub fn executor_service(&self) -> impl ExecutorService {
501 self.executor_registry.clone()
502 }
503
504 #[allow(clippy::missing_panics_doc)]
511 pub fn admin_service_client(&self) -> AdminServiceClient<Channel> {
512 use hyper_util::rt::TokioIo;
513 use ora_proto::server::v1::admin_service_server::AdminServiceServer;
514 use tonic::transport::{Endpoint, Uri};
515
516 let (client, server) = tokio::io::duplex(1024);
517
518 let admin = self.admin_service();
519 let wg = self.wg.as_ref().unwrap().handle().add();
520
521 tokio::spawn(async move {
522 let srv = tonic::transport::Server::builder()
523 .add_service(AdminServiceServer::new(admin).max_decoding_message_size(usize::MAX))
524 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
525
526 let waiting = wg.waiting();
527
528 tokio::select! {
529 _ = waiting => {}
530 serve_result = srv => {
531 if let Err(error) = serve_result {
532 tracing::error!(?error, "error during admin service serve");
533 }
534 }
535 }
536 });
537
538 let mut client = Some(client);
539 let channel = Endpoint::try_from("http://[::]:50051")
540 .unwrap()
541 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
542 let client = client.take();
543
544 async move {
545 if let Some(client) = client {
546 Ok(TokioIo::new(client))
547 } else {
548 Err(std::io::Error::new(
549 std::io::ErrorKind::Other,
550 "Client already taken",
551 ))
552 }
553 }
554 }));
555
556 AdminServiceClient::new(channel)
557 }
558
559 #[allow(clippy::missing_panics_doc)]
566 pub fn executor_service_client(&self) -> ExecutorServiceClient<Channel> {
567 use hyper_util::rt::TokioIo;
568 use ora_proto::server::v1::executor_service_server::ExecutorServiceServer;
569 use tonic::transport::{Endpoint, Uri};
570
571 let (client, server) = tokio::io::duplex(1024);
572
573 let executor_registry = self.executor_service();
574 let wg = self.wg.as_ref().unwrap().handle().add();
575
576 tokio::spawn(async move {
577 let srv = tonic::transport::Server::builder()
578 .add_service(
579 ExecutorServiceServer::new(executor_registry)
580 .max_decoding_message_size(usize::MAX),
581 )
582 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
583
584 let waiting = wg.waiting();
585
586 tokio::select! {
587 _ = waiting => {}
588 serve_result = srv => {
589 if let Err(error) = serve_result {
590 tracing::error!(?error, "error during admin service serve");
591 }
592 }
593 }
594 });
595
596 let mut client = Some(client);
597 let channel = Endpoint::try_from("http://[::]:50051")
598 .unwrap()
599 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
600 let client = client.take();
601
602 async move {
603 if let Some(client) = client {
604 Ok(TokioIo::new(client))
605 } else {
606 Err(std::io::Error::new(
607 std::io::ErrorKind::Other,
608 "Client already taken",
609 ))
610 }
611 }
612 }));
613
614 ExecutorServiceClient::new(channel)
615 }
616
617 pub fn options(&self) -> &ServerOptions {
619 &self.options
620 }
621
622 pub fn events(
624 &self,
625 ) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
626 self.event_bus.subscribe_audit_events()
627 }
628
629 #[tracing::instrument(skip_all)]
631 pub async fn shutdown(mut self) -> eyre::Result<()> {
632 Self::shutdown_impl(
633 self.wg.take().unwrap(),
634 self.executor_registry.clone(),
635 self.options.clone(),
636 )
637 .await
638 }
639
640 async fn shutdown_impl(
641 wg: WaitGroup,
642 executor_registry: ExecutorRegistry<StorageWrapper<S>>,
643 options: ServerOptions,
644 ) -> eyre::Result<()> {
645 tracing::info!("shutting down ora server");
646
647 let mut running_components = wg.all_done_stream();
648
649 tokio::spawn(async move {
650 if let Err(error) = executor_registry
651 .shutdown(Some(options.executor_shutdown_timeout))
652 .await
653 {
654 tracing::error!(?error, "error during executor registry shutdown");
655 }
656 });
657
658 while let Some((component_count, components)) = running_components.next().await {
659 tracing::debug!(
660 component_count = component_count,
661 components = ?components,
662 "waiting for components to shut down",
663 );
664 }
665
666 Ok(())
667 }
668}
669
670impl<S> Server<S>
671where
672 S: ora_storage::Storage + StorageSnapshot,
673{
674 #[allow(clippy::missing_panics_doc)]
676 pub fn snapshot_service(&self) -> impl SnapshotService {
677 snapshot::SnapshotInterface::new(
678 self.storage.clone(),
679 self.wg.as_ref().unwrap().handle(),
680 self.event_bus.clone(),
681 )
682 }
683
684 #[allow(clippy::missing_panics_doc)]
691 pub fn snapshot_service_client(&self) -> SnapshotServiceClient<Channel> {
692 use hyper_util::rt::TokioIo;
693 use ora_proto::snapshot::v1::snapshot_service_server::SnapshotServiceServer;
694 use tonic::transport::{Endpoint, Uri};
695
696 let (client, server) = tokio::io::duplex(1024);
697
698 let svc = self.snapshot_service();
699 let wg = self.wg.as_ref().unwrap().handle().add();
700
701 tokio::spawn(async move {
702 let srv = tonic::transport::Server::builder()
703 .add_service(SnapshotServiceServer::new(svc).max_decoding_message_size(usize::MAX))
704 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
705
706 let waiting = wg.waiting();
707
708 tokio::select! {
709 _ = waiting => {}
710 serve_result = srv => {
711 if let Err(error) = serve_result {
712 tracing::error!(?error, "error during admin service serve");
713 }
714 }
715 }
716 });
717
718 let mut client = Some(client);
719 let channel = Endpoint::try_from("http://[::]:50051")
720 .unwrap()
721 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
722 let client = client.take();
723
724 async move {
725 if let Some(client) = client {
726 Ok(TokioIo::new(client))
727 } else {
728 Err(std::io::Error::new(
729 std::io::ErrorKind::Other,
730 "Client already taken",
731 ))
732 }
733 }
734 }));
735
736 SnapshotServiceClient::new(channel)
737 }
738}
739
740impl<S> Drop for Server<S>
741where
742 S: ora_storage::Storage,
743{
744 fn drop(&mut self) {
745 let Some(wg) = self.wg.take() else {
746 return;
747 };
748
749 let stop_fut =
750 Self::shutdown_impl(wg, self.executor_registry.clone(), self.options.clone());
751
752 tracing::warn!("server instance dropped, attempting shutdown in the background");
753 tokio::spawn(async move {
754 if let Err(error) = stop_fut.await {
755 tracing::error!(?error, "error during server shutdown");
756 }
757 });
758 }
759}