Skip to main content

ironflow_runtime/
runtime.rs

1//! The runtime server builder and HTTP serving logic.
2//!
3//! [`Runtime`] is the central entry-point for configuring and launching an
4//! ironflow daemon. It uses a builder pattern to register webhook routes and
5//! cron jobs, then starts an [Axum](https://docs.rs/axum) HTTP server with
6//! graceful shutdown on `Ctrl+C`.
7//!
8//! Webhook handlers are executed in the background via [`tokio::spawn`], so
9//! the HTTP endpoint responds with **202 Accepted** immediately while the
10//! workflow runs asynchronously.
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use axum::Router;
17use axum::body::Bytes;
18use axum::extract::{DefaultBodyLimit, State};
19use axum::http::{HeaderMap, StatusCode, header};
20use axum::middleware;
21use axum::routing::{get, post};
22use serde_json::{Value, from_slice};
23use tokio::sync::{Mutex, Semaphore};
24use tokio::task::JoinSet;
25use tokio_cron_scheduler::{Job, JobScheduler};
26use tracing::{error, info, warn};
27
28use crate::cron::CronJob;
29use crate::error::RuntimeError;
30use crate::webhook::WebhookAuth;
31
32/// Default maximum body size for webhook payloads (2 MiB).
33const DEFAULT_MAX_BODY_SIZE: usize = 2 * 1024 * 1024;
34
35/// Default maximum number of concurrently running webhook handlers.
36const DEFAULT_MAX_CONCURRENT_HANDLERS: usize = 64;
37
38type WebhookHandler = Arc<dyn Fn(Value) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
39type ShutdownSignal = Pin<Box<dyn Future<Output = ()> + Send>>;
40
41/// Metric name constants for the runtime (webhook + cron).
42#[cfg(feature = "prometheus")]
43mod metric_names {
44    pub const WEBHOOK_RECEIVED_TOTAL: &str = "ironflow_webhook_received_total";
45    pub const CRON_RUNS_TOTAL: &str = "ironflow_cron_runs_total";
46
47    pub const AUTH_REJECTED: &str = "rejected";
48    pub const AUTH_ACCEPTED: &str = "accepted";
49    pub const AUTH_INVALID_BODY: &str = "invalid_body";
50}
51
52struct WebhookRoute {
53    path: String,
54    auth: WebhookAuth,
55    handler: WebhookHandler,
56}
57
58/// The ironflow runtime server builder.
59///
60/// `Runtime` uses a builder pattern: create one with [`Runtime::new`], register
61/// webhook routes with [`Runtime::webhook`] and cron jobs with
62/// [`Runtime::cron`], then call [`Runtime::serve`] to start both the HTTP
63/// server and the cron scheduler, or [`Runtime::run_crons`] to run only the
64/// cron scheduler without an HTTP listener.
65///
66/// # Built-in endpoints
67///
68/// | Method | Path | Description |
69/// |--------|------|-------------|
70/// | `GET`  | `/health` | Returns `200 OK` with body `"ok"`. Useful for load-balancer health checks. |
71/// | `POST` | *user-defined* | Webhook endpoints registered via [`Runtime::webhook`]. |
72///
73/// # Examples
74///
75/// ```no_run
76/// use ironflow_runtime::prelude::*;
77///
78/// #[tokio::main]
79/// async fn main() -> Result<(), ironflow_runtime::error::RuntimeError> {
80///     Runtime::new()
81///         .webhook("/hooks/deploy", WebhookAuth::github("secret"), |payload| async move {
82///             println!("deploy triggered: {payload}");
83///         })
84///         .cron("0 0 * * * *", "hourly-sync", || async {
85///             println!("syncing...");
86///         })
87///         .serve("0.0.0.0:3000")
88///         .await?;
89///
90///     Ok(())
91/// }
92/// ```
93pub struct Runtime {
94    webhooks: Vec<WebhookRoute>,
95    crons: Vec<CronJob>,
96    max_body_size: usize,
97    max_concurrent_handlers: usize,
98    custom_shutdown: Option<ShutdownSignal>,
99}
100
101impl Runtime {
102    /// Creates a new, empty `Runtime` with no webhooks or cron jobs.
103    ///
104    /// # Examples
105    ///
106    /// ```no_run
107    /// use ironflow_runtime::runtime::Runtime;
108    ///
109    /// let runtime = Runtime::new();
110    /// ```
111    pub fn new() -> Self {
112        Self {
113            webhooks: Vec::new(),
114            crons: Vec::new(),
115            max_body_size: DEFAULT_MAX_BODY_SIZE,
116            max_concurrent_handlers: DEFAULT_MAX_CONCURRENT_HANDLERS,
117            custom_shutdown: None,
118        }
119    }
120
121    /// Set the maximum allowed body size for webhook payloads.
122    ///
123    /// Requests exceeding this limit are rejected by axum before reaching the
124    /// handler. Defaults to 2 MiB.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    /// use ironflow_runtime::runtime::Runtime;
130    ///
131    /// let runtime = Runtime::new().max_body_size(512 * 1024); // 512 KiB
132    /// ```
133    pub fn max_body_size(mut self, bytes: usize) -> Self {
134        self.max_body_size = bytes;
135        self
136    }
137
138    /// Set the maximum number of concurrently running webhook handlers.
139    ///
140    /// When the limit is reached, new webhook requests still receive
141    /// **202 Accepted** but their handlers are queued until a slot is
142    /// available. Defaults to 64.
143    ///
144    /// # Panics
145    ///
146    /// Panics if `limit` is `0`.
147    ///
148    /// # Examples
149    ///
150    /// ```no_run
151    /// use ironflow_runtime::runtime::Runtime;
152    ///
153    /// let runtime = Runtime::new().max_concurrent_handlers(16);
154    /// ```
155    pub fn max_concurrent_handlers(mut self, limit: usize) -> Self {
156        assert!(limit > 0, "max_concurrent_handlers must be greater than 0");
157        self.max_concurrent_handlers = limit;
158        self
159    }
160
161    /// Override the default shutdown signal (`Ctrl+C` / `SIGTERM`).
162    ///
163    /// By default, [`Runtime::serve`] and [`Runtime::run_crons`] block until
164    /// the process receives `Ctrl+C` or `SIGTERM`. Use this method to provide
165    /// a custom future that resolves when the runtime should shut down.
166    ///
167    /// This is useful in tests where you want to trigger a clean shutdown
168    /// (including `scheduler.shutdown()`) without relying on OS signals.
169    ///
170    /// # Examples
171    ///
172    /// ```no_run
173    /// use ironflow_runtime::runtime::Runtime;
174    /// use tokio::sync::oneshot;
175    ///
176    /// # async fn example() -> Result<(), ironflow_runtime::error::RuntimeError> {
177    /// let (tx, rx) = oneshot::channel::<()>();
178    ///
179    /// let rt = Runtime::new()
180    ///     .with_shutdown(async { let _ = rx.await; })
181    ///     .cron("0 */5 * * * *", "check", || async {});
182    ///
183    /// // In another task: tx.send(()) to trigger shutdown.
184    /// rt.run_crons().await?;
185    /// # Ok(())
186    /// # }
187    /// ```
188    pub fn with_shutdown<F>(mut self, signal: F) -> Self
189    where
190        F: Future<Output = ()> + Send + 'static,
191    {
192        self.custom_shutdown = Some(Box::pin(signal));
193        self
194    }
195
196    /// Registers a webhook route.
197    ///
198    /// The handler receives the parsed JSON body as a [`serde_json::Value`].
199    /// When a request arrives, the server verifies authentication using `auth`,
200    /// then spawns the handler in the background and immediately returns
201    /// **202 Accepted** to the caller.
202    ///
203    /// # Arguments
204    ///
205    /// * `path` - The URL path to listen on (e.g. `"/hooks/github"`).
206    /// * `auth` - The [`WebhookAuth`] strategy for this endpoint.
207    /// * `handler` - An async function receiving the JSON payload.
208    ///
209    /// # Examples
210    ///
211    /// ```no_run
212    /// use ironflow_runtime::prelude::*;
213    ///
214    /// let runtime = Runtime::new()
215    ///     .webhook("/hooks/github", WebhookAuth::github("secret"), |payload| async move {
216    ///         println!("payload: {payload}");
217    ///     });
218    /// ```
219    pub fn webhook<F, Fut>(mut self, path: &str, auth: WebhookAuth, handler: F) -> Self
220    where
221        F: Fn(Value) -> Fut + Send + Sync + Clone + 'static,
222        Fut: Future<Output = ()> + Send + 'static,
223    {
224        assert!(
225            path.starts_with('/'),
226            "webhook path must start with '/', got: {path}"
227        );
228        if matches!(auth, WebhookAuth::None) {
229            warn!(path = %path, "webhook registered with WebhookAuth::None - all requests will be accepted without authentication");
230        }
231        let handler: WebhookHandler = Arc::new(move |payload| {
232            let handler = handler.clone();
233            Box::pin(async move { handler(payload).await })
234        });
235        self.webhooks.push(WebhookRoute {
236            path: path.to_string(),
237            auth,
238            handler,
239        });
240        self
241    }
242
243    /// Registers a cron job.
244    ///
245    /// The `schedule` uses a **6-field cron expression** (seconds granularity):
246    /// `sec min hour day-of-month month day-of-week`.
247    ///
248    /// # Arguments
249    ///
250    /// * `schedule` - A 6-field cron expression, e.g. `"0 */5 * * * *"` for every 5 minutes.
251    /// * `name` - A human-readable name for logging.
252    /// * `handler` - An async function to execute on each tick.
253    ///
254    /// # Examples
255    ///
256    /// ```no_run
257    /// use ironflow_runtime::prelude::*;
258    ///
259    /// let runtime = Runtime::new()
260    ///     .cron("0 0 * * * *", "hourly-cleanup", || async {
261    ///         println!("cleaning up...");
262    ///     });
263    /// ```
264    pub fn cron<F, Fut>(mut self, schedule: &str, name: &str, handler: F) -> Self
265    where
266        F: Fn() -> Fut + Send + Sync + 'static,
267        Fut: Future<Output = ()> + Send + 'static,
268    {
269        let handler_fn: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync> =
270            Box::new(move || Box::pin(handler()));
271        self.crons.push(CronJob {
272            schedule: schedule.to_string(),
273            name: name.to_string(),
274            handler: handler_fn,
275        });
276        self
277    }
278
279    /// Build the axum [`Router`] from the registered webhooks.
280    ///
281    /// This is separated from [`Runtime::serve`] so the router can be tested
282    /// independently (e.g. with `tower::ServiceExt::oneshot` or by
283    /// binding to a random port in integration tests).
284    fn build_router(
285        webhooks: Vec<WebhookRoute>,
286        handler_tracker: Arc<HandlerTracker>,
287        max_body_size: usize,
288        #[cfg(feature = "prometheus")] prom_handle: Option<
289            metrics_exporter_prometheus::PrometheusHandle,
290        >,
291    ) -> Router {
292        let mut router = Router::new();
293
294        for webhook in webhooks {
295            let auth = Arc::new(webhook.auth);
296            let handler = webhook.handler;
297            let path = webhook.path.clone();
298
299            let name: Arc<str> = Arc::from(path.as_str());
300            let route_state = WebhookState {
301                auth,
302                handler,
303                name,
304                tracker: handler_tracker.clone(),
305            };
306
307            router = router.route(&path, post(webhook_handler).with_state(route_state));
308            info!(path = %path, "registered webhook");
309        }
310
311        router = router.route("/health", get(|| async { "ok" }));
312
313        #[cfg(feature = "prometheus")]
314        if let Some(handle) = prom_handle {
315            router = router.route(
316                "/metrics",
317                get(move || {
318                    let h = handle.clone();
319                    async move { h.render() }
320                }),
321            );
322            info!("registered /metrics endpoint");
323        }
324
325        router
326            .layer(middleware::from_fn(security_headers))
327            .layer(DefaultBodyLimit::max(max_body_size))
328    }
329
330    /// Consumes the runtime and returns only the axum [`Router`].
331    ///
332    /// Cron jobs are **not** started. This is useful for testing the HTTP
333    /// layer in isolation without side-effects (e.g. with
334    /// `tower::ServiceExt::oneshot`).
335    ///
336    /// # Examples
337    ///
338    /// ```no_run
339    /// use ironflow_runtime::prelude::*;
340    ///
341    /// let router = Runtime::new()
342    ///     .webhook("/hooks/test", WebhookAuth::none(), |_payload| async {})
343    ///     .into_router();
344    /// ```
345    pub fn into_router(self) -> Router {
346        if !self.crons.is_empty() {
347            warn!(
348                cron_count = self.crons.len(),
349                "into_router() drops registered cron jobs - use serve() or run_crons() to start them"
350            );
351        }
352        let tracker = Arc::new(HandlerTracker::new(self.max_concurrent_handlers));
353        Self::build_router(
354            self.webhooks,
355            tracker,
356            self.max_body_size,
357            #[cfg(feature = "prometheus")]
358            None,
359        )
360    }
361
362    /// Starts the cron scheduler with all registered cron jobs.
363    ///
364    /// This is an internal helper used by both [`Runtime::serve`] and
365    /// [`Runtime::run_crons`].
366    async fn start_scheduler(crons: Vec<CronJob>) -> Result<JobScheduler, RuntimeError> {
367        let scheduler = JobScheduler::new().await?;
368
369        for cron_job in crons {
370            let handler = Arc::new(cron_job.handler);
371            let name = cron_job.name.clone();
372            let running = Arc::new(std::sync::atomic::AtomicBool::new(false));
373            let job = Job::new_async(cron_job.schedule.as_str(), move |_uuid, _lock| {
374                let handler = handler.clone();
375                let name = name.clone();
376                let running = running.clone();
377                Box::pin(async move {
378                    if running.swap(true, std::sync::atomic::Ordering::AcqRel) {
379                        warn!(cron = %name, "cron job still running, skipping this tick");
380                        return;
381                    }
382                    info!(cron = %name, "cron job triggered");
383                    #[cfg(feature = "prometheus")]
384                    metrics::counter!(metric_names::CRON_RUNS_TOTAL, "job" => name.clone())
385                        .increment(1);
386                    (handler)().await;
387                    running.store(false, std::sync::atomic::Ordering::Release);
388                })
389            })?;
390            info!(cron = %cron_job.name, schedule = %cron_job.schedule, "registered cron job");
391            scheduler.add(job).await?;
392        }
393
394        scheduler.start().await?;
395        Ok(scheduler)
396    }
397
398    /// Starts only the cron scheduler, blocking until a shutdown signal is
399    /// received (`Ctrl+C` / `SIGTERM`).
400    ///
401    /// Unlike [`Runtime::serve`], this does **not** start an HTTP server. Any
402    /// registered webhooks are ignored (a warning is logged if webhooks were
403    /// registered).
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if:
408    ///
409    /// - The cron scheduler fails to initialise or a cron expression is invalid.
410    /// - The scheduler fails to shut down cleanly.
411    ///
412    /// # Examples
413    ///
414    /// ```no_run
415    /// use ironflow_runtime::prelude::*;
416    ///
417    /// #[tokio::main]
418    /// async fn main() -> Result<(), ironflow_runtime::error::RuntimeError> {
419    ///     Runtime::new()
420    ///         .cron("0 0 * * * *", "hourly-sync", || async {
421    ///             println!("syncing...");
422    ///         })
423    ///         .run_crons()
424    ///         .await?;
425    ///     Ok(())
426    /// }
427    /// ```
428    pub async fn run_crons(self) -> Result<(), RuntimeError> {
429        let _ = dotenvy::dotenv();
430
431        if !self.webhooks.is_empty() {
432            warn!(
433                webhook_count = self.webhooks.len(),
434                "run_crons() ignores registered webhooks - use serve() to start both webhooks and crons"
435            );
436        }
437
438        #[cfg(feature = "prometheus")]
439        {
440            match metrics_exporter_prometheus::PrometheusBuilder::new().install_recorder() {
441                Ok(_) => info!("prometheus metrics recorder installed"),
442                Err(_) => {
443                    info!("prometheus metrics recorder already installed, reusing existing")
444                }
445            }
446        }
447
448        let mut scheduler = Self::start_scheduler(self.crons).await?;
449
450        info!("ironflow cron scheduler running (no HTTP server)");
451        match self.custom_shutdown {
452            Some(signal) => signal.await,
453            None => shutdown_signal().await,
454        }
455
456        info!("shutting down scheduler");
457        scheduler.shutdown().await.map_err(RuntimeError::Shutdown)?;
458        info!("ironflow cron scheduler stopped");
459
460        Ok(())
461    }
462
463    /// Starts the HTTP server and cron scheduler, blocking until shutdown.
464    ///
465    /// This method:
466    ///
467    /// 1. Loads environment variables from `.env` via [`dotenvy`].
468    /// 2. Starts the [`tokio_cron_scheduler`] scheduler with all registered cron jobs.
469    /// 3. Builds an [Axum](https://docs.rs/axum) router with all registered webhook
470    ///    routes plus a `GET /health` endpoint.
471    /// 4. Binds to `addr` and serves until a `Ctrl+C` signal is received.
472    /// 5. Gracefully shuts down the scheduler before returning.
473    ///
474    /// If you only need cron jobs without an HTTP server, use
475    /// [`Runtime::run_crons`] instead.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if:
480    ///
481    /// - The cron scheduler fails to initialise or a cron expression is invalid.
482    /// - The TCP listener cannot bind to `addr`.
483    /// - The Axum server encounters a fatal I/O error.
484    ///
485    /// # Examples
486    ///
487    /// ```no_run
488    /// use ironflow_runtime::prelude::*;
489    ///
490    /// #[tokio::main]
491    /// async fn main() -> Result<(), ironflow_runtime::error::RuntimeError> {
492    ///     Runtime::new()
493    ///         .serve("127.0.0.1:3000")
494    ///         .await?;
495    ///     Ok(())
496    /// }
497    /// ```
498    pub async fn serve(self, addr: &str) -> Result<(), RuntimeError> {
499        let _ = dotenvy::dotenv();
500
501        #[cfg(feature = "prometheus")]
502        let prom_handle = {
503            match metrics_exporter_prometheus::PrometheusBuilder::new().install_recorder() {
504                Ok(handle) => {
505                    info!("prometheus metrics recorder installed");
506                    Some(handle)
507                }
508                Err(_) => {
509                    info!("prometheus metrics recorder already installed, reusing existing");
510                    None
511                }
512            }
513        };
514
515        let mut scheduler = Self::start_scheduler(self.crons).await?;
516
517        let tracker = Arc::new(HandlerTracker::new(self.max_concurrent_handlers));
518        let router = Self::build_router(
519            self.webhooks,
520            tracker.clone(),
521            self.max_body_size,
522            #[cfg(feature = "prometheus")]
523            prom_handle,
524        );
525
526        let listener = tokio::net::TcpListener::bind(addr)
527            .await
528            .map_err(RuntimeError::Bind)?;
529        info!(addr = %addr, "ironflow runtime listening");
530
531        let graceful_shutdown = match self.custom_shutdown {
532            Some(signal) => signal,
533            None => Box::pin(shutdown_signal()),
534        };
535        axum::serve(listener, router)
536            .with_graceful_shutdown(graceful_shutdown)
537            .await
538            .map_err(RuntimeError::Serve)?;
539
540        // Wait for all in-flight webhook handlers to finish.
541        info!("waiting for in-flight webhook handlers to complete");
542        tracker.wait().await;
543
544        info!("shutting down scheduler");
545        scheduler.shutdown().await.map_err(RuntimeError::Shutdown)?;
546        info!("ironflow runtime stopped");
547
548        Ok(())
549    }
550}
551
552impl Default for Runtime {
553    fn default() -> Self {
554        Self::new()
555    }
556}
557
558/// Tracks in-flight webhook handlers and enforces a concurrency limit.
559///
560/// Combines a [`Semaphore`] for backpressure with a [`JoinSet`] so that
561/// [`Runtime::serve`] can wait for all running handlers before exiting.
562struct HandlerTracker {
563    semaphore: Arc<Semaphore>,
564    join_set: Mutex<JoinSet<()>>,
565}
566
567impl HandlerTracker {
568    fn new(max_concurrent: usize) -> Self {
569        Self {
570            semaphore: Arc::new(Semaphore::new(max_concurrent)),
571            join_set: Mutex::new(JoinSet::new()),
572        }
573    }
574
575    /// Spawn a handler task, respecting the concurrency limit.
576    async fn spawn(&self, name: String, handler: WebhookHandler, payload: Value) {
577        let semaphore = self.semaphore.clone();
578        let mut js = self.join_set.lock().await;
579        // Reap completed tasks to detect panics early.
580        while let Some(result) = js.try_join_next() {
581            if let Err(e) = result {
582                error!(error = %e, "webhook handler panicked");
583            }
584        }
585        use tracing::Instrument;
586        let span = tracing::info_span!("webhook", path = %name);
587        js.spawn(
588            async move {
589                let _permit = semaphore
590                    .acquire()
591                    .await
592                    .expect("semaphore closed unexpectedly");
593                info!("webhook workflow started");
594                handler(payload).await;
595                info!("webhook workflow completed");
596            }
597            .instrument(span),
598        );
599    }
600
601    /// Wait for all in-flight handlers to complete.
602    async fn wait(&self) {
603        let mut js = self.join_set.lock().await;
604        while let Some(result) = js.join_next().await {
605            if let Err(e) = result {
606                error!(error = %e, "webhook handler panicked");
607            }
608        }
609    }
610}
611
612#[derive(Clone)]
613struct WebhookState {
614    auth: Arc<WebhookAuth>,
615    handler: WebhookHandler,
616    name: Arc<str>,
617    tracker: Arc<HandlerTracker>,
618}
619
620async fn webhook_handler(
621    State(state): State<WebhookState>,
622    headers: HeaderMap,
623    body: Bytes,
624) -> StatusCode {
625    let name = &state.name;
626    if !state.auth.verify(&headers, &body) {
627        warn!(webhook = %name, "webhook auth failed");
628        #[cfg(feature = "prometheus")]
629        {
630            let label: String = name.to_string();
631            metrics::counter!(metric_names::WEBHOOK_RECEIVED_TOTAL, "path" => label, "auth" => metric_names::AUTH_REJECTED).increment(1);
632        }
633        return StatusCode::UNAUTHORIZED;
634    }
635
636    let payload: Value = match from_slice(&body) {
637        Ok(v) => v,
638        Err(e) => {
639            warn!(webhook = %name, error = %e, "invalid JSON body");
640            #[cfg(feature = "prometheus")]
641            {
642                let label: String = name.to_string();
643                metrics::counter!(metric_names::WEBHOOK_RECEIVED_TOTAL, "path" => label, "auth" => metric_names::AUTH_INVALID_BODY).increment(1);
644            }
645            return StatusCode::BAD_REQUEST;
646        }
647    };
648
649    #[cfg(feature = "prometheus")]
650    {
651        let label: String = name.to_string();
652        metrics::counter!(metric_names::WEBHOOK_RECEIVED_TOTAL, "path" => label, "auth" => metric_names::AUTH_ACCEPTED).increment(1);
653    }
654
655    state
656        .tracker
657        .spawn(name.to_string(), state.handler.clone(), payload)
658        .await;
659
660    StatusCode::ACCEPTED
661}
662
663async fn security_headers(
664    request: axum::http::Request<axum::body::Body>,
665    next: axum::middleware::Next,
666) -> axum::response::Response {
667    let mut response = next.run(request).await;
668    let headers = response.headers_mut();
669    headers.insert(
670        header::X_CONTENT_TYPE_OPTIONS,
671        "nosniff".parse().expect("valid header value"),
672    );
673    headers.insert(
674        header::X_FRAME_OPTIONS,
675        "DENY".parse().expect("valid header value"),
676    );
677    headers.insert(
678        "x-xss-protection",
679        "1; mode=block".parse().expect("valid header value"),
680    );
681    headers.insert(
682        header::STRICT_TRANSPORT_SECURITY,
683        "max-age=31536000; includeSubDomains"
684            .parse()
685            .expect("valid header value"),
686    );
687    headers.insert(
688        header::CONTENT_SECURITY_POLICY,
689        "default-src 'none'".parse().expect("valid header value"),
690    );
691    response
692}
693
694async fn shutdown_signal() {
695    let ctrl_c = async {
696        if let Err(e) = tokio::signal::ctrl_c().await {
697            warn!("failed to install ctrl+c handler: {e}");
698        }
699    };
700
701    #[cfg(unix)]
702    {
703        use tokio::signal::unix::{SignalKind, signal};
704        let mut sigterm =
705            signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
706        tokio::select! {
707            () = ctrl_c => info!("received SIGINT, shutting down"),
708            _ = sigterm.recv() => info!("received SIGTERM, shutting down"),
709        }
710    }
711
712    #[cfg(not(unix))]
713    {
714        ctrl_c.await;
715        info!("received ctrl+c, shutting down");
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    /// Test that Runtime::new() creates a runtime with default values.
724    #[test]
725    fn runtime_new_creates_with_defaults() {
726        let rt = Runtime::new();
727        assert_eq!(rt.webhooks.len(), 0);
728        assert_eq!(rt.crons.len(), 0);
729        assert_eq!(rt.max_body_size, DEFAULT_MAX_BODY_SIZE);
730        assert_eq!(rt.max_concurrent_handlers, DEFAULT_MAX_CONCURRENT_HANDLERS);
731        assert!(rt.custom_shutdown.is_none());
732    }
733
734    /// Test that Runtime::default() is equivalent to Runtime::new().
735    #[test]
736    fn runtime_default_equals_new() {
737        let rt_new = Runtime::new();
738        let rt_default = Runtime::default();
739        assert_eq!(rt_new.webhooks.len(), rt_default.webhooks.len());
740        assert_eq!(rt_new.crons.len(), rt_default.crons.len());
741        assert_eq!(rt_new.max_body_size, rt_default.max_body_size);
742        assert_eq!(
743            rt_new.max_concurrent_handlers,
744            rt_default.max_concurrent_handlers
745        );
746    }
747
748    /// Test that max_body_size() builder method sets the value and returns self.
749    #[test]
750    fn max_body_size_sets_value_and_returns_self() {
751        let rt = Runtime::new().max_body_size(512 * 1024);
752        assert_eq!(rt.max_body_size, 512 * 1024);
753    }
754
755    /// Test that max_body_size() can be chained with other builder methods.
756    #[test]
757    fn max_body_size_chainable() {
758        let rt =
759            Runtime::new()
760                .max_body_size(1024)
761                .webhook("/test", WebhookAuth::none(), |_| async {});
762        assert_eq!(rt.max_body_size, 1024);
763        assert_eq!(rt.webhooks.len(), 1);
764    }
765
766    /// Test that max_body_size() can be set to zero.
767    #[test]
768    fn max_body_size_can_be_zero() {
769        let rt = Runtime::new().max_body_size(0);
770        assert_eq!(rt.max_body_size, 0);
771    }
772
773    /// Test that max_body_size() can be set to large values.
774    #[test]
775    fn max_body_size_can_be_large() {
776        let large_size = 1024 * 1024 * 1024; // 1 GiB
777        let rt = Runtime::new().max_body_size(large_size);
778        assert_eq!(rt.max_body_size, large_size);
779    }
780
781    /// Test that max_concurrent_handlers() panics when given 0.
782    #[test]
783    #[should_panic(expected = "max_concurrent_handlers must be greater than 0")]
784    fn max_concurrent_handlers_zero_panics() {
785        let _ = Runtime::new().max_concurrent_handlers(0);
786    }
787
788    /// Test that max_concurrent_handlers() sets the value for valid inputs.
789    #[test]
790    fn max_concurrent_handlers_sets_valid_values() {
791        let rt = Runtime::new().max_concurrent_handlers(16);
792        assert_eq!(rt.max_concurrent_handlers, 16);
793    }
794
795    /// Test that max_concurrent_handlers() with 1 is allowed.
796    #[test]
797    fn max_concurrent_handlers_one_is_valid() {
798        let rt = Runtime::new().max_concurrent_handlers(1);
799        assert_eq!(rt.max_concurrent_handlers, 1);
800    }
801
802    /// Test that max_concurrent_handlers() with large values is allowed.
803    #[test]
804    fn max_concurrent_handlers_large_value_is_valid() {
805        let large_limit = 10000;
806        let rt = Runtime::new().max_concurrent_handlers(large_limit);
807        assert_eq!(rt.max_concurrent_handlers, large_limit);
808    }
809
810    /// Test that max_concurrent_handlers() returns self for chaining.
811    #[test]
812    fn max_concurrent_handlers_chainable() {
813        let rt = Runtime::new().max_concurrent_handlers(32).webhook(
814            "/test",
815            WebhookAuth::none(),
816            |_| async {},
817        );
818        assert_eq!(rt.max_concurrent_handlers, 32);
819        assert_eq!(rt.webhooks.len(), 1);
820    }
821
822    /// Test that with_shutdown() sets a custom shutdown signal and returns self.
823    #[tokio::test]
824    async fn with_shutdown_sets_signal_and_returns_self() {
825        let (tx, rx) = tokio::sync::oneshot::channel();
826        let rt = Runtime::new().with_shutdown(async move {
827            let _ = rx.await;
828        });
829        assert!(rt.custom_shutdown.is_some());
830
831        // Signal to verify it was set properly.
832        let _ = tx.send(());
833    }
834
835    /// Test that with_shutdown() is chainable.
836    #[tokio::test]
837    async fn with_shutdown_chainable() {
838        let (tx, rx) = tokio::sync::oneshot::channel();
839        let rt = Runtime::new()
840            .with_shutdown(async move {
841                let _ = rx.await;
842            })
843            .webhook("/test", WebhookAuth::none(), |_| async {});
844        assert!(rt.custom_shutdown.is_some());
845        assert_eq!(rt.webhooks.len(), 1);
846
847        let _ = tx.send(());
848    }
849
850    /// Test that webhook() registers a route and returns self.
851    #[test]
852    fn webhook_registers_route_and_returns_self() {
853        let rt = Runtime::new().webhook("/hooks/test", WebhookAuth::none(), |_| async {});
854        assert_eq!(rt.webhooks.len(), 1);
855        assert_eq!(rt.webhooks[0].path, "/hooks/test");
856    }
857
858    /// Test that webhook() panics if path does not start with '/'.
859    #[test]
860    #[should_panic(expected = "webhook path must start with '/'")]
861    fn webhook_path_without_slash_panics() {
862        let _ = Runtime::new().webhook("no-slash", WebhookAuth::none(), |_| async {});
863    }
864
865    /// Test that webhook() accepts paths with various formats.
866    #[test]
867    fn webhook_accepts_valid_paths() {
868        let rt = Runtime::new()
869            .webhook("/", WebhookAuth::none(), |_| async {})
870            .webhook("/simple", WebhookAuth::none(), |_| async {})
871            .webhook("/nested/path", WebhookAuth::none(), |_| async {})
872            .webhook("/with-dashes", WebhookAuth::none(), |_| async {})
873            .webhook("/with_underscores", WebhookAuth::none(), |_| async {})
874            .webhook("/with/numbers/123", WebhookAuth::none(), |_| async {});
875        assert_eq!(rt.webhooks.len(), 6);
876    }
877
878    /// Test that webhook() can be chained multiple times.
879    #[test]
880    fn webhook_chainable() {
881        let rt = Runtime::new()
882            .webhook("/hook-a", WebhookAuth::none(), |_| async {})
883            .webhook("/hook-b", WebhookAuth::none(), |_| async {})
884            .webhook("/hook-c", WebhookAuth::none(), |_| async {});
885        assert_eq!(rt.webhooks.len(), 3);
886        assert_eq!(rt.webhooks[0].path, "/hook-a");
887        assert_eq!(rt.webhooks[1].path, "/hook-b");
888        assert_eq!(rt.webhooks[2].path, "/hook-c");
889    }
890
891    /// Test that webhook() works with different auth types.
892    #[test]
893    fn webhook_with_various_auth_types() {
894        let rt = Runtime::new()
895            .webhook("/none", WebhookAuth::none(), |_| async {})
896            .webhook(
897                "/header",
898                WebhookAuth::header("x-api-key", "secret"),
899                |_| async {},
900            )
901            .webhook("/github", WebhookAuth::github("secret"), |_| async {})
902            .webhook("/gitlab", WebhookAuth::gitlab("token"), |_| async {});
903        assert_eq!(rt.webhooks.len(), 4);
904    }
905
906    /// Test that cron() registers a job and returns self.
907    #[test]
908    fn cron_registers_job_and_returns_self() {
909        let rt = Runtime::new().cron("0 0 * * * *", "daily-task", || async {});
910        assert_eq!(rt.crons.len(), 1);
911        assert_eq!(rt.crons[0].name, "daily-task");
912        assert_eq!(rt.crons[0].schedule, "0 0 * * * *");
913    }
914
915    /// Test that cron() is chainable.
916    #[test]
917    fn cron_chainable() {
918        let rt = Runtime::new()
919            .cron("0 0 * * * *", "midnight", || async {})
920            .cron("0 */5 * * * *", "every-5-minutes", || async {});
921        assert_eq!(rt.crons.len(), 2);
922    }
923
924    /// Test that cron() preserves all parameters correctly.
925    #[test]
926    fn cron_preserves_schedule_and_name() {
927        let rt = Runtime::new()
928            .cron("0 12 * * * MON", "noon-mondays", || async {})
929            .cron("0 0 1 * * *", "first-of-month", || async {});
930        assert_eq!(rt.crons[0].name, "noon-mondays");
931        assert_eq!(rt.crons[0].schedule, "0 12 * * * MON");
932        assert_eq!(rt.crons[1].name, "first-of-month");
933        assert_eq!(rt.crons[1].schedule, "0 0 1 * * *");
934    }
935
936    /// Test that into_router() returns a Router (compiles and doesn't panic).
937    #[test]
938    fn into_router_returns_router() {
939        let rt = Runtime::new();
940        let _router = rt.into_router();
941        // If this compiles and doesn't panic, the router was successfully created.
942    }
943
944    /// Test that into_router() with webhooks returns a Router.
945    #[test]
946    fn into_router_with_webhooks_returns_router() {
947        let rt = Runtime::new()
948            .webhook("/hook-a", WebhookAuth::none(), |_| async {})
949            .webhook("/hook-b", WebhookAuth::github("secret"), |_| async {});
950        let _router = rt.into_router();
951        // If this compiles and doesn't panic, the router was successfully created with all webhooks.
952    }
953
954    /// Test that into_router() with cron jobs (warns but doesn't panic).
955    #[test]
956    fn into_router_with_crons_returns_router() {
957        let rt = Runtime::new()
958            .cron("0 0 * * * *", "daily", || async {})
959            .cron("0 */5 * * * *", "every-5-min", || async {});
960        let _router = rt.into_router();
961        // Crons are dropped but not an error; router should still be created.
962    }
963
964    /// Test that into_router() with max_body_size returns a Router.
965    #[test]
966    fn into_router_respects_max_body_size_config() {
967        let rt =
968            Runtime::new()
969                .max_body_size(100)
970                .webhook("/hook", WebhookAuth::none(), |_| async {});
971        let _router = rt.into_router();
972        // Router created; actual body size limit enforcement is tested in integration tests.
973    }
974
975    /// Test that into_router() with max_concurrent_handlers returns a Router.
976    #[test]
977    fn into_router_respects_max_concurrent_handlers_config() {
978        let rt = Runtime::new().max_concurrent_handlers(16).webhook(
979            "/hook",
980            WebhookAuth::none(),
981            |_| async {},
982        );
983        let _router = rt.into_router();
984        // Router created; concurrency limit enforcement is tested in integration tests.
985    }
986
987    /// Test full builder chain with multiple methods.
988    #[test]
989    fn builder_chain_multiple_methods() {
990        let rt = Runtime::new()
991            .max_body_size(512 * 1024)
992            .max_concurrent_handlers(32)
993            .webhook("/hook-a", WebhookAuth::none(), |_| async {})
994            .webhook("/hook-b", WebhookAuth::github("secret"), |_| async {})
995            .cron("0 0 * * * *", "daily", || async {});
996
997        assert_eq!(rt.max_body_size, 512 * 1024);
998        assert_eq!(rt.max_concurrent_handlers, 32);
999        assert_eq!(rt.webhooks.len(), 2);
1000        assert_eq!(rt.crons.len(), 1);
1001    }
1002
1003    /// Test that into_router() drops cron jobs with a warning logged.
1004    #[test]
1005    fn into_router_with_crons_doesnt_start_them() {
1006        let rt = Runtime::new().cron("0 0 * * * *", "test-cron", || async {});
1007        // This should not panic; crons are simply dropped.
1008        let _router = rt.into_router();
1009    }
1010}