Skip to main content

forge/runtime/
mod.rs

1//! FORGE runtime — single binary that runs the gateway, workers, scheduler, and cluster.
2
3mod builder;
4pub use builder::ForgeBuilder;
5
6#[cfg(feature = "gateway")]
7use std::future::Future;
8use std::net::IpAddr;
9use std::path::PathBuf;
10#[cfg(feature = "gateway")]
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use uuid::Uuid;
16
17#[cfg(feature = "gateway")]
18use axum::Router;
19#[cfg(feature = "gateway")]
20use axum::body::Body;
21#[cfg(feature = "gateway")]
22use axum::http::Request;
23#[cfg(feature = "gateway")]
24use axum::response::Response;
25use tokio::sync::broadcast;
26
27use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
28use forge_core::config::ForgeConfig;
29use forge_core::error::{ForgeError, Result};
30use forge_runtime::pg::migration::{Migration, MigrationRunner, load_migrations_from_dir};
31
32#[cfg(feature = "gateway")]
33use forge_core::mcp::ForgeMcpTool;
34use forge_runtime::cluster::{
35    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, NodeRegistry, ShutdownConfig,
36};
37#[cfg(feature = "cron")]
38use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
39#[cfg(feature = "daemons")]
40use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
41use forge_runtime::function::FunctionRegistry;
42use forge_runtime::pg::Database;
43use forge_runtime::pg::{LeaderConfig, LeaderElection, PgNotifyBus};
44// CircuitBreakerClient wraps reqwest; used by cron/daemon/workflow for
45// outbound HTTP. (Gateway uses its own reqwest path.)
46#[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
47use forge_core::CircuitBreakerClient;
48#[cfg(feature = "gateway")]
49use forge_runtime::gateway::{
50    AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer, TlsListenConfig,
51    bind_listener,
52};
53#[cfg(feature = "jobs")]
54use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
55#[cfg(feature = "gateway")]
56use forge_runtime::mcp::McpToolRegistry;
57#[cfg(feature = "gateway")]
58use forge_runtime::realtime::{
59    InvalidationConfig, ListenerConfig, ReactorConfig, RealtimeConfig as RuntimeRealtimeConfig,
60};
61#[cfg(feature = "gateway")]
62use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
63#[cfg(feature = "workflows")]
64use forge_runtime::workflow::{
65    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
66};
67#[cfg(feature = "workflows")]
68use tokio_util::sync::CancellationToken;
69
70use builder::{config_role_to_node_role, get_hostname};
71
72/// Type alias for frontend handler function.
73#[cfg(feature = "gateway")]
74pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
75
76/// Common imports for Forge applications.
77///
78/// **Stability contract:** all `pub use` items here are part of Forge's stable
79/// surface. Removing or renaming an item is a breaking change.
80///
81/// **Upstream crates:** `axum` is re-exported so `custom_routes` factories
82/// compile against the same version the runtime uses. A breaking axum change
83/// requires a Forge minor or major bump.
84pub mod prelude {
85    pub use chrono::{DateTime, Utc};
86    pub use uuid::Uuid;
87
88    pub use serde::{Deserialize, Serialize};
89    pub use serde_json;
90    pub use serde_json::Value;
91
92    pub type Timestamp = DateTime<Utc>;
93
94    pub use forge_core::auth::TokenPair;
95    pub use forge_core::config::ForgeConfig;
96    pub use forge_core::cron::{CronContext, ForgeCron};
97    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
98    // EnvAccess adds `ctx.env(...)` / `ctx.env_require(...)` — kept in the
99    // glob so handlers don't need an explicit import.
100    pub use forge_core::env::EnvAccess;
101    pub use forge_core::error::{ForgeError, Result};
102    pub use forge_core::function::{
103        AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
104    };
105    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
106    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext};
107    pub use forge_core::realtime::Delta;
108    pub use forge_core::schemars::JsonSchema;
109    pub use forge_core::types::Upload;
110    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
111    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
112
113    // Same axum version the runtime uses; avoids type mismatches in custom_routes factories.
114    #[cfg(feature = "gateway")]
115    pub use axum;
116
117    pub use crate::{Forge, ForgeBuilder};
118
119    pub use forge_core::testing::{
120        TestCronContext, TestDaemonContext, TestJobContext, TestMcpToolContext,
121        TestMutationContext, TestQueryContext, TestWebhookContext, TestWorkflowContext,
122    };
123}
124
125/// The main FORGE runtime.
126pub struct Forge {
127    pub(super) config: ForgeConfig,
128    pub(super) db: Option<Database>,
129    pub(super) node_id: NodeId,
130    pub(super) function_registry: FunctionRegistry,
131    #[cfg(feature = "gateway")]
132    pub(super) mcp_registry: McpToolRegistry,
133    #[cfg(feature = "jobs")]
134    pub(super) job_registry: JobRegistry,
135    #[cfg(feature = "cron")]
136    pub(super) cron_registry: Arc<CronRegistry>,
137    #[cfg(feature = "workflows")]
138    pub(super) workflow_registry: WorkflowRegistry,
139    #[cfg(feature = "daemons")]
140    pub(super) daemon_registry: Arc<DaemonRegistry>,
141    #[cfg(feature = "gateway")]
142    pub(super) webhook_registry: Arc<WebhookRegistry>,
143    pub(super) shutdown_tx: broadcast::Sender<()>,
144    pub(super) migrations_dir: PathBuf,
145    pub(super) extra_migrations: Vec<Migration>,
146    #[cfg(feature = "gateway")]
147    pub(super) frontend_handler: Option<FrontendHandler>,
148    #[cfg(feature = "gateway")]
149    pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
150    #[cfg(feature = "gateway")]
151    pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
152}
153
154impl Forge {
155    pub fn builder() -> ForgeBuilder {
156        ForgeBuilder::new()
157    }
158
159    pub fn node_id(&self) -> NodeId {
160        self.node_id
161    }
162
163    pub fn config(&self) -> &ForgeConfig {
164        &self.config
165    }
166
167    pub fn function_registry(&self) -> &FunctionRegistry {
168        &self.function_registry
169    }
170
171    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
172        &mut self.function_registry
173    }
174
175    #[cfg(feature = "gateway")]
176    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
177        &mut self.mcp_registry
178    }
179
180    #[cfg(feature = "gateway")]
181    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
182        self.mcp_registry.register::<T>();
183        self
184    }
185
186    #[cfg(feature = "jobs")]
187    pub fn job_registry(&self) -> &JobRegistry {
188        &self.job_registry
189    }
190
191    #[cfg(feature = "jobs")]
192    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
193        &mut self.job_registry
194    }
195
196    #[cfg(feature = "cron")]
197    pub fn cron_registry(&self) -> Arc<CronRegistry> {
198        self.cron_registry.clone()
199    }
200
201    #[cfg(feature = "workflows")]
202    pub fn workflow_registry(&self) -> &WorkflowRegistry {
203        &self.workflow_registry
204    }
205
206    #[cfg(feature = "workflows")]
207    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
208        &mut self.workflow_registry
209    }
210
211    #[cfg(feature = "daemons")]
212    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
213        self.daemon_registry.clone()
214    }
215
216    #[cfg(feature = "gateway")]
217    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
218        self.webhook_registry.clone()
219    }
220
221    /// Persist all workflow definitions. Fails startup if a signature conflicts
222    /// under the same name+version (contract changed without a version bump).
223    #[cfg(feature = "workflows")]
224    async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
225        for info in self.workflow_registry.definitions() {
226            let status = info.status.as_str();
227
228            // Try to insert. If row exists, check signature matches.
229            let existing = sqlx::query!(
230                r#"
231                SELECT workflow_signature FROM forge_workflow_definitions
232                WHERE workflow_name = $1 AND workflow_version = $2
233                "#,
234                info.name,
235                info.version,
236            )
237            .fetch_optional(pool)
238            .await
239            .map_err(ForgeError::Database)?;
240
241            if let Some(row) = existing {
242                if row.workflow_signature != info.signature {
243                    return Err(ForgeError::config(format!(
244                        "Workflow '{}' version '{}' has a different signature than previously registered. \
245                         Persisted contract changed under the same version. \
246                         Expected signature: {}, got: {}. \
247                         Create a new version instead of modifying the existing one.",
248                        info.name, info.version, row.workflow_signature, info.signature
249                    )));
250                }
251                sqlx::query!(
252                    "UPDATE forge_workflow_definitions SET status = $3 WHERE workflow_name = $1 AND workflow_version = $2",
253                    info.name,
254                    info.version,
255                    status,
256                )
257                .execute(pool)
258                .await
259                .map_err(ForgeError::Database)?;
260            } else {
261                sqlx::query!(
262                    r#"
263                    INSERT INTO forge_workflow_definitions (workflow_name, workflow_version, workflow_signature, status)
264                    VALUES ($1, $2, $3, $4)
265                    "#,
266                    info.name,
267                    info.version,
268                    info.signature,
269                    status,
270                )
271                .execute(pool)
272                .await
273                .map_err(ForgeError::Database)?;
274            }
275
276            tracing::debug!(
277                workflow = info.name,
278                version = info.version,
279                signature = info.signature,
280                status = status,
281                "Workflow definition registered"
282            );
283        }
284
285        Ok(())
286    }
287
288    /// Start the runtime. Blocks until a ctrl-c or `Forge::shutdown()` is called.
289    pub async fn run(mut self) -> Result<()> {
290        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
291            &self.config.observability,
292            &self.config.project.name,
293            &self.config.project.version,
294        );
295        let telemetry_result = forge_runtime::init_telemetry(
296            &telemetry_config,
297            &self.config.project.name,
298            &self.config.observability.log_level,
299        );
300        match &telemetry_result {
301            Ok(true) | Ok(false) => {
302                tracing::debug!(
303                    endpoint = %telemetry_config.otlp_endpoint,
304                    traces = telemetry_config.enable_traces,
305                    metrics = telemetry_config.enable_metrics,
306                    logs = telemetry_config.enable_logs,
307                    sampling = telemetry_config.sampling_ratio,
308                    "Telemetry initialized"
309                );
310            }
311            // init_telemetry failed before a subscriber could be installed; tracing
312            // macros would be silently dropped, so fall back to eprintln!.
313            Err(e) => eprintln!("forge: failed to initialize telemetry: {e}"),
314        }
315
316        tracing::debug!("Connecting to database");
317
318        let db =
319            Database::from_config_with_service(&self.config.database, &self.config.project.name)
320                .await?;
321        let pool = db.primary().clone();
322        // Health monitor self-terminates on shutdown_tx; drop the handle.
323        let _ = db.start_health_monitor(self.shutdown_tx.subscribe());
324        self.db = Some(db);
325
326        tracing::debug!("Database connected");
327
328        let runner = MigrationRunner::new(pool.clone());
329
330        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
331        user_migrations.extend(self.extra_migrations.clone());
332
333        runner.run(user_migrations).await?;
334        tracing::debug!("Migrations applied");
335
336        #[cfg(feature = "workflows")]
337        if !self.workflow_registry.is_empty() {
338            self.persist_workflow_definitions(&pool).await?;
339        }
340
341        let hostname = get_hostname();
342
343        // HOST env var overrides bind address; PORT env var overrides config port.
344        let ip_address: IpAddr = std::env::var("HOST")
345            .unwrap_or_else(|_| "0.0.0.0".to_string())
346            .parse()
347            .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
348
349        if let Ok(port_str) = std::env::var("PORT")
350            && let Ok(port) = port_str.parse::<u16>()
351        {
352            self.config.gateway.port = port;
353        }
354
355        let roles: Vec<NodeRole> = self
356            .config
357            .node
358            .roles
359            .iter()
360            .map(config_role_to_node_role)
361            .collect();
362
363        let node_info = NodeInfo::new_local(
364            hostname,
365            ip_address,
366            self.config.gateway.port,
367            self.config.gateway.grpc_port,
368            roles.clone(),
369            self.config.node.worker_capabilities.clone(),
370            env!("CARGO_PKG_VERSION").to_string(),
371        );
372
373        let node_id = node_info.id;
374        self.node_id = node_id;
375
376        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
377
378        if let Err(e) = node_registry.register().await {
379            tracing::debug!("Failed to register node (tables may not exist): {}", e);
380        }
381
382        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
383            tracing::debug!("Failed to set node status: {}", e);
384        }
385
386        // Construct the shared PG NOTIFY bus up front so the leader election
387        // can subscribe to `forge_leader_released` and react instantly to a
388        // sibling's voluntary release instead of waiting for the next tick.
389        let notify_bus = Arc::new(PgNotifyBus::new(
390            pool.clone(),
391            &[
392                "forge_changes",
393                "forge_jobs_available",
394                "forge_workflow_wakeup",
395                forge_runtime::pg::LEADER_RELEASED_CHANNEL,
396            ],
397        ));
398
399        let leader_election = if roles.contains(&NodeRole::Scheduler) {
400            let election = Arc::new(
401                LeaderElection::new(
402                    pool.clone(),
403                    node_id,
404                    LeaderRole::Scheduler,
405                    LeaderConfig::default(),
406                )
407                .with_notify_bus(notify_bus.clone()),
408            );
409
410            // Try to become leader
411            if let Err(e) = election.try_become_leader().await {
412                tracing::debug!("Failed to acquire leadership: {}", e);
413            }
414
415            Some(election)
416        } else {
417            None
418        };
419
420        let shutdown = Arc::new(GracefulShutdown::new(
421            node_registry.clone(),
422            leader_election.clone(),
423            ShutdownConfig::default(),
424        ));
425
426        #[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
427        let http_client = CircuitBreakerClient::with_ssrf_protection();
428
429        let mut handles = Vec::new();
430        // Leader handles: cron, daemon, workflow scheduler. Must finish before
431        // releasing the advisory lock so no sibling starts duplicate work mid-tick.
432        let mut leader_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
433
434        {
435            let heartbeat_pool = pool.clone();
436            let heartbeat_node_id = node_id;
437            let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
438            handles.push(tokio::spawn(async move {
439                match HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config).await {
440                    Ok(heartbeat) => heartbeat.run().await,
441                    Err(e) => tracing::error!(error = %e, "Failed to start heartbeat loop"),
442                }
443            }));
444        }
445
446        if let Some(ref election) = leader_election {
447            let election = election.clone();
448            handles.push(tokio::spawn(async move {
449                election.run().await;
450            }));
451        }
452
453        #[cfg(feature = "cron")]
454        {
455            forge_runtime::cron::register_cron_bridges(&self.cron_registry, &mut self.job_registry);
456        }
457
458        #[cfg(feature = "jobs")]
459        let job_queue = JobQueue::new(pool.clone());
460
461        // Gate the direct bus spawn on the absence of a gateway role: the reactor
462        // already calls bus.run() for gateway nodes, and two spawns on the same
463        // PgNotifyBus instance would race.
464        #[cfg(feature = "gateway")]
465        let notify_bus_needs_direct_spawn = !roles.contains(&NodeRole::Gateway);
466        #[cfg(not(feature = "gateway"))]
467        let notify_bus_needs_direct_spawn = true;
468        if notify_bus_needs_direct_spawn {
469            let (bus_shutdown_tx, bus_shutdown_rx) = tokio::sync::watch::channel(false);
470            let bus_for_task = notify_bus.clone();
471            handles.push(tokio::spawn(async move {
472                bus_for_task.run(bus_shutdown_rx).await;
473            }));
474            let mut bus_broadcast_rx = self.shutdown_tx.subscribe();
475            tokio::spawn(async move {
476                let _ = bus_broadcast_rx.recv().await;
477                let _ = bus_shutdown_tx.send(true);
478            });
479        }
480
481        let kv_handle: Arc<dyn forge_core::function::KvHandle> =
482            Arc::new(forge_runtime::KvStore::new(pool.clone(), "handlers"));
483
484        // Must register the workflow bridge BEFORE spawning workers: JobRegistry
485        // is cloned by value per worker, so late registration is invisible and
486        // `$workflow_resume` jobs would fail with "unknown job type".
487        #[cfg(feature = "workflows")]
488        let workflow_bridge_executor = Arc::new(
489            WorkflowExecutor::new(
490                Arc::new(self.workflow_registry.clone()),
491                pool.clone(),
492                job_queue.clone(),
493                http_client.clone(),
494            )
495            .with_kv(Arc::clone(&kv_handle)),
496        );
497        #[cfg(feature = "workflows")]
498        {
499            forge_runtime::workflow::register_workflow_bridge(
500                workflow_bridge_executor.clone(),
501                &mut self.job_registry,
502            );
503        }
504
505        // Dispatcher must be constructed before workers so it can be threaded
506        // into each JobExecutor. Without it, `ctx.start_workflow(...)` writes
507        // blank version/signature columns and immediately blocks on resume.
508        #[cfg(feature = "jobs")]
509        let job_dispatcher = {
510            let job_queue_for_dispatch = JobQueue::new(pool.clone());
511            Arc::new(JobDispatcher::new(
512                job_queue_for_dispatch,
513                self.job_registry.clone(),
514            ))
515        };
516
517        // One Worker per queue so `default` traffic can't starve `workflows` or `cron`.
518        // Only the `default` queue's worker claims untagged (NULL capability) jobs.
519        #[cfg(feature = "jobs")]
520        if roles.contains(&NodeRole::Worker) {
521            let mut node_capabilities: Vec<String> = self.config.node.worker_capabilities.clone();
522            for queue_name in self.config.worker.queues.keys() {
523                if !node_capabilities.iter().any(|c| c == queue_name) {
524                    node_capabilities.push(queue_name.clone());
525                }
526            }
527
528            for (queue_name, queue_cfg) in &self.config.worker.queues {
529                if queue_cfg.workers == 0 {
530                    continue;
531                }
532                let worker_id = Uuid::new_v4();
533                let claim_untagged = queue_name == forge_core::config::DEFAULT_QUEUE;
534                let worker_config = WorkerConfig {
535                    id: Some(worker_id),
536                    capabilities: vec![queue_name.clone()],
537                    claim_untagged,
538                    max_concurrent: queue_cfg.workers,
539                    poll_interval: *self.config.worker.poll_interval,
540                    ..Default::default()
541                };
542
543                let worker_base = Worker::new(
544                    worker_config,
545                    job_queue.clone(),
546                    self.job_registry.clone(),
547                    pool.clone(),
548                    notify_bus.clone(),
549                )
550                .with_kv(Arc::clone(&kv_handle))
551                .with_job_dispatch(job_dispatcher.clone());
552
553                #[cfg(feature = "workflows")]
554                let mut worker =
555                    worker_base.with_workflow_dispatch(workflow_bridge_executor.clone());
556                #[cfg(not(feature = "workflows"))]
557                let mut worker = worker_base;
558
559                let queue_label = queue_name.clone();
560                handles.push(tokio::spawn(async move {
561                    if let Err(e) = worker.run().await {
562                        tracing::error!(queue = %queue_label, "Worker error: {}", e);
563                    }
564                }));
565
566                tracing::debug!(
567                    queue = %queue_name,
568                    workers = queue_cfg.workers,
569                    "Job worker pool started",
570                );
571            }
572
573            // pool_size >= sum(workers) + 6 (persistent conns: change listener,
574            // leader lock, heartbeat, health monitor, migration lock, headroom).
575            // Gateway handlers draw from the same pool but hold connections briefly.
576            let total_worker_concurrency: usize =
577                self.config.worker.queues.values().map(|q| q.workers).sum();
578            const PERSISTENT_CONN_OVERHEAD: usize = 6;
579            let min_recommended = total_worker_concurrency + PERSISTENT_CONN_OVERHEAD;
580            if (self.config.database.pool_size as usize) < min_recommended {
581                tracing::warn!(
582                    pool_size = self.config.database.pool_size,
583                    total_worker_concurrency,
584                    min_recommended,
585                    "database.pool_size ({}) is below the recommended minimum ({}) for the \
586                     configured worker concurrency. \
587                     Formula: sum(workers per queue) + 6 = {} + 6 = {}. \
588                     Increase database.pool_size to avoid connection exhaustion under load.",
589                    self.config.database.pool_size,
590                    min_recommended,
591                    total_worker_concurrency,
592                    min_recommended,
593                );
594            }
595        }
596
597        #[cfg(feature = "jobs")]
598        if roles.contains(&NodeRole::Worker) {
599            let kv_pool = pool.clone();
600            let mut kv_shutdown = self.shutdown_tx.subscribe();
601            let kv_leader = leader_election.clone();
602            handles.push(tokio::spawn(async move {
603                let kv = forge_runtime::KvStore::new(kv_pool.clone(), "app");
604                let rate_limiter = forge_runtime::StrictRateLimiter::new(kv_pool);
605                loop {
606                    tokio::select! {
607                        _ = kv_shutdown.recv() => break,
608                        _ = tokio::time::sleep(Duration::from_secs(300)) => {}
609                    }
610                    let is_leader = kv_leader.as_ref().map(|e| e.is_leader()).unwrap_or(true);
611                    if !is_leader {
612                        continue;
613                    }
614                    match kv.cleanup_expired().await {
615                        Ok(n) if n > 0 => tracing::debug!(count = n, "KV TTL cleanup"),
616                        Err(e) => tracing::warn!(error = %e, "KV TTL cleanup failed"),
617                        _ => {}
618                    }
619                    let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
620                    match rate_limiter.cleanup(cutoff).await {
621                        Ok(n) if n > 0 => tracing::debug!(count = n, "Rate limit bucket cleanup"),
622                        Err(e) => tracing::warn!(error = %e, "Rate limit cleanup failed"),
623                        _ => {}
624                    }
625                }
626            }));
627        }
628
629        #[cfg(feature = "cron")]
630        let cron_runner_handle: Option<Arc<CronRunner>> = if roles.contains(&NodeRole::Scheduler) {
631            let cron_registry = self.cron_registry.clone();
632            let cron_pool = pool.clone();
633            let cron_leader_election = leader_election.clone();
634
635            let cron_config = CronRunnerConfig {
636                poll_interval: *self.config.cron.poll_interval,
637                node_id: node_id.as_uuid(),
638                is_leader: cron_leader_election.is_none(),
639                leader_election: cron_leader_election,
640                run_stale_threshold: Duration::from_secs(15 * 60),
641                ..Default::default()
642            };
643
644            let cron_runner = Arc::new(CronRunner::new(
645                cron_registry,
646                cron_pool,
647                job_queue.clone(),
648                cron_config,
649            ));
650            let cron_runner_clone = cron_runner.clone();
651
652            leader_handles.push(tokio::spawn(async move {
653                if let Err(e) = cron_runner_clone.run().await {
654                    tracing::error!("Cron runner error: {}", e);
655                }
656            }));
657
658            tracing::debug!("Cron scheduler started");
659            Some(cron_runner)
660        } else {
661            None
662        };
663
664        #[cfg(feature = "workflows")]
665        let workflow_shutdown_token = CancellationToken::new();
666        #[cfg(feature = "workflows")]
667        if roles.contains(&NodeRole::Scheduler) {
668            let event_store = Arc::new(EventStore::new(pool.clone()));
669            let scheduler = WorkflowScheduler::new(
670                pool.clone(),
671                job_queue.clone(),
672                event_store,
673                WorkflowSchedulerConfig {
674                    poll_interval: *self.config.workflow.poll_interval,
675                    leader_election: leader_election.clone(),
676                    ..WorkflowSchedulerConfig::default()
677                },
678                notify_bus.clone(),
679            );
680
681            let shutdown_token = workflow_shutdown_token.clone();
682            leader_handles.push(tokio::spawn(async move {
683                scheduler.run(shutdown_token).await;
684            }));
685
686            tracing::debug!("Workflow scheduler started");
687        }
688
689        #[cfg(feature = "workflows")]
690        let workflow_executor = workflow_bridge_executor;
691
692        #[cfg(feature = "daemons")]
693        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
694            let daemon_registry = self.daemon_registry.clone();
695            let daemon_pool = pool.clone();
696            let daemon_http = http_client.clone();
697            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
698
699            let daemon_runner = DaemonRunner::new(
700                daemon_registry,
701                daemon_pool,
702                daemon_http,
703                node_id.as_uuid(),
704                daemon_shutdown_rx,
705            )
706            .with_config(forge_runtime::daemon::DaemonRunnerConfig {
707                health_check_interval: *self.config.daemon.health_check_interval,
708                heartbeat_interval: *self.config.daemon.heartbeat_interval,
709            });
710            #[cfg(feature = "jobs")]
711            let daemon_runner = daemon_runner.with_job_dispatch(job_dispatcher.clone());
712            #[cfg(feature = "workflows")]
713            let daemon_runner = daemon_runner.with_workflow_dispatch(workflow_executor.clone());
714            let daemon_runner = daemon_runner.with_kv(Arc::clone(&kv_handle));
715
716            leader_handles.push(tokio::spawn(async move {
717                if let Err(e) = daemon_runner.run().await {
718                    tracing::error!("Daemon runner error: {}", e);
719                }
720            }));
721
722            tracing::debug!("Daemon runner started");
723        }
724
725        #[cfg(feature = "gateway")]
726        let mut reactor_handle = None;
727
728        #[cfg(feature = "gateway")]
729        if roles.contains(&NodeRole::Gateway) {
730            // `from_core` enforces the both-or-neither contract here too, so
731            // a programmatically constructed `ForgeConfig` that bypasses
732            // `validate()` still can't slip a half-set TLS config through.
733            let tls: Option<TlsListenConfig> =
734                TlsListenConfig::from_core(&self.config.gateway.tls)?;
735
736            // Fail early if handlers require auth but no usable credentials are configured.
737            // The registry is populated at this point so we can inspect every handler.
738            let any_requires_auth = self
739                .function_registry
740                .queries()
741                .any(|(_, info)| !info.is_public || info.required_role.is_some())
742                || self
743                    .function_registry
744                    .mutations()
745                    .any(|(_, info)| !info.is_public || info.required_role.is_some());
746
747            if any_requires_auth && !self.config.auth.is_configured() {
748                return Err(ForgeError::config(
749                    "One or more handlers require authentication (private scope or require_role) \
750                     but auth is not configured. Set auth.jwt_secret (≥32 bytes) for HMAC or \
751                     auth.jwks_url for external identity providers.",
752                ));
753            }
754
755            // CORS wildcard is only allowed in development. Block startup
756            // unless FORGE_ENV is explicitly set to "development".
757            if self.config.gateway.cors_enabled
758                && self.config.gateway.cors_origins.iter().any(|o| o == "*")
759            {
760                let forge_env = std::env::var("FORGE_ENV").ok();
761                let is_dev = forge_env
762                    .as_deref()
763                    .is_some_and(|v| v.eq_ignore_ascii_case("development"));
764                if !is_dev {
765                    let production_indicators = [
766                        ("FORGE_ENV", std::env::var("FORGE_ENV").ok()),
767                        ("NODE_ENV", std::env::var("NODE_ENV").ok()),
768                        (
769                            "RAILWAY_ENVIRONMENT",
770                            std::env::var("RAILWAY_ENVIRONMENT").ok(),
771                        ),
772                        ("K_SERVICE", std::env::var("K_SERVICE").ok()),
773                        ("FLY_APP_NAME", std::env::var("FLY_APP_NAME").ok()),
774                        (
775                            "KUBERNETES_SERVICE_HOST",
776                            std::env::var("KUBERNETES_SERVICE_HOST").ok(),
777                        ),
778                        ("AWS_EXECUTION_ENV", std::env::var("AWS_EXECUTION_ENV").ok()),
779                    ];
780                    let hint = production_indicators
781                        .iter()
782                        .find_map(|(name, val)| {
783                            val.as_ref().map(|v| format!(" ({name}={v} detected)"))
784                        })
785                        .unwrap_or_default();
786                    return Err(ForgeError::config(format!(
787                        "gateway.cors_origins = [\"*\"] is only allowed when FORGE_ENV=development{hint}. \
788                         Set explicit origins (e.g. cors_origins = [\"https://yourdomain.com\"])."
789                    )));
790                }
791            }
792
793            let gateway_config = RuntimeGatewayConfig {
794                port: self.config.gateway.port,
795                max_connections: self.config.gateway.max_connections,
796                sse_max_sessions: self.config.realtime.sse_max_sessions,
797                request_timeout_secs: self.config.gateway.request_timeout.as_secs(),
798                cors_enabled: self.config.gateway.cors_enabled,
799                cors_origins: self.config.gateway.cors_origins.clone(),
800                auth: AuthConfig::from_forge_config(&self.config.auth)
801                    .map_err(|e| ForgeError::config(e.to_string()))?,
802                mcp: self.config.mcp.clone(),
803                quiet_paths: self.config.gateway.quiet_paths.clone(),
804                max_body_size_bytes: self.config.gateway.max_body_size.as_bytes(),
805                max_json_body_bytes: self.config.gateway.max_json_body_size.as_bytes(),
806                max_file_size_bytes: self.config.gateway.max_file_size.as_bytes(),
807                token_ttl: forge_core::AuthTokenTtl::new(
808                    self.config.auth.access_token_ttl_secs(),
809                    self.config.auth.refresh_token_ttl_days(),
810                ),
811                project_name: self.config.project.name.clone(),
812                tls,
813                reactor_config: {
814                    let rt = &self.config.realtime;
815                    ReactorConfig {
816                        listener: ListenerConfig {
817                            buffer_size: rt.postgres_change_buffer_size,
818                            ..ListenerConfig::default()
819                        },
820                        invalidation: InvalidationConfig {
821                            debounce_ms: rt.debounce_quiet_window.as_millis(),
822                            max_debounce_ms: rt.debounce_max_wait.as_millis(),
823                            ..InvalidationConfig::default()
824                        },
825                        realtime: RuntimeRealtimeConfig {
826                            max_subscriptions_per_session: rt.subscription_max_per_session,
827                        },
828                        max_concurrent_reexecutions: rt.max_concurrent_reexecutions,
829                        resync_interval_secs: rt.resync_interval.as_secs(),
830                        shard_count: rt.shard_count,
831                        ..ReactorConfig::default()
832                    }
833                },
834                max_multipart_fields: self.config.gateway.max_multipart_fields,
835                max_sessions_per_user: self.config.realtime.max_sessions_per_user,
836                max_sessions_per_ip: self.config.realtime.max_sessions_per_ip,
837                max_subscriptions_per_user: self.config.realtime.max_subscriptions_per_user,
838                security_headers: self.config.gateway.security_headers,
839                hsts: self.config.gateway.hsts,
840                trusted_proxies: self
841                    .config
842                    .gateway
843                    .trusted_proxies
844                    .iter()
845                    .filter_map(|s| {
846                        s.parse::<ipnet::IpNet>()
847                            .or_else(|_| s.parse::<std::net::IpAddr>().map(ipnet::IpNet::from))
848                            .ok()
849                    })
850                    .collect(),
851                max_jobs_per_request: self.config.gateway.max_jobs_per_request,
852                max_result_size_bytes: self.config.gateway.max_result_size_bytes,
853                max_json_depth: self.config.gateway.max_json_depth,
854            };
855
856            let db_ref = self
857                .db
858                .clone()
859                .ok_or_else(|| ForgeError::internal("Database not initialized"))?;
860
861            let gateway = GatewayServer::new(
862                gateway_config,
863                self.function_registry.clone(),
864                db_ref.clone(),
865                notify_bus.clone(),
866            )
867            .with_node_id(self.node_id);
868            #[cfg(feature = "jobs")]
869            let gateway = gateway.with_job_dispatcher(job_dispatcher.clone());
870            #[cfg(feature = "workflows")]
871            let gateway = gateway.with_workflow_dispatcher(workflow_executor.clone());
872            let gateway = gateway.with_kv(Arc::clone(&kv_handle));
873            let mut gateway = gateway.with_mcp_registry(self.mcp_registry.clone());
874
875            if matches!(
876                self.config.rate_limit.mode,
877                forge_core::config::RateLimitMode::Hybrid
878            ) {
879                // System table query runs at startup, not in offline-checked code path.
880                #[allow(clippy::disallowed_methods)]
881                let active_nodes: Option<i64> =
882                    sqlx::query_scalar("SELECT COUNT(*) FROM forge_nodes WHERE status = 'active'")
883                        .fetch_one(db_ref.primary())
884                        .await
885                        .ok();
886                if active_nodes.is_some_and(|n| n > 1) {
887                    let n = active_nodes.unwrap_or(0);
888                    tracing::warn!(
889                        active_nodes = n,
890                        "rate_limit.mode is 'hybrid' but {n} active nodes detected. \
891                         Per-user/per-IP limits are local-only and effectively multiply by the \
892                         node count. Set rate_limit.mode = \"strict\" for cluster deployments."
893                    );
894                }
895            }
896
897            let rate_limiter: std::sync::Arc<dyn forge_core::rate_limit::RateLimiterBackend> =
898                match self.config.rate_limit.mode {
899                    forge_core::config::RateLimitMode::Strict => std::sync::Arc::new(
900                        forge_runtime::StrictRateLimiter::new(db_ref.primary().clone()),
901                    ),
902                    forge_core::config::RateLimitMode::Hybrid => {
903                        std::sync::Arc::new(forge_runtime::HybridRateLimiter::with_max_buckets(
904                            db_ref.primary().clone(),
905                            self.config.rate_limit.max_local_buckets,
906                        ))
907                    }
908                };
909            gateway = gateway.with_rate_limiter(rate_limiter);
910            if let Some(resolver) = self.role_resolver.take() {
911                gateway = gateway.with_role_resolver(resolver);
912            }
913            if self.config.signals.enabled {
914                let signals_pool = std::sync::Arc::new(db_ref.primary().clone());
915                let collector = forge_runtime::signals::SignalsCollector::spawn(
916                    signals_pool.clone(),
917                    self.config.signals.batch_size,
918                    *self.config.signals.flush_interval,
919                    self.config.signals.channel_capacity,
920                );
921                // Explicit MMDB path means the operator wants city-level data.
922                // Fail fast rather than silently downgrading to the embedded DB.
923                let geoip = match &self.config.signals.geoip_db_path {
924                    Some(path) => {
925                        let resolver = forge_runtime::signals::geoip::GeoIpResolver::from_mmdb(
926                            std::path::Path::new(path),
927                        )?;
928                        tracing::info!(path, "GeoIP: MaxMind MMDB loaded (city-level)");
929                        resolver
930                    }
931                    None => forge_runtime::signals::geoip::GeoIpResolver::new(),
932                };
933                gateway = gateway
934                    .with_signals_collector(collector)
935                    .with_signals_anonymize_ip(self.config.signals.anonymize_ip)
936                    .with_signals_geoip(geoip);
937
938                forge_runtime::signals::session::spawn_session_reaper(
939                    signals_pool.clone(),
940                    (self.config.signals.session_timeout.as_secs() / 60) as u32,
941                );
942
943                forge_runtime::signals::partition::ensure_partitions(&signals_pool).await;
944
945                {
946                    let partition_pool = signals_pool.clone();
947                    let retention_days = self.config.signals.retention_days;
948                    let partition_leader = leader_election.clone();
949                    let mut partition_shutdown = self.shutdown_tx.subscribe();
950                    handles.push(tokio::spawn(async move {
951                        loop {
952                            tokio::select! {
953                                _ = partition_shutdown.recv() => break,
954                                _ = tokio::time::sleep(Duration::from_secs(21_600)) => {}
955                            }
956                            let is_leader = partition_leader
957                                .as_ref()
958                                .map(|e| e.is_leader())
959                                .unwrap_or(true);
960                            if is_leader {
961                                forge_runtime::signals::partition::ensure_partitions(
962                                    &partition_pool,
963                                )
964                                .await;
965                                forge_runtime::signals::partition::drop_old_partitions(
966                                    &partition_pool,
967                                    retention_days,
968                                )
969                                .await;
970                                forge_runtime::signals::partition::check_default_partition(
971                                    &partition_pool,
972                                )
973                                .await;
974                            }
975                        }
976                    }));
977                }
978
979                tracing::info!("Signals enabled (analytics + diagnostics)");
980            }
981
982            if let Some(factory) = self.custom_routes_factory.take() {
983                gateway = gateway.with_custom_routes(factory(pool.clone()));
984                tracing::debug!("Custom routes merged into gateway middleware stack");
985            }
986
987            let reactor = gateway.reactor();
988            if let Err(e) = reactor.start().await {
989                tracing::error!("Failed to start reactor: {}", e);
990            } else {
991                tracing::debug!("Reactor started");
992                reactor_handle = Some(reactor);
993            }
994
995            let api_router = gateway.router();
996            let mut router = Router::new().nest("/_api", api_router);
997
998            if !self.webhook_registry.is_empty() {
999                use axum::routing::post;
1000                use tower_http::cors::{Any, CorsLayer};
1001
1002                let webhook_state = WebhookState::new(self.webhook_registry.clone(), pool.clone());
1003                #[cfg(feature = "jobs")]
1004                let webhook_state = webhook_state.with_job_dispatcher(job_dispatcher.clone());
1005                #[cfg(feature = "workflows")]
1006                let webhook_state =
1007                    webhook_state.with_workflow_dispatcher(workflow_executor.clone());
1008                let webhook_state = webhook_state.with_kv(Arc::clone(&kv_handle));
1009                let webhook_state = Arc::new(webhook_state);
1010
1011                // Webhook routes sit outside the API router so they need their own CORS layer.
1012                let webhook_cors = if self.config.gateway.cors_enabled
1013                    || !self.config.gateway.cors_origins.is_empty()
1014                {
1015                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
1016                        CorsLayer::new()
1017                            .allow_origin(Any)
1018                            .allow_methods(Any)
1019                            .allow_headers(Any)
1020                    } else {
1021                        use axum::http::Method;
1022                        let origins: Vec<_> = self
1023                            .config
1024                            .gateway
1025                            .cors_origins
1026                            .iter()
1027                            .filter_map(|o| o.parse().ok())
1028                            .collect();
1029                        CorsLayer::new()
1030                            .allow_origin(origins)
1031                            .allow_methods([
1032                                Method::GET,
1033                                Method::POST,
1034                                Method::PUT,
1035                                Method::DELETE,
1036                                Method::PATCH,
1037                                Method::OPTIONS,
1038                            ])
1039                            .allow_headers([
1040                                axum::http::header::CONTENT_TYPE,
1041                                axum::http::header::AUTHORIZATION,
1042                                axum::http::header::ACCEPT,
1043                                axum::http::HeaderName::from_static("x-webhook-signature"),
1044                                axum::http::HeaderName::from_static("x-idempotency-key"),
1045                            ])
1046                            .allow_credentials(true)
1047                    }
1048                } else {
1049                    CorsLayer::new()
1050                };
1051
1052                let webhook_router = Router::new()
1053                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
1054                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
1055                    .layer(
1056                        tower::ServiceBuilder::new()
1057                            .layer(axum::error_handling::HandleErrorLayer::new(
1058                                |err: tower::BoxError| async move {
1059                                    if err.is::<tower::timeout::error::Elapsed>() {
1060                                        return (
1061                                            axum::http::StatusCode::REQUEST_TIMEOUT,
1062                                            "Request timed out",
1063                                        );
1064                                    }
1065                                    (
1066                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
1067                                        "Server overloaded",
1068                                    )
1069                                },
1070                            ))
1071                            .layer(tower::limit::ConcurrencyLimitLayer::new(
1072                                self.config.gateway.max_connections,
1073                            ))
1074                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
1075                                self.config.gateway.request_timeout.as_secs(),
1076                            ))),
1077                    )
1078                    .layer(webhook_cors);
1079
1080                router = router.nest("/_api/webhooks", webhook_router);
1081
1082                tracing::debug!(
1083                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
1084                    "Webhook routes registered"
1085                );
1086            }
1087
1088            if self.config.mcp.enabled {
1089                use axum::routing::get;
1090
1091                // Return a parseable JSON 404 when OAuth is not configured so
1092                // MCP clients get a clear signal rather than an HTML error page.
1093                async fn oauth_not_supported() -> impl axum::response::IntoResponse {
1094                    (
1095                        axum::http::StatusCode::NOT_FOUND,
1096                        axum::Json(serde_json::json!({
1097                            "error": "oauth_not_supported",
1098                            "error_description": "This server does not support OAuth. Connect without authentication."
1099                        })),
1100                    )
1101                }
1102
1103                #[cfg(feature = "mcp-oauth")]
1104                if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
1105                    router = router.nest("/_api", oauth_api_router);
1106                    router = router
1107                        .route(
1108                            "/.well-known/oauth-authorization-server",
1109                            get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
1110                                .with_state(oauth_state.clone()),
1111                        )
1112                        .route(
1113                            "/.well-known/oauth-protected-resource",
1114                            get(forge_runtime::gateway::oauth::well_known_resource_metadata)
1115                                .with_state(oauth_state),
1116                        );
1117
1118                    tracing::info!("OAuth 2.1 endpoints enabled for MCP");
1119                } else {
1120                    router = router
1121                        .route(
1122                            "/.well-known/oauth-authorization-server",
1123                            get(oauth_not_supported),
1124                        )
1125                        .route(
1126                            "/.well-known/oauth-protected-resource",
1127                            get(oauth_not_supported),
1128                        );
1129                }
1130
1131                #[cfg(not(feature = "mcp-oauth"))]
1132                {
1133                    router = router
1134                        .route(
1135                            "/.well-known/oauth-authorization-server",
1136                            get(oauth_not_supported),
1137                        )
1138                        .route(
1139                            "/.well-known/oauth-protected-resource",
1140                            get(oauth_not_supported),
1141                        );
1142                }
1143            }
1144
1145            if let Some(handler) = self.frontend_handler {
1146                use axum::routing::get;
1147                router = router.fallback(get(handler));
1148                tracing::debug!("Frontend handler enabled");
1149            }
1150
1151            let addr = gateway.addr();
1152            let tls = gateway.tls().cloned();
1153            // Graceful shutdown drains in-flight requests before we release the
1154            // advisory lock, which also drains the mutation outbox (each flush
1155            // is part of the request transaction).
1156            let mut gateway_shutdown_rx = shutdown.subscribe();
1157
1158            handles.push(tokio::spawn(async move {
1159                tracing::debug!(addr = %addr, "Gateway server binding");
1160                let listener = match bind_listener(addr, tls.as_ref()).await {
1161                    Ok(l) => l,
1162                    Err(e) => {
1163                        tracing::error!(error = %e, "Failed to bind gateway listener");
1164                        return;
1165                    }
1166                };
1167                let serve = axum::serve(listener, router).with_graceful_shutdown(async move {
1168                    let _ = gateway_shutdown_rx.wait_for(|v| *v).await;
1169                    tracing::debug!("Gateway draining in-flight requests");
1170                });
1171                if let Err(e) = serve.await {
1172                    tracing::error!("Gateway server error: {}", e);
1173                }
1174            }));
1175        }
1176
1177        #[cfg(feature = "jobs")]
1178        let jobs_count = self.job_registry.len();
1179        #[cfg(not(feature = "jobs"))]
1180        let jobs_count: usize = 0;
1181        #[cfg(feature = "cron")]
1182        let crons_count = self.cron_registry.len();
1183        #[cfg(not(feature = "cron"))]
1184        let crons_count: usize = 0;
1185        #[cfg(feature = "workflows")]
1186        let workflows_count = self.workflow_registry.len();
1187        #[cfg(not(feature = "workflows"))]
1188        let workflows_count: usize = 0;
1189        #[cfg(feature = "daemons")]
1190        let daemons_count = self.daemon_registry.len();
1191        #[cfg(not(feature = "daemons"))]
1192        let daemons_count: usize = 0;
1193        #[cfg(feature = "gateway")]
1194        let webhooks_count = self.webhook_registry.len();
1195        #[cfg(not(feature = "gateway"))]
1196        let webhooks_count: usize = 0;
1197        #[cfg(feature = "gateway")]
1198        let mcp_tools_count = self.mcp_registry.len();
1199        #[cfg(not(feature = "gateway"))]
1200        let mcp_tools_count: usize = 0;
1201
1202        tracing::info!(
1203            queries = self.function_registry.queries().count(),
1204            mutations = self.function_registry.mutations().count(),
1205            jobs = jobs_count,
1206            crons = crons_count,
1207            workflows = workflows_count,
1208            daemons = daemons_count,
1209            webhooks = webhooks_count,
1210            mcp_tools = mcp_tools_count,
1211            "Functions registered"
1212        );
1213
1214        {
1215            let pool = pool.clone();
1216            tokio::spawn(async move {
1217                loop {
1218                    tokio::time::sleep(Duration::from_secs(15)).await;
1219                    forge_runtime::observability::record_pool_metrics(&pool);
1220                }
1221            });
1222        }
1223
1224        let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
1225        let capabilities = &self.config.node.worker_capabilities;
1226        tracing::info!(
1227            node_id = %node_id,
1228            project = %self.config.project.name,
1229            version = env!("CARGO_PKG_VERSION"),
1230            roles = ?role_names,
1231            worker_capabilities = ?capabilities,
1232            port = self.config.gateway.port,
1233            db_pool_size = self.config.database.pool_size,
1234            cluster_discovery = ?self.config.cluster.discovery,
1235            observability = self.config.observability.enabled,
1236            mcp = self.config.mcp.enabled,
1237            "Forge started"
1238        );
1239
1240        let mut shutdown_rx = self.shutdown_tx.subscribe();
1241
1242        tokio::select! {
1243            _ = tokio::signal::ctrl_c() => {
1244                tracing::debug!("Received ctrl-c");
1245            }
1246            _ = shutdown_rx.recv() => {
1247                tracing::debug!("Received shutdown notification");
1248            }
1249        }
1250
1251        tracing::debug!("Graceful shutdown starting");
1252
1253        // No-op if already triggered via `Forge::shutdown()`.
1254        let _ = self.shutdown_tx.send(());
1255
1256        // Cancel leader tasks before joining so they wind down concurrently.
1257        #[cfg(feature = "workflows")]
1258        workflow_shutdown_token.cancel();
1259
1260        #[cfg(feature = "cron")]
1261        if let Some(ref runner) = cron_runner_handle {
1262            runner.stop().await;
1263        }
1264
1265        // Drain leader tasks before releasing the advisory lock to prevent a
1266        // sibling from starting duplicate work while we are mid-tick.
1267        tracing::debug!("Waiting for leader-held tasks to drain");
1268        for handle in leader_handles {
1269            let _ = handle.await;
1270        }
1271        tracing::debug!("Leader-held tasks drained");
1272
1273        // Advisory lock released here — only after leader tasks have fully stopped.
1274        if let Err(e) = shutdown.shutdown().await {
1275            tracing::warn!(error = %e, "Shutdown error");
1276        }
1277
1278        if let Some(ref election) = leader_election {
1279            election.stop();
1280        }
1281
1282        #[cfg(feature = "gateway")]
1283        if let Some(ref reactor) = reactor_handle {
1284            reactor.stop();
1285        }
1286
1287        if let Some(ref db) = self.db {
1288            db.close().await;
1289        }
1290
1291        forge_runtime::shutdown_telemetry();
1292        tracing::info!("Forge stopped");
1293        Ok(())
1294    }
1295
1296    /// Request shutdown.
1297    pub fn shutdown(&self) {
1298        let _ = self.shutdown_tx.send(());
1299    }
1300}
1301
1302#[cfg(test)]
1303#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1304mod tests {
1305    use super::*;
1306    use std::future::Future;
1307    use std::pin::Pin;
1308
1309    use forge_core::config::NodeRole as ConfigNodeRole;
1310    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1311
1312    struct TestMcpTool;
1313
1314    impl forge_core::__sealed::Sealed for TestMcpTool {}
1315
1316    impl ForgeMcpTool for TestMcpTool {
1317        type Args = serde_json::Value;
1318        type Output = serde_json::Value;
1319
1320        fn info() -> McpToolInfo {
1321            McpToolInfo {
1322                name: "test.mcp.tool",
1323                title: None,
1324                description: None,
1325                required_role: None,
1326                is_public: false,
1327                timeout: None,
1328                rate_limit_requests: None,
1329                rate_limit_per_secs: None,
1330                rate_limit_key: None,
1331                annotations: McpToolAnnotations::default(),
1332                icons: &[],
1333            }
1334        }
1335
1336        fn execute(
1337            _ctx: &forge_core::McpToolContext,
1338            _args: Self::Args,
1339        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1340            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1341        }
1342    }
1343
1344    #[test]
1345    fn test_forge_builder_new() {
1346        let builder = ForgeBuilder::new();
1347        assert!(builder.config.is_none());
1348    }
1349
1350    #[test]
1351    fn test_forge_builder_requires_config() {
1352        let builder = ForgeBuilder::new();
1353        let result = builder.build();
1354        assert!(result.is_err());
1355    }
1356
1357    #[test]
1358    fn test_forge_builder_with_config() {
1359        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1360        let result = ForgeBuilder::new().config(config).build();
1361        assert!(result.is_ok());
1362    }
1363
1364    #[test]
1365    fn test_forge_builder_register_mcp_tool() {
1366        let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1367        assert_eq!(builder.mcp_registry.len(), 1);
1368    }
1369
1370    #[test]
1371    fn test_config_role_conversion() {
1372        use builder::config_role_to_node_role;
1373        assert_eq!(
1374            config_role_to_node_role(&ConfigNodeRole::Gateway),
1375            NodeRole::Gateway
1376        );
1377        assert_eq!(
1378            config_role_to_node_role(&ConfigNodeRole::Worker),
1379            NodeRole::Worker
1380        );
1381        assert_eq!(
1382            config_role_to_node_role(&ConfigNodeRole::Scheduler),
1383            NodeRole::Scheduler
1384        );
1385        assert_eq!(
1386            config_role_to_node_role(&ConfigNodeRole::Function),
1387            NodeRole::Function
1388        );
1389    }
1390}