ora_server/
lib.rs

1//! Ora server implementation.
2
3use std::{
4    future::ready,
5    num::NonZeroUsize,
6    time::{Duration, SystemTime},
7};
8
9use executor_registry::{ExecutorRegistry, ExecutorRegistryOptions};
10use futures::{future::select, StreamExt};
11use ora_proto::{
12    server::v1::{
13        admin_service_client::AdminServiceClient, admin_service_server::AdminService,
14        executor_service_client::ExecutorServiceClient, executor_service_server::ExecutorService,
15    },
16    snapshot::v1::{
17        snapshot_service_client::SnapshotServiceClient, snapshot_service_server::SnapshotService,
18    },
19};
20use ora_storage::{JobQueryFilters, ScheduleQueryFilters};
21use scheduling::{
22    create_executions, create_schedule_jobs, mark_executions_ready, schedule_executions,
23    timer::spawn_timer,
24};
25use storage::StorageWrapper;
26use tokio::time::{sleep, timeout};
27use tonic::transport::Channel;
28use tracing::Instrument;
29use wgroup::WaitGroup;
30
31pub(crate) mod scheduling;
32pub(crate) mod time;
33
34pub(crate) mod admin;
35pub(crate) mod events;
36pub(crate) mod executor_registry;
37pub(crate) mod snapshot;
38pub(crate) mod storage;
39
40#[cfg(feature = "metrics")]
41pub(crate) mod metrics;
42
43pub use ora_storage::{Storage, StorageSnapshot};
44
45pub use events::{AuditEvent, AuditEventKind};
46
47/// Re-exported types.
48pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
49/// Re-exported types.
50pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
51
52pub use ora_timer::TimerOptions;
53
54/// Options for the server.
55#[derive(Debug, Clone)]
56pub struct ServerOptions {
57    /// Options for the timer.
58    pub timer: ora_timer::TimerOptions,
59    /// The timeout for an executor to be considered dead.
60    pub executor_heartbeat_timeout: std::time::Duration,
61    /// Buffer size for internal channels.
62    pub timer_buffer_size: NonZeroUsize,
63    /// Buffer size for internal events.
64    pub event_buffer_size: NonZeroUsize,
65    /// Bookkeeping interval for various tasks,
66    /// the server is mostly event-driven but some
67    /// tasks are run periodically.
68    ///
69    /// Even event-driven tasks are run periodically
70    /// to ensure that they are not stuck.
71    pub bookkeeping_interval: std::time::Duration,
72    /// Interval for cleaning up old jobs and schedules.
73    ///
74    /// If not provided, the bookkeeping interval is used.
75    pub cleanup_interval: Option<std::time::Duration>,
76    /// Executor shutdown timeout.
77    pub executor_shutdown_timeout: std::time::Duration,
78    /// Delete inactive jobs after this duration.
79    ///
80    /// By default, jobs are never deleted.
81    pub max_job_age: Option<std::time::Duration>,
82    /// Delete inactive schedules after this duration.
83    ///
84    /// By default, schedules are never deleted.
85    pub max_schedule_age: Option<std::time::Duration>,
86}
87
88impl Default for ServerOptions {
89    fn default() -> Self {
90        Self {
91            timer: Default::default(),
92            executor_heartbeat_timeout: Duration::from_secs(60),
93            timer_buffer_size: NonZeroUsize::new(100_000).unwrap(),
94            event_buffer_size: NonZeroUsize::new(100_000).unwrap(),
95            bookkeeping_interval: Duration::from_secs(5),
96            cleanup_interval: None,
97            executor_shutdown_timeout: Duration::from_secs(10),
98            max_job_age: None,
99            max_schedule_age: None,
100        }
101    }
102}
103
104/// A running server instance.
105#[must_use = "the server needs to be explicitly stopped"]
106pub struct Server<S>
107where
108    S: ora_storage::Storage,
109{
110    storage: S,
111    executor_registry: executor_registry::ExecutorRegistry<StorageWrapper<S>>,
112    admin: admin::Admin<StorageWrapper<S>>,
113    options: ServerOptions,
114    event_bus: events::EventBus,
115    wg: Option<WaitGroup>,
116}
117
118impl<S> Server<S>
119where
120    S: ora_storage::Storage,
121{
122    /// Start a new server with the given storage backend and options.
123    ///
124    /// The server will spawn backgrounds tasks that will
125    /// run until the server is stopped.
126    pub fn spawn(storage: S, options: ServerOptions) -> eyre::Result<Self> {
127        let wg = WaitGroup::new();
128        let event_bus = events::EventBus::new(options.event_buffer_size.get());
129
130        let storage = StorageWrapper::new(storage, event_bus.clone());
131
132        let executor_registry = executor_registry::ExecutorRegistry::new(
133            storage.clone(),
134            ExecutorRegistryOptions {
135                executor_timeout: options.executor_heartbeat_timeout,
136            },
137            wg.handle(),
138            event_bus.clone(),
139        );
140        let admin = admin::Admin::new(
141            storage.clone(),
142            wg.handle(),
143            event_bus.clone(),
144            executor_registry.clone(),
145        );
146
147        let (mut pending_buf_producer, pending_buf_consumer) =
148            rtrb::RingBuffer::new(options.timer_buffer_size.get());
149
150        let (ready_buf_producer, mut ready_buf_consumer) =
151            rtrb::RingBuffer::new(options.timer_buffer_size.get());
152
153        spawn_timer(
154            ready_buf_producer,
155            pending_buf_consumer,
156            wg.add_with("timer"),
157            options.timer,
158            event_bus.clone(),
159        )?;
160
161        tokio::spawn({
162            let guard = wg.add_with("create_job_executions");
163            let backend = storage.clone();
164            let event_bus = event_bus.clone();
165
166            async move {
167                loop {
168                    tracing::trace!("running create_job_executions");
169
170                    if guard.is_waiting() {
171                        break;
172                    }
173
174                    if let Err(error) = create_executions(&event_bus, &backend).await {
175                        tracing::error!(?error, "failed to create job executions");
176                    }
177
178                    let mut events = event_bus
179                        .subscribe_job_events()
180                        .filter(|event| ready(matches!(event, events::JobEvent::JobsCreated)));
181                    _ = timeout(options.bookkeeping_interval, events.next()).await;
182                }
183            }
184            .instrument(tracing::info_span!("create_job_executions"))
185        });
186
187        tokio::spawn({
188            let guard = wg.add_with("schedule_executions");
189            let backend = storage.clone();
190            let event_bus = event_bus.clone();
191
192            async move {
193                let mut last_scheduled_id = None;
194
195                loop {
196                    tracing::trace!("running schedule_executions");
197
198                    if guard.is_waiting() {
199                        break;
200                    }
201
202                    match schedule_executions(
203                        &backend,
204                        &mut pending_buf_producer,
205                        last_scheduled_id,
206                    )
207                    .await
208                    {
209                        Ok(last_id) => {
210                            last_scheduled_id = last_id.or(last_scheduled_id);
211                        }
212                        Err(error) => {
213                            tracing::error!(?error, "failed to schedule executions");
214                        }
215                    }
216
217                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
218                        ready(matches!(event, events::ExecutionEvent::ExecutionsAdded))
219                    });
220                    _ = timeout(options.bookkeeping_interval, events.next()).await;
221                }
222            }
223            .instrument(tracing::info_span!("schedule_executions"))
224        });
225
226        tokio::spawn({
227            let guard = wg.add_with("ready_executions");
228            let backend = storage.clone();
229            let event_bus = event_bus.clone();
230
231            async move {
232                loop {
233                    tracing::trace!("running ready_executions");
234
235                    if guard.is_waiting() {
236                        break;
237                    }
238
239                    if let Err(error) =
240                        mark_executions_ready(&event_bus, &backend, &mut ready_buf_consumer).await
241                    {
242                        tracing::error!(?error, "failed to mark executions ready");
243                    }
244
245                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
246                        ready(matches!(
247                            event,
248                            events::ExecutionEvent::TimedExecutionsReady
249                        ))
250                    });
251                    _ = timeout(options.bookkeeping_interval, events.next()).await;
252                }
253            }
254            .instrument(tracing::info_span!("ready_executions"))
255        });
256
257        tokio::spawn({
258            let guard = wg.add_with("assign_executions");
259            let executor_registry = executor_registry.clone();
260            let event_bus = event_bus.clone();
261
262            async move {
263                loop {
264                    tracing::trace!("running assign_executions");
265
266                    if guard.is_waiting() {
267                        break;
268                    }
269
270                    if let Err(error) = executor_registry.assign_executions().await {
271                        tracing::error!(?error, "failed to assign executions");
272                    }
273
274                    let mut execution_events =
275                        event_bus.subscribe_execution_events().filter(|event| {
276                            ready(matches!(
277                                event,
278                                events::ExecutionEvent::ExecutionsReadyToRun
279                            ))
280                        });
281                    let mut executor_events =
282                        event_bus.subscribe_executor_events().filter(|event| {
283                            ready(matches!(event, events::ExecutorEvent::ExecutorReady))
284                        });
285
286                    _ = timeout(
287                        options.bookkeeping_interval,
288                        select(execution_events.next(), executor_events.next()),
289                    )
290                    .await;
291                }
292            }
293            .instrument(tracing::info_span!("assign_executions"))
294        });
295
296        tokio::spawn({
297            let guard = wg.add_with("execution_timeouts");
298            let executor_registry = executor_registry.clone();
299            let event_bus = event_bus.clone();
300
301            async move {
302                loop {
303                    tracing::trace!("running execution_timeouts");
304
305                    if guard.is_waiting() {
306                        break;
307                    }
308
309                    if let Err(error) = executor_registry.fail_timed_out_executions().await {
310                        tracing::error!(?error, "failed to fail timed out executions");
311                    }
312
313                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
314                        ready(matches!(
315                            event,
316                            events::ExecutionEvent::TimedExecutionsReady
317                        ))
318                    });
319                    _ = timeout(options.bookkeeping_interval, events.next()).await;
320                }
321            }
322            .instrument(tracing::info_span!("execution_timeouts"))
323        });
324
325        tokio::spawn({
326            let guard = wg.add_with("reap_dead_executors");
327            let executor_registry = executor_registry.clone();
328
329            async move {
330                loop {
331                    tracing::trace!("running reap_dead_executors");
332
333                    if guard.is_waiting() {
334                        break;
335                    }
336
337                    if let Err(error) = executor_registry.reap_dead_executors().await {
338                        tracing::error!(?error, "failed to reap dead executors");
339                    }
340
341                    sleep(options.bookkeeping_interval).await;
342                }
343            }
344            .instrument(tracing::info_span!("reap_dead_executors"))
345        });
346
347        tokio::spawn({
348            let guard = wg.add_with("clean_up_orphan_executions");
349            let executor_registry = executor_registry.clone();
350
351            async move {
352                loop {
353                    tracing::trace!("running clean_up_orphan_executions");
354
355                    if guard.is_waiting() {
356                        break;
357                    }
358
359                    if let Err(error) = executor_registry.clean_up_orphan_executions().await {
360                        tracing::error!(?error, "failed to clean up orphan executions");
361                    }
362
363                    sleep(options.bookkeeping_interval).await;
364                }
365            }
366            .instrument(tracing::info_span!("clean_up_orphan_executions"))
367        });
368
369        if let Some(max_job_age) = options.max_job_age {
370            tokio::spawn({
371                let guard = wg.add_with("remove_old_jobs");
372                let storage = storage.clone();
373
374                async move {
375                    loop {
376                        tracing::trace!("running remove_old_jobs");
377
378                        if guard.is_waiting() {
379                            break;
380                        }
381
382                        let Some(after) = SystemTime::now().checked_sub(max_job_age) else {
383                            sleep(
384                                options
385                                    .cleanup_interval
386                                    .unwrap_or(options.bookkeeping_interval),
387                            )
388                            .await;
389                            continue;
390                        };
391
392                        if let Err(error) = storage
393                            .delete_jobs(JobQueryFilters {
394                                active: Some(false),
395                                created_before: Some(after),
396                                ..Default::default()
397                            })
398                            .await
399                        {
400                            tracing::error!(?error, "failed to clean up jobs");
401                        }
402
403                        sleep(
404                            options
405                                .cleanup_interval
406                                .unwrap_or(options.bookkeeping_interval),
407                        )
408                        .await;
409                    }
410                }
411                .instrument(tracing::info_span!("remove_old_jobs"))
412            });
413        }
414
415        if let Some(max_schedule_age) = options.max_schedule_age {
416            tokio::spawn({
417                let guard = wg.add_with("remove_old_schedules");
418                let storage = storage.clone();
419
420                async move {
421                    loop {
422                        tracing::trace!("running remove_old_schedules");
423
424                        if guard.is_waiting() {
425                            break;
426                        }
427
428                        let Some(after) = SystemTime::now().checked_sub(max_schedule_age) else {
429                            sleep(
430                                options
431                                    .cleanup_interval
432                                    .unwrap_or(options.bookkeeping_interval),
433                            )
434                            .await;
435                            continue;
436                        };
437
438                        if let Err(error) = storage
439                            .delete_schedules(ScheduleQueryFilters {
440                                active: Some(false),
441                                created_before: Some(after),
442                                ..Default::default()
443                            })
444                            .await
445                        {
446                            tracing::error!(?error, "failed to clean up schedules");
447                        }
448
449                        sleep(
450                            options
451                                .cleanup_interval
452                                .unwrap_or(options.bookkeeping_interval),
453                        )
454                        .await;
455                    }
456                }
457                .instrument(tracing::info_span!("remove_old_schedules"))
458            });
459        }
460
461        tokio::spawn({
462            let guard = wg.add_with("create_schedule_jobs");
463            let event_bus = event_bus.clone();
464            let storage = storage.clone();
465
466            async move {
467                loop {
468                    tracing::trace!("running create_schedule_jobs");
469
470                    if guard.is_waiting() {
471                        break;
472                    }
473
474                    if let Err(error) = create_schedule_jobs(&event_bus, &storage).await {
475                        tracing::error!(?error, "failed to create schedule jobs");
476                    }
477
478                    let mut execution_events =
479                        event_bus.subscribe_execution_events().filter(|event| {
480                            ready(matches!(event, events::ExecutionEvent::ExecutionsFinished))
481                        });
482                    let mut schedule_events =
483                        event_bus.subscribe_schedule_events().filter(|event| {
484                            ready(matches!(event, events::ScheduleEvent::SchedulesAdded))
485                        });
486
487                    _ = timeout(
488                        options.bookkeeping_interval,
489                        select(execution_events.next(), schedule_events.next()),
490                    )
491                    .await;
492                }
493            }
494            .instrument(tracing::info_span!("create_schedule_jobs"))
495        });
496
497        #[cfg(feature = "metrics")]
498        tokio::spawn({
499            let storage = storage.clone();
500            let event_bus = event_bus.clone();
501
502            async move {
503                if let Err(error) = crate::metrics::collect_metrics(&storage, &event_bus).await {
504                    tracing::error!(?error, "error collecting metrics");
505                }
506            }
507        });
508
509        Ok(Self {
510            storage: storage.inner.clone(),
511            executor_registry,
512            admin,
513            options,
514            event_bus,
515            wg: Some(wg),
516        })
517    }
518
519    /// Get a handle to the admin gRPC service.
520    pub fn admin_service(&self) -> impl AdminService {
521        self.admin.clone()
522    }
523
524    /// Get a handle to the executor service.
525    pub fn executor_service(&self) -> impl ExecutorService {
526        self.executor_registry.clone()
527    }
528
529    /// Get an admin service client that connects to the server
530    /// using an in-memory transport.
531    ///
532    /// Make sure to reuse the client as every call to this function
533    /// will spawn a new task that is only cleaned up
534    /// once the server is dropped.
535    #[allow(clippy::missing_panics_doc)]
536    pub fn admin_service_client(&self) -> AdminServiceClient<Channel> {
537        use hyper_util::rt::TokioIo;
538        use ora_proto::server::v1::admin_service_server::AdminServiceServer;
539        use tonic::transport::{Endpoint, Uri};
540
541        let (client, server) = tokio::io::duplex(1024);
542
543        let admin = self.admin_service();
544        let wg = self.wg.as_ref().unwrap().handle().add();
545
546        tokio::spawn(async move {
547            let srv = tonic::transport::Server::builder()
548                .add_service(AdminServiceServer::new(admin).max_decoding_message_size(usize::MAX))
549                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
550
551            let waiting = wg.waiting();
552
553            tokio::select! {
554                _ = waiting => {}
555                serve_result = srv => {
556                    if let Err(error) = serve_result {
557                        tracing::error!(?error, "error during admin service serve");
558                    }
559                }
560            }
561        });
562
563        let mut client = Some(client);
564        let channel = Endpoint::try_from("http://[::]:50051")
565            .unwrap()
566            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
567                let client = client.take();
568
569                async move {
570                    if let Some(client) = client {
571                        Ok(TokioIo::new(client))
572                    } else {
573                        Err(std::io::Error::new(
574                            std::io::ErrorKind::Other,
575                            "Client already taken",
576                        ))
577                    }
578                }
579            }));
580
581        AdminServiceClient::new(channel)
582    }
583
584    /// Get an executor service client that connects to the server
585    /// using an in-memory transport.
586    ///
587    /// Make sure to reuse the client as every call to this function
588    /// will spawn a new task that is only cleaned up
589    /// once the server is dropped.
590    #[allow(clippy::missing_panics_doc)]
591    pub fn executor_service_client(&self) -> ExecutorServiceClient<Channel> {
592        use hyper_util::rt::TokioIo;
593        use ora_proto::server::v1::executor_service_server::ExecutorServiceServer;
594        use tonic::transport::{Endpoint, Uri};
595
596        let (client, server) = tokio::io::duplex(1024);
597
598        let executor_registry = self.executor_service();
599        let wg = self.wg.as_ref().unwrap().handle().add();
600
601        tokio::spawn(async move {
602            let srv = tonic::transport::Server::builder()
603                .add_service(
604                    ExecutorServiceServer::new(executor_registry)
605                        .max_decoding_message_size(usize::MAX),
606                )
607                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
608
609            let waiting = wg.waiting();
610
611            tokio::select! {
612                _ = waiting => {}
613                serve_result = srv => {
614                    if let Err(error) = serve_result {
615                        tracing::error!(?error, "error during admin service serve");
616                    }
617                }
618            }
619        });
620
621        let mut client = Some(client);
622        let channel = Endpoint::try_from("http://[::]:50051")
623            .unwrap()
624            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
625                let client = client.take();
626
627                async move {
628                    if let Some(client) = client {
629                        Ok(TokioIo::new(client))
630                    } else {
631                        Err(std::io::Error::new(
632                            std::io::ErrorKind::Other,
633                            "Client already taken",
634                        ))
635                    }
636                }
637            }));
638
639        ExecutorServiceClient::new(channel)
640    }
641
642    /// Get the server options.
643    pub fn options(&self) -> &ServerOptions {
644        &self.options
645    }
646
647    /// Subscribe to system events.
648    pub fn events(
649        &self,
650    ) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
651        self.event_bus.subscribe_audit_events()
652    }
653
654    /// Shutdown the server.
655    #[tracing::instrument(skip_all)]
656    pub async fn shutdown(mut self) -> eyre::Result<()> {
657        Self::shutdown_impl(
658            self.wg.take().unwrap(),
659            self.executor_registry.clone(),
660            self.options.clone(),
661        )
662        .await
663    }
664
665    async fn shutdown_impl(
666        wg: WaitGroup,
667        executor_registry: ExecutorRegistry<StorageWrapper<S>>,
668        options: ServerOptions,
669    ) -> eyre::Result<()> {
670        tracing::info!("shutting down ora server");
671
672        let mut running_components = wg.all_done_stream();
673
674        tokio::spawn(async move {
675            if let Err(error) = executor_registry
676                .shutdown(Some(options.executor_shutdown_timeout))
677                .await
678            {
679                tracing::error!(?error, "error during executor registry shutdown");
680            }
681        });
682
683        while let Some((component_count, components)) = running_components.next().await {
684            tracing::debug!(
685                component_count = component_count,
686                components = ?components,
687                "waiting for components to shut down",
688            );
689        }
690
691        Ok(())
692    }
693}
694
695impl<S> Server<S>
696where
697    S: ora_storage::Storage + StorageSnapshot,
698{
699    /// Get a handle to the snapshot service.
700    #[allow(clippy::missing_panics_doc)]
701    pub fn snapshot_service(&self) -> impl SnapshotService {
702        snapshot::SnapshotInterface::new(
703            self.storage.clone(),
704            self.wg.as_ref().unwrap().handle(),
705            self.event_bus.clone(),
706        )
707    }
708
709    /// Get a snapshot service client that connects to the server
710    /// using an in-memory transport.
711    ///
712    /// Make sure to reuse the client as every call to this function
713    /// will spawn a new task that is only cleaned up
714    /// once the server is dropped.
715    #[allow(clippy::missing_panics_doc)]
716    pub fn snapshot_service_client(&self) -> SnapshotServiceClient<Channel> {
717        use hyper_util::rt::TokioIo;
718        use ora_proto::snapshot::v1::snapshot_service_server::SnapshotServiceServer;
719        use tonic::transport::{Endpoint, Uri};
720
721        let (client, server) = tokio::io::duplex(1024);
722
723        let svc = self.snapshot_service();
724        let wg = self.wg.as_ref().unwrap().handle().add();
725
726        tokio::spawn(async move {
727            let srv = tonic::transport::Server::builder()
728                .add_service(SnapshotServiceServer::new(svc).max_decoding_message_size(usize::MAX))
729                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
730
731            let waiting = wg.waiting();
732
733            tokio::select! {
734                _ = waiting => {}
735                serve_result = srv => {
736                    if let Err(error) = serve_result {
737                        tracing::error!(?error, "error during admin service serve");
738                    }
739                }
740            }
741        });
742
743        let mut client = Some(client);
744        let channel = Endpoint::try_from("http://[::]:50051")
745            .unwrap()
746            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
747                let client = client.take();
748
749                async move {
750                    if let Some(client) = client {
751                        Ok(TokioIo::new(client))
752                    } else {
753                        Err(std::io::Error::new(
754                            std::io::ErrorKind::Other,
755                            "Client already taken",
756                        ))
757                    }
758                }
759            }));
760
761        SnapshotServiceClient::new(channel)
762    }
763}
764
765impl<S> Drop for Server<S>
766where
767    S: ora_storage::Storage,
768{
769    fn drop(&mut self) {
770        let Some(wg) = self.wg.take() else {
771            return;
772        };
773
774        let stop_fut =
775            Self::shutdown_impl(wg, self.executor_registry.clone(), self.options.clone());
776
777        tracing::warn!("server instance dropped, attempting shutdown in the background");
778        tokio::spawn(async move {
779            if let Err(error) = stop_fut.await {
780                tracing::error!(?error, "error during server shutdown");
781            }
782        });
783    }
784}