Skip to main content

forge/runtime/
mod.rs

1//! FORGE runtime — single binary that runs the gateway, workers, scheduler, and cluster.
2
3mod builder;
4pub use builder::ForgeBuilder;
5
6#[cfg(feature = "gateway")]
7use std::future::Future;
8use std::net::IpAddr;
9use std::path::PathBuf;
10#[cfg(feature = "gateway")]
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use uuid::Uuid;
16
17#[cfg(feature = "gateway")]
18use axum::Router;
19#[cfg(feature = "gateway")]
20use axum::body::Body;
21#[cfg(feature = "gateway")]
22use axum::http::Request;
23#[cfg(feature = "gateway")]
24use axum::response::Response;
25use tokio::sync::broadcast;
26
27use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
28use forge_core::config::ForgeConfig;
29use forge_core::error::{ForgeError, Result};
30use forge_runtime::pg::migration::{Migration, MigrationRunner, load_migrations_from_dir};
31
32#[cfg(feature = "gateway")]
33use forge_core::mcp::ForgeMcpTool;
34use forge_runtime::cluster::{
35    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, NodeRegistry, ShutdownConfig,
36};
37#[cfg(feature = "cron")]
38use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
39#[cfg(feature = "daemons")]
40use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
41use forge_runtime::function::FunctionRegistry;
42use forge_runtime::pg::Database;
43use forge_runtime::pg::{LeaderConfig, LeaderElection, PgNotifyBus};
44// CircuitBreakerClient wraps reqwest; used by cron/daemon/workflow for
45// outbound HTTP. (Gateway uses its own reqwest path.)
46#[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
47use forge_core::CircuitBreakerClient;
48#[cfg(feature = "gateway")]
49use forge_runtime::gateway::{
50    AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer, TlsListenConfig,
51    bind_listener,
52};
53#[cfg(feature = "jobs")]
54use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
55#[cfg(feature = "gateway")]
56use forge_runtime::mcp::McpToolRegistry;
57#[cfg(feature = "gateway")]
58use forge_runtime::realtime::{
59    InvalidationConfig, ListenerConfig, ReactorConfig, RealtimeConfig as RuntimeRealtimeConfig,
60};
61#[cfg(feature = "gateway")]
62use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
63#[cfg(feature = "workflows")]
64use forge_runtime::workflow::{
65    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
66};
67#[cfg(feature = "workflows")]
68use tokio_util::sync::CancellationToken;
69
70use builder::{config_role_to_node_role, get_hostname};
71
72/// Type alias for frontend handler function.
73#[cfg(feature = "gateway")]
74pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
75
76/// Common imports for Forge applications.
77///
78/// **Stability contract:** all `pub use` items here are part of Forge's stable
79/// surface. Removing or renaming an item is a breaking change.
80///
81/// **Upstream crates:** `axum` is re-exported so `custom_routes` factories
82/// compile against the same version the runtime uses. A breaking axum change
83/// requires a Forge minor or major bump.
84pub mod prelude {
85    pub use chrono::{DateTime, Utc};
86    pub use uuid::Uuid;
87
88    pub use serde::{Deserialize, Serialize};
89    pub use serde_json;
90    pub use serde_json::Value;
91
92    pub type Timestamp = DateTime<Utc>;
93
94    pub use forge_core::auth::TokenPair;
95    pub use forge_core::config::ForgeConfig;
96    pub use forge_core::cron::{CronContext, ForgeCron};
97    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
98    // EnvAccess adds `ctx.env(...)` / `ctx.env_require(...)` — kept in the
99    // glob so handlers don't need an explicit import.
100    pub use forge_core::env::EnvAccess;
101    pub use forge_core::error::{ForgeError, Result};
102    pub use forge_core::function::{
103        AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
104    };
105    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
106    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext};
107    pub use forge_core::realtime::Delta;
108    pub use forge_core::schemars::JsonSchema;
109    pub use forge_core::types::Upload;
110    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
111    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
112
113    // Same axum version the runtime uses; avoids type mismatches in custom_routes factories.
114    #[cfg(feature = "gateway")]
115    pub use axum;
116
117    pub use crate::{Forge, ForgeBuilder};
118
119    pub use forge_core::testing::{
120        TestCronContext, TestDaemonContext, TestJobContext, TestMcpToolContext,
121        TestMutationContext, TestQueryContext, TestWebhookContext, TestWorkflowContext,
122    };
123}
124
125/// The main FORGE runtime.
126pub struct Forge {
127    pub(super) config: ForgeConfig,
128    pub(super) db: Option<Database>,
129    pub(super) node_id: NodeId,
130    pub(super) function_registry: FunctionRegistry,
131    #[cfg(feature = "gateway")]
132    pub(super) mcp_registry: McpToolRegistry,
133    #[cfg(feature = "jobs")]
134    pub(super) job_registry: JobRegistry,
135    #[cfg(feature = "cron")]
136    pub(super) cron_registry: Arc<CronRegistry>,
137    #[cfg(feature = "workflows")]
138    pub(super) workflow_registry: WorkflowRegistry,
139    #[cfg(feature = "daemons")]
140    pub(super) daemon_registry: Arc<DaemonRegistry>,
141    #[cfg(feature = "gateway")]
142    pub(super) webhook_registry: Arc<WebhookRegistry>,
143    pub(super) shutdown_tx: broadcast::Sender<()>,
144    pub(super) migrations_dir: PathBuf,
145    pub(super) extra_migrations: Vec<Migration>,
146    #[cfg(feature = "gateway")]
147    pub(super) frontend_handler: Option<FrontendHandler>,
148    #[cfg(feature = "gateway")]
149    pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
150    #[cfg(feature = "gateway")]
151    pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
152}
153
154impl Forge {
155    pub fn builder() -> ForgeBuilder {
156        ForgeBuilder::new()
157    }
158
159    pub fn node_id(&self) -> NodeId {
160        self.node_id
161    }
162
163    pub fn config(&self) -> &ForgeConfig {
164        &self.config
165    }
166
167    pub fn function_registry(&self) -> &FunctionRegistry {
168        &self.function_registry
169    }
170
171    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
172        &mut self.function_registry
173    }
174
175    #[cfg(feature = "gateway")]
176    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
177        &mut self.mcp_registry
178    }
179
180    #[cfg(feature = "gateway")]
181    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
182        self.mcp_registry.register::<T>();
183        self
184    }
185
186    #[cfg(feature = "jobs")]
187    pub fn job_registry(&self) -> &JobRegistry {
188        &self.job_registry
189    }
190
191    #[cfg(feature = "jobs")]
192    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
193        &mut self.job_registry
194    }
195
196    #[cfg(feature = "cron")]
197    pub fn cron_registry(&self) -> Arc<CronRegistry> {
198        self.cron_registry.clone()
199    }
200
201    #[cfg(feature = "workflows")]
202    pub fn workflow_registry(&self) -> &WorkflowRegistry {
203        &self.workflow_registry
204    }
205
206    #[cfg(feature = "workflows")]
207    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
208        &mut self.workflow_registry
209    }
210
211    #[cfg(feature = "daemons")]
212    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
213        self.daemon_registry.clone()
214    }
215
216    #[cfg(feature = "gateway")]
217    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
218        self.webhook_registry.clone()
219    }
220
221    /// Persist all workflow definitions. Fails startup if a signature conflicts
222    /// under the same name+version (contract changed without a version bump).
223    #[cfg(feature = "workflows")]
224    async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
225        self.workflow_registry.persist_definitions(pool).await
226    }
227
228    /// Start the runtime. Blocks until a ctrl-c or `Forge::shutdown()` is called.
229    pub async fn run(mut self) -> Result<()> {
230        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
231            &self.config.observability,
232            &self.config.project.name,
233            &self.config.project.version,
234        );
235        let telemetry_result = forge_runtime::init_telemetry(
236            &telemetry_config,
237            &self.config.project.name,
238            &self.config.observability.log_level,
239        );
240        match &telemetry_result {
241            Ok(true) | Ok(false) => {
242                tracing::debug!(
243                    endpoint = %telemetry_config.otlp_endpoint,
244                    traces = telemetry_config.enable_traces,
245                    metrics = telemetry_config.enable_metrics,
246                    logs = telemetry_config.enable_logs,
247                    sampling = telemetry_config.sampling_ratio,
248                    "Telemetry initialized"
249                );
250            }
251            // init_telemetry failed before a subscriber could be installed; tracing
252            // macros would be silently dropped, so fall back to eprintln!.
253            Err(e) => eprintln!("forge: failed to initialize telemetry: {e}"),
254        }
255
256        tracing::debug!("Connecting to database");
257
258        let db =
259            Database::from_config_with_service(&self.config.database, &self.config.project.name)
260                .await?;
261        let pool = db.primary().clone();
262        // Health monitor self-terminates on shutdown_tx; drop the handle.
263        let _ = db.start_health_monitor(self.shutdown_tx.subscribe());
264        self.db = Some(db);
265
266        tracing::debug!("Database connected");
267
268        let runner = MigrationRunner::new(pool.clone());
269
270        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
271        user_migrations.extend(self.extra_migrations.clone());
272
273        runner.run(user_migrations).await?;
274        tracing::debug!("Migrations applied");
275
276        #[cfg(feature = "workflows")]
277        if !self.workflow_registry.is_empty() {
278            self.persist_workflow_definitions(&pool).await?;
279        }
280
281        let hostname = get_hostname();
282
283        // HOST env var overrides bind address; PORT env var overrides config port.
284        let ip_address: IpAddr = std::env::var("HOST")
285            .unwrap_or_else(|_| "0.0.0.0".to_string())
286            .parse()
287            .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
288
289        if let Ok(port_str) = std::env::var("PORT")
290            && let Ok(port) = port_str.parse::<u16>()
291        {
292            self.config.gateway.port = port;
293        }
294
295        let roles: Vec<NodeRole> = self
296            .config
297            .node
298            .roles
299            .iter()
300            .map(config_role_to_node_role)
301            .collect();
302
303        let node_info = NodeInfo::new_local(
304            hostname,
305            ip_address,
306            self.config.gateway.port,
307            self.config.gateway.grpc_port,
308            roles.clone(),
309            self.config.node.worker_capabilities.clone(),
310            env!("CARGO_PKG_VERSION").to_string(),
311        );
312
313        let node_id = node_info.id;
314        self.node_id = node_id;
315
316        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
317
318        if let Err(e) = node_registry.register().await {
319            tracing::debug!("Failed to register node (tables may not exist): {}", e);
320        }
321
322        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
323            tracing::debug!("Failed to set node status: {}", e);
324        }
325
326        // Construct the shared PG NOTIFY bus up front so the leader election
327        // can subscribe to `forge_leader_released` and react instantly to a
328        // sibling's voluntary release instead of waiting for the next tick.
329        let notify_bus = Arc::new(PgNotifyBus::new(
330            pool.clone(),
331            &[
332                "forge_changes",
333                "forge_jobs_available",
334                "forge_workflow_wakeup",
335                forge_runtime::pg::LEADER_RELEASED_CHANNEL,
336            ],
337        ));
338
339        let leader_election = if roles.contains(&NodeRole::Scheduler) {
340            let election = Arc::new(
341                LeaderElection::new(
342                    pool.clone(),
343                    node_id,
344                    LeaderRole::Scheduler,
345                    LeaderConfig::default(),
346                )
347                .with_notify_bus(notify_bus.clone()),
348            );
349
350            // Try to become leader
351            if let Err(e) = election.try_become_leader().await {
352                tracing::debug!("Failed to acquire leadership: {}", e);
353            }
354
355            Some(election)
356        } else {
357            None
358        };
359
360        let shutdown = Arc::new(GracefulShutdown::new(
361            node_registry.clone(),
362            leader_election.clone(),
363            ShutdownConfig::default(),
364        ));
365
366        #[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
367        let http_client = CircuitBreakerClient::with_ssrf_protection();
368
369        let mut handles = Vec::new();
370        // Leader handles: cron, daemon, workflow scheduler. Must finish before
371        // releasing the advisory lock so no sibling starts duplicate work mid-tick.
372        let mut leader_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
373
374        {
375            let heartbeat_pool = pool.clone();
376            let heartbeat_node_id = node_id;
377            let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
378            handles.push(tokio::spawn(async move {
379                match HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config).await {
380                    Ok(heartbeat) => heartbeat.run().await,
381                    Err(e) => tracing::error!(error = %e, "Failed to start heartbeat loop"),
382                }
383            }));
384        }
385
386        if let Some(ref election) = leader_election {
387            let election = election.clone();
388            handles.push(tokio::spawn(async move {
389                election.run().await;
390            }));
391        }
392
393        #[cfg(feature = "cron")]
394        {
395            forge_runtime::cron::register_cron_bridges(&self.cron_registry, &mut self.job_registry);
396        }
397
398        #[cfg(feature = "jobs")]
399        let job_queue = JobQueue::new(pool.clone());
400
401        // Gate the direct bus spawn on the absence of a gateway role: the reactor
402        // already calls bus.run() for gateway nodes, and two spawns on the same
403        // PgNotifyBus instance would race.
404        #[cfg(feature = "gateway")]
405        let notify_bus_needs_direct_spawn = !roles.contains(&NodeRole::Gateway);
406        #[cfg(not(feature = "gateway"))]
407        let notify_bus_needs_direct_spawn = true;
408        if notify_bus_needs_direct_spawn {
409            let (bus_shutdown_tx, bus_shutdown_rx) = tokio::sync::watch::channel(false);
410            let bus_for_task = notify_bus.clone();
411            handles.push(tokio::spawn(async move {
412                bus_for_task.run(bus_shutdown_rx).await;
413            }));
414            let mut bus_broadcast_rx = self.shutdown_tx.subscribe();
415            tokio::spawn(async move {
416                let _ = bus_broadcast_rx.recv().await;
417                let _ = bus_shutdown_tx.send(true);
418            });
419        }
420
421        let kv_handle: Arc<dyn forge_core::function::KvHandle> =
422            Arc::new(forge_runtime::KvStore::new(pool.clone(), "handlers"));
423
424        // Must register the workflow bridge BEFORE spawning workers: JobRegistry
425        // is cloned by value per worker, so late registration is invisible and
426        // `$workflow_resume` jobs would fail with "unknown job type".
427        #[cfg(feature = "workflows")]
428        let workflow_bridge_executor = Arc::new(
429            WorkflowExecutor::new(
430                Arc::new(self.workflow_registry.clone()),
431                pool.clone(),
432                job_queue.clone(),
433                http_client.clone(),
434            )
435            .with_kv(Arc::clone(&kv_handle)),
436        );
437        #[cfg(feature = "workflows")]
438        {
439            forge_runtime::workflow::register_workflow_bridge(
440                workflow_bridge_executor.clone(),
441                &mut self.job_registry,
442            );
443        }
444
445        // Dispatcher must be constructed before workers so it can be threaded
446        // into each JobExecutor. Without it, `ctx.start_workflow(...)` writes
447        // blank version/signature columns and immediately blocks on resume.
448        #[cfg(feature = "jobs")]
449        let job_dispatcher = {
450            let job_queue_for_dispatch = JobQueue::new(pool.clone());
451            Arc::new(JobDispatcher::new(
452                job_queue_for_dispatch,
453                self.job_registry.clone(),
454            ))
455        };
456
457        // One Worker per queue so `default` traffic can't starve `workflows` or `cron`.
458        // Only the `default` queue's worker claims untagged (NULL capability) jobs.
459        #[cfg(feature = "jobs")]
460        if roles.contains(&NodeRole::Worker) {
461            let mut node_capabilities: Vec<String> = self.config.node.worker_capabilities.clone();
462            for queue_name in self.config.worker.queues.keys() {
463                if !node_capabilities.iter().any(|c| c == queue_name) {
464                    node_capabilities.push(queue_name.clone());
465                }
466            }
467
468            for (queue_name, queue_cfg) in &self.config.worker.queues {
469                if queue_cfg.workers == 0 {
470                    continue;
471                }
472                let worker_id = Uuid::new_v4();
473                let claim_untagged = queue_name == forge_core::config::DEFAULT_QUEUE;
474                let worker_config = WorkerConfig {
475                    id: Some(worker_id),
476                    capabilities: vec![queue_name.clone()],
477                    claim_untagged,
478                    max_concurrent: queue_cfg.workers,
479                    poll_interval: *self.config.worker.poll_interval,
480                    ..Default::default()
481                };
482
483                let worker_base = Worker::new(
484                    worker_config,
485                    job_queue.clone(),
486                    self.job_registry.clone(),
487                    pool.clone(),
488                    notify_bus.clone(),
489                )
490                .with_kv(Arc::clone(&kv_handle))
491                .with_job_dispatch(job_dispatcher.clone());
492
493                #[cfg(feature = "workflows")]
494                let mut worker =
495                    worker_base.with_workflow_dispatch(workflow_bridge_executor.clone());
496                #[cfg(not(feature = "workflows"))]
497                let mut worker = worker_base;
498
499                let queue_label = queue_name.clone();
500                handles.push(tokio::spawn(async move {
501                    if let Err(e) = worker.run().await {
502                        tracing::error!(queue = %queue_label, "Worker error: {}", e);
503                    }
504                }));
505
506                tracing::debug!(
507                    queue = %queue_name,
508                    workers = queue_cfg.workers,
509                    "Job worker pool started",
510                );
511            }
512
513            // pool_size >= sum(workers) + 6 (persistent conns: change listener,
514            // leader lock, heartbeat, health monitor, migration lock, headroom).
515            // Gateway handlers draw from the same pool but hold connections briefly.
516            let total_worker_concurrency: usize =
517                self.config.worker.queues.values().map(|q| q.workers).sum();
518            const PERSISTENT_CONN_OVERHEAD: usize = 6;
519            let min_recommended = total_worker_concurrency + PERSISTENT_CONN_OVERHEAD;
520            if (self.config.database.pool_size as usize) < min_recommended {
521                tracing::warn!(
522                    pool_size = self.config.database.pool_size,
523                    total_worker_concurrency,
524                    min_recommended,
525                    "database.pool_size ({}) is below the recommended minimum ({}) for the \
526                     configured worker concurrency. \
527                     Formula: sum(workers per queue) + 6 = {} + 6 = {}. \
528                     Increase database.pool_size to avoid connection exhaustion under load.",
529                    self.config.database.pool_size,
530                    min_recommended,
531                    total_worker_concurrency,
532                    min_recommended,
533                );
534            }
535        }
536
537        #[cfg(feature = "jobs")]
538        if roles.contains(&NodeRole::Worker) {
539            let kv_pool = pool.clone();
540            let mut kv_shutdown = self.shutdown_tx.subscribe();
541            let kv_leader = leader_election.clone();
542            handles.push(tokio::spawn(async move {
543                let kv = forge_runtime::KvStore::new(kv_pool.clone(), "app");
544                let rate_limiter = forge_runtime::StrictRateLimiter::new(kv_pool);
545                loop {
546                    tokio::select! {
547                        _ = kv_shutdown.recv() => break,
548                        _ = tokio::time::sleep(Duration::from_secs(300)) => {}
549                    }
550                    let is_leader = kv_leader.as_ref().map(|e| e.is_leader()).unwrap_or(true);
551                    if !is_leader {
552                        continue;
553                    }
554                    match kv.cleanup_expired().await {
555                        Ok(n) if n > 0 => tracing::debug!(count = n, "KV TTL cleanup"),
556                        Err(e) => tracing::warn!(error = %e, "KV TTL cleanup failed"),
557                        _ => {}
558                    }
559                    let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
560                    match rate_limiter.cleanup(cutoff).await {
561                        Ok(n) if n > 0 => tracing::debug!(count = n, "Rate limit bucket cleanup"),
562                        Err(e) => tracing::warn!(error = %e, "Rate limit cleanup failed"),
563                        _ => {}
564                    }
565                }
566            }));
567        }
568
569        #[cfg(feature = "cron")]
570        let cron_runner_handle: Option<Arc<CronRunner>> = if roles.contains(&NodeRole::Scheduler) {
571            let cron_registry = self.cron_registry.clone();
572            let cron_pool = pool.clone();
573            let cron_leader_election = leader_election.clone();
574
575            let cron_config = CronRunnerConfig {
576                poll_interval: *self.config.cron.poll_interval,
577                node_id: node_id.as_uuid(),
578                is_leader: cron_leader_election.is_none(),
579                leader_election: cron_leader_election,
580                run_stale_threshold: Duration::from_secs(15 * 60),
581                ..Default::default()
582            };
583
584            let cron_runner = Arc::new(CronRunner::new(
585                cron_registry,
586                cron_pool,
587                job_queue.clone(),
588                cron_config,
589            ));
590            let cron_runner_clone = cron_runner.clone();
591
592            leader_handles.push(tokio::spawn(async move {
593                if let Err(e) = cron_runner_clone.run().await {
594                    tracing::error!("Cron runner error: {}", e);
595                }
596            }));
597
598            tracing::debug!("Cron scheduler started");
599            Some(cron_runner)
600        } else {
601            None
602        };
603
604        #[cfg(feature = "workflows")]
605        let workflow_shutdown_token = CancellationToken::new();
606        #[cfg(feature = "workflows")]
607        if roles.contains(&NodeRole::Scheduler) {
608            let event_store = Arc::new(EventStore::new(pool.clone()));
609            let scheduler = WorkflowScheduler::new(
610                pool.clone(),
611                job_queue.clone(),
612                event_store,
613                WorkflowSchedulerConfig {
614                    poll_interval: *self.config.workflow.poll_interval,
615                    leader_election: leader_election.clone(),
616                    ..WorkflowSchedulerConfig::default()
617                },
618                notify_bus.clone(),
619            );
620
621            let shutdown_token = workflow_shutdown_token.clone();
622            leader_handles.push(tokio::spawn(async move {
623                scheduler.run(shutdown_token).await;
624            }));
625
626            tracing::debug!("Workflow scheduler started");
627        }
628
629        #[cfg(feature = "workflows")]
630        let workflow_executor = workflow_bridge_executor;
631
632        #[cfg(feature = "daemons")]
633        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
634            let daemon_registry = self.daemon_registry.clone();
635            let daemon_pool = pool.clone();
636            let daemon_http = http_client.clone();
637            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
638
639            let daemon_runner = DaemonRunner::new(
640                daemon_registry,
641                daemon_pool,
642                daemon_http,
643                node_id.as_uuid(),
644                daemon_shutdown_rx,
645            )
646            .with_config(forge_runtime::daemon::DaemonRunnerConfig {
647                health_check_interval: *self.config.daemon.health_check_interval,
648                heartbeat_interval: *self.config.daemon.heartbeat_interval,
649            });
650            #[cfg(feature = "jobs")]
651            let daemon_runner = daemon_runner.with_job_dispatch(job_dispatcher.clone());
652            #[cfg(feature = "workflows")]
653            let daemon_runner = daemon_runner.with_workflow_dispatch(workflow_executor.clone());
654            let daemon_runner = daemon_runner.with_kv(Arc::clone(&kv_handle));
655
656            leader_handles.push(tokio::spawn(async move {
657                if let Err(e) = daemon_runner.run().await {
658                    tracing::error!("Daemon runner error: {}", e);
659                }
660            }));
661
662            tracing::debug!("Daemon runner started");
663        }
664
665        #[cfg(feature = "gateway")]
666        let mut reactor_handle = None;
667
668        #[cfg(feature = "gateway")]
669        if roles.contains(&NodeRole::Gateway) {
670            // `from_core` enforces the both-or-neither contract here too, so
671            // a programmatically constructed `ForgeConfig` that bypasses
672            // `validate()` still can't slip a half-set TLS config through.
673            let tls: Option<TlsListenConfig> =
674                TlsListenConfig::from_core(&self.config.gateway.tls)?;
675
676            // Fail early if handlers require auth but no usable credentials are configured.
677            // The registry is populated at this point so we can inspect every handler.
678            let any_requires_auth = self
679                .function_registry
680                .queries()
681                .any(|(_, info)| !info.is_public || info.required_role.is_some())
682                || self
683                    .function_registry
684                    .mutations()
685                    .any(|(_, info)| !info.is_public || info.required_role.is_some());
686
687            if any_requires_auth && !self.config.auth.is_configured() {
688                return Err(ForgeError::config(
689                    "One or more handlers require authentication (private scope or require_role) \
690                     but auth is not configured. Set auth.jwt_secret (≥32 bytes) for HMAC or \
691                     auth.jwks_url for external identity providers.",
692                ));
693            }
694
695            // CORS wildcard is only allowed in development. Block startup
696            // unless FORGE_ENV is explicitly set to "development".
697            if self.config.gateway.cors_enabled
698                && self.config.gateway.cors_origins.iter().any(|o| o == "*")
699            {
700                let forge_env = std::env::var("FORGE_ENV").ok();
701                let is_dev = forge_env
702                    .as_deref()
703                    .is_some_and(|v| v.eq_ignore_ascii_case("development"));
704                if !is_dev {
705                    let production_indicators = [
706                        ("FORGE_ENV", std::env::var("FORGE_ENV").ok()),
707                        ("NODE_ENV", std::env::var("NODE_ENV").ok()),
708                        (
709                            "RAILWAY_ENVIRONMENT",
710                            std::env::var("RAILWAY_ENVIRONMENT").ok(),
711                        ),
712                        ("K_SERVICE", std::env::var("K_SERVICE").ok()),
713                        ("FLY_APP_NAME", std::env::var("FLY_APP_NAME").ok()),
714                        (
715                            "KUBERNETES_SERVICE_HOST",
716                            std::env::var("KUBERNETES_SERVICE_HOST").ok(),
717                        ),
718                        ("AWS_EXECUTION_ENV", std::env::var("AWS_EXECUTION_ENV").ok()),
719                    ];
720                    let hint = production_indicators
721                        .iter()
722                        .find_map(|(name, val)| {
723                            val.as_ref().map(|v| format!(" ({name}={v} detected)"))
724                        })
725                        .unwrap_or_default();
726                    return Err(ForgeError::config(format!(
727                        "gateway.cors_origins = [\"*\"] is only allowed when FORGE_ENV=development{hint}. \
728                         Set explicit origins (e.g. cors_origins = [\"https://yourdomain.com\"])."
729                    )));
730                }
731            }
732
733            let gateway_config = RuntimeGatewayConfig {
734                port: self.config.gateway.port,
735                max_connections: self.config.gateway.max_connections,
736                sse_max_sessions: self.config.realtime.sse_max_sessions,
737                request_timeout_secs: self.config.gateway.request_timeout.as_secs(),
738                cors_enabled: self.config.gateway.cors_enabled,
739                cors_origins: self.config.gateway.cors_origins.clone(),
740                auth: AuthConfig::from_forge_config(&self.config.auth)
741                    .map_err(|e| ForgeError::config(e.to_string()))?,
742                mcp: self.config.mcp.clone(),
743                quiet_paths: self.config.gateway.quiet_paths.clone(),
744                max_body_size_bytes: self.config.gateway.max_body_size.as_bytes(),
745                max_json_body_bytes: self.config.gateway.max_json_body_size.as_bytes(),
746                max_file_size_bytes: self.config.gateway.max_file_size.as_bytes(),
747                token_ttl: forge_core::AuthTokenTtl::new(
748                    self.config.auth.access_token_ttl_secs(),
749                    self.config.auth.refresh_token_ttl_days(),
750                ),
751                project_name: self.config.project.name.clone(),
752                tls,
753                reactor_config: {
754                    let rt = &self.config.realtime;
755                    ReactorConfig {
756                        listener: ListenerConfig {
757                            buffer_size: rt.postgres_change_buffer_size,
758                            ..ListenerConfig::default()
759                        },
760                        invalidation: InvalidationConfig {
761                            debounce_ms: rt.debounce_quiet_window.as_millis(),
762                            max_debounce_ms: rt.debounce_max_wait.as_millis(),
763                            ..InvalidationConfig::default()
764                        },
765                        realtime: RuntimeRealtimeConfig {
766                            max_subscriptions_per_session: rt.subscription_max_per_session,
767                        },
768                        max_concurrent_reexecutions: rt.max_concurrent_reexecutions,
769                        resync_interval_secs: rt.resync_interval.as_secs(),
770                        shard_count: rt.shard_count,
771                        ..ReactorConfig::default()
772                    }
773                },
774                max_multipart_fields: self.config.gateway.max_multipart_fields,
775                max_sessions_per_user: self.config.realtime.max_sessions_per_user,
776                max_sessions_per_ip: self.config.realtime.max_sessions_per_ip,
777                max_subscriptions_per_user: self.config.realtime.max_subscriptions_per_user,
778                security_headers: self.config.gateway.security_headers,
779                hsts: self.config.gateway.hsts,
780                trusted_proxies: self
781                    .config
782                    .gateway
783                    .trusted_proxies
784                    .iter()
785                    .filter_map(|s| {
786                        s.parse::<ipnet::IpNet>()
787                            .or_else(|_| s.parse::<std::net::IpAddr>().map(ipnet::IpNet::from))
788                            .ok()
789                    })
790                    .collect(),
791                max_jobs_per_request: self.config.gateway.max_jobs_per_request,
792                max_result_size_bytes: self.config.gateway.max_result_size_bytes,
793                max_json_depth: self.config.gateway.max_json_depth,
794            };
795
796            let db_ref = self
797                .db
798                .clone()
799                .ok_or_else(|| ForgeError::internal("Database not initialized"))?;
800
801            let gateway = GatewayServer::new(
802                gateway_config,
803                self.function_registry.clone(),
804                db_ref.clone(),
805                notify_bus.clone(),
806            )
807            .with_node_id(self.node_id);
808            #[cfg(feature = "jobs")]
809            let gateway = gateway.with_job_dispatcher(job_dispatcher.clone());
810            #[cfg(feature = "workflows")]
811            let gateway = gateway.with_workflow_dispatcher(workflow_executor.clone());
812            let gateway = gateway.with_kv(Arc::clone(&kv_handle));
813            let mut gateway = gateway.with_mcp_registry(self.mcp_registry.clone());
814
815            if matches!(
816                self.config.rate_limit.mode,
817                forge_core::config::RateLimitMode::Hybrid
818            ) {
819                // System table query runs at startup, not in offline-checked code path.
820                #[allow(clippy::disallowed_methods)]
821                let active_nodes: Option<i64> =
822                    sqlx::query_scalar("SELECT COUNT(*) FROM forge_nodes WHERE status = 'active'")
823                        .fetch_one(db_ref.primary())
824                        .await
825                        .ok();
826                if active_nodes.is_some_and(|n| n > 1) {
827                    let n = active_nodes.unwrap_or(0);
828                    tracing::warn!(
829                        active_nodes = n,
830                        "rate_limit.mode is 'hybrid' but {n} active nodes detected. \
831                         Per-user/per-IP limits are local-only and effectively multiply by the \
832                         node count. Set rate_limit.mode = \"strict\" for cluster deployments."
833                    );
834                }
835            }
836
837            let rate_limiter: std::sync::Arc<dyn forge_core::rate_limit::RateLimiterBackend> =
838                match self.config.rate_limit.mode {
839                    forge_core::config::RateLimitMode::Strict => std::sync::Arc::new(
840                        forge_runtime::StrictRateLimiter::new(db_ref.primary().clone()),
841                    ),
842                    forge_core::config::RateLimitMode::Hybrid => {
843                        std::sync::Arc::new(forge_runtime::HybridRateLimiter::with_max_buckets(
844                            db_ref.primary().clone(),
845                            self.config.rate_limit.max_local_buckets,
846                        ))
847                    }
848                };
849            gateway = gateway.with_rate_limiter(rate_limiter);
850            if let Some(resolver) = self.role_resolver.take() {
851                gateway = gateway.with_role_resolver(resolver);
852            }
853            if self.config.signals.enabled {
854                let signals_pool = std::sync::Arc::new(db_ref.primary().clone());
855                let collector = forge_runtime::signals::SignalsCollector::spawn(
856                    signals_pool.clone(),
857                    self.config.signals.batch_size,
858                    *self.config.signals.flush_interval,
859                    self.config.signals.channel_capacity,
860                );
861                // Explicit MMDB path means the operator wants city-level data.
862                // Fail fast rather than silently downgrading to the embedded DB.
863                let geoip = match &self.config.signals.geoip_db_path {
864                    Some(path) => {
865                        let resolver = forge_runtime::signals::geoip::GeoIpResolver::from_mmdb(
866                            std::path::Path::new(path),
867                        )?;
868                        tracing::info!(path, "GeoIP: MaxMind MMDB loaded (city-level)");
869                        resolver
870                    }
871                    None => forge_runtime::signals::geoip::GeoIpResolver::new(),
872                };
873                gateway = gateway
874                    .with_signals_collector(collector)
875                    .with_signals_anonymize_ip(self.config.signals.anonymize_ip)
876                    .with_signals_geoip(geoip);
877
878                forge_runtime::signals::session::spawn_session_reaper(
879                    signals_pool.clone(),
880                    (self.config.signals.session_timeout.as_secs() / 60) as u32,
881                );
882
883                forge_runtime::signals::partition::ensure_partitions(&signals_pool).await;
884
885                {
886                    let partition_pool = signals_pool.clone();
887                    let retention_days = self.config.signals.retention_days;
888                    let partition_leader = leader_election.clone();
889                    let mut partition_shutdown = self.shutdown_tx.subscribe();
890                    handles.push(tokio::spawn(async move {
891                        loop {
892                            tokio::select! {
893                                _ = partition_shutdown.recv() => break,
894                                _ = tokio::time::sleep(Duration::from_secs(21_600)) => {}
895                            }
896                            let is_leader = partition_leader
897                                .as_ref()
898                                .map(|e| e.is_leader())
899                                .unwrap_or(true);
900                            if is_leader {
901                                forge_runtime::signals::partition::ensure_partitions(
902                                    &partition_pool,
903                                )
904                                .await;
905                                forge_runtime::signals::partition::drop_old_partitions(
906                                    &partition_pool,
907                                    retention_days,
908                                )
909                                .await;
910                                forge_runtime::signals::partition::check_default_partition(
911                                    &partition_pool,
912                                )
913                                .await;
914                            }
915                        }
916                    }));
917                }
918
919                tracing::info!("Signals enabled (analytics + diagnostics)");
920            }
921
922            // Webhooks ride the gateway middleware stack via `with_custom_routes`.
923            // The previous separate router had a hand-rolled CORS stack that
924            // deadlocked post-preflight browser POSTs and omitted the
925            // `x-webhook-timestamp` header.
926            let mut custom_routes: Option<Router> = self
927                .custom_routes_factory
928                .take()
929                .map(|factory| factory(pool.clone()));
930
931            if !self.webhook_registry.is_empty() {
932                use axum::extract::DefaultBodyLimit;
933                use axum::routing::post;
934
935                let webhook_state = WebhookState::new(self.webhook_registry.clone(), pool.clone());
936                #[cfg(feature = "jobs")]
937                let webhook_state = webhook_state.with_job_dispatcher(job_dispatcher.clone());
938                #[cfg(feature = "workflows")]
939                let webhook_state =
940                    webhook_state.with_workflow_dispatcher(workflow_executor.clone());
941                let webhook_state = webhook_state.with_kv(Arc::clone(&kv_handle));
942                let webhook_state = Arc::new(webhook_state);
943
944                let webhook_routes = Router::new()
945                    .route(
946                        "/webhooks/{*path}",
947                        post(webhook_handler).with_state(webhook_state),
948                    )
949                    .layer(DefaultBodyLimit::max(1024 * 1024));
950
951                custom_routes = Some(match custom_routes {
952                    Some(existing) => existing.merge(webhook_routes),
953                    None => webhook_routes,
954                });
955
956                tracing::debug!(
957                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
958                    "Webhook routes registered"
959                );
960            }
961
962            if let Some(routes) = custom_routes {
963                gateway = gateway.with_custom_routes(routes);
964                tracing::debug!("Custom and webhook routes merged into gateway middleware stack");
965            }
966
967            let reactor = gateway.reactor();
968            if let Err(e) = reactor.start().await {
969                tracing::error!("Failed to start reactor: {}", e);
970            } else {
971                tracing::debug!("Reactor started");
972                reactor_handle = Some(reactor);
973            }
974
975            let api_router = gateway.router();
976            let mut router = Router::new().nest("/_api", api_router);
977
978            if self.config.mcp.enabled {
979                use axum::routing::get;
980
981                // Return a parseable JSON 404 when OAuth is not configured so
982                // MCP clients get a clear signal rather than an HTML error page.
983                async fn oauth_not_supported() -> impl axum::response::IntoResponse {
984                    (
985                        axum::http::StatusCode::NOT_FOUND,
986                        axum::Json(serde_json::json!({
987                            "error": "oauth_not_supported",
988                            "error_description": "This server does not support OAuth. Connect without authentication."
989                        })),
990                    )
991                }
992
993                #[cfg(feature = "mcp-oauth")]
994                if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
995                    router = router.nest("/_api", oauth_api_router);
996                    router = router
997                        .route(
998                            "/.well-known/oauth-authorization-server",
999                            get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
1000                                .with_state(oauth_state.clone()),
1001                        )
1002                        .route(
1003                            "/.well-known/oauth-protected-resource",
1004                            get(forge_runtime::gateway::oauth::well_known_resource_metadata)
1005                                .with_state(oauth_state),
1006                        );
1007
1008                    tracing::info!("OAuth 2.1 endpoints enabled for MCP");
1009                } else {
1010                    router = router
1011                        .route(
1012                            "/.well-known/oauth-authorization-server",
1013                            get(oauth_not_supported),
1014                        )
1015                        .route(
1016                            "/.well-known/oauth-protected-resource",
1017                            get(oauth_not_supported),
1018                        );
1019                }
1020
1021                #[cfg(not(feature = "mcp-oauth"))]
1022                {
1023                    router = router
1024                        .route(
1025                            "/.well-known/oauth-authorization-server",
1026                            get(oauth_not_supported),
1027                        )
1028                        .route(
1029                            "/.well-known/oauth-protected-resource",
1030                            get(oauth_not_supported),
1031                        );
1032                }
1033            }
1034
1035            if let Some(handler) = self.frontend_handler {
1036                use axum::routing::get;
1037                router = router.fallback(get(handler));
1038                tracing::debug!("Frontend handler enabled");
1039            }
1040
1041            let addr = gateway.addr();
1042            let tls = gateway.tls().cloned();
1043            // Graceful shutdown drains in-flight requests before we release the
1044            // advisory lock, which also drains the mutation outbox (each flush
1045            // is part of the request transaction).
1046            let mut gateway_shutdown_rx = shutdown.subscribe();
1047
1048            handles.push(tokio::spawn(async move {
1049                tracing::debug!(addr = %addr, "Gateway server binding");
1050                let listener = match bind_listener(addr, tls.as_ref()).await {
1051                    Ok(l) => l,
1052                    Err(e) => {
1053                        tracing::error!(error = %e, "Failed to bind gateway listener");
1054                        return;
1055                    }
1056                };
1057                let serve = axum::serve(listener, router).with_graceful_shutdown(async move {
1058                    let _ = gateway_shutdown_rx.wait_for(|v| *v).await;
1059                    tracing::debug!("Gateway draining in-flight requests");
1060                });
1061                if let Err(e) = serve.await {
1062                    tracing::error!("Gateway server error: {}", e);
1063                }
1064            }));
1065        }
1066
1067        #[cfg(feature = "jobs")]
1068        let jobs_count = self.job_registry.len();
1069        #[cfg(not(feature = "jobs"))]
1070        let jobs_count: usize = 0;
1071        #[cfg(feature = "cron")]
1072        let crons_count = self.cron_registry.len();
1073        #[cfg(not(feature = "cron"))]
1074        let crons_count: usize = 0;
1075        #[cfg(feature = "workflows")]
1076        let workflows_count = self.workflow_registry.len();
1077        #[cfg(not(feature = "workflows"))]
1078        let workflows_count: usize = 0;
1079        #[cfg(feature = "daemons")]
1080        let daemons_count = self.daemon_registry.len();
1081        #[cfg(not(feature = "daemons"))]
1082        let daemons_count: usize = 0;
1083        #[cfg(feature = "gateway")]
1084        let webhooks_count = self.webhook_registry.len();
1085        #[cfg(not(feature = "gateway"))]
1086        let webhooks_count: usize = 0;
1087        #[cfg(feature = "gateway")]
1088        let mcp_tools_count = self.mcp_registry.len();
1089        #[cfg(not(feature = "gateway"))]
1090        let mcp_tools_count: usize = 0;
1091
1092        tracing::info!(
1093            queries = self.function_registry.queries().count(),
1094            mutations = self.function_registry.mutations().count(),
1095            jobs = jobs_count,
1096            crons = crons_count,
1097            workflows = workflows_count,
1098            daemons = daemons_count,
1099            webhooks = webhooks_count,
1100            mcp_tools = mcp_tools_count,
1101            "Functions registered"
1102        );
1103
1104        {
1105            let pool = pool.clone();
1106            tokio::spawn(async move {
1107                loop {
1108                    tokio::time::sleep(Duration::from_secs(15)).await;
1109                    forge_runtime::observability::record_pool_metrics(&pool);
1110                }
1111            });
1112        }
1113
1114        let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
1115        let capabilities = &self.config.node.worker_capabilities;
1116        tracing::info!(
1117            node_id = %node_id,
1118            project = %self.config.project.name,
1119            version = env!("CARGO_PKG_VERSION"),
1120            roles = ?role_names,
1121            worker_capabilities = ?capabilities,
1122            port = self.config.gateway.port,
1123            db_pool_size = self.config.database.pool_size,
1124            cluster_discovery = ?self.config.cluster.discovery,
1125            observability = self.config.observability.enabled,
1126            mcp = self.config.mcp.enabled,
1127            "Forge started"
1128        );
1129
1130        let mut shutdown_rx = self.shutdown_tx.subscribe();
1131
1132        tokio::select! {
1133            _ = tokio::signal::ctrl_c() => {
1134                tracing::debug!("Received ctrl-c");
1135            }
1136            _ = shutdown_rx.recv() => {
1137                tracing::debug!("Received shutdown notification");
1138            }
1139        }
1140
1141        tracing::debug!("Graceful shutdown starting");
1142
1143        // No-op if already triggered via `Forge::shutdown()`.
1144        let _ = self.shutdown_tx.send(());
1145
1146        // Cancel leader tasks before joining so they wind down concurrently.
1147        #[cfg(feature = "workflows")]
1148        workflow_shutdown_token.cancel();
1149
1150        #[cfg(feature = "cron")]
1151        if let Some(ref runner) = cron_runner_handle {
1152            runner.stop().await;
1153        }
1154
1155        // Drain leader tasks before releasing the advisory lock to prevent a
1156        // sibling from starting duplicate work while we are mid-tick.
1157        tracing::debug!("Waiting for leader-held tasks to drain");
1158        for handle in leader_handles {
1159            let _ = handle.await;
1160        }
1161        tracing::debug!("Leader-held tasks drained");
1162
1163        // Advisory lock released here — only after leader tasks have fully stopped.
1164        if let Err(e) = shutdown.shutdown().await {
1165            tracing::warn!(error = %e, "Shutdown error");
1166        }
1167
1168        if let Some(ref election) = leader_election {
1169            election.stop();
1170        }
1171
1172        #[cfg(feature = "gateway")]
1173        if let Some(ref reactor) = reactor_handle {
1174            reactor.stop();
1175        }
1176
1177        if let Some(ref db) = self.db {
1178            db.close().await;
1179        }
1180
1181        forge_runtime::shutdown_telemetry();
1182        tracing::info!("Forge stopped");
1183        Ok(())
1184    }
1185
1186    /// Request shutdown.
1187    pub fn shutdown(&self) {
1188        let _ = self.shutdown_tx.send(());
1189    }
1190}
1191
1192#[cfg(test)]
1193#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1194mod tests {
1195    use super::*;
1196    use std::future::Future;
1197    use std::pin::Pin;
1198
1199    use forge_core::config::NodeRole as ConfigNodeRole;
1200    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1201
1202    struct TestMcpTool;
1203
1204    impl forge_core::__sealed::Sealed for TestMcpTool {}
1205
1206    impl ForgeMcpTool for TestMcpTool {
1207        type Args = serde_json::Value;
1208        type Output = serde_json::Value;
1209
1210        fn info() -> McpToolInfo {
1211            McpToolInfo {
1212                name: "test.mcp.tool",
1213                title: None,
1214                description: None,
1215                required_role: None,
1216                is_public: false,
1217                timeout: None,
1218                rate_limit_requests: None,
1219                rate_limit_per_secs: None,
1220                rate_limit_key: None,
1221                annotations: McpToolAnnotations::default(),
1222                icons: &[],
1223            }
1224        }
1225
1226        fn execute(
1227            _ctx: &forge_core::McpToolContext,
1228            _args: Self::Args,
1229        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1230            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1231        }
1232    }
1233
1234    #[test]
1235    fn test_forge_builder_new() {
1236        let builder = ForgeBuilder::new();
1237        assert!(builder.config.is_none());
1238    }
1239
1240    #[test]
1241    fn test_forge_builder_requires_config() {
1242        let builder = ForgeBuilder::new();
1243        let result = builder.build();
1244        assert!(result.is_err());
1245    }
1246
1247    #[test]
1248    fn test_forge_builder_with_config() {
1249        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1250        let result = ForgeBuilder::new().config(config).build();
1251        assert!(result.is_ok());
1252    }
1253
1254    #[test]
1255    fn test_forge_builder_register_mcp_tool() {
1256        let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1257        assert_eq!(builder.mcp_registry.len(), 1);
1258    }
1259
1260    #[test]
1261    fn test_config_role_conversion() {
1262        use builder::config_role_to_node_role;
1263        assert_eq!(
1264            config_role_to_node_role(&ConfigNodeRole::Gateway),
1265            NodeRole::Gateway
1266        );
1267        assert_eq!(
1268            config_role_to_node_role(&ConfigNodeRole::Worker),
1269            NodeRole::Worker
1270        );
1271        assert_eq!(
1272            config_role_to_node_role(&ConfigNodeRole::Scheduler),
1273            NodeRole::Scheduler
1274        );
1275        assert_eq!(
1276            config_role_to_node_role(&ConfigNodeRole::Function),
1277            NodeRole::Function
1278        );
1279    }
1280}