Skip to main content

ora_server/
lib.rs

1//! Ora server implementation.
2
3use std::{sync::Arc, time::Duration};
4
5use either::Either;
6use wgroup::{WaitGroup, WaitGroupHandle, WaitGuard};
7
8use crate::{executor_pool::ExecutorPool, grpc::GrpcImpl};
9
10mod broadcast;
11mod executor_pool;
12mod grpc;
13pub mod proto;
14mod server;
15mod util;
16
17pub use grpc::GrpcServices;
18pub use ora_backend::Backend;
19
20/// Options for configuring and spawning an Ora server.
21#[derive(Debug, Clone)]
22pub struct ServerOptions {
23    /// Duration after which historical data is deleted.
24    ///
25    /// By default, historical data is retained indefinitely.
26    pub delete_history_after: Duration,
27    /// Shutdown grace period after which all executions are cancelled.
28    ///
29    /// Defaults to 15 seconds.
30    pub shutdown_grace_period: Duration,
31}
32
33impl Default for ServerOptions {
34    fn default() -> Self {
35        Self {
36            delete_history_after: Default::default(),
37            shutdown_grace_period: Duration::from_secs(15),
38        }
39    }
40}
41
42/// A builder for an Ora server.
43#[must_use]
44pub struct ServerBuilder<B> {
45    backend: Arc<B>,
46    options: ServerOptions,
47}
48
49impl<B> ServerBuilder<B> {
50    /// Create a new Ora server builder with the given backend.
51    pub fn new(backend: B, options: ServerOptions) -> Self {
52        Self {
53            backend: Arc::new(backend),
54            options,
55        }
56    }
57
58    /// Set the duration after which historical data is deleted.
59    ///
60    /// By default, historical data is retained indefinitely.
61    pub fn delete_history_after(mut self, duration: Duration) -> Self {
62        self.options.delete_history_after = duration;
63        self
64    }
65
66    /// Set the shutdown grace period after which all executions are cancelled.
67    ///
68    /// Defaults to 15 seconds.
69    pub fn shutdown_grace_period(mut self, duration: Duration) -> Self {
70        self.options.shutdown_grace_period = duration;
71        self
72    }
73}
74
75impl<B> ServerBuilder<B>
76where
77    B: Backend,
78{
79    /// Spawn the server, returning a handle to it.
80    ///
81    /// # Panics
82    ///
83    /// Must be called from within a Tokio runtime.
84    pub fn spawn(self) -> ServerHandle<B> {
85        server::spawn_server(self.backend, self.options, Either::Left(WaitGroup::new()))
86    }
87
88    /// Spawn the server using the given wait group handle.
89    ///
90    /// Stopping the server will be possible
91    /// only via the provided wait group.
92    pub fn spawn_with_wg(self, wg: WaitGroupHandle) -> ServerHandle<B> {
93        server::spawn_server(self.backend, self.options, Either::Right(wg))
94    }
95}
96
97/// A handle to a running Ora server.
98///
99/// The server is automatically stopped when
100/// the handle is dropped.
101#[derive(Debug)]
102#[must_use]
103pub struct ServerHandle<B> {
104    backend: Arc<B>,
105    executor_pool: ExecutorPool,
106    wg: Either<WaitGroup, WaitGroupHandle>,
107}
108
109impl<B> ServerHandle<B> {
110    /// Stop the server gracefully.
111    ///
112    /// Does nothing if the server was
113    /// created with an external wait group handle.
114    #[allow(clippy::missing_panics_doc)]
115    pub async fn stop(self) {
116        if let Either::Left(wg) = self.wg {
117            wg.all_done().await;
118        }
119    }
120
121    // Not part of the public API.
122    #[doc(hidden)]
123    pub fn add_wg_internal(&self) -> WaitGuard {
124        match &self.wg {
125            Either::Left(wg) => wg.add(),
126            Either::Right(handle) => handle.add(),
127        }
128    }
129}
130
131impl<B> ServerHandle<B>
132where
133    B: Backend,
134{
135    /// Return a type implementing ora gRPC services.
136    #[must_use]
137    pub fn grpc(&self) -> impl GrpcServices + Send + Sync + 'static {
138        GrpcImpl {
139            backend: self.backend.clone(),
140            executor_pool: self.executor_pool.clone(),
141            wg: match &self.wg {
142                Either::Left(wg) => wg.handle(),
143                Either::Right(handle) => handle.clone(),
144            },
145        }
146    }
147}