Skip to main content

forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - SSE server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Cluster coordination
10
11use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34    ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49/// Type alias for frontend handler function.
50pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52/// Prelude module for common imports.
53pub mod prelude {
54    // Common types
55    pub use chrono::{DateTime, Utc};
56    pub use uuid::Uuid;
57
58    // Serde re-exports for user code
59    pub use serde::{Deserialize, Serialize};
60    pub use serde_json;
61
62    /// Timestamp type alias for convenience.
63    pub type Timestamp = DateTime<Utc>;
64
65    // Core types
66    pub use forge_core::auth::TokenPair;
67    pub use forge_core::cluster::NodeRole;
68    pub use forge_core::config::ForgeConfig;
69    pub use forge_core::cron::{CronContext, ForgeCron};
70    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
71    pub use forge_core::env::EnvAccess;
72    pub use forge_core::error::{ForgeError, Result};
73    pub use forge_core::function::{
74        AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
75    };
76    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
77    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
78    pub use forge_core::realtime::Delta;
79    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
80    pub use forge_core::schemars::JsonSchema;
81    pub use forge_core::types::Upload;
82    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
83    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
84
85    // Same axum version the runtime uses, avoids type mismatches in custom handlers
86    pub use axum;
87
88    pub use crate::{Forge, ForgeBuilder};
89}
90
91/// The main FORGE runtime.
92pub struct Forge {
93    config: ForgeConfig,
94    db: Option<Database>,
95    node_id: NodeId,
96    function_registry: FunctionRegistry,
97    mcp_registry: McpToolRegistry,
98    job_registry: JobRegistry,
99    cron_registry: Arc<CronRegistry>,
100    workflow_registry: WorkflowRegistry,
101    daemon_registry: Arc<DaemonRegistry>,
102    webhook_registry: Arc<WebhookRegistry>,
103    shutdown_tx: broadcast::Sender<()>,
104    /// Path to user migrations directory (default: ./migrations).
105    migrations_dir: PathBuf,
106    /// Additional migrations provided programmatically.
107    extra_migrations: Vec<Migration>,
108    /// Optional frontend handler for embedded SPA.
109    frontend_handler: Option<FrontendHandler>,
110    /// Custom axum routes merged into the top-level router.
111    custom_routes: Option<Router>,
112}
113
114impl Forge {
115    /// Create a new builder for configuring FORGE.
116    pub fn builder() -> ForgeBuilder {
117        ForgeBuilder::new()
118    }
119
120    /// Get the node ID.
121    pub fn node_id(&self) -> NodeId {
122        self.node_id
123    }
124
125    /// Get the configuration.
126    pub fn config(&self) -> &ForgeConfig {
127        &self.config
128    }
129
130    /// Get the function registry.
131    pub fn function_registry(&self) -> &FunctionRegistry {
132        &self.function_registry
133    }
134
135    /// Get the function registry mutably.
136    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
137        &mut self.function_registry
138    }
139
140    /// Get the MCP tool registry mutably.
141    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
142        &mut self.mcp_registry
143    }
144
145    /// Register an MCP tool without manually accessing the registry.
146    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
147        self.mcp_registry.register::<T>();
148        self
149    }
150
151    /// Get the job registry.
152    pub fn job_registry(&self) -> &JobRegistry {
153        &self.job_registry
154    }
155
156    /// Get the job registry mutably.
157    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
158        &mut self.job_registry
159    }
160
161    /// Get the cron registry.
162    pub fn cron_registry(&self) -> Arc<CronRegistry> {
163        self.cron_registry.clone()
164    }
165
166    /// Get the workflow registry.
167    pub fn workflow_registry(&self) -> &WorkflowRegistry {
168        &self.workflow_registry
169    }
170
171    /// Get the workflow registry mutably.
172    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
173        &mut self.workflow_registry
174    }
175
176    /// Get the daemon registry.
177    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
178        self.daemon_registry.clone()
179    }
180
181    /// Get the webhook registry.
182    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
183        self.webhook_registry.clone()
184    }
185
186    /// Run the FORGE server.
187    pub async fn run(mut self) -> Result<()> {
188        // Users shouldn't need tracing_subscriber boilerplate to see logs
189        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
190            &self.config.observability,
191            &self.config.project.name,
192            &self.config.project.version,
193        );
194        match forge_runtime::init_telemetry(
195            &telemetry_config,
196            &self.config.project.name,
197            &self.config.observability.log_level,
198        ) {
199            Ok(true) => {}
200            Ok(false) => {
201                // Subscriber already exists, user set one up manually
202            }
203            Err(e) => {
204                eprintln!("forge: failed to initialize telemetry: {e}");
205            }
206        }
207
208        tracing::debug!("Connecting to database");
209
210        // Connect to database
211        let db =
212            Database::from_config_with_service(&self.config.database, &self.config.project.name)
213                .await?;
214        let pool = db.primary().clone();
215        let jobs_pool = db.jobs_pool().clone();
216        let observability_pool = db.observability_pool().clone();
217        if let Some(handle) = db.start_health_monitor() {
218            let mut shutdown_rx = self.shutdown_tx.subscribe();
219            tokio::spawn(async move {
220                tokio::select! {
221                    _ = shutdown_rx.recv() => {}
222                    _ = handle => {}
223                }
224            });
225        }
226        self.db = Some(db);
227
228        tracing::debug!("Database connected");
229
230        // Run migrations with mesh-safe locking
231        // This acquires an advisory lock, so only one node runs migrations at a time
232        let runner = MigrationRunner::new(pool.clone());
233
234        // Load user migrations from directory + any programmatic ones
235        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
236        user_migrations.extend(self.extra_migrations.clone());
237
238        runner.run(user_migrations).await?;
239        tracing::debug!("Migrations applied");
240
241        // Get local node info
242        let hostname = get_hostname();
243
244        let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
245        let roles: Vec<NodeRole> = self
246            .config
247            .node
248            .roles
249            .iter()
250            .map(config_role_to_node_role)
251            .collect();
252
253        let node_info = NodeInfo::new_local(
254            hostname,
255            ip_address,
256            self.config.gateway.port,
257            self.config.gateway.grpc_port,
258            roles.clone(),
259            self.config.node.worker_capabilities.clone(),
260            env!("CARGO_PKG_VERSION").to_string(),
261        );
262
263        let node_id = node_info.id;
264        self.node_id = node_id;
265
266        // Create node registry
267        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
268
269        // Register node in cluster
270        if let Err(e) = node_registry.register().await {
271            tracing::debug!("Failed to register node (tables may not exist): {}", e);
272        }
273
274        // Set node status to active
275        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
276            tracing::debug!("Failed to set node status: {}", e);
277        }
278
279        // Create leader election for scheduler role
280        let leader_election = if roles.contains(&NodeRole::Scheduler) {
281            let election = Arc::new(LeaderElection::new(
282                pool.clone(),
283                node_id,
284                LeaderRole::Scheduler,
285                LeaderConfig::default(),
286            ));
287
288            // Try to become leader
289            if let Err(e) = election.try_become_leader().await {
290                tracing::debug!("Failed to acquire leadership: {}", e);
291            }
292
293            Some(election)
294        } else {
295            None
296        };
297
298        // Create graceful shutdown coordinator
299        let shutdown = Arc::new(GracefulShutdown::new(
300            node_registry.clone(),
301            leader_election.clone(),
302            ShutdownConfig::default(),
303        ));
304
305        // Create HTTP client with circuit breaker for actions and crons
306        let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
307
308        // Start background tasks based on roles
309        let mut handles = Vec::new();
310
311        // Start heartbeat loop
312        {
313            let heartbeat_pool = pool.clone();
314            let heartbeat_node_id = node_id;
315            let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
316            handles.push(tokio::spawn(async move {
317                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
318                heartbeat.run().await;
319            }));
320        }
321
322        // Start leader election loop if scheduler role
323        if let Some(ref election) = leader_election {
324            let election = election.clone();
325            handles.push(tokio::spawn(async move {
326                election.run().await;
327            }));
328        }
329
330        // Start job worker if worker role
331        if roles.contains(&NodeRole::Worker) {
332            let job_queue = JobQueue::new(jobs_pool.clone());
333            let worker_config = WorkerConfig {
334                id: Some(node_id.as_uuid()),
335                capabilities: self.config.node.worker_capabilities.clone(),
336                max_concurrent: self.config.worker.max_concurrent_jobs,
337                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
338                ..Default::default()
339            };
340
341            let mut worker = Worker::new(
342                worker_config,
343                job_queue,
344                self.job_registry.clone(),
345                jobs_pool.clone(),
346            );
347
348            handles.push(tokio::spawn(async move {
349                if let Err(e) = worker.run().await {
350                    tracing::error!("Worker error: {}", e);
351                }
352            }));
353
354            tracing::debug!("Job worker started");
355        }
356
357        // Start cron runner if scheduler role and is leader
358        if roles.contains(&NodeRole::Scheduler) {
359            let cron_registry = self.cron_registry.clone();
360            let cron_pool = jobs_pool.clone();
361            let cron_http = http_client.clone();
362            let cron_leader_election = leader_election.clone();
363
364            let cron_config = CronRunnerConfig {
365                poll_interval: Duration::from_secs(1),
366                node_id: node_id.as_uuid(),
367                is_leader: cron_leader_election.is_none(),
368                leader_election: cron_leader_election,
369                run_stale_threshold: Duration::from_secs(15 * 60),
370            };
371
372            let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
373
374            handles.push(tokio::spawn(async move {
375                if let Err(e) = cron_runner.run().await {
376                    tracing::error!("Cron runner error: {}", e);
377                }
378            }));
379
380            tracing::debug!("Cron scheduler started");
381        }
382
383        // Start workflow scheduler if scheduler role
384        let workflow_shutdown_token = CancellationToken::new();
385        if roles.contains(&NodeRole::Scheduler) {
386            let scheduler_executor = Arc::new(WorkflowExecutor::new(
387                Arc::new(self.workflow_registry.clone()),
388                jobs_pool.clone(),
389                http_client.clone(),
390            ));
391            let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
392            let scheduler = WorkflowScheduler::new(
393                jobs_pool.clone(),
394                scheduler_executor,
395                event_store,
396                WorkflowSchedulerConfig::default(),
397            );
398
399            let shutdown_token = workflow_shutdown_token.clone();
400            handles.push(tokio::spawn(async move {
401                scheduler.run(shutdown_token).await;
402            }));
403
404            tracing::debug!("Workflow scheduler started");
405        }
406
407        // Create job dispatcher and workflow executor for dispatch capabilities
408        let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
409        let job_dispatcher = Arc::new(JobDispatcher::new(
410            job_queue_for_dispatch,
411            self.job_registry.clone(),
412        ));
413        let workflow_executor = Arc::new(WorkflowExecutor::new(
414            Arc::new(self.workflow_registry.clone()),
415            jobs_pool.clone(),
416            http_client.clone(),
417        ));
418
419        // Start daemon runner if scheduler role (daemons run as singletons)
420        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
421            let daemon_registry = self.daemon_registry.clone();
422            let daemon_pool = jobs_pool.clone();
423            let daemon_http = http_client.clone();
424            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
425
426            let daemon_runner = DaemonRunner::new(
427                daemon_registry,
428                daemon_pool,
429                daemon_http,
430                node_id.as_uuid(),
431                daemon_shutdown_rx,
432            )
433            .with_job_dispatch(job_dispatcher.clone())
434            .with_workflow_dispatch(workflow_executor.clone());
435
436            handles.push(tokio::spawn(async move {
437                if let Err(e) = daemon_runner.run().await {
438                    tracing::error!("Daemon runner error: {}", e);
439                }
440            }));
441
442            tracing::debug!("Daemon runner started");
443        }
444
445        // Reactor handle for shutdown
446        let mut reactor_handle = None;
447
448        // Start HTTP gateway if gateway role
449        if roles.contains(&NodeRole::Gateway) {
450            let gateway_config = RuntimeGatewayConfig {
451                port: self.config.gateway.port,
452                max_connections: self.config.gateway.max_connections,
453                sse_max_sessions: self.config.gateway.sse_max_sessions,
454                request_timeout_secs: self.config.gateway.request_timeout_secs,
455                cors_enabled: self.config.gateway.cors_enabled
456                    || !self.config.gateway.cors_origins.is_empty(),
457                cors_origins: self.config.gateway.cors_origins.clone(),
458                auth: AuthConfig::from_forge_config(&self.config.auth)
459                    .map_err(|e| ForgeError::Config(e.to_string()))?,
460                mcp: self.config.mcp.clone(),
461                quiet_routes: self.config.gateway.quiet_routes.clone(),
462                token_ttl: forge_core::AuthTokenTtl {
463                    access_token_secs: self.config.auth.access_token_ttl_secs(),
464                    refresh_token_days: self.config.auth.refresh_token_ttl_days(),
465                },
466                project_name: self.config.project.name.clone(),
467            };
468
469            // Build gateway server (pass Database wrapper for read replica routing)
470            let gateway = GatewayServer::new(
471                gateway_config,
472                self.function_registry.clone(),
473                self.db
474                    .clone()
475                    .ok_or_else(|| ForgeError::Internal("Database not initialized".into()))?,
476            )
477            .with_job_dispatcher(job_dispatcher.clone())
478            .with_workflow_dispatcher(workflow_executor.clone())
479            .with_mcp_registry(self.mcp_registry.clone());
480
481            // Start the reactor for real-time updates
482            let reactor = gateway.reactor();
483            if let Err(e) = reactor.start().await {
484                tracing::error!("Failed to start reactor: {}", e);
485            } else {
486                tracing::debug!("Reactor started");
487                reactor_handle = Some(reactor);
488            }
489
490            // Build API router (all under /_api)
491            let api_router = gateway.router();
492
493            // Build final router with API
494            let mut router = Router::new().nest("/_api", api_router);
495
496            // Mount webhook routes under /_api (bypasses gateway auth middleware)
497            if !self.webhook_registry.is_empty() {
498                use axum::routing::post;
499                use tower_http::cors::{Any, CorsLayer};
500
501                let webhook_state = Arc::new(
502                    WebhookState::new(self.webhook_registry.clone(), pool.clone())
503                        .with_job_dispatcher(job_dispatcher.clone()),
504                );
505
506                // Webhook routes need their own CORS layer since they're outside the API router.
507                // Reuse gateway CORS policy rather than forcing wildcard access.
508                let webhook_cors = if self.config.gateway.cors_enabled
509                    || !self.config.gateway.cors_origins.is_empty()
510                {
511                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
512                        CorsLayer::new()
513                            .allow_origin(Any)
514                            .allow_methods(Any)
515                            .allow_headers(Any)
516                    } else {
517                        let origins: Vec<_> = self
518                            .config
519                            .gateway
520                            .cors_origins
521                            .iter()
522                            .filter_map(|o| o.parse().ok())
523                            .collect();
524                        CorsLayer::new()
525                            .allow_origin(origins)
526                            .allow_methods(Any)
527                            .allow_headers(Any)
528                    }
529                } else {
530                    CorsLayer::new()
531                };
532
533                let webhook_router = Router::new()
534                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
535                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
536                    .layer(
537                        tower::ServiceBuilder::new()
538                            .layer(axum::error_handling::HandleErrorLayer::new(
539                                |err: tower::BoxError| async move {
540                                    if err.is::<tower::timeout::error::Elapsed>() {
541                                        return (
542                                            axum::http::StatusCode::REQUEST_TIMEOUT,
543                                            "Request timed out",
544                                        );
545                                    }
546                                    (
547                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
548                                        "Server overloaded",
549                                    )
550                                },
551                            ))
552                            .layer(tower::limit::ConcurrencyLimitLayer::new(
553                                self.config.gateway.max_connections,
554                            ))
555                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
556                                self.config.gateway.request_timeout_secs,
557                            ))),
558                    )
559                    .layer(webhook_cors);
560
561                router = router.nest("/_api/webhooks", webhook_router);
562
563                tracing::debug!(
564                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
565                    "Webhook routes registered"
566                );
567            }
568
569            // MCP OAuth: mount OAuth routes or return JSON 404 for discovery
570            if self.config.mcp.enabled {
571                use axum::routing::get;
572
573                if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
574                    // OAuth API routes under /_api/oauth/* (bypass auth middleware)
575                    router = router.nest("/_api", oauth_api_router);
576
577                    // Well-known metadata at root level
578                    router = router
579                        .route(
580                            "/.well-known/oauth-authorization-server",
581                            get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
582                                .with_state(oauth_state.clone()),
583                        )
584                        .route(
585                            "/.well-known/oauth-protected-resource",
586                            get(forge_runtime::gateway::oauth::well_known_resource_metadata)
587                                .with_state(oauth_state),
588                        );
589
590                    tracing::info!("OAuth 2.1 endpoints enabled for MCP");
591                } else {
592                    // OAuth not configured: return parseable JSON 404
593                    async fn oauth_not_supported() -> impl axum::response::IntoResponse {
594                        (
595                            axum::http::StatusCode::NOT_FOUND,
596                            axum::Json(serde_json::json!({
597                                "error": "oauth_not_supported",
598                                "error_description": "This server does not support OAuth. Connect without authentication."
599                            })),
600                        )
601                    }
602                    router = router
603                        .route(
604                            "/.well-known/oauth-authorization-server",
605                            get(oauth_not_supported),
606                        )
607                        .route(
608                            "/.well-known/oauth-protected-resource",
609                            get(oauth_not_supported),
610                        );
611                }
612            }
613
614            // Merge custom routes before frontend fallback so they take precedence
615            if let Some(custom) = self.custom_routes.take() {
616                router = router.merge(custom);
617                tracing::debug!("Custom routes merged");
618            }
619
620            // Add frontend handler as fallback if configured
621            if let Some(handler) = self.frontend_handler {
622                use axum::routing::get;
623                router = router.fallback(get(handler));
624                tracing::debug!("Frontend handler enabled");
625            }
626
627            let addr = gateway.addr();
628
629            handles.push(tokio::spawn(async move {
630                tracing::debug!(addr = %addr, "Gateway server binding");
631                let listener = tokio::net::TcpListener::bind(addr)
632                    .await
633                    .expect("Failed to bind");
634                if let Err(e) = axum::serve(listener, router).await {
635                    tracing::error!("Gateway server error: {}", e);
636                }
637            }));
638        }
639
640        tracing::info!(
641            queries = self.function_registry.queries().count(),
642            mutations = self.function_registry.mutations().count(),
643            jobs = self.job_registry.len(),
644            crons = self.cron_registry.len(),
645            workflows = self.workflow_registry.len(),
646            daemons = self.daemon_registry.len(),
647            webhooks = self.webhook_registry.len(),
648            mcp_tools = self.mcp_registry.len(),
649            "Functions registered"
650        );
651
652        {
653            let metrics_pool = observability_pool;
654            tokio::spawn(async move {
655                loop {
656                    tokio::time::sleep(Duration::from_secs(15)).await;
657                    forge_runtime::observability::record_pool_metrics(&metrics_pool);
658                }
659            });
660        }
661
662        tracing::info!(
663            node_id = %node_id,
664            roles = ?roles,
665            port = self.config.gateway.port,
666            "Forge started"
667        );
668
669        // Wait for shutdown signal
670        let mut shutdown_rx = self.shutdown_tx.subscribe();
671
672        tokio::select! {
673            _ = tokio::signal::ctrl_c() => {
674                tracing::debug!("Received ctrl-c");
675            }
676            _ = shutdown_rx.recv() => {
677                tracing::debug!("Received shutdown notification");
678            }
679        }
680
681        // Graceful shutdown
682        tracing::debug!("Graceful shutdown starting");
683
684        // Stop workflow scheduler
685        workflow_shutdown_token.cancel();
686
687        if let Err(e) = shutdown.shutdown().await {
688            tracing::warn!(error = %e, "Shutdown error");
689        }
690
691        // Stop leader election
692        if let Some(ref election) = leader_election {
693            election.stop();
694        }
695
696        // Stop reactor before closing database
697        if let Some(ref reactor) = reactor_handle {
698            reactor.stop();
699        }
700
701        // Close database connections
702        if let Some(ref db) = self.db {
703            db.close().await;
704        }
705
706        forge_runtime::shutdown_telemetry();
707        tracing::info!("Forge stopped");
708        Ok(())
709    }
710
711    /// Request shutdown.
712    pub fn shutdown(&self) {
713        let _ = self.shutdown_tx.send(());
714    }
715}
716
717/// Builder for configuring the FORGE runtime.
718pub struct ForgeBuilder {
719    config: Option<ForgeConfig>,
720    function_registry: FunctionRegistry,
721    mcp_registry: McpToolRegistry,
722    job_registry: JobRegistry,
723    cron_registry: CronRegistry,
724    workflow_registry: WorkflowRegistry,
725    daemon_registry: DaemonRegistry,
726    webhook_registry: WebhookRegistry,
727    migrations_dir: PathBuf,
728    extra_migrations: Vec<Migration>,
729    frontend_handler: Option<FrontendHandler>,
730    custom_routes: Option<Router>,
731}
732
733impl ForgeBuilder {
734    /// Create a new builder.
735    pub fn new() -> Self {
736        Self {
737            config: None,
738            function_registry: FunctionRegistry::new(),
739            mcp_registry: McpToolRegistry::new(),
740            job_registry: JobRegistry::new(),
741            cron_registry: CronRegistry::new(),
742            workflow_registry: WorkflowRegistry::new(),
743            daemon_registry: DaemonRegistry::new(),
744            webhook_registry: WebhookRegistry::new(),
745            migrations_dir: PathBuf::from("migrations"),
746            extra_migrations: Vec::new(),
747            frontend_handler: None,
748            custom_routes: None,
749        }
750    }
751
752    /// Set the directory to load migrations from.
753    ///
754    /// Defaults to `./migrations`. Migration files should be named like:
755    /// - `0001_create_users.sql`
756    /// - `0002_add_posts.sql`
757    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
758        self.migrations_dir = path.into();
759        self
760    }
761
762    /// Add a migration programmatically.
763    ///
764    /// Use this for migrations that need to be generated at runtime,
765    /// or for testing. For most cases, use migration files instead.
766    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
767        self.extra_migrations.push(Migration::new(name, sql));
768        self
769    }
770
771    /// Set a frontend handler for serving embedded SPA assets.
772    ///
773    /// Use with the `embedded-frontend` feature to build a single binary
774    /// that includes both backend and frontend.
775    pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
776        self.frontend_handler = Some(handler);
777        self
778    }
779
780    /// Add custom axum routes to the server.
781    ///
782    /// Routes are merged at the top level, outside `/_api`, giving full
783    /// control over headers, extractors, and response types. Avoid paths
784    /// starting with `/_api` as they conflict with internal routes.
785    ///
786    /// ```ignore
787    /// use axum::{Router, routing::get};
788    ///
789    /// let routes = Router::new()
790    ///     .route("/custom/health", get(|| async { "ok" }));
791    ///
792    /// builder.custom_routes(routes);
793    /// ```
794    pub fn custom_routes(mut self, router: Router) -> Self {
795        self.custom_routes = Some(router);
796        self
797    }
798
799    /// Automatically register all functions discovered via `#[forge::query]`,
800    /// `#[forge::mutation]`, `#[forge::job]`, `#[forge::cron]`, `#[forge::workflow]`,
801    /// `#[forge::daemon]`, `#[forge::webhook]`, and `#[forge::mcp_tool]` macros.
802    ///
803    /// This replaces the need to manually call `.register_query::<T>()` etc.
804    /// for every function in your application.
805    pub fn auto_register(mut self) -> Self {
806        crate::auto_register::auto_register_all(
807            &mut self.function_registry,
808            &mut self.job_registry,
809            &mut self.cron_registry,
810            &mut self.workflow_registry,
811            &mut self.daemon_registry,
812            &mut self.webhook_registry,
813            &mut self.mcp_registry,
814        );
815        self
816    }
817
818    /// Set the configuration.
819    pub fn config(mut self, config: ForgeConfig) -> Self {
820        self.config = Some(config);
821        self
822    }
823
824    /// Get mutable access to the function registry.
825    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
826        &mut self.function_registry
827    }
828
829    /// Get mutable access to the job registry.
830    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
831        &mut self.job_registry
832    }
833
834    /// Get mutable access to the MCP tool registry.
835    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
836        &mut self.mcp_registry
837    }
838
839    /// Register an MCP tool without manually accessing the registry.
840    pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
841        self.mcp_registry.register::<T>();
842        self
843    }
844
845    /// Get mutable access to the cron registry.
846    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
847        &mut self.cron_registry
848    }
849
850    /// Get mutable access to the workflow registry.
851    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
852        &mut self.workflow_registry
853    }
854
855    /// Get mutable access to the daemon registry.
856    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
857        &mut self.daemon_registry
858    }
859
860    /// Get mutable access to the webhook registry.
861    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
862        &mut self.webhook_registry
863    }
864
865    /// Register a query function.
866    pub fn register_query<Q: ForgeQuery>(mut self) -> Self
867    where
868        Q::Args: serde::de::DeserializeOwned + Send + 'static,
869        Q::Output: serde::Serialize + Send + 'static,
870    {
871        self.function_registry.register_query::<Q>();
872        self
873    }
874
875    /// Register a mutation function.
876    pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
877    where
878        M::Args: serde::de::DeserializeOwned + Send + 'static,
879        M::Output: serde::Serialize + Send + 'static,
880    {
881        self.function_registry.register_mutation::<M>();
882        self
883    }
884
885    /// Register a background job.
886    pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
887    where
888        J::Args: serde::de::DeserializeOwned + Send + 'static,
889        J::Output: serde::Serialize + Send + 'static,
890    {
891        self.job_registry.register::<J>();
892        self
893    }
894
895    /// Register a cron handler.
896    pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
897        self.cron_registry.register::<C>();
898        self
899    }
900
901    /// Register a workflow.
902    pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
903    where
904        W::Input: serde::de::DeserializeOwned,
905        W::Output: serde::Serialize,
906    {
907        self.workflow_registry.register::<W>();
908        self
909    }
910
911    /// Register a daemon.
912    pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
913        self.daemon_registry.register::<D>();
914        self
915    }
916
917    /// Register a webhook.
918    pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
919        self.webhook_registry.register::<W>();
920        self
921    }
922
923    /// Build the FORGE runtime.
924    pub fn build(self) -> Result<Forge> {
925        let config = self
926            .config
927            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
928
929        let (shutdown_tx, _) = broadcast::channel(1);
930
931        Ok(Forge {
932            config,
933            db: None,
934            node_id: NodeId::new(),
935            function_registry: self.function_registry,
936            mcp_registry: self.mcp_registry,
937            job_registry: self.job_registry,
938            cron_registry: Arc::new(self.cron_registry),
939            workflow_registry: self.workflow_registry,
940            daemon_registry: Arc::new(self.daemon_registry),
941            webhook_registry: Arc::new(self.webhook_registry),
942            shutdown_tx,
943            migrations_dir: self.migrations_dir,
944            extra_migrations: self.extra_migrations,
945            frontend_handler: self.frontend_handler,
946            custom_routes: self.custom_routes,
947        })
948    }
949}
950
951impl Default for ForgeBuilder {
952    fn default() -> Self {
953        Self::new()
954    }
955}
956
957#[cfg(unix)]
958fn get_hostname() -> String {
959    nix::unistd::gethostname()
960        .map(|h| h.to_string_lossy().to_string())
961        .unwrap_or_else(|_| "unknown".to_string())
962}
963
964#[cfg(not(unix))]
965fn get_hostname() -> String {
966    std::env::var("COMPUTERNAME")
967        .or_else(|_| std::env::var("HOSTNAME"))
968        .unwrap_or_else(|_| "unknown".to_string())
969}
970
971/// Convert config NodeRole to cluster NodeRole.
972fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
973    match role {
974        ConfigNodeRole::Gateway => NodeRole::Gateway,
975        ConfigNodeRole::Function => NodeRole::Function,
976        ConfigNodeRole::Worker => NodeRole::Worker,
977        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
978    }
979}
980
981#[cfg(test)]
982#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
983mod tests {
984    use super::*;
985    use std::future::Future;
986    use std::pin::Pin;
987
988    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
989
990    struct TestMcpTool;
991
992    impl ForgeMcpTool for TestMcpTool {
993        type Args = serde_json::Value;
994        type Output = serde_json::Value;
995
996        fn info() -> McpToolInfo {
997            McpToolInfo {
998                name: "test.mcp.tool",
999                title: None,
1000                description: None,
1001                required_role: None,
1002                is_public: false,
1003                timeout: None,
1004                rate_limit_requests: None,
1005                rate_limit_per_secs: None,
1006                rate_limit_key: None,
1007                annotations: McpToolAnnotations::default(),
1008                icons: &[],
1009            }
1010        }
1011
1012        fn execute(
1013            _ctx: &forge_core::McpToolContext,
1014            _args: Self::Args,
1015        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1016            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1017        }
1018    }
1019
1020    #[test]
1021    fn test_forge_builder_new() {
1022        let builder = ForgeBuilder::new();
1023        assert!(builder.config.is_none());
1024    }
1025
1026    #[test]
1027    fn test_forge_builder_requires_config() {
1028        let builder = ForgeBuilder::new();
1029        let result = builder.build();
1030        assert!(result.is_err());
1031    }
1032
1033    #[test]
1034    fn test_forge_builder_with_config() {
1035        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1036        let result = ForgeBuilder::new().config(config).build();
1037        assert!(result.is_ok());
1038    }
1039
1040    #[test]
1041    fn test_forge_builder_register_mcp_tool() {
1042        let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1043        assert_eq!(builder.mcp_registry.len(), 1);
1044    }
1045
1046    #[test]
1047    fn test_config_role_conversion() {
1048        assert_eq!(
1049            config_role_to_node_role(&ConfigNodeRole::Gateway),
1050            NodeRole::Gateway
1051        );
1052        assert_eq!(
1053            config_role_to_node_role(&ConfigNodeRole::Worker),
1054            NodeRole::Worker
1055        );
1056        assert_eq!(
1057            config_role_to_node_role(&ConfigNodeRole::Scheduler),
1058            NodeRole::Scheduler
1059        );
1060        assert_eq!(
1061            config_role_to_node_role(&ConfigNodeRole::Function),
1062            NodeRole::Function
1063        );
1064    }
1065}