1use std::{
4 future::ready,
5 num::NonZeroUsize,
6 time::{Duration, SystemTime},
7};
8
9use executor_registry::{ExecutorRegistry, ExecutorRegistryOptions};
10use futures::{future::select, StreamExt};
11use ora_proto::{
12 server::v1::{
13 admin_service_client::AdminServiceClient, admin_service_server::AdminService,
14 executor_service_client::ExecutorServiceClient, executor_service_server::ExecutorService,
15 },
16 snapshot::v1::{
17 snapshot_service_client::SnapshotServiceClient, snapshot_service_server::SnapshotService,
18 },
19};
20use ora_storage::{JobQueryFilters, ScheduleQueryFilters};
21use scheduling::{
22 create_executions, create_schedule_jobs, mark_executions_ready, schedule_executions,
23 timer::spawn_timer,
24};
25use storage::StorageWrapper;
26use tokio::time::{sleep, timeout};
27use tonic::transport::Channel;
28use tracing::Instrument;
29use wgroup::WaitGroup;
30
31pub(crate) mod scheduling;
32pub(crate) mod time;
33
34pub(crate) mod admin;
35pub(crate) mod events;
36pub(crate) mod executor_registry;
37pub(crate) mod snapshot;
38pub(crate) mod storage;
39
40#[cfg(feature = "metrics")]
41pub(crate) mod metrics;
42
43pub use ora_storage::{Storage, StorageSnapshot};
44
45pub use events::{AuditEvent, AuditEventKind};
46
47pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
49pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
51
52pub use ora_timer::TimerOptions;
53
54#[derive(Debug, Clone)]
56pub struct ServerOptions {
57 pub timer: ora_timer::TimerOptions,
59 pub executor_heartbeat_timeout: std::time::Duration,
61 pub timer_buffer_size: NonZeroUsize,
63 pub event_buffer_size: NonZeroUsize,
65 pub bookkeeping_interval: std::time::Duration,
72 pub cleanup_interval: Option<std::time::Duration>,
76 pub executor_shutdown_timeout: std::time::Duration,
78 pub max_job_age: Option<std::time::Duration>,
82 pub max_schedule_age: Option<std::time::Duration>,
86}
87
88impl Default for ServerOptions {
89 fn default() -> Self {
90 Self {
91 timer: Default::default(),
92 executor_heartbeat_timeout: Duration::from_secs(60),
93 timer_buffer_size: NonZeroUsize::new(100_000).unwrap(),
94 event_buffer_size: NonZeroUsize::new(100_000).unwrap(),
95 bookkeeping_interval: Duration::from_secs(5),
96 cleanup_interval: None,
97 executor_shutdown_timeout: Duration::from_secs(10),
98 max_job_age: None,
99 max_schedule_age: None,
100 }
101 }
102}
103
104#[must_use = "the server needs to be explicitly stopped"]
106pub struct Server<S>
107where
108 S: ora_storage::Storage,
109{
110 storage: S,
111 executor_registry: executor_registry::ExecutorRegistry<StorageWrapper<S>>,
112 admin: admin::Admin<StorageWrapper<S>>,
113 options: ServerOptions,
114 event_bus: events::EventBus,
115 wg: Option<WaitGroup>,
116}
117
118impl<S> Server<S>
119where
120 S: ora_storage::Storage,
121{
122 pub fn spawn(storage: S, options: ServerOptions) -> eyre::Result<Self> {
127 let wg = WaitGroup::new();
128 let event_bus = events::EventBus::new(options.event_buffer_size.get());
129
130 let storage = StorageWrapper::new(storage, event_bus.clone());
131
132 let executor_registry = executor_registry::ExecutorRegistry::new(
133 storage.clone(),
134 ExecutorRegistryOptions {
135 executor_timeout: options.executor_heartbeat_timeout,
136 },
137 wg.handle(),
138 event_bus.clone(),
139 );
140 let admin = admin::Admin::new(
141 storage.clone(),
142 wg.handle(),
143 event_bus.clone(),
144 executor_registry.clone(),
145 );
146
147 let (mut pending_buf_producer, pending_buf_consumer) =
148 rtrb::RingBuffer::new(options.timer_buffer_size.get());
149
150 let (ready_buf_producer, mut ready_buf_consumer) =
151 rtrb::RingBuffer::new(options.timer_buffer_size.get());
152
153 spawn_timer(
154 ready_buf_producer,
155 pending_buf_consumer,
156 wg.add_with("timer"),
157 options.timer,
158 event_bus.clone(),
159 )?;
160
161 tokio::spawn({
162 let guard = wg.add_with("create_job_executions");
163 let backend = storage.clone();
164 let event_bus = event_bus.clone();
165
166 async move {
167 loop {
168 tracing::trace!("running create_job_executions");
169
170 if guard.is_waiting() {
171 break;
172 }
173
174 if let Err(error) = create_executions(&event_bus, &backend).await {
175 tracing::error!(?error, "failed to create job executions");
176 }
177
178 let mut events = event_bus
179 .subscribe_job_events()
180 .filter(|event| ready(matches!(event, events::JobEvent::JobsCreated)));
181 _ = timeout(options.bookkeeping_interval, events.next()).await;
182 }
183 }
184 .instrument(tracing::info_span!("create_job_executions"))
185 });
186
187 tokio::spawn({
188 let guard = wg.add_with("schedule_executions");
189 let backend = storage.clone();
190 let event_bus = event_bus.clone();
191
192 async move {
193 let mut last_scheduled_id = None;
194
195 loop {
196 tracing::trace!("running schedule_executions");
197
198 if guard.is_waiting() {
199 break;
200 }
201
202 match schedule_executions(
203 &backend,
204 &mut pending_buf_producer,
205 last_scheduled_id,
206 )
207 .await
208 {
209 Ok(last_id) => {
210 last_scheduled_id = last_id.or(last_scheduled_id);
211 }
212 Err(error) => {
213 tracing::error!(?error, "failed to schedule executions");
214 }
215 }
216
217 let mut events = event_bus.subscribe_execution_events().filter(|event| {
218 ready(matches!(event, events::ExecutionEvent::ExecutionsAdded))
219 });
220 _ = timeout(options.bookkeeping_interval, events.next()).await;
221 }
222 }
223 .instrument(tracing::info_span!("schedule_executions"))
224 });
225
226 tokio::spawn({
227 let guard = wg.add_with("ready_executions");
228 let backend = storage.clone();
229 let event_bus = event_bus.clone();
230
231 async move {
232 loop {
233 tracing::trace!("running ready_executions");
234
235 if guard.is_waiting() {
236 break;
237 }
238
239 if let Err(error) =
240 mark_executions_ready(&event_bus, &backend, &mut ready_buf_consumer).await
241 {
242 tracing::error!(?error, "failed to mark executions ready");
243 }
244
245 let mut events = event_bus.subscribe_execution_events().filter(|event| {
246 ready(matches!(
247 event,
248 events::ExecutionEvent::TimedExecutionsReady
249 ))
250 });
251 _ = timeout(options.bookkeeping_interval, events.next()).await;
252 }
253 }
254 .instrument(tracing::info_span!("ready_executions"))
255 });
256
257 tokio::spawn({
258 let guard = wg.add_with("assign_executions");
259 let executor_registry = executor_registry.clone();
260 let event_bus = event_bus.clone();
261
262 async move {
263 loop {
264 tracing::trace!("running assign_executions");
265
266 if guard.is_waiting() {
267 break;
268 }
269
270 if let Err(error) = executor_registry.assign_executions().await {
271 tracing::error!(?error, "failed to assign executions");
272 }
273
274 let mut execution_events =
275 event_bus.subscribe_execution_events().filter(|event| {
276 ready(matches!(
277 event,
278 events::ExecutionEvent::ExecutionsReadyToRun
279 ))
280 });
281 let mut executor_events =
282 event_bus.subscribe_executor_events().filter(|event| {
283 ready(matches!(event, events::ExecutorEvent::ExecutorReady))
284 });
285
286 _ = timeout(
287 options.bookkeeping_interval,
288 select(execution_events.next(), executor_events.next()),
289 )
290 .await;
291 }
292 }
293 .instrument(tracing::info_span!("assign_executions"))
294 });
295
296 tokio::spawn({
297 let guard = wg.add_with("execution_timeouts");
298 let executor_registry = executor_registry.clone();
299 let event_bus = event_bus.clone();
300
301 async move {
302 loop {
303 tracing::trace!("running execution_timeouts");
304
305 if guard.is_waiting() {
306 break;
307 }
308
309 if let Err(error) = executor_registry.fail_timed_out_executions().await {
310 tracing::error!(?error, "failed to fail timed out executions");
311 }
312
313 let mut events = event_bus.subscribe_execution_events().filter(|event| {
314 ready(matches!(
315 event,
316 events::ExecutionEvent::TimedExecutionsReady
317 ))
318 });
319 _ = timeout(options.bookkeeping_interval, events.next()).await;
320 }
321 }
322 .instrument(tracing::info_span!("execution_timeouts"))
323 });
324
325 tokio::spawn({
326 let guard = wg.add_with("reap_dead_executors");
327 let executor_registry = executor_registry.clone();
328
329 async move {
330 loop {
331 tracing::trace!("running reap_dead_executors");
332
333 if guard.is_waiting() {
334 break;
335 }
336
337 if let Err(error) = executor_registry.reap_dead_executors().await {
338 tracing::error!(?error, "failed to reap dead executors");
339 }
340
341 sleep(options.bookkeeping_interval).await;
342 }
343 }
344 .instrument(tracing::info_span!("reap_dead_executors"))
345 });
346
347 tokio::spawn({
348 let guard = wg.add_with("clean_up_orphan_executions");
349 let executor_registry = executor_registry.clone();
350
351 async move {
352 loop {
353 tracing::trace!("running clean_up_orphan_executions");
354
355 if guard.is_waiting() {
356 break;
357 }
358
359 if let Err(error) = executor_registry.clean_up_orphan_executions().await {
360 tracing::error!(?error, "failed to clean up orphan executions");
361 }
362
363 sleep(options.bookkeeping_interval).await;
364 }
365 }
366 .instrument(tracing::info_span!("clean_up_orphan_executions"))
367 });
368
369 if let Some(max_job_age) = options.max_job_age {
370 tokio::spawn({
371 let guard = wg.add_with("remove_old_jobs");
372 let storage = storage.clone();
373
374 async move {
375 loop {
376 tracing::trace!("running remove_old_jobs");
377
378 if guard.is_waiting() {
379 break;
380 }
381
382 let Some(after) = SystemTime::now().checked_sub(max_job_age) else {
383 sleep(
384 options
385 .cleanup_interval
386 .unwrap_or(options.bookkeeping_interval),
387 )
388 .await;
389 continue;
390 };
391
392 if let Err(error) = storage
393 .delete_jobs(JobQueryFilters {
394 active: Some(false),
395 created_before: Some(after),
396 ..Default::default()
397 })
398 .await
399 {
400 tracing::error!(?error, "failed to clean up jobs");
401 }
402
403 sleep(
404 options
405 .cleanup_interval
406 .unwrap_or(options.bookkeeping_interval),
407 )
408 .await;
409 }
410 }
411 .instrument(tracing::info_span!("remove_old_jobs"))
412 });
413 }
414
415 if let Some(max_schedule_age) = options.max_schedule_age {
416 tokio::spawn({
417 let guard = wg.add_with("remove_old_schedules");
418 let storage = storage.clone();
419
420 async move {
421 loop {
422 tracing::trace!("running remove_old_schedules");
423
424 if guard.is_waiting() {
425 break;
426 }
427
428 let Some(after) = SystemTime::now().checked_sub(max_schedule_age) else {
429 sleep(
430 options
431 .cleanup_interval
432 .unwrap_or(options.bookkeeping_interval),
433 )
434 .await;
435 continue;
436 };
437
438 if let Err(error) = storage
439 .delete_schedules(ScheduleQueryFilters {
440 active: Some(false),
441 created_before: Some(after),
442 ..Default::default()
443 })
444 .await
445 {
446 tracing::error!(?error, "failed to clean up schedules");
447 }
448
449 sleep(
450 options
451 .cleanup_interval
452 .unwrap_or(options.bookkeeping_interval),
453 )
454 .await;
455 }
456 }
457 .instrument(tracing::info_span!("remove_old_schedules"))
458 });
459 }
460
461 tokio::spawn({
462 let guard = wg.add_with("create_schedule_jobs");
463 let event_bus = event_bus.clone();
464 let storage = storage.clone();
465
466 async move {
467 loop {
468 tracing::trace!("running create_schedule_jobs");
469
470 if guard.is_waiting() {
471 break;
472 }
473
474 if let Err(error) = create_schedule_jobs(&event_bus, &storage).await {
475 tracing::error!(?error, "failed to create schedule jobs");
476 }
477
478 let mut execution_events =
479 event_bus.subscribe_execution_events().filter(|event| {
480 ready(matches!(event, events::ExecutionEvent::ExecutionsFinished))
481 });
482 let mut schedule_events =
483 event_bus.subscribe_schedule_events().filter(|event| {
484 ready(matches!(event, events::ScheduleEvent::SchedulesAdded))
485 });
486
487 _ = timeout(
488 options.bookkeeping_interval,
489 select(execution_events.next(), schedule_events.next()),
490 )
491 .await;
492 }
493 }
494 .instrument(tracing::info_span!("create_schedule_jobs"))
495 });
496
497 #[cfg(feature = "metrics")]
498 tokio::spawn({
499 let storage = storage.clone();
500 let event_bus = event_bus.clone();
501
502 async move {
503 if let Err(error) = crate::metrics::collect_metrics(&storage, &event_bus).await {
504 tracing::error!(?error, "error collecting metrics");
505 }
506 }
507 });
508
509 Ok(Self {
510 storage: storage.inner.clone(),
511 executor_registry,
512 admin,
513 options,
514 event_bus,
515 wg: Some(wg),
516 })
517 }
518
519 pub fn admin_service(&self) -> impl AdminService {
521 self.admin.clone()
522 }
523
524 pub fn executor_service(&self) -> impl ExecutorService {
526 self.executor_registry.clone()
527 }
528
529 #[allow(clippy::missing_panics_doc)]
536 pub fn admin_service_client(&self) -> AdminServiceClient<Channel> {
537 use hyper_util::rt::TokioIo;
538 use ora_proto::server::v1::admin_service_server::AdminServiceServer;
539 use tonic::transport::{Endpoint, Uri};
540
541 let (client, server) = tokio::io::duplex(1024);
542
543 let admin = self.admin_service();
544 let wg = self.wg.as_ref().unwrap().handle().add();
545
546 tokio::spawn(async move {
547 let srv = tonic::transport::Server::builder()
548 .add_service(AdminServiceServer::new(admin).max_decoding_message_size(usize::MAX))
549 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
550
551 let waiting = wg.waiting();
552
553 tokio::select! {
554 _ = waiting => {}
555 serve_result = srv => {
556 if let Err(error) = serve_result {
557 tracing::error!(?error, "error during admin service serve");
558 }
559 }
560 }
561 });
562
563 let mut client = Some(client);
564 let channel = Endpoint::try_from("http://[::]:50051")
565 .unwrap()
566 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
567 let client = client.take();
568
569 async move {
570 if let Some(client) = client {
571 Ok(TokioIo::new(client))
572 } else {
573 Err(std::io::Error::new(
574 std::io::ErrorKind::Other,
575 "Client already taken",
576 ))
577 }
578 }
579 }));
580
581 AdminServiceClient::new(channel)
582 }
583
584 #[allow(clippy::missing_panics_doc)]
591 pub fn executor_service_client(&self) -> ExecutorServiceClient<Channel> {
592 use hyper_util::rt::TokioIo;
593 use ora_proto::server::v1::executor_service_server::ExecutorServiceServer;
594 use tonic::transport::{Endpoint, Uri};
595
596 let (client, server) = tokio::io::duplex(1024);
597
598 let executor_registry = self.executor_service();
599 let wg = self.wg.as_ref().unwrap().handle().add();
600
601 tokio::spawn(async move {
602 let srv = tonic::transport::Server::builder()
603 .add_service(
604 ExecutorServiceServer::new(executor_registry)
605 .max_decoding_message_size(usize::MAX),
606 )
607 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
608
609 let waiting = wg.waiting();
610
611 tokio::select! {
612 _ = waiting => {}
613 serve_result = srv => {
614 if let Err(error) = serve_result {
615 tracing::error!(?error, "error during admin service serve");
616 }
617 }
618 }
619 });
620
621 let mut client = Some(client);
622 let channel = Endpoint::try_from("http://[::]:50051")
623 .unwrap()
624 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
625 let client = client.take();
626
627 async move {
628 if let Some(client) = client {
629 Ok(TokioIo::new(client))
630 } else {
631 Err(std::io::Error::new(
632 std::io::ErrorKind::Other,
633 "Client already taken",
634 ))
635 }
636 }
637 }));
638
639 ExecutorServiceClient::new(channel)
640 }
641
642 pub fn options(&self) -> &ServerOptions {
644 &self.options
645 }
646
647 pub fn events(
649 &self,
650 ) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
651 self.event_bus.subscribe_audit_events()
652 }
653
654 #[tracing::instrument(skip_all)]
656 pub async fn shutdown(mut self) -> eyre::Result<()> {
657 Self::shutdown_impl(
658 self.wg.take().unwrap(),
659 self.executor_registry.clone(),
660 self.options.clone(),
661 )
662 .await
663 }
664
665 async fn shutdown_impl(
666 wg: WaitGroup,
667 executor_registry: ExecutorRegistry<StorageWrapper<S>>,
668 options: ServerOptions,
669 ) -> eyre::Result<()> {
670 tracing::info!("shutting down ora server");
671
672 let mut running_components = wg.all_done_stream();
673
674 tokio::spawn(async move {
675 if let Err(error) = executor_registry
676 .shutdown(Some(options.executor_shutdown_timeout))
677 .await
678 {
679 tracing::error!(?error, "error during executor registry shutdown");
680 }
681 });
682
683 while let Some((component_count, components)) = running_components.next().await {
684 tracing::debug!(
685 component_count = component_count,
686 components = ?components,
687 "waiting for components to shut down",
688 );
689 }
690
691 Ok(())
692 }
693}
694
695impl<S> Server<S>
696where
697 S: ora_storage::Storage + StorageSnapshot,
698{
699 #[allow(clippy::missing_panics_doc)]
701 pub fn snapshot_service(&self) -> impl SnapshotService {
702 snapshot::SnapshotInterface::new(
703 self.storage.clone(),
704 self.wg.as_ref().unwrap().handle(),
705 self.event_bus.clone(),
706 )
707 }
708
709 #[allow(clippy::missing_panics_doc)]
716 pub fn snapshot_service_client(&self) -> SnapshotServiceClient<Channel> {
717 use hyper_util::rt::TokioIo;
718 use ora_proto::snapshot::v1::snapshot_service_server::SnapshotServiceServer;
719 use tonic::transport::{Endpoint, Uri};
720
721 let (client, server) = tokio::io::duplex(1024);
722
723 let svc = self.snapshot_service();
724 let wg = self.wg.as_ref().unwrap().handle().add();
725
726 tokio::spawn(async move {
727 let srv = tonic::transport::Server::builder()
728 .add_service(SnapshotServiceServer::new(svc).max_decoding_message_size(usize::MAX))
729 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)));
730
731 let waiting = wg.waiting();
732
733 tokio::select! {
734 _ = waiting => {}
735 serve_result = srv => {
736 if let Err(error) = serve_result {
737 tracing::error!(?error, "error during admin service serve");
738 }
739 }
740 }
741 });
742
743 let mut client = Some(client);
744 let channel = Endpoint::try_from("http://[::]:50051")
745 .unwrap()
746 .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
747 let client = client.take();
748
749 async move {
750 if let Some(client) = client {
751 Ok(TokioIo::new(client))
752 } else {
753 Err(std::io::Error::new(
754 std::io::ErrorKind::Other,
755 "Client already taken",
756 ))
757 }
758 }
759 }));
760
761 SnapshotServiceClient::new(channel)
762 }
763}
764
765impl<S> Drop for Server<S>
766where
767 S: ora_storage::Storage,
768{
769 fn drop(&mut self) {
770 let Some(wg) = self.wg.take() else {
771 return;
772 };
773
774 let stop_fut =
775 Self::shutdown_impl(wg, self.executor_registry.clone(), self.options.clone());
776
777 tracing::warn!("server instance dropped, attempting shutdown in the background");
778 tokio::spawn(async move {
779 if let Err(error) = stop_fut.await {
780 tracing::error!(?error, "error during server shutdown");
781 }
782 });
783 }
784}