ora_server/
lib.rs

1//! Ora server implementation.
2
3use std::{
4    future::ready,
5    num::NonZeroUsize,
6    time::{Duration, SystemTime},
7};
8
9use executor_registry::{ExecutorRegistry, ExecutorRegistryOptions};
10use futures::{future::select, StreamExt};
11use ora_proto::{
12    server::v1::{
13        admin_service_client::AdminServiceClient, admin_service_server::AdminService,
14        executor_service_client::ExecutorServiceClient, executor_service_server::ExecutorService,
15    },
16    snapshot::v1::{
17        snapshot_service_client::SnapshotServiceClient, snapshot_service_server::SnapshotService,
18    },
19};
20use ora_storage::{JobQueryFilters, ScheduleQueryFilters};
21use scheduling::{
22    create_executions, create_schedule_jobs, mark_executions_ready, schedule_executions,
23    timer::spawn_timer,
24};
25use storage::StorageWrapper;
26use tokio::time::{sleep, timeout};
27use tonic::transport::Channel;
28use tracing::Instrument;
29use wgroup::WaitGroup;
30
31pub(crate) mod scheduling;
32pub(crate) mod time;
33
34pub(crate) mod admin;
35pub(crate) mod events;
36pub(crate) mod executor_registry;
37pub(crate) mod snapshot;
38pub(crate) mod storage;
39
40#[cfg(feature = "metrics")]
41pub(crate) mod metrics;
42
43pub use ora_storage::{Storage, StorageSnapshot};
44
45pub use events::{AuditEvent, AuditEventKind};
46
47/// Re-exported types.
48pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
49/// Re-exported types.
50pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
51
52pub use ora_timer::TimerOptions;
53
54/// Options for the server.
55#[derive(Debug, Clone)]
56pub struct ServerOptions {
57    /// Options for the timer.
58    pub timer: ora_timer::TimerOptions,
59    /// The timeout for an executor to be considered dead.
60    pub executor_heartbeat_timeout: std::time::Duration,
61    /// Buffer size for internal channels.
62    pub timer_buffer_size: NonZeroUsize,
63    /// Buffer size for internal events.
64    pub event_buffer_size: NonZeroUsize,
65    /// Bookkeeping interval for various tasks,
66    /// the server is mostly event-driven but some
67    /// tasks are run periodically.
68    ///
69    /// Even event-driven tasks are run periodically
70    /// to ensure that they are not stuck.
71    pub bookkeeping_interval: std::time::Duration,
72    /// Executor shutdown timeout.
73    pub executor_shutdown_timeout: std::time::Duration,
74    /// Delete inactive jobs after this duration.
75    ///
76    /// By default, jobs are never deleted.
77    pub max_job_age: Option<std::time::Duration>,
78    /// Delete inactive schedules after this duration.
79    ///
80    /// By default, schedules are never deleted.
81    pub max_schedule_age: Option<std::time::Duration>,
82}
83
84impl Default for ServerOptions {
85    fn default() -> Self {
86        Self {
87            timer: Default::default(),
88            executor_heartbeat_timeout: Duration::from_secs(60),
89            timer_buffer_size: NonZeroUsize::new(10_1000).unwrap(),
90            event_buffer_size: NonZeroUsize::new(10_1000).unwrap(),
91            bookkeeping_interval: Duration::from_secs(5),
92            executor_shutdown_timeout: Duration::from_secs(10),
93            max_job_age: None,
94            max_schedule_age: None,
95        }
96    }
97}
98
99/// A running server instance.
100#[must_use = "the server needs to be explicitly stopped"]
101pub struct Server<S>
102where
103    S: ora_storage::Storage,
104{
105    storage: S,
106    executor_registry: executor_registry::ExecutorRegistry<StorageWrapper<S>>,
107    admin: admin::Admin<StorageWrapper<S>>,
108    options: ServerOptions,
109    event_bus: events::EventBus,
110    wg: Option<WaitGroup>,
111}
112
113impl<S> Server<S>
114where
115    S: ora_storage::Storage,
116{
117    /// Start a new server with the given storage backend and options.
118    ///
119    /// The server will spawn backgrounds tasks that will
120    /// run until the server is stopped.
121    pub fn spawn(storage: S, options: ServerOptions) -> eyre::Result<Self> {
122        let wg = WaitGroup::new();
123        let event_bus = events::EventBus::new(options.event_buffer_size.get());
124
125        let storage = StorageWrapper::new(storage, event_bus.clone());
126
127        let executor_registry = executor_registry::ExecutorRegistry::new(
128            storage.clone(),
129            ExecutorRegistryOptions {
130                executor_timeout: options.executor_heartbeat_timeout,
131            },
132            wg.handle(),
133            event_bus.clone(),
134        );
135        let admin = admin::Admin::new(
136            storage.clone(),
137            wg.handle(),
138            event_bus.clone(),
139            executor_registry.clone(),
140        );
141
142        let (mut pending_buf_producer, pending_buf_consumer) =
143            rtrb::RingBuffer::new(options.timer_buffer_size.get());
144
145        let (ready_buf_producer, mut ready_buf_consumer) =
146            rtrb::RingBuffer::new(options.timer_buffer_size.get());
147
148        spawn_timer(
149            ready_buf_producer,
150            pending_buf_consumer,
151            wg.add_with("timer"),
152            options.timer,
153            event_bus.clone(),
154        )?;
155
156        tokio::spawn({
157            let guard = wg.add_with("create_job_executions");
158            let backend = storage.clone();
159            let event_bus = event_bus.clone();
160
161            async move {
162                loop {
163                    tracing::trace!("running create_job_executions");
164
165                    if guard.is_waiting() {
166                        break;
167                    }
168
169                    if let Err(error) = create_executions(&event_bus, &backend).await {
170                        tracing::error!(?error, "failed to create job executions");
171                    }
172
173                    let mut events = event_bus
174                        .subscribe_job_events()
175                        .filter(|event| ready(matches!(event, events::JobEvent::JobsCreated)));
176                    _ = timeout(options.bookkeeping_interval, events.next()).await;
177                }
178            }
179            .instrument(tracing::info_span!("create_job_executions"))
180        });
181
182        tokio::spawn({
183            let guard = wg.add_with("schedule_executions");
184            let backend = storage.clone();
185            let event_bus = event_bus.clone();
186
187            async move {
188                let mut last_scheduled_id = None;
189
190                loop {
191                    tracing::trace!("running schedule_executions");
192
193                    if guard.is_waiting() {
194                        break;
195                    }
196
197                    match schedule_executions(
198                        &backend,
199                        &mut pending_buf_producer,
200                        last_scheduled_id,
201                    )
202                    .await
203                    {
204                        Ok(last_id) => {
205                            last_scheduled_id = last_id.or(last_scheduled_id);
206                        }
207                        Err(error) => {
208                            tracing::error!(?error, "failed to schedule executions");
209                        }
210                    }
211
212                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
213                        ready(matches!(event, events::ExecutionEvent::ExecutionsAdded))
214                    });
215                    _ = timeout(options.bookkeeping_interval, events.next()).await;
216                }
217            }
218            .instrument(tracing::info_span!("schedule_executions"))
219        });
220
221        tokio::spawn({
222            let guard = wg.add_with("ready_executions");
223            let backend = storage.clone();
224            let event_bus = event_bus.clone();
225
226            async move {
227                loop {
228                    tracing::trace!("running ready_executions");
229
230                    if guard.is_waiting() {
231                        break;
232                    }
233
234                    if let Err(error) =
235                        mark_executions_ready(&event_bus, &backend, &mut ready_buf_consumer).await
236                    {
237                        tracing::error!(?error, "failed to mark executions ready");
238                    }
239
240                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
241                        ready(matches!(
242                            event,
243                            events::ExecutionEvent::TimedExecutionsReady
244                        ))
245                    });
246                    _ = timeout(options.bookkeeping_interval, events.next()).await;
247                }
248            }
249            .instrument(tracing::info_span!("ready_executions"))
250        });
251
252        tokio::spawn({
253            let guard = wg.add_with("assign_executions");
254            let executor_registry = executor_registry.clone();
255            let event_bus = event_bus.clone();
256
257            async move {
258                loop {
259                    tracing::trace!("running assign_executions");
260
261                    if guard.is_waiting() {
262                        break;
263                    }
264
265                    if let Err(error) = executor_registry.assign_executions().await {
266                        tracing::error!(?error, "failed to assign executions");
267                    }
268
269                    let mut execution_events =
270                        event_bus.subscribe_execution_events().filter(|event| {
271                            ready(matches!(
272                                event,
273                                events::ExecutionEvent::ExecutionsReadyToRun
274                            ))
275                        });
276                    let mut executor_events =
277                        event_bus.subscribe_executor_events().filter(|event| {
278                            ready(matches!(event, events::ExecutorEvent::ExecutorReady))
279                        });
280
281                    _ = timeout(
282                        options.bookkeeping_interval,
283                        select(execution_events.next(), executor_events.next()),
284                    )
285                    .await;
286                }
287            }
288            .instrument(tracing::info_span!("assign_executions"))
289        });
290
291        tokio::spawn({
292            let guard = wg.add_with("execution_timeouts");
293            let executor_registry = executor_registry.clone();
294            let event_bus = event_bus.clone();
295
296            async move {
297                loop {
298                    tracing::trace!("running execution_timeouts");
299
300                    if guard.is_waiting() {
301                        break;
302                    }
303
304                    if let Err(error) = executor_registry.fail_timed_out_executions().await {
305                        tracing::error!(?error, "failed to fail timed out executions");
306                    }
307
308                    let mut events = event_bus.subscribe_execution_events().filter(|event| {
309                        ready(matches!(
310                            event,
311                            events::ExecutionEvent::TimedExecutionsReady
312                        ))
313                    });
314                    _ = timeout(options.bookkeeping_interval, events.next()).await;
315                }
316            }
317            .instrument(tracing::info_span!("execution_timeouts"))
318        });
319
320        tokio::spawn({
321            let guard = wg.add_with("reap_dead_executors");
322            let executor_registry = executor_registry.clone();
323
324            async move {
325                loop {
326                    tracing::trace!("running reap_dead_executors");
327
328                    if guard.is_waiting() {
329                        break;
330                    }
331
332                    if let Err(error) = executor_registry.reap_dead_executors().await {
333                        tracing::error!(?error, "failed to reap dead executors");
334                    }
335
336                    sleep(options.bookkeeping_interval).await;
337                }
338            }
339            .instrument(tracing::info_span!("reap_dead_executors"))
340        });
341
342        tokio::spawn({
343            let guard = wg.add_with("clean_up_orphan_executions");
344            let executor_registry = executor_registry.clone();
345
346            async move {
347                loop {
348                    tracing::trace!("running clean_up_orphan_executions");
349
350                    if guard.is_waiting() {
351                        break;
352                    }
353
354                    if let Err(error) = executor_registry.clean_up_orphan_executions().await {
355                        tracing::error!(?error, "failed to clean up orphan executions");
356                    }
357
358                    sleep(options.bookkeeping_interval).await;
359                }
360            }
361            .instrument(tracing::info_span!("clean_up_orphan_executions"))
362        });
363
364        if let Some(max_job_age) = options.max_job_age {
365            tokio::spawn({
366                let guard = wg.add_with("remove_old_jobs");
367                let storage = storage.clone();
368
369                async move {
370                    loop {
371                        tracing::trace!("running remove_old_jobs");
372
373                        if guard.is_waiting() {
374                            break;
375                        }
376
377                        let Some(after) = SystemTime::now().checked_sub(max_job_age) else {
378                            sleep(options.bookkeeping_interval).await;
379                            continue;
380                        };
381
382                        if let Err(error) = storage
383                            .delete_jobs(JobQueryFilters {
384                                active: Some(false),
385                                created_before: Some(after),
386                                ..Default::default()
387                            })
388                            .await
389                        {
390                            tracing::error!(?error, "failed to clean up jobs");
391                        }
392
393                        sleep(options.bookkeeping_interval).await;
394                    }
395                }
396                .instrument(tracing::info_span!("remove_old_jobs"))
397            });
398        }
399
400        if let Some(max_schedule_age) = options.max_schedule_age {
401            tokio::spawn({
402                let guard = wg.add_with("remove_old_schedules");
403                let storage = storage.clone();
404
405                async move {
406                    loop {
407                        tracing::trace!("running remove_old_schedules");
408
409                        if guard.is_waiting() {
410                            break;
411                        }
412
413                        let Some(after) = SystemTime::now().checked_sub(max_schedule_age) else {
414                            sleep(options.bookkeeping_interval).await;
415                            continue;
416                        };
417
418                        if let Err(error) = storage
419                            .delete_schedules(ScheduleQueryFilters {
420                                active: Some(false),
421                                created_before: Some(after),
422                                ..Default::default()
423                            })
424                            .await
425                        {
426                            tracing::error!(?error, "failed to clean up schedules");
427                        }
428
429                        sleep(options.bookkeeping_interval).await;
430                    }
431                }
432                .instrument(tracing::info_span!("remove_old_schedules"))
433            });
434        }
435
436        tokio::spawn({
437            let guard = wg.add_with("create_schedule_jobs");
438            let event_bus = event_bus.clone();
439            let storage = storage.clone();
440
441            async move {
442                loop {
443                    tracing::trace!("running create_schedule_jobs");
444
445                    if guard.is_waiting() {
446                        break;
447                    }
448
449                    if let Err(error) = create_schedule_jobs(&event_bus, &storage).await {
450                        tracing::error!(?error, "failed to create schedule jobs");
451                    }
452
453                    let mut execution_events =
454                        event_bus.subscribe_execution_events().filter(|event| {
455                            ready(matches!(event, events::ExecutionEvent::ExecutionsFinished))
456                        });
457                    let mut schedule_events =
458                        event_bus.subscribe_schedule_events().filter(|event| {
459                            ready(matches!(event, events::ScheduleEvent::SchedulesAdded))
460                        });
461
462                    _ = timeout(
463                        options.bookkeeping_interval,
464                        select(execution_events.next(), schedule_events.next()),
465                    )
466                    .await;
467                }
468            }
469            .instrument(tracing::info_span!("create_schedule_jobs"))
470        });
471
472        #[cfg(feature = "metrics")]
473        tokio::spawn({
474            let storage = storage.clone();
475            let event_bus = event_bus.clone();
476
477            async move {
478                if let Err(error) = crate::metrics::collect_metrics(&storage, &event_bus).await {
479                    tracing::error!(?error, "error collecting metrics");
480                }
481            }
482        });
483
484        Ok(Self {
485            storage: storage.inner.clone(),
486            executor_registry,
487            admin,
488            options,
489            event_bus,
490            wg: Some(wg),
491        })
492    }
493
494    /// Get a handle to the admin gRPC service.
495    pub fn admin_service(&self) -> impl AdminService {
496        self.admin.clone()
497    }
498
499    /// Get a handle to the executor service.
500    pub fn executor_service(&self) -> impl ExecutorService {
501        self.executor_registry.clone()
502    }
503
504    /// Get an admin service client that connects to the server
505    /// using an in-memory transport.
506    ///
507    /// Make sure to reuse the client as every call to this function
508    /// will spawn a new task that is only cleaned up
509    /// once the server is dropped.
510    #[allow(clippy::missing_panics_doc)]
511    pub fn admin_service_client(&self) -> AdminServiceClient<Channel> {
512        use hyper_util::rt::TokioIo;
513        use ora_proto::server::v1::admin_service_server::AdminServiceServer;
514        use tonic::transport::{Endpoint, Uri};
515
516        let (client, server) = tokio::io::duplex(1024);
517
518        let admin = self.admin_service();
519        let wg = self.wg.as_ref().unwrap().handle().add();
520
521        tokio::spawn(async move {
522            let srv = tonic::transport::Server::builder()
523                .add_service(AdminServiceServer::new(admin).max_decoding_message_size(usize::MAX))
524                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
525
526            let waiting = wg.waiting();
527
528            tokio::select! {
529                _ = waiting => {}
530                serve_result = srv => {
531                    if let Err(error) = serve_result {
532                        tracing::error!(?error, "error during admin service serve");
533                    }
534                }
535            }
536        });
537
538        let mut client = Some(client);
539        let channel = Endpoint::try_from("http://[::]:50051")
540            .unwrap()
541            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
542                let client = client.take();
543
544                async move {
545                    if let Some(client) = client {
546                        Ok(TokioIo::new(client))
547                    } else {
548                        Err(std::io::Error::new(
549                            std::io::ErrorKind::Other,
550                            "Client already taken",
551                        ))
552                    }
553                }
554            }));
555
556        AdminServiceClient::new(channel)
557    }
558
559    /// Get an executor service client that connects to the server
560    /// using an in-memory transport.
561    ///
562    /// Make sure to reuse the client as every call to this function
563    /// will spawn a new task that is only cleaned up
564    /// once the server is dropped.
565    #[allow(clippy::missing_panics_doc)]
566    pub fn executor_service_client(&self) -> ExecutorServiceClient<Channel> {
567        use hyper_util::rt::TokioIo;
568        use ora_proto::server::v1::executor_service_server::ExecutorServiceServer;
569        use tonic::transport::{Endpoint, Uri};
570
571        let (client, server) = tokio::io::duplex(1024);
572
573        let executor_registry = self.executor_service();
574        let wg = self.wg.as_ref().unwrap().handle().add();
575
576        tokio::spawn(async move {
577            let srv = tonic::transport::Server::builder()
578                .add_service(
579                    ExecutorServiceServer::new(executor_registry)
580                        .max_decoding_message_size(usize::MAX),
581                )
582                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
583
584            let waiting = wg.waiting();
585
586            tokio::select! {
587                _ = waiting => {}
588                serve_result = srv => {
589                    if let Err(error) = serve_result {
590                        tracing::error!(?error, "error during admin service serve");
591                    }
592                }
593            }
594        });
595
596        let mut client = Some(client);
597        let channel = Endpoint::try_from("http://[::]:50051")
598            .unwrap()
599            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
600                let client = client.take();
601
602                async move {
603                    if let Some(client) = client {
604                        Ok(TokioIo::new(client))
605                    } else {
606                        Err(std::io::Error::new(
607                            std::io::ErrorKind::Other,
608                            "Client already taken",
609                        ))
610                    }
611                }
612            }));
613
614        ExecutorServiceClient::new(channel)
615    }
616
617    /// Get the server options.
618    pub fn options(&self) -> &ServerOptions {
619        &self.options
620    }
621
622    /// Subscribe to system events.
623    pub fn events(
624        &self,
625    ) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
626        self.event_bus.subscribe_audit_events()
627    }
628
629    /// Shutdown the server.
630    #[tracing::instrument(skip_all)]
631    pub async fn shutdown(mut self) -> eyre::Result<()> {
632        Self::shutdown_impl(
633            self.wg.take().unwrap(),
634            self.executor_registry.clone(),
635            self.options.clone(),
636        )
637        .await
638    }
639
640    async fn shutdown_impl(
641        wg: WaitGroup,
642        executor_registry: ExecutorRegistry<StorageWrapper<S>>,
643        options: ServerOptions,
644    ) -> eyre::Result<()> {
645        tracing::info!("shutting down ora server");
646
647        let mut running_components = wg.all_done_stream();
648
649        tokio::spawn(async move {
650            if let Err(error) = executor_registry
651                .shutdown(Some(options.executor_shutdown_timeout))
652                .await
653            {
654                tracing::error!(?error, "error during executor registry shutdown");
655            }
656        });
657
658        while let Some((component_count, components)) = running_components.next().await {
659            tracing::debug!(
660                component_count = component_count,
661                components = ?components,
662                "waiting for components to shut down",
663            );
664        }
665
666        Ok(())
667    }
668}
669
670impl<S> Server<S>
671where
672    S: ora_storage::Storage + StorageSnapshot,
673{
674    /// Get a handle to the snapshot service.
675    #[allow(clippy::missing_panics_doc)]
676    pub fn snapshot_service(&self) -> impl SnapshotService {
677        snapshot::SnapshotInterface::new(
678            self.storage.clone(),
679            self.wg.as_ref().unwrap().handle(),
680            self.event_bus.clone(),
681        )
682    }
683
684    /// Get a snapshot service client that connects to the server
685    /// using an in-memory transport.
686    ///
687    /// Make sure to reuse the client as every call to this function
688    /// will spawn a new task that is only cleaned up
689    /// once the server is dropped.
690    #[allow(clippy::missing_panics_doc)]
691    pub fn snapshot_service_client(&self) -> SnapshotServiceClient<Channel> {
692        use hyper_util::rt::TokioIo;
693        use ora_proto::snapshot::v1::snapshot_service_server::SnapshotServiceServer;
694        use tonic::transport::{Endpoint, Uri};
695
696        let (client, server) = tokio::io::duplex(1024);
697
698        let svc = self.snapshot_service();
699        let wg = self.wg.as_ref().unwrap().handle().add();
700
701        tokio::spawn(async move {
702            let srv = tonic::transport::Server::builder()
703                .add_service(SnapshotServiceServer::new(svc).max_decoding_message_size(usize::MAX))
704                .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
705
706            let waiting = wg.waiting();
707
708            tokio::select! {
709                _ = waiting => {}
710                serve_result = srv => {
711                    if let Err(error) = serve_result {
712                        tracing::error!(?error, "error during admin service serve");
713                    }
714                }
715            }
716        });
717
718        let mut client = Some(client);
719        let channel = Endpoint::try_from("http://[::]:50051")
720            .unwrap()
721            .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
722                let client = client.take();
723
724                async move {
725                    if let Some(client) = client {
726                        Ok(TokioIo::new(client))
727                    } else {
728                        Err(std::io::Error::new(
729                            std::io::ErrorKind::Other,
730                            "Client already taken",
731                        ))
732                    }
733                }
734            }));
735
736        SnapshotServiceClient::new(channel)
737    }
738}
739
740impl<S> Drop for Server<S>
741where
742    S: ora_storage::Storage,
743{
744    fn drop(&mut self) {
745        let Some(wg) = self.wg.take() else {
746            return;
747        };
748
749        let stop_fut =
750            Self::shutdown_impl(wg, self.executor_registry.clone(), self.options.clone());
751
752        tracing::warn!("server instance dropped, attempting shutdown in the background");
753        tokio::spawn(async move {
754            if let Err(error) = stop_fut.await {
755                tracing::error!(?error, "error during server shutdown");
756            }
757        });
758    }
759}