Skip to main content

forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - SSE server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Cluster coordination
10
11use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34    ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49/// Type alias for frontend handler function.
50pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52/// Prelude module for common imports.
53pub mod prelude {
54    // Common types
55    pub use chrono::{DateTime, Utc};
56    pub use uuid::Uuid;
57
58    // Serde re-exports for user code
59    pub use serde::{Deserialize, Serialize};
60    pub use serde_json;
61
62    /// Timestamp type alias for convenience.
63    pub type Timestamp = DateTime<Utc>;
64
65    // Core types
66    pub use forge_core::auth::TokenPair;
67    pub use forge_core::cluster::NodeRole;
68    pub use forge_core::config::ForgeConfig;
69    pub use forge_core::cron::{CronContext, ForgeCron};
70    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
71    pub use forge_core::env::EnvAccess;
72    pub use forge_core::error::{ForgeError, Result};
73    pub use forge_core::function::{
74        AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
75    };
76    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
77    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
78    pub use forge_core::realtime::Delta;
79    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
80    pub use forge_core::schemars::JsonSchema;
81    pub use forge_core::types::Upload;
82    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
83    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
84
85    // Same axum version the runtime uses, avoids type mismatches in custom handlers
86    pub use axum;
87
88    pub use crate::{Forge, ForgeBuilder};
89}
90
91/// The main FORGE runtime.
92pub struct Forge {
93    config: ForgeConfig,
94    db: Option<Database>,
95    node_id: NodeId,
96    function_registry: FunctionRegistry,
97    mcp_registry: McpToolRegistry,
98    job_registry: JobRegistry,
99    cron_registry: Arc<CronRegistry>,
100    workflow_registry: WorkflowRegistry,
101    daemon_registry: Arc<DaemonRegistry>,
102    webhook_registry: Arc<WebhookRegistry>,
103    shutdown_tx: broadcast::Sender<()>,
104    /// Path to user migrations directory (default: ./migrations).
105    migrations_dir: PathBuf,
106    /// Additional migrations provided programmatically.
107    extra_migrations: Vec<Migration>,
108    /// Optional frontend handler for embedded SPA.
109    frontend_handler: Option<FrontendHandler>,
110    /// Custom axum routes merged into the top-level router.
111    custom_routes: Option<Router>,
112}
113
114impl Forge {
115    /// Create a new builder for configuring FORGE.
116    pub fn builder() -> ForgeBuilder {
117        ForgeBuilder::new()
118    }
119
120    /// Get the node ID.
121    pub fn node_id(&self) -> NodeId {
122        self.node_id
123    }
124
125    /// Get the configuration.
126    pub fn config(&self) -> &ForgeConfig {
127        &self.config
128    }
129
130    /// Get the function registry.
131    pub fn function_registry(&self) -> &FunctionRegistry {
132        &self.function_registry
133    }
134
135    /// Get the function registry mutably.
136    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
137        &mut self.function_registry
138    }
139
140    /// Get the MCP tool registry mutably.
141    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
142        &mut self.mcp_registry
143    }
144
145    /// Register an MCP tool without manually accessing the registry.
146    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
147        self.mcp_registry.register::<T>();
148        self
149    }
150
151    /// Get the job registry.
152    pub fn job_registry(&self) -> &JobRegistry {
153        &self.job_registry
154    }
155
156    /// Get the job registry mutably.
157    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
158        &mut self.job_registry
159    }
160
161    /// Get the cron registry.
162    pub fn cron_registry(&self) -> Arc<CronRegistry> {
163        self.cron_registry.clone()
164    }
165
166    /// Get the workflow registry.
167    pub fn workflow_registry(&self) -> &WorkflowRegistry {
168        &self.workflow_registry
169    }
170
171    /// Get the workflow registry mutably.
172    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
173        &mut self.workflow_registry
174    }
175
176    /// Get the daemon registry.
177    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
178        self.daemon_registry.clone()
179    }
180
181    /// Get the webhook registry.
182    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
183        self.webhook_registry.clone()
184    }
185
186    /// Persist all registered workflow definitions to the database.
187    /// Fails startup if a definition's signature conflicts with a previously
188    /// registered one under the same name+version.
189    async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
190        for info in self.workflow_registry.definitions() {
191            let status = if info.is_active {
192                "active"
193            } else if info.is_deprecated {
194                "deprecated"
195            } else {
196                "active"
197            };
198
199            // Try to insert. If row exists, check signature matches.
200            let existing = sqlx::query!(
201                r#"
202                SELECT workflow_signature FROM forge_workflow_definitions
203                WHERE workflow_name = $1 AND workflow_version = $2
204                "#,
205                info.name,
206                info.version,
207            )
208            .fetch_optional(pool)
209            .await
210            .map_err(|e| ForgeError::Database(e.to_string()))?;
211
212            if let Some(row) = existing {
213                if row.workflow_signature != info.signature {
214                    return Err(ForgeError::Config(format!(
215                        "Workflow '{}' version '{}' has a different signature than previously registered. \
216                         Persisted contract changed under the same version. \
217                         Expected signature: {}, got: {}. \
218                         Create a new version instead of modifying the existing one.",
219                        info.name, info.version, row.workflow_signature, info.signature
220                    )));
221                }
222                // Update status if changed
223                sqlx::query!(
224                    "UPDATE forge_workflow_definitions SET status = $3 WHERE workflow_name = $1 AND workflow_version = $2",
225                    info.name,
226                    info.version,
227                    status,
228                )
229                .execute(pool)
230                .await
231                .map_err(|e| ForgeError::Database(e.to_string()))?;
232            } else {
233                sqlx::query!(
234                    r#"
235                    INSERT INTO forge_workflow_definitions (workflow_name, workflow_version, workflow_signature, status)
236                    VALUES ($1, $2, $3, $4)
237                    "#,
238                    info.name,
239                    info.version,
240                    info.signature,
241                    status,
242                )
243                .execute(pool)
244                .await
245                .map_err(|e| ForgeError::Database(e.to_string()))?;
246            }
247
248            tracing::debug!(
249                workflow = info.name,
250                version = info.version,
251                signature = info.signature,
252                status = status,
253                "Workflow definition registered"
254            );
255        }
256
257        Ok(())
258    }
259
260    /// Run the FORGE server.
261    pub async fn run(mut self) -> Result<()> {
262        // Apply FORGE_OTEL_* environment variable overrides before initializing
263        self.config.observability.apply_env_overrides();
264
265        // Users shouldn't need tracing_subscriber boilerplate to see logs
266        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
267            &self.config.observability,
268            &self.config.project.name,
269            &self.config.project.version,
270        );
271        match forge_runtime::init_telemetry(
272            &telemetry_config,
273            &self.config.project.name,
274            &self.config.observability.log_level,
275        ) {
276            Ok(true) => {}
277            Ok(false) => {
278                // Subscriber already exists, user set one up manually
279            }
280            Err(e) => {
281                eprintln!("forge: failed to initialize telemetry: {e}");
282            }
283        }
284
285        tracing::debug!("Connecting to database");
286
287        // Connect to database
288        let db =
289            Database::from_config_with_service(&self.config.database, &self.config.project.name)
290                .await?;
291        let pool = db.primary().clone();
292        let jobs_pool = db.jobs_pool().clone();
293        let observability_pool = db.observability_pool().clone();
294        if let Some(handle) = db.start_health_monitor() {
295            let mut shutdown_rx = self.shutdown_tx.subscribe();
296            tokio::spawn(async move {
297                tokio::select! {
298                    _ = shutdown_rx.recv() => {}
299                    _ = handle => {}
300                }
301            });
302        }
303        self.db = Some(db);
304
305        tracing::debug!("Database connected");
306
307        // Run migrations with mesh-safe locking
308        // This acquires an advisory lock, so only one node runs migrations at a time
309        let runner = MigrationRunner::new(pool.clone());
310
311        // Load user migrations from directory + any programmatic ones
312        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
313        user_migrations.extend(self.extra_migrations.clone());
314
315        runner.run(user_migrations).await?;
316        tracing::debug!("Migrations applied");
317
318        // Persist workflow definitions and validate signatures
319        if !self.workflow_registry.is_empty() {
320            self.persist_workflow_definitions(&pool).await?;
321        }
322
323        // Get local node info
324        let hostname = get_hostname();
325
326        // Support HOST env var (default 0.0.0.0), PORT env var (overrides config)
327        let ip_address: IpAddr = std::env::var("HOST")
328            .unwrap_or_else(|_| "0.0.0.0".to_string())
329            .parse()
330            .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
331
332        if let Ok(port_str) = std::env::var("PORT")
333            && let Ok(port) = port_str.parse::<u16>()
334        {
335            self.config.gateway.port = port;
336        }
337
338        let roles: Vec<NodeRole> = self
339            .config
340            .node
341            .roles
342            .iter()
343            .map(config_role_to_node_role)
344            .collect();
345
346        let node_info = NodeInfo::new_local(
347            hostname,
348            ip_address,
349            self.config.gateway.port,
350            self.config.gateway.grpc_port,
351            roles.clone(),
352            self.config.node.worker_capabilities.clone(),
353            env!("CARGO_PKG_VERSION").to_string(),
354        );
355
356        let node_id = node_info.id;
357        self.node_id = node_id;
358
359        // Create node registry
360        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
361
362        // Register node in cluster
363        if let Err(e) = node_registry.register().await {
364            tracing::debug!("Failed to register node (tables may not exist): {}", e);
365        }
366
367        // Set node status to active
368        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
369            tracing::debug!("Failed to set node status: {}", e);
370        }
371
372        // Create leader election for scheduler role
373        let leader_election = if roles.contains(&NodeRole::Scheduler) {
374            let election = Arc::new(LeaderElection::new(
375                pool.clone(),
376                node_id,
377                LeaderRole::Scheduler,
378                LeaderConfig::default(),
379            ));
380
381            // Try to become leader
382            if let Err(e) = election.try_become_leader().await {
383                tracing::debug!("Failed to acquire leadership: {}", e);
384            }
385
386            Some(election)
387        } else {
388            None
389        };
390
391        // Create graceful shutdown coordinator
392        let shutdown = Arc::new(GracefulShutdown::new(
393            node_registry.clone(),
394            leader_election.clone(),
395            ShutdownConfig::default(),
396        ));
397
398        // Create HTTP client with circuit breaker for actions and crons
399        let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
400
401        // Start background tasks based on roles
402        let mut handles = Vec::new();
403
404        // Start heartbeat loop
405        {
406            let heartbeat_pool = pool.clone();
407            let heartbeat_node_id = node_id;
408            let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
409            handles.push(tokio::spawn(async move {
410                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
411                heartbeat.run().await;
412            }));
413        }
414
415        // Start leader election loop if scheduler role
416        if let Some(ref election) = leader_election {
417            let election = election.clone();
418            handles.push(tokio::spawn(async move {
419                election.run().await;
420            }));
421        }
422
423        // Start job worker if worker role
424        if roles.contains(&NodeRole::Worker) {
425            let job_queue = JobQueue::new(jobs_pool.clone());
426            let worker_config = WorkerConfig {
427                id: Some(node_id.as_uuid()),
428                capabilities: self.config.node.worker_capabilities.clone(),
429                max_concurrent: self.config.worker.max_concurrent_jobs,
430                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
431                ..Default::default()
432            };
433
434            let mut worker = Worker::new(
435                worker_config,
436                job_queue,
437                self.job_registry.clone(),
438                jobs_pool.clone(),
439            );
440
441            handles.push(tokio::spawn(async move {
442                if let Err(e) = worker.run().await {
443                    tracing::error!("Worker error: {}", e);
444                }
445            }));
446
447            tracing::debug!("Job worker started");
448        }
449
450        // Start cron runner if scheduler role and is leader
451        if roles.contains(&NodeRole::Scheduler) {
452            let cron_registry = self.cron_registry.clone();
453            let cron_pool = jobs_pool.clone();
454            let cron_http = http_client.clone();
455            let cron_leader_election = leader_election.clone();
456
457            let cron_config = CronRunnerConfig {
458                poll_interval: Duration::from_secs(1),
459                node_id: node_id.as_uuid(),
460                is_leader: cron_leader_election.is_none(),
461                leader_election: cron_leader_election,
462                run_stale_threshold: Duration::from_secs(15 * 60),
463            };
464
465            let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
466
467            handles.push(tokio::spawn(async move {
468                if let Err(e) = cron_runner.run().await {
469                    tracing::error!("Cron runner error: {}", e);
470                }
471            }));
472
473            tracing::debug!("Cron scheduler started");
474        }
475
476        // Start workflow scheduler if scheduler role
477        let workflow_shutdown_token = CancellationToken::new();
478        if roles.contains(&NodeRole::Scheduler) {
479            let scheduler_executor = Arc::new(WorkflowExecutor::new(
480                Arc::new(self.workflow_registry.clone()),
481                jobs_pool.clone(),
482                http_client.clone(),
483            ));
484            let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
485            let scheduler = WorkflowScheduler::new(
486                jobs_pool.clone(),
487                scheduler_executor,
488                event_store,
489                WorkflowSchedulerConfig::default(),
490            );
491
492            let shutdown_token = workflow_shutdown_token.clone();
493            handles.push(tokio::spawn(async move {
494                scheduler.run(shutdown_token).await;
495            }));
496
497            tracing::debug!("Workflow scheduler started");
498        }
499
500        // Create job dispatcher and workflow executor for dispatch capabilities
501        let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
502        let job_dispatcher = Arc::new(JobDispatcher::new(
503            job_queue_for_dispatch,
504            self.job_registry.clone(),
505        ));
506        let workflow_executor = Arc::new(WorkflowExecutor::new(
507            Arc::new(self.workflow_registry.clone()),
508            jobs_pool.clone(),
509            http_client.clone(),
510        ));
511
512        // Start daemon runner if scheduler role (daemons run as singletons)
513        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
514            let daemon_registry = self.daemon_registry.clone();
515            let daemon_pool = jobs_pool.clone();
516            let daemon_http = http_client.clone();
517            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
518
519            let daemon_runner = DaemonRunner::new(
520                daemon_registry,
521                daemon_pool,
522                daemon_http,
523                node_id.as_uuid(),
524                daemon_shutdown_rx,
525            )
526            .with_job_dispatch(job_dispatcher.clone())
527            .with_workflow_dispatch(workflow_executor.clone());
528
529            handles.push(tokio::spawn(async move {
530                if let Err(e) = daemon_runner.run().await {
531                    tracing::error!("Daemon runner error: {}", e);
532                }
533            }));
534
535            tracing::debug!("Daemon runner started");
536        }
537
538        // Reactor handle for shutdown
539        let mut reactor_handle = None;
540
541        // Start HTTP gateway if gateway role
542        if roles.contains(&NodeRole::Gateway) {
543            let gateway_config = RuntimeGatewayConfig {
544                port: self.config.gateway.port,
545                max_connections: self.config.gateway.max_connections,
546                sse_max_sessions: self.config.gateway.sse_max_sessions,
547                request_timeout_secs: self.config.gateway.request_timeout_secs,
548                cors_enabled: self.config.gateway.cors_enabled
549                    || !self.config.gateway.cors_origins.is_empty(),
550                cors_origins: self.config.gateway.cors_origins.clone(),
551                auth: AuthConfig::from_forge_config(&self.config.auth)
552                    .map_err(|e| ForgeError::Config(e.to_string()))?,
553                mcp: self.config.mcp.clone(),
554                quiet_routes: self.config.gateway.quiet_routes.clone(),
555                max_body_size_bytes: self.config.gateway.max_body_size_bytes()?,
556                token_ttl: forge_core::AuthTokenTtl {
557                    access_token_secs: self.config.auth.access_token_ttl_secs(),
558                    refresh_token_days: self.config.auth.refresh_token_ttl_days(),
559                },
560                project_name: self.config.project.name.clone(),
561            };
562
563            // Build gateway server (pass Database wrapper for read replica routing)
564            let db_ref = self
565                .db
566                .clone()
567                .ok_or_else(|| ForgeError::Internal("Database not initialized".into()))?;
568
569            let mut gateway = GatewayServer::new(
570                gateway_config,
571                self.function_registry.clone(),
572                db_ref.clone(),
573            )
574            .with_job_dispatcher(job_dispatcher.clone())
575            .with_workflow_dispatcher(workflow_executor.clone())
576            .with_mcp_registry(self.mcp_registry.clone());
577
578            // Wire signals (product analytics + diagnostics)
579            if self.config.signals.enabled {
580                let signals_pool = std::sync::Arc::new(db_ref.analytics_pool().clone());
581                let collector = forge_runtime::signals::SignalsCollector::spawn(
582                    signals_pool.clone(),
583                    self.config.signals.batch_size,
584                    std::time::Duration::from_millis(self.config.signals.flush_interval_ms),
585                );
586                gateway = gateway
587                    .with_signals_collector(collector)
588                    .with_signals_anonymize_ip(self.config.signals.anonymize_ip);
589
590                // Spawn session reaper
591                forge_runtime::signals::session::spawn_session_reaper(
592                    signals_pool,
593                    self.config.signals.session_timeout_mins,
594                );
595
596                tracing::info!("Signals enabled (analytics + diagnostics)");
597            }
598
599            // Start the reactor for real-time updates
600            let reactor = gateway.reactor();
601            if let Err(e) = reactor.start().await {
602                tracing::error!("Failed to start reactor: {}", e);
603            } else {
604                tracing::debug!("Reactor started");
605                reactor_handle = Some(reactor);
606            }
607
608            // Build API router (all under /_api)
609            let api_router = gateway.router();
610
611            // Build final router with API
612            let mut router = Router::new().nest("/_api", api_router);
613
614            // Mount webhook routes under /_api (bypasses gateway auth middleware)
615            if !self.webhook_registry.is_empty() {
616                use axum::routing::post;
617                use tower_http::cors::{Any, CorsLayer};
618
619                let webhook_state = Arc::new(
620                    WebhookState::new(self.webhook_registry.clone(), pool.clone())
621                        .with_job_dispatcher(job_dispatcher.clone()),
622                );
623
624                // Webhook routes need their own CORS layer since they're outside the API router.
625                // Reuse gateway CORS policy rather than forcing wildcard access.
626                let webhook_cors = if self.config.gateway.cors_enabled
627                    || !self.config.gateway.cors_origins.is_empty()
628                {
629                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
630                        CorsLayer::new()
631                            .allow_origin(Any)
632                            .allow_methods(Any)
633                            .allow_headers(Any)
634                    } else {
635                        use axum::http::Method;
636                        let origins: Vec<_> = self
637                            .config
638                            .gateway
639                            .cors_origins
640                            .iter()
641                            .filter_map(|o| o.parse().ok())
642                            .collect();
643                        CorsLayer::new()
644                            .allow_origin(origins)
645                            .allow_methods([
646                                Method::GET,
647                                Method::POST,
648                                Method::PUT,
649                                Method::DELETE,
650                                Method::PATCH,
651                                Method::OPTIONS,
652                            ])
653                            .allow_headers([
654                                axum::http::header::CONTENT_TYPE,
655                                axum::http::header::AUTHORIZATION,
656                                axum::http::header::ACCEPT,
657                                axum::http::HeaderName::from_static("x-webhook-signature"),
658                                axum::http::HeaderName::from_static("x-idempotency-key"),
659                            ])
660                            .allow_credentials(true)
661                    }
662                } else {
663                    CorsLayer::new()
664                };
665
666                let webhook_router = Router::new()
667                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
668                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
669                    .layer(
670                        tower::ServiceBuilder::new()
671                            .layer(axum::error_handling::HandleErrorLayer::new(
672                                |err: tower::BoxError| async move {
673                                    if err.is::<tower::timeout::error::Elapsed>() {
674                                        return (
675                                            axum::http::StatusCode::REQUEST_TIMEOUT,
676                                            "Request timed out",
677                                        );
678                                    }
679                                    (
680                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
681                                        "Server overloaded",
682                                    )
683                                },
684                            ))
685                            .layer(tower::limit::ConcurrencyLimitLayer::new(
686                                self.config.gateway.max_connections,
687                            ))
688                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
689                                self.config.gateway.request_timeout_secs,
690                            ))),
691                    )
692                    .layer(webhook_cors);
693
694                router = router.nest("/_api/webhooks", webhook_router);
695
696                tracing::debug!(
697                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
698                    "Webhook routes registered"
699                );
700            }
701
702            // MCP OAuth: mount OAuth routes or return JSON 404 for discovery
703            if self.config.mcp.enabled {
704                use axum::routing::get;
705
706                if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
707                    // OAuth API routes under /_api/oauth/* (bypass auth middleware)
708                    router = router.nest("/_api", oauth_api_router);
709
710                    // Well-known metadata at root level
711                    router = router
712                        .route(
713                            "/.well-known/oauth-authorization-server",
714                            get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
715                                .with_state(oauth_state.clone()),
716                        )
717                        .route(
718                            "/.well-known/oauth-protected-resource",
719                            get(forge_runtime::gateway::oauth::well_known_resource_metadata)
720                                .with_state(oauth_state),
721                        );
722
723                    tracing::info!("OAuth 2.1 endpoints enabled for MCP");
724                } else {
725                    // OAuth not configured: return parseable JSON 404
726                    async fn oauth_not_supported() -> impl axum::response::IntoResponse {
727                        (
728                            axum::http::StatusCode::NOT_FOUND,
729                            axum::Json(serde_json::json!({
730                                "error": "oauth_not_supported",
731                                "error_description": "This server does not support OAuth. Connect without authentication."
732                            })),
733                        )
734                    }
735                    router = router
736                        .route(
737                            "/.well-known/oauth-authorization-server",
738                            get(oauth_not_supported),
739                        )
740                        .route(
741                            "/.well-known/oauth-protected-resource",
742                            get(oauth_not_supported),
743                        );
744                }
745            }
746
747            // Merge custom routes before frontend fallback so they take precedence
748            if let Some(custom) = self.custom_routes.take() {
749                router = router.merge(custom);
750                tracing::debug!("Custom routes merged");
751            }
752
753            // Add frontend handler as fallback if configured
754            if let Some(handler) = self.frontend_handler {
755                use axum::routing::get;
756                router = router.fallback(get(handler));
757                tracing::debug!("Frontend handler enabled");
758            }
759
760            let addr = gateway.addr();
761
762            handles.push(tokio::spawn(async move {
763                tracing::debug!(addr = %addr, "Gateway server binding");
764                let listener = tokio::net::TcpListener::bind(addr)
765                    .await
766                    .expect("Failed to bind");
767                if let Err(e) = axum::serve(listener, router).await {
768                    tracing::error!("Gateway server error: {}", e);
769                }
770            }));
771        }
772
773        tracing::info!(
774            queries = self.function_registry.queries().count(),
775            mutations = self.function_registry.mutations().count(),
776            jobs = self.job_registry.len(),
777            crons = self.cron_registry.len(),
778            workflows = self.workflow_registry.len(),
779            daemons = self.daemon_registry.len(),
780            webhooks = self.webhook_registry.len(),
781            mcp_tools = self.mcp_registry.len(),
782            "Functions registered"
783        );
784
785        {
786            let metrics_pool = observability_pool;
787            tokio::spawn(async move {
788                loop {
789                    tokio::time::sleep(Duration::from_secs(15)).await;
790                    forge_runtime::observability::record_pool_metrics(&metrics_pool);
791                }
792            });
793        }
794
795        // Startup banner: summary of config, roles, and capabilities
796        let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
797        let capabilities = &self.config.node.worker_capabilities;
798        tracing::info!(
799            node_id = %node_id,
800            project = %self.config.project.name,
801            version = env!("CARGO_PKG_VERSION"),
802            roles = ?role_names,
803            worker_capabilities = ?capabilities,
804            port = self.config.gateway.port,
805            db_pool_size = self.config.database.pool_size,
806            cluster_discovery = ?self.config.cluster.discovery,
807            observability = self.config.observability.enabled,
808            mcp = self.config.mcp.enabled,
809            "Forge started"
810        );
811
812        // Wait for shutdown signal
813        let mut shutdown_rx = self.shutdown_tx.subscribe();
814
815        tokio::select! {
816            _ = tokio::signal::ctrl_c() => {
817                tracing::debug!("Received ctrl-c");
818            }
819            _ = shutdown_rx.recv() => {
820                tracing::debug!("Received shutdown notification");
821            }
822        }
823
824        // Graceful shutdown
825        tracing::debug!("Graceful shutdown starting");
826
827        // Stop workflow scheduler
828        workflow_shutdown_token.cancel();
829
830        if let Err(e) = shutdown.shutdown().await {
831            tracing::warn!(error = %e, "Shutdown error");
832        }
833
834        // Stop leader election
835        if let Some(ref election) = leader_election {
836            election.stop();
837        }
838
839        // Stop reactor before closing database
840        if let Some(ref reactor) = reactor_handle {
841            reactor.stop();
842        }
843
844        // Close database connections
845        if let Some(ref db) = self.db {
846            db.close().await;
847        }
848
849        forge_runtime::shutdown_telemetry();
850        tracing::info!("Forge stopped");
851        Ok(())
852    }
853
854    /// Request shutdown.
855    pub fn shutdown(&self) {
856        let _ = self.shutdown_tx.send(());
857    }
858}
859
860/// Builder for configuring the FORGE runtime.
861pub struct ForgeBuilder {
862    config: Option<ForgeConfig>,
863    function_registry: FunctionRegistry,
864    mcp_registry: McpToolRegistry,
865    job_registry: JobRegistry,
866    cron_registry: CronRegistry,
867    workflow_registry: WorkflowRegistry,
868    daemon_registry: DaemonRegistry,
869    webhook_registry: WebhookRegistry,
870    migrations_dir: PathBuf,
871    extra_migrations: Vec<Migration>,
872    frontend_handler: Option<FrontendHandler>,
873    custom_routes: Option<Router>,
874}
875
876impl ForgeBuilder {
877    /// Create a new builder.
878    pub fn new() -> Self {
879        Self {
880            config: None,
881            function_registry: FunctionRegistry::new(),
882            mcp_registry: McpToolRegistry::new(),
883            job_registry: JobRegistry::new(),
884            cron_registry: CronRegistry::new(),
885            workflow_registry: WorkflowRegistry::new(),
886            daemon_registry: DaemonRegistry::new(),
887            webhook_registry: WebhookRegistry::new(),
888            migrations_dir: PathBuf::from("migrations"),
889            extra_migrations: Vec::new(),
890            frontend_handler: None,
891            custom_routes: None,
892        }
893    }
894
895    /// Set the directory to load migrations from.
896    ///
897    /// Defaults to `./migrations`. Migration files should be named like:
898    /// - `0001_create_users.sql`
899    /// - `0002_add_posts.sql`
900    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
901        self.migrations_dir = path.into();
902        self
903    }
904
905    /// Add a migration programmatically.
906    ///
907    /// Use this for migrations that need to be generated at runtime,
908    /// or for testing. For most cases, use migration files instead.
909    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
910        self.extra_migrations.push(Migration::new(name, sql));
911        self
912    }
913
914    /// Set a frontend handler for serving embedded SPA assets.
915    ///
916    /// Use with the `embedded-frontend` feature to build a single binary
917    /// that includes both backend and frontend.
918    pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
919        self.frontend_handler = Some(handler);
920        self
921    }
922
923    /// Add custom axum routes to the server.
924    ///
925    /// Routes are merged at the top level, outside `/_api`, giving full
926    /// control over headers, extractors, and response types. Avoid paths
927    /// starting with `/_api` as they conflict with internal routes.
928    ///
929    /// ```ignore
930    /// use axum::{Router, routing::get};
931    ///
932    /// let routes = Router::new()
933    ///     .route("/custom/health", get(|| async { "ok" }));
934    ///
935    /// builder.custom_routes(routes);
936    /// ```
937    pub fn custom_routes(mut self, router: Router) -> Self {
938        self.custom_routes = Some(router);
939        self
940    }
941
942    /// Automatically register all functions discovered via `#[forge::query]`,
943    /// `#[forge::mutation]`, `#[forge::job]`, `#[forge::cron]`, `#[forge::workflow]`,
944    /// `#[forge::daemon]`, `#[forge::webhook]`, and `#[forge::mcp_tool]` macros.
945    ///
946    /// This replaces the need to manually call `.register_query::<T>()` etc.
947    /// for every function in your application.
948    pub fn auto_register(mut self) -> Self {
949        crate::auto_register::auto_register_all(
950            &mut self.function_registry,
951            &mut self.job_registry,
952            &mut self.cron_registry,
953            &mut self.workflow_registry,
954            &mut self.daemon_registry,
955            &mut self.webhook_registry,
956            &mut self.mcp_registry,
957        );
958        self
959    }
960
961    /// Set the configuration.
962    pub fn config(mut self, config: ForgeConfig) -> Self {
963        self.config = Some(config);
964        self
965    }
966
967    /// Get mutable access to the function registry.
968    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
969        &mut self.function_registry
970    }
971
972    /// Get mutable access to the job registry.
973    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
974        &mut self.job_registry
975    }
976
977    /// Get mutable access to the MCP tool registry.
978    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
979        &mut self.mcp_registry
980    }
981
982    /// Register an MCP tool without manually accessing the registry.
983    pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
984        self.mcp_registry.register::<T>();
985        self
986    }
987
988    /// Get mutable access to the cron registry.
989    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
990        &mut self.cron_registry
991    }
992
993    /// Get mutable access to the workflow registry.
994    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
995        &mut self.workflow_registry
996    }
997
998    /// Get mutable access to the daemon registry.
999    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
1000        &mut self.daemon_registry
1001    }
1002
1003    /// Get mutable access to the webhook registry.
1004    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
1005        &mut self.webhook_registry
1006    }
1007
1008    /// Register a query function.
1009    pub fn register_query<Q: ForgeQuery>(mut self) -> Self
1010    where
1011        Q::Args: serde::de::DeserializeOwned + Send + 'static,
1012        Q::Output: serde::Serialize + Send + 'static,
1013    {
1014        self.function_registry.register_query::<Q>();
1015        self
1016    }
1017
1018    /// Register a mutation function.
1019    pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
1020    where
1021        M::Args: serde::de::DeserializeOwned + Send + 'static,
1022        M::Output: serde::Serialize + Send + 'static,
1023    {
1024        self.function_registry.register_mutation::<M>();
1025        self
1026    }
1027
1028    /// Register a background job.
1029    pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
1030    where
1031        J::Args: serde::de::DeserializeOwned + Send + 'static,
1032        J::Output: serde::Serialize + Send + 'static,
1033    {
1034        self.job_registry.register::<J>();
1035        self
1036    }
1037
1038    /// Register a cron handler.
1039    pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
1040        self.cron_registry.register::<C>();
1041        self
1042    }
1043
1044    /// Register a workflow.
1045    pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
1046    where
1047        W::Input: serde::de::DeserializeOwned,
1048        W::Output: serde::Serialize,
1049    {
1050        self.workflow_registry.register::<W>();
1051        self
1052    }
1053
1054    /// Register a daemon.
1055    pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
1056        self.daemon_registry.register::<D>();
1057        self
1058    }
1059
1060    /// Register a webhook.
1061    pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
1062        self.webhook_registry.register::<W>();
1063        self
1064    }
1065
1066    /// Build the FORGE runtime.
1067    pub fn build(self) -> Result<Forge> {
1068        let config = self
1069            .config
1070            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
1071
1072        let (shutdown_tx, _) = broadcast::channel(1);
1073
1074        Ok(Forge {
1075            config,
1076            db: None,
1077            node_id: NodeId::new(),
1078            function_registry: self.function_registry,
1079            mcp_registry: self.mcp_registry,
1080            job_registry: self.job_registry,
1081            cron_registry: Arc::new(self.cron_registry),
1082            workflow_registry: self.workflow_registry,
1083            daemon_registry: Arc::new(self.daemon_registry),
1084            webhook_registry: Arc::new(self.webhook_registry),
1085            shutdown_tx,
1086            migrations_dir: self.migrations_dir,
1087            extra_migrations: self.extra_migrations,
1088            frontend_handler: self.frontend_handler,
1089            custom_routes: self.custom_routes,
1090        })
1091    }
1092}
1093
1094impl Default for ForgeBuilder {
1095    fn default() -> Self {
1096        Self::new()
1097    }
1098}
1099
1100#[cfg(unix)]
1101fn get_hostname() -> String {
1102    nix::unistd::gethostname()
1103        .map(|h| h.to_string_lossy().to_string())
1104        .unwrap_or_else(|_| "unknown".to_string())
1105}
1106
1107#[cfg(not(unix))]
1108fn get_hostname() -> String {
1109    std::env::var("COMPUTERNAME")
1110        .or_else(|_| std::env::var("HOSTNAME"))
1111        .unwrap_or_else(|_| "unknown".to_string())
1112}
1113
1114/// Convert config NodeRole to cluster NodeRole.
1115fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
1116    match role {
1117        ConfigNodeRole::Gateway => NodeRole::Gateway,
1118        ConfigNodeRole::Function => NodeRole::Function,
1119        ConfigNodeRole::Worker => NodeRole::Worker,
1120        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
1121    }
1122}
1123
1124#[cfg(test)]
1125#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1126mod tests {
1127    use super::*;
1128    use std::future::Future;
1129    use std::pin::Pin;
1130
1131    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1132
1133    struct TestMcpTool;
1134
1135    impl ForgeMcpTool for TestMcpTool {
1136        type Args = serde_json::Value;
1137        type Output = serde_json::Value;
1138
1139        fn info() -> McpToolInfo {
1140            McpToolInfo {
1141                name: "test.mcp.tool",
1142                title: None,
1143                description: None,
1144                required_role: None,
1145                is_public: false,
1146                timeout: None,
1147                rate_limit_requests: None,
1148                rate_limit_per_secs: None,
1149                rate_limit_key: None,
1150                annotations: McpToolAnnotations::default(),
1151                icons: &[],
1152            }
1153        }
1154
1155        fn execute(
1156            _ctx: &forge_core::McpToolContext,
1157            _args: Self::Args,
1158        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1159            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1160        }
1161    }
1162
1163    #[test]
1164    fn test_forge_builder_new() {
1165        let builder = ForgeBuilder::new();
1166        assert!(builder.config.is_none());
1167    }
1168
1169    #[test]
1170    fn test_forge_builder_requires_config() {
1171        let builder = ForgeBuilder::new();
1172        let result = builder.build();
1173        assert!(result.is_err());
1174    }
1175
1176    #[test]
1177    fn test_forge_builder_with_config() {
1178        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1179        let result = ForgeBuilder::new().config(config).build();
1180        assert!(result.is_ok());
1181    }
1182
1183    #[test]
1184    fn test_forge_builder_register_mcp_tool() {
1185        let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1186        assert_eq!(builder.mcp_registry.len(), 1);
1187    }
1188
1189    #[test]
1190    fn test_config_role_conversion() {
1191        assert_eq!(
1192            config_role_to_node_role(&ConfigNodeRole::Gateway),
1193            NodeRole::Gateway
1194        );
1195        assert_eq!(
1196            config_role_to_node_role(&ConfigNodeRole::Worker),
1197            NodeRole::Worker
1198        );
1199        assert_eq!(
1200            config_role_to_node_role(&ConfigNodeRole::Scheduler),
1201            NodeRole::Scheduler
1202        );
1203        assert_eq!(
1204            config_role_to_node_role(&ConfigNodeRole::Function),
1205            NodeRole::Function
1206        );
1207    }
1208}